From 77abe0804d50ac919f5bf622ceb661cf542cd646 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Mon, 16 Mar 2026 14:30:43 +0000 Subject: [PATCH] fix(rustproxy-http): prevent stale HTTP/2 connection drivers from evicting newer pooled connections --- changelog.md | 6 +++ .../rustproxy-http/src/connection_pool.rs | 37 +++++++++++++---- .../rustproxy-http/src/proxy_service.rs | 41 ++++++++++++++----- ts/00_commitinfo_data.ts | 2 +- 4 files changed, 68 insertions(+), 18 deletions(-) diff --git a/changelog.md b/changelog.md index 2bc5971..91a10ef 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,11 @@ # Changelog +## 2026-03-16 - 25.11.17 - fix(rustproxy-http) +prevent stale HTTP/2 connection drivers from evicting newer pooled connections + +- add generation IDs to pooled HTTP/2 senders so pool removal only affects the matching connection +- update HTTP/2 proxy and retry paths to register generation-tagged connections and skip eviction before registration completes + ## 2026-03-16 - 25.11.16 - fix(repo) no changes to commit diff --git a/rust/crates/rustproxy-http/src/connection_pool.rs b/rust/crates/rustproxy-http/src/connection_pool.rs index c956f55..efea6c4 100644 --- a/rust/crates/rustproxy-http/src/connection_pool.rs +++ b/rust/crates/rustproxy-http/src/connection_pool.rs @@ -4,13 +4,13 @@ //! HTTP/2 connections are multiplexed (clone the sender for each request). use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, Instant}; use bytes::Bytes; use dashmap::DashMap; use http_body_util::combinators::BoxBody; use hyper::client::conn::{http1, http2}; -// No per-request logging in the pool — only log on actual failures (in proxy_service.rs) /// Maximum idle connections per backend key. const MAX_IDLE_PER_KEY: usize = 16; @@ -38,10 +38,13 @@ struct IdleH1 { idle_since: Instant, } -/// A pooled HTTP/2 sender (multiplexed, Clone-able). +/// A pooled HTTP/2 sender (multiplexed, Clone-able) with a generation tag. struct PooledH2 { sender: http2::SendRequest>, created_at: Instant, + /// Unique generation ID. Connection drivers use this to only remove their OWN + /// entry, preventing phantom eviction when multiple connections share the same key. + generation: u64, } /// Backend connection pool. @@ -50,6 +53,8 @@ pub struct ConnectionPool { h1_pool: Arc>>, /// HTTP/2 multiplexed connections indexed by backend key. h2_pool: Arc>, + /// Monotonic generation counter for H2 pool entries. + h2_generation: AtomicU64, /// Handle for the background eviction task. eviction_handle: Option>, } @@ -69,6 +74,7 @@ impl ConnectionPool { Self { h1_pool, h2_pool, + h2_generation: AtomicU64::new(0), eviction_handle: Some(eviction_handle), } } @@ -132,22 +138,39 @@ impl ConnectionPool { None } - /// Remove a dead HTTP/2 sender from the pool. + /// Remove a dead HTTP/2 sender from the pool (unconditional). /// Called when `send_request` fails to prevent subsequent requests from reusing the stale sender. pub fn remove_h2(&self, key: &PoolKey) { self.h2_pool.remove(key); } - /// Register an HTTP/2 sender in the pool. Since h2 is multiplexed, - /// only one sender per key is stored (it's Clone-able). - pub fn register_h2(&self, key: PoolKey, sender: http2::SendRequest>) { + /// Remove an HTTP/2 sender ONLY if the current entry has the expected generation. + /// This prevents phantom eviction: when multiple connections share the same key, + /// an old connection's driver won't accidentally remove a newer connection's entry. + pub fn remove_h2_if_generation(&self, key: &PoolKey, expected_gen: u64) { + if let Some(entry) = self.h2_pool.get(key) { + if entry.value().generation == expected_gen { + drop(entry); // release DashMap ref before remove + self.h2_pool.remove(key); + } + // else: a newer connection replaced ours — don't touch it + } + } + + /// Register an HTTP/2 sender in the pool. Returns the generation ID for this entry. + /// The caller should pass this generation to the connection driver so it can use + /// `remove_h2_if_generation` instead of `remove_h2` to avoid phantom eviction. + pub fn register_h2(&self, key: PoolKey, sender: http2::SendRequest>) -> u64 { + let gen = self.h2_generation.fetch_add(1, Ordering::Relaxed); if sender.is_closed() { - return; + return gen; } self.h2_pool.insert(key, PooledH2 { sender, created_at: Instant::now(), + generation: gen, }); + gen } /// Background eviction loop — runs every EVICTION_INTERVAL to remove stale connections. diff --git a/rust/crates/rustproxy-http/src/proxy_service.rs b/rust/crates/rustproxy-http/src/proxy_service.rs index 6e58ffe..aa44040 100644 --- a/rust/crates/rustproxy-http/src/proxy_service.rs +++ b/rust/crates/rustproxy-http/src/proxy_service.rs @@ -1019,16 +1019,24 @@ impl HttpProxyService { } }; - // Spawn the H2 connection driver; proactively evict from pool on exit - // so the next request gets a fresh connection instead of a dead sender. + // Shared generation ID: driver reads it after registration sets it. + // Uses u64::MAX as sentinel for "not yet registered" (driver waits/skips eviction). + let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX)); + + // Spawn the H2 connection driver; evict from pool on exit using generation-tagged + // removal to prevent phantom eviction when multiple connections share the same key. { let pool = Arc::clone(&self.connection_pool); let key = pool_key.clone(); + let gen = Arc::clone(&gen_holder); tokio::spawn(async move { if let Err(e) = conn.await { warn!("HTTP/2 upstream connection error: {} ({:?})", e, e); } - pool.remove_h2(&key); + let g = gen.load(std::sync::atomic::Ordering::Relaxed); + if g != u64::MAX { + pool.remove_h2_if_generation(&key, g); + } }); } @@ -1036,7 +1044,8 @@ impl HttpProxyService { let sender_for_pool = sender.clone(); let result = self.forward_h2_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, Some(pool_key), domain, conn_activity).await; if matches!(&result, Ok(ref resp) if resp.status() != StatusCode::BAD_GATEWAY) { - self.connection_pool.register_h2(pool_key.clone(), sender_for_pool); + let g = self.connection_pool.register_h2(pool_key.clone(), sender_for_pool); + gen_holder.store(g, std::sync::atomic::Ordering::Relaxed); } result } @@ -1171,15 +1180,20 @@ impl HttpProxyService { } }; - // Spawn the H2 connection driver; proactively evict from pool on exit. + // Spawn the H2 connection driver with generation-tagged eviction. + let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX)); { let pool = Arc::clone(&self.connection_pool); let key = pool_key.clone(); + let gen = Arc::clone(&gen_holder); tokio::spawn(async move { if let Err(e) = conn.await { warn!("H2 retry: upstream connection error: {} ({:?})", e, e); } - pool.remove_h2(&key); + let g = gen.load(std::sync::atomic::Ordering::Relaxed); + if g != u64::MAX { + pool.remove_h2_if_generation(&key, g); + } }); } @@ -1207,7 +1221,8 @@ impl HttpProxyService { match sender.send_request(upstream_req).await { Ok(resp) => { // Register in pool only after request succeeds - self.connection_pool.register_h2(pool_key.clone(), sender); + let g = self.connection_pool.register_h2(pool_key.clone(), sender); + gen_holder.store(g, std::sync::atomic::Ordering::Relaxed); let result = self.build_streaming_response(resp, route, route_id, source_ip, conn_activity).await; // Close the fresh backend connection (opened above) self.metrics.backend_connection_closed(&backend_key); @@ -1300,15 +1315,20 @@ impl HttpProxyService { } } Ok(Ok((mut sender, conn))) => { - // Spawn the H2 connection driver; proactively evict from pool on exit. + // Spawn the H2 connection driver with generation-tagged eviction. + let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX)); { let pool = Arc::clone(&self.connection_pool); let key = pool_key.clone(); + let gen = Arc::clone(&gen_holder); tokio::spawn(async move { if let Err(e) = conn.await { warn!("HTTP/2 upstream connection error: {} ({:?})", e, e); } - pool.remove_h2(&key); + let g = gen.load(std::sync::atomic::Ordering::Relaxed); + if g != u64::MAX { + pool.remove_h2_if_generation(&key, g); + } }); } @@ -1350,7 +1370,8 @@ impl HttpProxyService { match sender.send_request(upstream_req).await { Ok(upstream_response) => { // H2 works! Register sender in pool for multiplexed reuse - self.connection_pool.register_h2(pool_key.clone(), sender); + let g = self.connection_pool.register_h2(pool_key.clone(), sender); + gen_holder.store(g, std::sync::atomic::Ordering::Relaxed); self.build_streaming_response(upstream_response, route, route_id, source_ip, conn_activity).await } Err(e) => { diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 42896ec..cecc1f9 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '25.11.16', + version: '25.11.17', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' }