feat: add TCP keepalive options and connection pooling for improved performance

- Added `socket2` dependency for socket options.
- Introduced `keep_alive`, `keep_alive_initial_delay_ms`, and `max_connections` fields in `ConnectionConfig`.
- Implemented TCP keepalive settings in `TcpListenerManager` for both client and backend connections.
- Created a new `ConnectionPool` for managing idle HTTP/1.1 and HTTP/2 connections to reduce overhead.
- Enhanced TLS configuration to support ALPN for HTTP/2.
- Added performance tests for connection pooling, stability, and concurrent connections.
This commit is contained in:
2026-02-20 18:16:09 +00:00
parent 0f6752b9a7
commit 9521f2e044
14 changed files with 1058 additions and 101 deletions

View File

@@ -87,21 +87,20 @@ impl tokio::io::AsyncWrite for BackendStream {
}
}
/// Connect to a backend over TLS. Uses InsecureVerifier for internal backends
/// with self-signed certs (same pattern as tls_handler::connect_tls).
/// Connect to a backend over TLS using the shared backend TLS config
/// (from tls_handler). Session resumption is automatic.
async fn connect_tls_backend(
backend_tls_config: &Arc<rustls::ClientConfig>,
host: &str,
port: u16,
) -> Result<tokio_rustls::client::TlsStream<TcpStream>, Box<dyn std::error::Error + Send + Sync>> {
let _ = rustls::crypto::ring::default_provider().install_default();
let config = rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(Arc::new(InsecureBackendVerifier))
.with_no_client_auth();
let connector = tokio_rustls::TlsConnector::from(Arc::new(config));
let connector = tokio_rustls::TlsConnector::from(Arc::clone(backend_tls_config));
let stream = TcpStream::connect(format!("{}:{}", host, port)).await?;
stream.set_nodelay(true)?;
// Apply keepalive with 60s default
let _ = socket2::SockRef::from(&stream).set_tcp_keepalive(
&socket2::TcpKeepalive::new().with_time(std::time::Duration::from_secs(60))
);
let server_name = rustls::pki_types::ServerName::try_from(host.to_string())?;
let tls_stream = connector.connect(server_name, stream).await?;
@@ -109,56 +108,6 @@ async fn connect_tls_backend(
Ok(tls_stream)
}
/// Insecure certificate verifier for backend TLS connections.
/// Internal backends may use self-signed certs.
#[derive(Debug)]
struct InsecureBackendVerifier;
impl rustls::client::danger::ServerCertVerifier for InsecureBackendVerifier {
fn verify_server_cert(
&self,
_end_entity: &rustls::pki_types::CertificateDer<'_>,
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
_server_name: &rustls::pki_types::ServerName<'_>,
_ocsp_response: &[u8],
_now: rustls::pki_types::UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
Ok(rustls::client::danger::ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
vec![
rustls::SignatureScheme::RSA_PKCS1_SHA256,
rustls::SignatureScheme::RSA_PKCS1_SHA384,
rustls::SignatureScheme::RSA_PKCS1_SHA512,
rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
rustls::SignatureScheme::ED25519,
rustls::SignatureScheme::RSA_PSS_SHA256,
rustls::SignatureScheme::RSA_PSS_SHA384,
rustls::SignatureScheme::RSA_PSS_SHA512,
]
}
}
/// HTTP proxy service that processes HTTP traffic.
pub struct HttpProxyService {
route_manager: Arc<RouteManager>,
@@ -172,6 +121,10 @@ pub struct HttpProxyService {
request_counter: AtomicU64,
/// Cache of compiled URL rewrite regexes (keyed by pattern string).
regex_cache: DashMap<String, Regex>,
/// Shared backend TLS config for session resumption across connections.
backend_tls_config: Arc<rustls::ClientConfig>,
/// Backend connection pool for reusing keep-alive connections.
connection_pool: Arc<crate::connection_pool::ConnectionPool>,
}
impl HttpProxyService {
@@ -184,6 +137,8 @@ impl HttpProxyService {
route_rate_limiters: Arc::new(DashMap::new()),
request_counter: AtomicU64::new(0),
regex_cache: DashMap::new(),
backend_tls_config: Self::default_backend_tls_config(),
connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()),
}
}
@@ -201,9 +156,17 @@ impl HttpProxyService {
route_rate_limiters: Arc::new(DashMap::new()),
request_counter: AtomicU64::new(0),
regex_cache: DashMap::new(),
backend_tls_config: Self::default_backend_tls_config(),
connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()),
}
}
/// Set the shared backend TLS config (enables session resumption).
/// Call this after construction to inject the shared config from tls_handler.
pub fn set_backend_tls_config(&mut self, config: Arc<rustls::ClientConfig>) {
self.backend_tls_config = config;
}
/// Handle an incoming HTTP connection on a plain TCP stream.
pub async fn handle_connection(
self: Arc<Self>,
@@ -217,8 +180,10 @@ impl HttpProxyService {
/// Handle an incoming HTTP connection on any IO type (plain TCP or TLS-terminated).
///
/// Uses HTTP/1.1 with upgrade support. Responds to graceful shutdown via the
/// cancel token — in-flight requests complete, but no new requests are accepted.
/// Uses `hyper_util::server::conn::auto::Builder` to auto-detect h1 vs h2
/// based on ALPN negotiation (TLS) or connection preface (h2c).
/// Supports HTTP/1.1 upgrades (WebSocket) and HTTP/2 CONNECT.
/// Responds to graceful shutdown via the cancel token.
pub async fn handle_io<I>(
self: Arc<Self>,
stream: I,
@@ -241,24 +206,23 @@ impl HttpProxyService {
}
});
// Use http1::Builder with upgrades for WebSocket support
let mut conn = hyper::server::conn::http1::Builder::new()
.keep_alive(true)
.serve_connection(io, service)
.with_upgrades();
// Auto-detect h1 vs h2 based on ALPN / connection preface.
// serve_connection_with_upgrades supports h1 Upgrade (WebSocket) and h2 CONNECT.
let builder = hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new());
let conn = builder.serve_connection_with_upgrades(io, service);
// Pin on the heap — auto::UpgradeableConnection is !Unpin
let mut conn = Box::pin(conn);
// Use select to support graceful shutdown via cancellation token
let conn_pin = std::pin::Pin::new(&mut conn);
tokio::select! {
result = conn_pin => {
result = conn.as_mut() => {
if let Err(e) = result {
debug!("HTTP connection error from {}: {}", peer_addr, e);
}
}
_ = cancel.cancelled() => {
// Graceful shutdown: let in-flight request finish, stop accepting new ones
let conn_pin = std::pin::Pin::new(&mut conn);
conn_pin.graceful_shutdown();
conn.as_mut().graceful_shutdown();
if let Err(e) = conn.await {
debug!("HTTP connection error during shutdown from {}: {}", peer_addr, e);
}
@@ -478,11 +442,32 @@ impl HttpProxyService {
}
}
// Connect to upstream with timeout (TLS if upstream.use_tls is set)
// --- Connection pooling: try reusing an existing connection first ---
let pool_key = crate::connection_pool::PoolKey {
host: upstream.host.clone(),
port: upstream.port,
use_tls: upstream.use_tls,
h2: use_h2,
};
// Try pooled connection first (H2 only — H2 senders are Clone and multiplexed,
// so checkout doesn't consume request parts. For H1, we try pool inside forward_h1.)
if use_h2 {
if let Some(sender) = self.connection_pool.checkout_h2(&pool_key) {
let result = self.forward_h2_pooled(
sender, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &pool_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
return result;
}
}
// Fresh connection path
let backend = if upstream.use_tls {
match tokio::time::timeout(
self.connect_timeout,
connect_tls_backend(&upstream.host, upstream.port),
connect_tls_backend(&self.backend_tls_config, &upstream.host, upstream.port),
).await {
Ok(Ok(tls)) => BackendStream::Tls(tls),
Ok(Err(e)) => {
@@ -503,6 +488,9 @@ impl HttpProxyService {
).await {
Ok(Ok(s)) => {
s.set_nodelay(true).ok();
let _ = socket2::SockRef::from(&s).set_tcp_keepalive(
&socket2::TcpKeepalive::new().with_time(std::time::Duration::from_secs(60))
);
BackendStream::Plain(s)
}
Ok(Err(e)) => {
@@ -521,17 +509,16 @@ impl HttpProxyService {
let io = TokioIo::new(backend);
let result = if use_h2 {
// HTTP/2 backend
self.forward_h2(io, parts, body, upstream_headers, &upstream_path, &upstream, route_match.route, route_id, &ip_str).await
self.forward_h2(io, parts, body, upstream_headers, &upstream_path, &upstream, route_match.route, route_id, &ip_str, &pool_key).await
} else {
// HTTP/1.1 backend (default)
self.forward_h1(io, parts, body, upstream_headers, &upstream_path, &upstream, route_match.route, route_id, &ip_str).await
self.forward_h1(io, parts, body, upstream_headers, &upstream_path, &upstream, route_match.route, route_id, &ip_str, &pool_key).await
};
self.upstream_selector.connection_ended(&upstream_key);
result
}
/// Forward request to backend via HTTP/1.1 with body streaming.
/// Tries a pooled connection first; if unavailable, uses the fresh IO connection.
async fn forward_h1(
&self,
io: TokioIo<BackendStream>,
@@ -543,8 +530,21 @@ impl HttpProxyService {
route: &rustproxy_config::RouteConfig,
route_id: Option<&str>,
source_ip: &str,
pool_key: &crate::connection_pool::PoolKey,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let (mut sender, conn) = match hyper::client::conn::http1::handshake(io).await {
// Try pooled H1 connection first — avoids TCP+TLS handshake
if let Some(pooled_sender) = self.connection_pool.checkout_h1(pool_key) {
return self.forward_h1_with_sender(
pooled_sender, parts, body, upstream_headers, upstream_path,
route, route_id, source_ip, pool_key,
).await;
}
// Fresh connection: explicitly type the handshake with BoxBody for uniform pool type
let (sender, conn): (
hyper::client::conn::http1::SendRequest<BoxBody<Bytes, hyper::Error>>,
hyper::client::conn::http1::Connection<TokioIo<BackendStream>, BoxBody<Bytes, hyper::Error>>,
) = match hyper::client::conn::http1::handshake(io).await {
Ok(h) => h,
Err(e) => {
error!("Upstream handshake failed: {}", e);
@@ -558,6 +558,22 @@ impl HttpProxyService {
}
});
self.forward_h1_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, pool_key).await
}
/// Common H1 forwarding logic used by both fresh and pooled paths.
async fn forward_h1_with_sender(
&self,
mut sender: hyper::client::conn::http1::SendRequest<BoxBody<Bytes, hyper::Error>>,
parts: hyper::http::request::Parts,
body: Incoming,
upstream_headers: hyper::HeaderMap,
upstream_path: &str,
route: &rustproxy_config::RouteConfig,
route_id: Option<&str>,
source_ip: &str,
pool_key: &crate::connection_pool::PoolKey,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let mut upstream_req = Request::builder()
.method(parts.method)
.uri(upstream_path)
@@ -567,7 +583,7 @@ impl HttpProxyService {
*headers = upstream_headers;
}
// Wrap the request body in CountingBody to track bytes_in
// Wrap the request body in CountingBody then box it for the uniform pool type
let counting_req_body = CountingBody::new(
body,
Arc::clone(&self.metrics),
@@ -575,9 +591,9 @@ impl HttpProxyService {
Some(source_ip.to_string()),
Direction::In,
);
let boxed_body: BoxBody<Bytes, hyper::Error> = BoxBody::new(counting_req_body);
// Stream the request body through to upstream
let upstream_req = upstream_req.body(counting_req_body).unwrap();
let upstream_req = upstream_req.body(boxed_body).unwrap();
let upstream_response = match sender.send_request(upstream_req).await {
Ok(resp) => resp,
@@ -587,10 +603,14 @@ impl HttpProxyService {
}
};
// Return sender to pool (body streams lazily, sender is reusable once response head is received)
self.connection_pool.checkin_h1(pool_key.clone(), sender);
self.build_streaming_response(upstream_response, route, route_id, source_ip).await
}
/// Forward request to backend via HTTP/2 with body streaming.
/// Forward request to backend via HTTP/2 with body streaming (fresh connection).
/// Registers the h2 sender in the pool for future multiplexed requests.
async fn forward_h2(
&self,
io: TokioIo<BackendStream>,
@@ -602,9 +622,14 @@ impl HttpProxyService {
route: &rustproxy_config::RouteConfig,
route_id: Option<&str>,
source_ip: &str,
pool_key: &crate::connection_pool::PoolKey,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let exec = hyper_util::rt::TokioExecutor::new();
let (mut sender, conn) = match hyper::client::conn::http2::handshake(exec, io).await {
// Explicitly type the handshake with BoxBody for uniform pool type
let (sender, conn): (
hyper::client::conn::http2::SendRequest<BoxBody<Bytes, hyper::Error>>,
hyper::client::conn::http2::Connection<TokioIo<BackendStream>, BoxBody<Bytes, hyper::Error>, hyper_util::rt::TokioExecutor>,
) = match hyper::client::conn::http2::handshake(exec, io).await {
Ok(h) => h,
Err(e) => {
error!("HTTP/2 upstream handshake failed: {}", e);
@@ -618,6 +643,40 @@ impl HttpProxyService {
}
});
// Register for multiplexed reuse
self.connection_pool.register_h2(pool_key.clone(), sender.clone());
self.forward_h2_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip).await
}
/// Forward request using an existing (pooled) HTTP/2 sender.
async fn forward_h2_pooled(
&self,
sender: hyper::client::conn::http2::SendRequest<BoxBody<Bytes, hyper::Error>>,
parts: hyper::http::request::Parts,
body: Incoming,
upstream_headers: hyper::HeaderMap,
upstream_path: &str,
route: &rustproxy_config::RouteConfig,
route_id: Option<&str>,
source_ip: &str,
_pool_key: &crate::connection_pool::PoolKey,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
self.forward_h2_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip).await
}
/// Common H2 forwarding logic used by both fresh and pooled paths.
async fn forward_h2_with_sender(
&self,
mut sender: hyper::client::conn::http2::SendRequest<BoxBody<Bytes, hyper::Error>>,
parts: hyper::http::request::Parts,
body: Incoming,
upstream_headers: hyper::HeaderMap,
upstream_path: &str,
route: &rustproxy_config::RouteConfig,
route_id: Option<&str>,
source_ip: &str,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let mut upstream_req = Request::builder()
.method(parts.method)
.uri(upstream_path);
@@ -626,7 +685,7 @@ impl HttpProxyService {
*headers = upstream_headers;
}
// Wrap the request body in CountingBody to track bytes_in
// Wrap the request body in CountingBody then box it for the uniform pool type
let counting_req_body = CountingBody::new(
body,
Arc::clone(&self.metrics),
@@ -634,9 +693,9 @@ impl HttpProxyService {
Some(source_ip.to_string()),
Direction::In,
);
let boxed_body: BoxBody<Bytes, hyper::Error> = BoxBody::new(counting_req_body);
// Stream the request body through to upstream
let upstream_req = upstream_req.body(counting_req_body).unwrap();
let upstream_req = upstream_req.body(boxed_body).unwrap();
let upstream_response = match sender.send_request(upstream_req).await {
Ok(resp) => resp,
@@ -723,7 +782,7 @@ impl HttpProxyService {
let mut upstream_stream: BackendStream = if upstream.use_tls {
match tokio::time::timeout(
self.connect_timeout,
connect_tls_backend(&upstream.host, upstream.port),
connect_tls_backend(&self.backend_tls_config, &upstream.host, upstream.port),
).await {
Ok(Ok(tls)) => BackendStream::Tls(tls),
Ok(Err(e)) => {
@@ -744,6 +803,9 @@ impl HttpProxyService {
).await {
Ok(Ok(s)) => {
s.set_nodelay(true).ok();
let _ = socket2::SockRef::from(&s).set_tcp_keepalive(
&socket2::TcpKeepalive::new().with_time(std::time::Duration::from_secs(60))
);
BackendStream::Plain(s)
}
Ok(Err(e)) => {
@@ -1221,6 +1283,70 @@ fn guess_content_type(path: &std::path::Path) -> &'static str {
}
}
impl HttpProxyService {
/// Build a default backend TLS config with InsecureVerifier.
/// Used as fallback when no shared config is injected from tls_handler.
fn default_backend_tls_config() -> Arc<rustls::ClientConfig> {
let _ = rustls::crypto::ring::default_provider().install_default();
let config = rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(Arc::new(InsecureBackendVerifier))
.with_no_client_auth();
Arc::new(config)
}
}
/// Insecure certificate verifier for backend TLS connections (fallback only).
/// The production path uses the shared config from tls_handler which has the same
/// behavior but with session resumption across all outbound connections.
#[derive(Debug)]
struct InsecureBackendVerifier;
impl rustls::client::danger::ServerCertVerifier for InsecureBackendVerifier {
fn verify_server_cert(
&self,
_end_entity: &rustls::pki_types::CertificateDer<'_>,
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
_server_name: &rustls::pki_types::ServerName<'_>,
_ocsp_response: &[u8],
_now: rustls::pki_types::UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
Ok(rustls::client::danger::ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
vec![
rustls::SignatureScheme::RSA_PKCS1_SHA256,
rustls::SignatureScheme::RSA_PKCS1_SHA384,
rustls::SignatureScheme::RSA_PKCS1_SHA512,
rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
rustls::SignatureScheme::ED25519,
rustls::SignatureScheme::RSA_PSS_SHA256,
rustls::SignatureScheme::RSA_PSS_SHA384,
rustls::SignatureScheme::RSA_PSS_SHA512,
]
}
}
impl Default for HttpProxyService {
fn default() -> Self {
Self {
@@ -1231,6 +1357,8 @@ impl Default for HttpProxyService {
route_rate_limiters: Arc::new(DashMap::new()),
request_counter: AtomicU64::new(0),
regex_cache: DashMap::new(),
backend_tls_config: Self::default_backend_tls_config(),
connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()),
}
}
}