//! Backend connection pool for HTTP/1.1 and HTTP/2. //! //! Reuses idle keep-alive connections to avoid per-request TCP+TLS handshakes. //! HTTP/2 connections are multiplexed (clone the sender for each request). use std::sync::Arc; use std::time::{Duration, Instant}; use bytes::Bytes; use dashmap::DashMap; use http_body_util::combinators::BoxBody; use hyper::client::conn::{http1, http2}; use tracing::debug; /// Maximum idle connections per backend key. const MAX_IDLE_PER_KEY: usize = 16; /// Default idle timeout — connections not used within this window are evicted. const IDLE_TIMEOUT: Duration = Duration::from_secs(90); /// Background eviction interval. const EVICTION_INTERVAL: Duration = Duration::from_secs(30); /// Maximum age for pooled HTTP/2 connections before proactive eviction. /// Prevents staleness from backends that close idle connections (e.g. nginx GOAWAY). /// 120s is well within typical server GOAWAY windows (nginx: ~60s idle, envoy: ~60s). const MAX_H2_AGE: Duration = Duration::from_secs(120); /// Identifies a unique backend endpoint. #[derive(Clone, Debug, Hash, Eq, PartialEq)] pub struct PoolKey { pub host: String, pub port: u16, pub use_tls: bool, pub h2: bool, } /// An idle HTTP/1.1 sender with a timestamp for eviction. struct IdleH1 { sender: http1::SendRequest>, idle_since: Instant, } /// A pooled HTTP/2 sender (multiplexed, Clone-able). struct PooledH2 { sender: http2::SendRequest>, created_at: Instant, } /// Backend connection pool. pub struct ConnectionPool { /// HTTP/1.1 idle connections indexed by backend key. h1_pool: Arc>>, /// HTTP/2 multiplexed connections indexed by backend key. h2_pool: Arc>, /// Handle for the background eviction task. eviction_handle: Option>, } impl ConnectionPool { /// Create a new pool and start the background eviction task. pub fn new() -> Self { let h1_pool: Arc>> = Arc::new(DashMap::new()); let h2_pool: Arc> = Arc::new(DashMap::new()); let h1_clone = Arc::clone(&h1_pool); let h2_clone = Arc::clone(&h2_pool); let eviction_handle = tokio::spawn(async move { Self::eviction_loop(h1_clone, h2_clone).await; }); Self { h1_pool, h2_pool, eviction_handle: Some(eviction_handle), } } /// Try to check out an idle HTTP/1.1 sender for the given key. /// Returns `None` if no usable idle connection exists. pub fn checkout_h1(&self, key: &PoolKey) -> Option>> { let mut entry = self.h1_pool.get_mut(key)?; let idles = entry.value_mut(); while let Some(idle) = idles.pop() { // Check if the connection is still alive and ready if idle.idle_since.elapsed() < IDLE_TIMEOUT && idle.sender.is_ready() && !idle.sender.is_closed() { debug!("Pool hit (h1): {}:{}", key.host, key.port); return Some(idle.sender); } // Stale or closed — drop it } // Clean up empty entry if idles.is_empty() { drop(entry); self.h1_pool.remove(key); } None } /// Return an HTTP/1.1 sender to the pool after the response body has been prepared. /// The caller should NOT call this if the sender is closed or not ready. pub fn checkin_h1(&self, key: PoolKey, sender: http1::SendRequest>) { if sender.is_closed() || !sender.is_ready() { return; // Don't pool broken connections } let mut entry = self.h1_pool.entry(key).or_insert_with(Vec::new); if entry.value().len() < MAX_IDLE_PER_KEY { entry.value_mut().push(IdleH1 { sender, idle_since: Instant::now(), }); } // If at capacity, just drop the sender } /// Try to get a cloned HTTP/2 sender for the given key. /// HTTP/2 senders are Clone-able (multiplexed), so we clone rather than remove. pub fn checkout_h2(&self, key: &PoolKey) -> Option>> { let entry = self.h2_pool.get(key)?; let pooled = entry.value(); // Check if the h2 connection is still alive and not too old if pooled.sender.is_closed() || pooled.created_at.elapsed() >= MAX_H2_AGE { drop(entry); self.h2_pool.remove(key); return None; } if pooled.sender.is_ready() { debug!("Pool hit (h2): {}:{}", key.host, key.port); return Some(pooled.sender.clone()); } None } /// Remove a dead HTTP/2 sender from the pool. /// Called when `send_request` fails to prevent subsequent requests from reusing the stale sender. pub fn remove_h2(&self, key: &PoolKey) { self.h2_pool.remove(key); } /// Register an HTTP/2 sender in the pool. Since h2 is multiplexed, /// only one sender per key is stored (it's Clone-able). pub fn register_h2(&self, key: PoolKey, sender: http2::SendRequest>) { if sender.is_closed() { return; } self.h2_pool.insert(key, PooledH2 { sender, created_at: Instant::now(), }); } /// Background eviction loop — runs every EVICTION_INTERVAL to remove stale connections. async fn eviction_loop( h1_pool: Arc>>, h2_pool: Arc>, ) { let mut interval = tokio::time::interval(EVICTION_INTERVAL); loop { interval.tick().await; // Evict stale H1 connections let mut empty_keys = Vec::new(); for mut entry in h1_pool.iter_mut() { entry.value_mut().retain(|idle| { idle.idle_since.elapsed() < IDLE_TIMEOUT && !idle.sender.is_closed() }); if entry.value().is_empty() { empty_keys.push(entry.key().clone()); } } for key in empty_keys { h1_pool.remove(&key); } // Evict dead or aged-out H2 connections let mut dead_h2 = Vec::new(); for entry in h2_pool.iter() { if entry.value().sender.is_closed() || entry.value().created_at.elapsed() >= MAX_H2_AGE { dead_h2.push(entry.key().clone()); } } for key in dead_h2 { h2_pool.remove(&key); } } } } impl Drop for ConnectionPool { fn drop(&mut self) { if let Some(handle) = self.eviction_handle.take() { handle.abort(); } } }