feat(rustproxy-http): add protocol failure suppression, h3 fallback escalation, and protocol cache metrics exposure

This commit is contained in:
2026-03-22 10:20:00 +00:00
parent d12812bb8d
commit f04875885f
8 changed files with 602 additions and 31 deletions

View File

@@ -311,6 +311,11 @@ impl HttpProxyService {
self.protocol_cache.clear();
}
/// Snapshot the protocol cache for metrics/UI display.
pub fn protocol_cache_snapshot(&self) -> Vec<crate::protocol_cache::ProtocolCacheEntry> {
self.protocol_cache.snapshot()
}
/// Handle an incoming HTTP connection on a plain TCP stream.
pub async fn handle_connection(
self: Arc<Self>,
@@ -701,6 +706,11 @@ impl HttpProxyService {
port: upstream.port,
requested_host: host.clone(),
};
// Save cached H3 port for within-request escalation (may be needed later
// if TCP connect fails and we escalate to H3 as a last resort)
let cached_h3_port = self.protocol_cache.get(&protocol_cache_key)
.and_then(|c| c.h3_port);
let protocol_decision = match backend_protocol_mode {
rustproxy_config::BackendProtocol::Http1 => ProtocolDecision::H1,
rustproxy_config::BackendProtocol::Http2 => ProtocolDecision::H2,
@@ -713,17 +723,32 @@ impl HttpProxyService {
match self.protocol_cache.get(&protocol_cache_key) {
Some(cached) => match cached.protocol {
crate::protocol_cache::DetectedProtocol::H3 => {
if let Some(h3_port) = cached.h3_port {
if self.protocol_cache.is_suppressed(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
// H3 cached but suppressed — fall back to ALPN probe
ProtocolDecision::AlpnProbe
} else if let Some(h3_port) = cached.h3_port {
ProtocolDecision::H3 { port: h3_port }
} else {
// H3 cached but no port — fall back to ALPN probe
ProtocolDecision::AlpnProbe
}
}
crate::protocol_cache::DetectedProtocol::H2 => ProtocolDecision::H2,
crate::protocol_cache::DetectedProtocol::H2 => {
if self.protocol_cache.is_suppressed(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H2) {
ProtocolDecision::H1
} else {
ProtocolDecision::H2
}
}
crate::protocol_cache::DetectedProtocol::H1 => ProtocolDecision::H1,
},
None => ProtocolDecision::AlpnProbe,
None => {
// Cache miss — skip ALPN probe if H2 is suppressed
if self.protocol_cache.is_suppressed(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H2) {
ProtocolDecision::H1
} else {
ProtocolDecision::AlpnProbe
}
}
}
}
}
@@ -776,8 +801,16 @@ impl HttpProxyService {
return result;
}
Err(e) => {
warn!(backend = %upstream_key, error = %e,
warn!(backend = %upstream_key, domain = %domain_str, error = %e,
"H3 backend connect failed, falling back to H2/H1");
// Record failure with escalating cooldown — prevents Alt-Svc
// from re-upgrading to H3 during cooldown period
if is_auto_detect_mode {
self.protocol_cache.record_failure(
protocol_cache_key.clone(),
crate::protocol_cache::DetectedProtocol::H3,
);
}
// Suppress Alt-Svc caching for the fallback to prevent re-caching H3
// from our own injected Alt-Svc header or a stale backend Alt-Svc
conn_activity.alt_svc_cache_key = None;
@@ -899,6 +932,36 @@ impl HttpProxyService {
);
self.metrics.backend_connect_error(&upstream_key);
self.upstream_selector.connection_ended(&upstream_key);
// --- Within-request escalation: try H3 via QUIC if retryable ---
if is_auto_detect_mode {
if let Some(h3_port) = cached_h3_port {
if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
debug!(backend = %upstream_key, domain = %domain_str, "TCP connect failed — escalating to H3");
match self.connect_quic_backend(&upstream.host, h3_port).await {
Ok(quic_conn) => {
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port);
let h3_pool_key = crate::connection_pool::PoolKey {
host: upstream.host.clone(), port: h3_port, use_tls: true,
protocol: crate::connection_pool::PoolProtocol::H3,
};
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
return result;
}
Err(e3) => {
debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed");
self.protocol_cache.record_failure(protocol_cache_key.clone(), crate::protocol_cache::DetectedProtocol::H3);
}
}
}
}
}
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend TLS unavailable"));
}
Err(_) => {
@@ -910,6 +973,36 @@ impl HttpProxyService {
);
self.metrics.backend_connect_error(&upstream_key);
self.upstream_selector.connection_ended(&upstream_key);
// --- Within-request escalation: try H3 via QUIC if retryable ---
if is_auto_detect_mode {
if let Some(h3_port) = cached_h3_port {
if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
debug!(backend = %upstream_key, domain = %domain_str, "TCP connect timeout — escalating to H3");
match self.connect_quic_backend(&upstream.host, h3_port).await {
Ok(quic_conn) => {
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port);
let h3_pool_key = crate::connection_pool::PoolKey {
host: upstream.host.clone(), port: h3_port, use_tls: true,
protocol: crate::connection_pool::PoolProtocol::H3,
};
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
return result;
}
Err(e3) => {
debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed");
self.protocol_cache.record_failure(protocol_cache_key.clone(), crate::protocol_cache::DetectedProtocol::H3);
}
}
}
}
}
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend TLS connect timeout"));
}
}
@@ -937,6 +1030,36 @@ impl HttpProxyService {
);
self.metrics.backend_connect_error(&upstream_key);
self.upstream_selector.connection_ended(&upstream_key);
// --- Within-request escalation: try H3 via QUIC if retryable ---
if is_auto_detect_mode {
if let Some(h3_port) = cached_h3_port {
if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
debug!(backend = %upstream_key, domain = %domain_str, "TCP connect failed — escalating to H3");
match self.connect_quic_backend(&upstream.host, h3_port).await {
Ok(quic_conn) => {
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port);
let h3_pool_key = crate::connection_pool::PoolKey {
host: upstream.host.clone(), port: h3_port, use_tls: true,
protocol: crate::connection_pool::PoolProtocol::H3,
};
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
return result;
}
Err(e3) => {
debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed");
self.protocol_cache.record_failure(protocol_cache_key.clone(), crate::protocol_cache::DetectedProtocol::H3);
}
}
}
}
}
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable"));
}
Err(_) => {
@@ -948,6 +1071,36 @@ impl HttpProxyService {
);
self.metrics.backend_connect_error(&upstream_key);
self.upstream_selector.connection_ended(&upstream_key);
// --- Within-request escalation: try H3 via QUIC if retryable ---
if is_auto_detect_mode {
if let Some(h3_port) = cached_h3_port {
if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
debug!(backend = %upstream_key, domain = %domain_str, "TCP connect timeout — escalating to H3");
match self.connect_quic_backend(&upstream.host, h3_port).await {
Ok(quic_conn) => {
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port);
let h3_pool_key = crate::connection_pool::PoolKey {
host: upstream.host.clone(), port: h3_port, use_tls: true,
protocol: crate::connection_pool::PoolProtocol::H3,
};
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
return result;
}
Err(e3) => {
debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed");
self.protocol_cache.record_failure(protocol_cache_key.clone(), crate::protocol_cache::DetectedProtocol::H3);
}
}
}
}
}
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout"));
}
}
@@ -1416,6 +1569,11 @@ impl HttpProxyService {
port: upstream.port,
requested_host: requested_host.clone(),
};
// Record H2 failure (escalating cooldown) before downgrading cache to H1
self.protocol_cache.record_failure(
cache_key.clone(),
crate::protocol_cache::DetectedProtocol::H2,
);
self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1);
match self.reconnect_backend(upstream, domain, backend_key).await {
@@ -1549,12 +1707,16 @@ impl HttpProxyService {
self.metrics.backend_h2_failure(backend_key);
self.metrics.backend_handshake_error(backend_key);
// Update cache to H1 so subsequent requests skip H2
// Record H2 failure (escalating cooldown) and downgrade cache to H1
let cache_key = crate::protocol_cache::ProtocolCacheKey {
host: upstream.host.clone(),
port: upstream.port,
requested_host: requested_host.clone(),
};
self.protocol_cache.record_failure(
cache_key.clone(),
crate::protocol_cache::DetectedProtocol::H2,
);
self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1);
// Reconnect for H1 (the original io was consumed by the failed h2 handshake)
@@ -2569,7 +2731,7 @@ impl HttpProxyService {
let connecting = self.quinn_client_endpoint.connect(addr, &server_name)?;
let connection = tokio::time::timeout(QUIC_CONNECT_TIMEOUT, connecting).await
.map_err(|_| "QUIC connect timeout (3s)")??;
.map_err(|_| format!("QUIC connect timeout (3s) for {}", host))??;
debug!("QUIC backend connection established to {}:{}", host, port);
Ok(connection)