fix(rustproxy-http): prevent stale HTTP/2 connection drivers from evicting newer pooled connections
This commit is contained in:
@@ -1019,16 +1019,24 @@ impl HttpProxyService {
|
||||
}
|
||||
};
|
||||
|
||||
// Spawn the H2 connection driver; proactively evict from pool on exit
|
||||
// so the next request gets a fresh connection instead of a dead sender.
|
||||
// Shared generation ID: driver reads it after registration sets it.
|
||||
// Uses u64::MAX as sentinel for "not yet registered" (driver waits/skips eviction).
|
||||
let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX));
|
||||
|
||||
// Spawn the H2 connection driver; evict from pool on exit using generation-tagged
|
||||
// removal to prevent phantom eviction when multiple connections share the same key.
|
||||
{
|
||||
let pool = Arc::clone(&self.connection_pool);
|
||||
let key = pool_key.clone();
|
||||
let gen = Arc::clone(&gen_holder);
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = conn.await {
|
||||
warn!("HTTP/2 upstream connection error: {} ({:?})", e, e);
|
||||
}
|
||||
pool.remove_h2(&key);
|
||||
let g = gen.load(std::sync::atomic::Ordering::Relaxed);
|
||||
if g != u64::MAX {
|
||||
pool.remove_h2_if_generation(&key, g);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1036,7 +1044,8 @@ impl HttpProxyService {
|
||||
let sender_for_pool = sender.clone();
|
||||
let result = self.forward_h2_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, Some(pool_key), domain, conn_activity).await;
|
||||
if matches!(&result, Ok(ref resp) if resp.status() != StatusCode::BAD_GATEWAY) {
|
||||
self.connection_pool.register_h2(pool_key.clone(), sender_for_pool);
|
||||
let g = self.connection_pool.register_h2(pool_key.clone(), sender_for_pool);
|
||||
gen_holder.store(g, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
result
|
||||
}
|
||||
@@ -1171,15 +1180,20 @@ impl HttpProxyService {
|
||||
}
|
||||
};
|
||||
|
||||
// Spawn the H2 connection driver; proactively evict from pool on exit.
|
||||
// Spawn the H2 connection driver with generation-tagged eviction.
|
||||
let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX));
|
||||
{
|
||||
let pool = Arc::clone(&self.connection_pool);
|
||||
let key = pool_key.clone();
|
||||
let gen = Arc::clone(&gen_holder);
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = conn.await {
|
||||
warn!("H2 retry: upstream connection error: {} ({:?})", e, e);
|
||||
}
|
||||
pool.remove_h2(&key);
|
||||
let g = gen.load(std::sync::atomic::Ordering::Relaxed);
|
||||
if g != u64::MAX {
|
||||
pool.remove_h2_if_generation(&key, g);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1207,7 +1221,8 @@ impl HttpProxyService {
|
||||
match sender.send_request(upstream_req).await {
|
||||
Ok(resp) => {
|
||||
// Register in pool only after request succeeds
|
||||
self.connection_pool.register_h2(pool_key.clone(), sender);
|
||||
let g = self.connection_pool.register_h2(pool_key.clone(), sender);
|
||||
gen_holder.store(g, std::sync::atomic::Ordering::Relaxed);
|
||||
let result = self.build_streaming_response(resp, route, route_id, source_ip, conn_activity).await;
|
||||
// Close the fresh backend connection (opened above)
|
||||
self.metrics.backend_connection_closed(&backend_key);
|
||||
@@ -1300,15 +1315,20 @@ impl HttpProxyService {
|
||||
}
|
||||
}
|
||||
Ok(Ok((mut sender, conn))) => {
|
||||
// Spawn the H2 connection driver; proactively evict from pool on exit.
|
||||
// Spawn the H2 connection driver with generation-tagged eviction.
|
||||
let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX));
|
||||
{
|
||||
let pool = Arc::clone(&self.connection_pool);
|
||||
let key = pool_key.clone();
|
||||
let gen = Arc::clone(&gen_holder);
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = conn.await {
|
||||
warn!("HTTP/2 upstream connection error: {} ({:?})", e, e);
|
||||
}
|
||||
pool.remove_h2(&key);
|
||||
let g = gen.load(std::sync::atomic::Ordering::Relaxed);
|
||||
if g != u64::MAX {
|
||||
pool.remove_h2_if_generation(&key, g);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1350,7 +1370,8 @@ impl HttpProxyService {
|
||||
match sender.send_request(upstream_req).await {
|
||||
Ok(upstream_response) => {
|
||||
// H2 works! Register sender in pool for multiplexed reuse
|
||||
self.connection_pool.register_h2(pool_key.clone(), sender);
|
||||
let g = self.connection_pool.register_h2(pool_key.clone(), sender);
|
||||
gen_holder.store(g, std::sync::atomic::Ordering::Relaxed);
|
||||
self.build_streaming_response(upstream_response, route, route_id, source_ip, conn_activity).await
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
Reference in New Issue
Block a user