fix(proxy-service): handle HTTP/3 backend forwarding failures with protocol fallback and pool cleanup
This commit is contained in:
@@ -280,6 +280,11 @@ impl ConnectionPool {
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove a QUIC/HTTP/3 connection from the pool unconditionally.
|
||||
pub fn remove_h3(&self, key: &PoolKey) {
|
||||
self.h3_pool.remove(key);
|
||||
}
|
||||
|
||||
/// Background eviction loop — runs every EVICTION_INTERVAL to remove stale connections.
|
||||
async fn eviction_loop(
|
||||
h1_pool: Arc<DashMap<PoolKey, Vec<IdleH1>>>,
|
||||
|
||||
@@ -92,6 +92,23 @@ enum ProtocolDecision {
|
||||
AlpnProbe,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct H3ForwardError {
|
||||
status: StatusCode,
|
||||
message: &'static str,
|
||||
retryable: bool,
|
||||
}
|
||||
|
||||
impl H3ForwardError {
|
||||
fn new(status: StatusCode, message: &'static str, retryable: bool) -> Self {
|
||||
Self {
|
||||
status,
|
||||
message,
|
||||
retryable,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII guard that decrements the active request counter on drop.
|
||||
/// Ensures the counter is correct even if the request handler panics.
|
||||
struct ActiveRequestGuard {
|
||||
@@ -972,6 +989,11 @@ impl HttpProxyService {
|
||||
use_tls: true,
|
||||
protocol: crate::connection_pool::PoolProtocol::H3,
|
||||
};
|
||||
let h3_retry_state = if body.is_end_stream() {
|
||||
Some((parts.method.clone(), upstream_headers.clone()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Try H3 pool checkout first
|
||||
if let Some((pooled_sr, quic_conn, _age)) =
|
||||
@@ -990,13 +1012,53 @@ impl HttpProxyService {
|
||||
route_id,
|
||||
&ip_str,
|
||||
&h3_pool_key,
|
||||
if is_auto_detect_mode {
|
||||
Some(protocol_cache_key.clone())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
domain_str,
|
||||
&conn_activity,
|
||||
&upstream_key,
|
||||
)
|
||||
.await;
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
return result;
|
||||
match result {
|
||||
Ok(response) => {
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
return Ok(response);
|
||||
}
|
||||
Err(error) => {
|
||||
if is_auto_detect_mode {
|
||||
self.protocol_cache.record_failure(
|
||||
protocol_cache_key.clone(),
|
||||
crate::protocol_cache::DetectedProtocol::H3,
|
||||
);
|
||||
}
|
||||
if is_auto_detect_mode && error.retryable {
|
||||
if let Some((method, headers)) = h3_retry_state {
|
||||
let fallback = self
|
||||
.retry_h3_failure_as_h1(
|
||||
method,
|
||||
headers,
|
||||
&upstream_path,
|
||||
&upstream,
|
||||
route_match.route,
|
||||
route_id,
|
||||
&ip_str,
|
||||
&protocol_cache_key,
|
||||
domain_str,
|
||||
&conn_activity,
|
||||
&upstream_key,
|
||||
)
|
||||
.await;
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
return fallback;
|
||||
}
|
||||
}
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
return Ok(error_response(error.status, error.message));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Try fresh QUIC connection
|
||||
@@ -1019,13 +1081,54 @@ impl HttpProxyService {
|
||||
route_id,
|
||||
&ip_str,
|
||||
&h3_pool_key,
|
||||
if is_auto_detect_mode {
|
||||
Some(protocol_cache_key.clone())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
domain_str,
|
||||
&conn_activity,
|
||||
&upstream_key,
|
||||
)
|
||||
.await;
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
return result;
|
||||
match result {
|
||||
Ok(response) => {
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
return Ok(response);
|
||||
}
|
||||
Err(error) => {
|
||||
self.metrics.backend_connection_closed(&upstream_key);
|
||||
if is_auto_detect_mode {
|
||||
self.protocol_cache.record_failure(
|
||||
protocol_cache_key.clone(),
|
||||
crate::protocol_cache::DetectedProtocol::H3,
|
||||
);
|
||||
}
|
||||
if is_auto_detect_mode && error.retryable {
|
||||
if let Some((method, headers)) = h3_retry_state {
|
||||
let fallback = self
|
||||
.retry_h3_failure_as_h1(
|
||||
method,
|
||||
headers,
|
||||
&upstream_path,
|
||||
&upstream,
|
||||
route_match.route,
|
||||
route_id,
|
||||
&ip_str,
|
||||
&protocol_cache_key,
|
||||
domain_str,
|
||||
&conn_activity,
|
||||
&upstream_key,
|
||||
)
|
||||
.await;
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
return fallback;
|
||||
}
|
||||
}
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
return Ok(error_response(error.status, error.message));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(backend = %upstream_key, domain = %domain_str, error = %e,
|
||||
@@ -1236,13 +1339,23 @@ impl HttpProxyService {
|
||||
route_id,
|
||||
&ip_str,
|
||||
&h3_pool_key,
|
||||
Some(protocol_cache_key.clone()),
|
||||
domain_str,
|
||||
&conn_activity,
|
||||
&upstream_key,
|
||||
)
|
||||
.await;
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
return result;
|
||||
return match result {
|
||||
Ok(response) => Ok(response),
|
||||
Err(error) => {
|
||||
self.protocol_cache.record_failure(
|
||||
protocol_cache_key.clone(),
|
||||
crate::protocol_cache::DetectedProtocol::H3,
|
||||
);
|
||||
Ok(error_response(error.status, error.message))
|
||||
}
|
||||
};
|
||||
}
|
||||
Err(e3) => {
|
||||
debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed");
|
||||
@@ -1313,13 +1426,23 @@ impl HttpProxyService {
|
||||
route_id,
|
||||
&ip_str,
|
||||
&h3_pool_key,
|
||||
Some(protocol_cache_key.clone()),
|
||||
domain_str,
|
||||
&conn_activity,
|
||||
&upstream_key,
|
||||
)
|
||||
.await;
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
return result;
|
||||
return match result {
|
||||
Ok(response) => Ok(response),
|
||||
Err(error) => {
|
||||
self.protocol_cache.record_failure(
|
||||
protocol_cache_key.clone(),
|
||||
crate::protocol_cache::DetectedProtocol::H3,
|
||||
);
|
||||
Ok(error_response(error.status, error.message))
|
||||
}
|
||||
};
|
||||
}
|
||||
Err(e3) => {
|
||||
debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed");
|
||||
@@ -1410,13 +1533,23 @@ impl HttpProxyService {
|
||||
route_id,
|
||||
&ip_str,
|
||||
&h3_pool_key,
|
||||
Some(protocol_cache_key.clone()),
|
||||
domain_str,
|
||||
&conn_activity,
|
||||
&upstream_key,
|
||||
)
|
||||
.await;
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
return result;
|
||||
return match result {
|
||||
Ok(response) => Ok(response),
|
||||
Err(error) => {
|
||||
self.protocol_cache.record_failure(
|
||||
protocol_cache_key.clone(),
|
||||
crate::protocol_cache::DetectedProtocol::H3,
|
||||
);
|
||||
Ok(error_response(error.status, error.message))
|
||||
}
|
||||
};
|
||||
}
|
||||
Err(e3) => {
|
||||
debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed");
|
||||
@@ -1487,13 +1620,23 @@ impl HttpProxyService {
|
||||
route_id,
|
||||
&ip_str,
|
||||
&h3_pool_key,
|
||||
Some(protocol_cache_key.clone()),
|
||||
domain_str,
|
||||
&conn_activity,
|
||||
&upstream_key,
|
||||
)
|
||||
.await;
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
return result;
|
||||
return match result {
|
||||
Ok(response) => Ok(response),
|
||||
Err(error) => {
|
||||
self.protocol_cache.record_failure(
|
||||
protocol_cache_key.clone(),
|
||||
crate::protocol_cache::DetectedProtocol::H3,
|
||||
);
|
||||
Ok(error_response(error.status, error.message))
|
||||
}
|
||||
};
|
||||
}
|
||||
Err(e3) => {
|
||||
debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed");
|
||||
@@ -2402,6 +2545,58 @@ impl HttpProxyService {
|
||||
}
|
||||
}
|
||||
|
||||
/// Retry a bodyless request over HTTP/1.1 after H3 failed post-connect.
|
||||
/// The original body has already been handed to the H3 path, so only empty
|
||||
/// requests can be retried safely without duplicating client data.
|
||||
async fn retry_h3_failure_as_h1(
|
||||
&self,
|
||||
method: hyper::Method,
|
||||
upstream_headers: hyper::HeaderMap,
|
||||
upstream_path: &str,
|
||||
upstream: &crate::upstream_selector::UpstreamSelection,
|
||||
route: &rustproxy_config::RouteConfig,
|
||||
route_id: Option<&str>,
|
||||
source_ip: &str,
|
||||
protocol_cache_key: &crate::protocol_cache::ProtocolCacheKey,
|
||||
domain: &str,
|
||||
conn_activity: &ConnActivity,
|
||||
backend_key: &str,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
warn!(backend = %backend_key, domain = %domain,
|
||||
"H3 forwarding failed, retrying bodyless request as HTTP/1.1");
|
||||
self.protocol_cache.insert(
|
||||
protocol_cache_key.clone(),
|
||||
crate::protocol_cache::DetectedProtocol::H1,
|
||||
"H3 forwarding failure — downgrade to H1",
|
||||
);
|
||||
|
||||
match self.reconnect_backend(upstream, domain, backend_key).await {
|
||||
Some(fallback_backend) => {
|
||||
let fallback_io = TokioIo::new(fallback_backend);
|
||||
let result = self
|
||||
.forward_h1_empty_body(
|
||||
fallback_io,
|
||||
method,
|
||||
upstream_headers,
|
||||
upstream_path,
|
||||
route,
|
||||
route_id,
|
||||
source_ip,
|
||||
domain,
|
||||
conn_activity,
|
||||
backend_key,
|
||||
)
|
||||
.await;
|
||||
self.metrics.backend_connection_closed(backend_key);
|
||||
result
|
||||
}
|
||||
None => Ok(error_response(
|
||||
StatusCode::BAD_GATEWAY,
|
||||
"Backend unavailable after H3 fallback",
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Forward a request with an empty body via HTTP/1.1.
|
||||
/// Used when retrying after a failed H2 attempt where the original body was consumed.
|
||||
async fn forward_h1_empty_body(
|
||||
@@ -3552,10 +3747,11 @@ impl HttpProxyService {
|
||||
route_id: Option<&str>,
|
||||
source_ip: &str,
|
||||
pool_key: &crate::connection_pool::PoolKey,
|
||||
protocol_cache_key: Option<crate::protocol_cache::ProtocolCacheKey>,
|
||||
domain: &str,
|
||||
conn_activity: &ConnActivity,
|
||||
backend_key: &str,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, H3ForwardError> {
|
||||
// Obtain the h3 SendRequest handle: skip handshake + driver on pool hit.
|
||||
let (mut send_request, gen_holder) = if let Some(sr) = pooled_sender {
|
||||
// Pool hit — reuse existing h3 session, no SETTINGS round-trip
|
||||
@@ -3572,9 +3768,11 @@ impl HttpProxyService {
|
||||
Err(e) => {
|
||||
error!(backend = %backend_key, domain = %domain, error = %e, "H3 client handshake failed");
|
||||
self.metrics.backend_handshake_error(backend_key);
|
||||
return Ok(error_response(
|
||||
self.connection_pool.remove_h3(pool_key);
|
||||
return Err(H3ForwardError::new(
|
||||
StatusCode::BAD_GATEWAY,
|
||||
"H3 handshake failed",
|
||||
true,
|
||||
));
|
||||
}
|
||||
};
|
||||
@@ -3623,7 +3821,12 @@ impl HttpProxyService {
|
||||
Err(e) => {
|
||||
error!(backend = %backend_key, domain = %domain, error = %e, "H3 send_request failed");
|
||||
self.metrics.backend_request_error(backend_key);
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 request failed"));
|
||||
self.connection_pool.remove_h3(pool_key);
|
||||
return Err(H3ForwardError::new(
|
||||
StatusCode::BAD_GATEWAY,
|
||||
"H3 request failed",
|
||||
true,
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -3646,9 +3849,11 @@ impl HttpProxyService {
|
||||
);
|
||||
if let Err(e) = stream.send_data(data).await {
|
||||
error!(backend = %backend_key, error = %e, "H3 send_data failed");
|
||||
return Ok(error_response(
|
||||
self.connection_pool.remove_h3(pool_key);
|
||||
return Err(H3ForwardError::new(
|
||||
StatusCode::BAD_GATEWAY,
|
||||
"H3 body send failed",
|
||||
false,
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -3669,9 +3874,11 @@ impl HttpProxyService {
|
||||
Err(e) => {
|
||||
error!(backend = %backend_key, domain = %domain, error = %e, "H3 recv_response failed");
|
||||
self.metrics.backend_request_error(backend_key);
|
||||
return Ok(error_response(
|
||||
self.connection_pool.remove_h3(pool_key);
|
||||
return Err(H3ForwardError::new(
|
||||
StatusCode::BAD_GATEWAY,
|
||||
"H3 response failed",
|
||||
true,
|
||||
));
|
||||
}
|
||||
};
|
||||
@@ -3693,17 +3900,34 @@ impl HttpProxyService {
|
||||
}
|
||||
|
||||
// Stream response body back via unfold — correctly preserves waker across polls
|
||||
let body_stream = futures::stream::unfold(stream, |mut s| async move {
|
||||
match s.recv_data().await {
|
||||
Ok(Some(mut buf)) => {
|
||||
use bytes::Buf;
|
||||
let data = buf.copy_to_bytes(buf.remaining());
|
||||
Some((Ok::<_, hyper::Error>(http_body::Frame::data(data)), s))
|
||||
}
|
||||
Ok(None) => None,
|
||||
Err(e) => {
|
||||
warn!("H3 response body recv error: {}", e);
|
||||
None
|
||||
let h3_failure_cache = Arc::clone(&self.protocol_cache);
|
||||
let h3_failure_cache_key = protocol_cache_key.clone();
|
||||
let h3_failure_pool = Arc::clone(&self.connection_pool);
|
||||
let h3_failure_pool_key = pool_key.clone();
|
||||
let body_stream = futures::stream::unfold(stream, move |mut s| {
|
||||
let h3_failure_cache = Arc::clone(&h3_failure_cache);
|
||||
let h3_failure_cache_key = h3_failure_cache_key.clone();
|
||||
let h3_failure_pool = Arc::clone(&h3_failure_pool);
|
||||
let h3_failure_pool_key = h3_failure_pool_key.clone();
|
||||
async move {
|
||||
match s.recv_data().await {
|
||||
Ok(Some(mut buf)) => {
|
||||
use bytes::Buf;
|
||||
let data = buf.copy_to_bytes(buf.remaining());
|
||||
Some((Ok::<_, hyper::Error>(http_body::Frame::data(data)), s))
|
||||
}
|
||||
Ok(None) => None,
|
||||
Err(e) => {
|
||||
warn!("H3 response body recv error: {}", e);
|
||||
if let Some(cache_key) = h3_failure_cache_key {
|
||||
h3_failure_cache.record_failure(
|
||||
cache_key,
|
||||
crate::protocol_cache::DetectedProtocol::H3,
|
||||
);
|
||||
}
|
||||
h3_failure_pool.remove_h3(&h3_failure_pool_key);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user