From 8e76c42cea72966ab07883da0a272c2db05551ee Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Mon, 16 Mar 2026 12:29:15 +0000 Subject: [PATCH] fix(rustproxy-http): validate pooled HTTP/2 connections asynchronously before reuse and evict stale senders --- changelog.md | 7 +++ .../rustproxy-http/src/connection_pool.rs | 17 +++++--- .../rustproxy-http/src/proxy_service.rs | 43 ++++++++++++++----- ts/00_commitinfo_data.ts | 2 +- 4 files changed, 53 insertions(+), 16 deletions(-) diff --git a/changelog.md b/changelog.md index 52c3024..3f388bd 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-16 - 25.11.10 - fix(rustproxy-http) +validate pooled HTTP/2 connections asynchronously before reuse and evict stale senders + +- Add an async ready() check with a 500ms timeout before reusing pooled HTTP/2 senders to catch GOAWAY/RST states before forwarding requests +- Return connection age from the HTTP/2 pool checkout path and log warnings for older pooled connections +- Evict pooled HTTP/2 senders when they are closed, exceed max age, fail readiness validation, or time out during readiness checks + ## 2026-03-16 - 25.11.9 - fix(rustproxy-routing) reduce hot-path allocations in routing, metrics, and proxy protocol handling diff --git a/rust/crates/rustproxy-http/src/connection_pool.rs b/rust/crates/rustproxy-http/src/connection_pool.rs index 95c3f3a..537d32d 100644 --- a/rust/crates/rustproxy-http/src/connection_pool.rs +++ b/rust/crates/rustproxy-http/src/connection_pool.rs @@ -10,7 +10,7 @@ use bytes::Bytes; use dashmap::DashMap; use http_body_util::combinators::BoxBody; use hyper::client::conn::{http1, http2}; -use tracing::debug; +use tracing::{debug, warn}; /// Maximum idle connections per backend key. const MAX_IDLE_PER_KEY: usize = 16; @@ -115,20 +115,27 @@ impl ConnectionPool { /// Try to get a cloned HTTP/2 sender for the given key. /// HTTP/2 senders are Clone-able (multiplexed), so we clone rather than remove. - pub fn checkout_h2(&self, key: &PoolKey) -> Option>> { + pub fn checkout_h2(&self, key: &PoolKey) -> Option<(http2::SendRequest>, Duration)> { let entry = self.h2_pool.get(key)?; let pooled = entry.value(); + let age = pooled.created_at.elapsed(); // Check if the h2 connection is still alive and not too old - if pooled.sender.is_closed() || pooled.created_at.elapsed() >= MAX_H2_AGE { + if pooled.sender.is_closed() || age >= MAX_H2_AGE { + let reason = if pooled.sender.is_closed() { "closed" } else { "max_age" }; + debug!("Pool evict (h2): {}:{} (reason={}, age={:.1}s)", key.host, key.port, reason, age.as_secs_f64()); drop(entry); self.h2_pool.remove(key); return None; } if pooled.sender.is_ready() { - debug!("Pool hit (h2): {}:{}", key.host, key.port); - return Some(pooled.sender.clone()); + if age > Duration::from_secs(30) { + warn!("Pool hit (h2): {}:{} — connection age {:.1}s (>30s, may be stale)", key.host, key.port, age.as_secs_f64()); + } else { + debug!("Pool hit (h2): {}:{} (age={:.1}s)", key.host, key.port, age.as_secs_f64()); + } + return Some((pooled.sender.clone(), age)); } None } diff --git a/rust/crates/rustproxy-http/src/proxy_service.rs b/rust/crates/rustproxy-http/src/proxy_service.rs index 2d0bac0..656afc9 100644 --- a/rust/crates/rustproxy-http/src/proxy_service.rs +++ b/rust/crates/rustproxy-http/src/proxy_service.rs @@ -659,17 +659,40 @@ impl HttpProxyService { h2: use_h2, }; - // H2 pool checkout (H2 senders are Clone and multiplexed) + // H2 pool checkout with async readiness validation. + // checkout_h2 does synchronous is_closed()/is_ready() checks, but these + // reflect cached state — the H2 connection driver (a separate tokio task) + // may not have processed a pending GOAWAY/RST yet. The ready().await + // forces the runtime to yield, giving the driver a chance to detect failures. if use_h2 { - if let Some(sender) = self.connection_pool.checkout_h2(&pool_key) { - self.metrics.backend_pool_hit(&upstream_key); - self.metrics.set_backend_protocol(&upstream_key, "h2"); - let result = self.forward_h2_pooled( - sender, parts, body, upstream_headers, &upstream_path, - route_match.route, route_id, &ip_str, &pool_key, domain_str, &conn_activity, - ).await; - self.upstream_selector.connection_ended(&upstream_key); - return result; + if let Some((mut sender, age)) = self.connection_pool.checkout_h2(&pool_key) { + match tokio::time::timeout( + std::time::Duration::from_millis(500), + sender.ready(), + ).await { + Ok(Ok(())) => { + self.metrics.backend_pool_hit(&upstream_key); + self.metrics.set_backend_protocol(&upstream_key, "h2"); + let result = self.forward_h2_pooled( + sender, parts, body, upstream_headers, &upstream_path, + route_match.route, route_id, &ip_str, &pool_key, domain_str, &conn_activity, + ).await; + self.upstream_selector.connection_ended(&upstream_key); + return result; + } + Ok(Err(e)) => { + warn!(backend = %upstream_key, age_secs = age.as_secs(), + "Pooled H2 sender failed ready check (GOAWAY/RST): {}, evicting", e); + self.connection_pool.remove_h2(&pool_key); + // Fall through to fresh connection + } + Err(_) => { + warn!(backend = %upstream_key, age_secs = age.as_secs(), + "Pooled H2 sender ready check timed out (500ms), evicting"); + self.connection_pool.remove_h2(&pool_key); + // Fall through to fresh connection + } + } } } } diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 1e78964..6a07a5e 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '25.11.9', + version: '25.11.10', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' }