//! Backend connection pool for HTTP/1.1 and HTTP/2. //! //! Reuses idle keep-alive connections to avoid per-request TCP+TLS handshakes. //! HTTP/2 connections are multiplexed (clone the sender for each request). use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, Instant}; use bytes::Bytes; use dashmap::DashMap; use http_body_util::combinators::BoxBody; use hyper::client::conn::{http1, http2}; /// Maximum idle connections per backend key. const MAX_IDLE_PER_KEY: usize = 16; /// Default idle timeout — connections not used within this window are evicted. const IDLE_TIMEOUT: Duration = Duration::from_secs(90); /// Background eviction interval. const EVICTION_INTERVAL: Duration = Duration::from_secs(30); /// Maximum age for pooled HTTP/2 connections before proactive eviction. /// Prevents staleness from backends that close idle connections (e.g. nginx GOAWAY). /// 120s is well within typical server GOAWAY windows (nginx: ~60s idle, envoy: ~60s). const MAX_H2_AGE: Duration = Duration::from_secs(120); /// Identifies a unique backend endpoint. #[derive(Clone, Debug, Hash, Eq, PartialEq)] pub struct PoolKey { pub host: String, pub port: u16, pub use_tls: bool, pub h2: bool, } /// An idle HTTP/1.1 sender with a timestamp for eviction. struct IdleH1 { sender: http1::SendRequest>, idle_since: Instant, } /// A pooled HTTP/2 sender (multiplexed, Clone-able) with a generation tag. struct PooledH2 { sender: http2::SendRequest>, created_at: Instant, /// Unique generation ID. Connection drivers use this to only remove their OWN /// entry, preventing phantom eviction when multiple connections share the same key. generation: u64, } /// Backend connection pool. pub struct ConnectionPool { /// HTTP/1.1 idle connections indexed by backend key. h1_pool: Arc>>, /// HTTP/2 multiplexed connections indexed by backend key. h2_pool: Arc>, /// Monotonic generation counter for H2 pool entries. h2_generation: AtomicU64, /// Handle for the background eviction task. eviction_handle: Option>, } impl ConnectionPool { /// Create a new pool and start the background eviction task. pub fn new() -> Self { let h1_pool: Arc>> = Arc::new(DashMap::new()); let h2_pool: Arc> = Arc::new(DashMap::new()); let h1_clone = Arc::clone(&h1_pool); let h2_clone = Arc::clone(&h2_pool); let eviction_handle = tokio::spawn(async move { Self::eviction_loop(h1_clone, h2_clone).await; }); Self { h1_pool, h2_pool, h2_generation: AtomicU64::new(0), eviction_handle: Some(eviction_handle), } } /// Try to check out an idle HTTP/1.1 sender for the given key. /// Returns `None` if no usable idle connection exists. pub fn checkout_h1(&self, key: &PoolKey) -> Option>> { let mut entry = self.h1_pool.get_mut(key)?; let idles = entry.value_mut(); while let Some(idle) = idles.pop() { // Check if the connection is still alive and ready if idle.idle_since.elapsed() < IDLE_TIMEOUT && idle.sender.is_ready() && !idle.sender.is_closed() { // H1 pool hit — no logging on hot path return Some(idle.sender); } // Stale or closed — drop it } // Clean up empty entry if idles.is_empty() { drop(entry); self.h1_pool.remove(key); } None } /// Return an HTTP/1.1 sender to the pool after the response body has been prepared. /// The caller should NOT call this if the sender is closed or not ready. pub fn checkin_h1(&self, key: PoolKey, sender: http1::SendRequest>) { if sender.is_closed() || !sender.is_ready() { return; // Don't pool broken connections } let mut entry = self.h1_pool.entry(key).or_insert_with(Vec::new); if entry.value().len() < MAX_IDLE_PER_KEY { entry.value_mut().push(IdleH1 { sender, idle_since: Instant::now(), }); } // If at capacity, just drop the sender } /// Try to get a cloned HTTP/2 sender for the given key. /// HTTP/2 senders are Clone-able (multiplexed), so we clone rather than remove. pub fn checkout_h2(&self, key: &PoolKey) -> Option<(http2::SendRequest>, Duration)> { let entry = self.h2_pool.get(key)?; let pooled = entry.value(); let age = pooled.created_at.elapsed(); if pooled.sender.is_closed() || age >= MAX_H2_AGE { drop(entry); self.h2_pool.remove(key); return None; } if pooled.sender.is_ready() { return Some((pooled.sender.clone(), age)); } None } /// Remove a dead HTTP/2 sender from the pool (unconditional). /// Called when `send_request` fails to prevent subsequent requests from reusing the stale sender. pub fn remove_h2(&self, key: &PoolKey) { self.h2_pool.remove(key); } /// Remove an HTTP/2 sender ONLY if the current entry has the expected generation. /// This prevents phantom eviction: when multiple connections share the same key, /// an old connection's driver won't accidentally remove a newer connection's entry. pub fn remove_h2_if_generation(&self, key: &PoolKey, expected_gen: u64) { if let Some(entry) = self.h2_pool.get(key) { if entry.value().generation == expected_gen { drop(entry); // release DashMap ref before remove self.h2_pool.remove(key); } // else: a newer connection replaced ours — don't touch it } } /// Register an HTTP/2 sender in the pool. Returns the generation ID for this entry. /// The caller should pass this generation to the connection driver so it can use /// `remove_h2_if_generation` instead of `remove_h2` to avoid phantom eviction. pub fn register_h2(&self, key: PoolKey, sender: http2::SendRequest>) -> u64 { let gen = self.h2_generation.fetch_add(1, Ordering::Relaxed); if sender.is_closed() { return gen; } self.h2_pool.insert(key, PooledH2 { sender, created_at: Instant::now(), generation: gen, }); gen } /// Background eviction loop — runs every EVICTION_INTERVAL to remove stale connections. async fn eviction_loop( h1_pool: Arc>>, h2_pool: Arc>, ) { let mut interval = tokio::time::interval(EVICTION_INTERVAL); loop { interval.tick().await; // Evict stale H1 connections let mut empty_keys = Vec::new(); for mut entry in h1_pool.iter_mut() { entry.value_mut().retain(|idle| { idle.idle_since.elapsed() < IDLE_TIMEOUT && !idle.sender.is_closed() }); if entry.value().is_empty() { empty_keys.push(entry.key().clone()); } } for key in empty_keys { h1_pool.remove(&key); } // Evict dead or aged-out H2 connections let mut dead_h2 = Vec::new(); for entry in h2_pool.iter() { if entry.value().sender.is_closed() || entry.value().created_at.elapsed() >= MAX_H2_AGE { dead_h2.push(entry.key().clone()); } } for key in dead_h2 { h2_pool.remove(&key); } } } } impl Drop for ConnectionPool { fn drop(&mut self) { if let Some(handle) = self.eviction_handle.take() { handle.abort(); } } }