feat(protocol-cache): add sliding TTL re-probing and eviction for backend protocol detection
This commit is contained in:
@@ -711,6 +711,9 @@ impl HttpProxyService {
|
||||
let cached_h3_port = self.protocol_cache.get(&protocol_cache_key)
|
||||
.and_then(|c| c.h3_port);
|
||||
|
||||
// Track whether this ALPN probe is a periodic re-probe (vs first-time detection)
|
||||
let mut is_reprobe = false;
|
||||
|
||||
let protocol_decision = match backend_protocol_mode {
|
||||
rustproxy_config::BackendProtocol::Http1 => ProtocolDecision::H1,
|
||||
rustproxy_config::BackendProtocol::Http2 => ProtocolDecision::H2,
|
||||
@@ -721,6 +724,12 @@ impl HttpProxyService {
|
||||
ProtocolDecision::H1
|
||||
} else {
|
||||
match self.protocol_cache.get(&protocol_cache_key) {
|
||||
Some(cached) if cached.needs_reprobe => {
|
||||
// Entry exists but 5+ minutes since last probe — force ALPN re-probe
|
||||
// (only fires for H1/H2; H3 entries have needs_reprobe=false)
|
||||
is_reprobe = true;
|
||||
ProtocolDecision::AlpnProbe
|
||||
}
|
||||
Some(cached) => match cached.protocol {
|
||||
crate::protocol_cache::DetectedProtocol::H3 => {
|
||||
if self.protocol_cache.is_suppressed(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
|
||||
@@ -893,7 +902,7 @@ impl HttpProxyService {
|
||||
let alpn = tls.get_ref().1.alpn_protocol();
|
||||
let is_h2 = alpn.map(|p| p == b"h2").unwrap_or(false);
|
||||
|
||||
// Cache the result
|
||||
// Cache the result (or update existing entry for re-probes)
|
||||
let cache_key = crate::protocol_cache::ProtocolCacheKey {
|
||||
host: upstream.host.clone(),
|
||||
port: upstream.port,
|
||||
@@ -904,13 +913,18 @@ impl HttpProxyService {
|
||||
} else {
|
||||
crate::protocol_cache::DetectedProtocol::H1
|
||||
};
|
||||
self.protocol_cache.insert(cache_key, detected);
|
||||
if is_reprobe {
|
||||
self.protocol_cache.update_probe_result(&cache_key, detected, "periodic ALPN re-probe");
|
||||
} else {
|
||||
self.protocol_cache.insert(cache_key, detected, "initial ALPN detection");
|
||||
}
|
||||
|
||||
info!(
|
||||
backend = %upstream_key,
|
||||
domain = %domain_str,
|
||||
protocol = if is_h2 { "h2" } else { "h1" },
|
||||
connect_time_ms = %connect_start.elapsed().as_millis(),
|
||||
reprobe = is_reprobe,
|
||||
"Backend protocol detected via ALPN"
|
||||
);
|
||||
|
||||
@@ -938,11 +952,11 @@ impl HttpProxyService {
|
||||
if let Some(h3_port) = cached_h3_port {
|
||||
if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
|
||||
self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
|
||||
debug!(backend = %upstream_key, domain = %domain_str, "TCP connect failed — escalating to H3");
|
||||
debug!(backend = %upstream_key, domain = %domain_str, "TLS connect failed — escalating to H3");
|
||||
match self.connect_quic_backend(&upstream.host, h3_port).await {
|
||||
Ok(quic_conn) => {
|
||||
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
|
||||
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port);
|
||||
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port, "recovery — TLS failed, H3 succeeded");
|
||||
let h3_pool_key = crate::connection_pool::PoolKey {
|
||||
host: upstream.host.clone(), port: h3_port, use_tls: true,
|
||||
protocol: crate::connection_pool::PoolProtocol::H3,
|
||||
@@ -961,6 +975,8 @@ impl HttpProxyService {
|
||||
}
|
||||
}
|
||||
}
|
||||
// All protocols failed — evict cache entry
|
||||
self.protocol_cache.evict(&protocol_cache_key);
|
||||
}
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend TLS unavailable"));
|
||||
}
|
||||
@@ -979,11 +995,11 @@ impl HttpProxyService {
|
||||
if let Some(h3_port) = cached_h3_port {
|
||||
if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
|
||||
self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
|
||||
debug!(backend = %upstream_key, domain = %domain_str, "TCP connect timeout — escalating to H3");
|
||||
debug!(backend = %upstream_key, domain = %domain_str, "TLS connect timeout — escalating to H3");
|
||||
match self.connect_quic_backend(&upstream.host, h3_port).await {
|
||||
Ok(quic_conn) => {
|
||||
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
|
||||
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port);
|
||||
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port, "recovery — TLS timeout, H3 succeeded");
|
||||
let h3_pool_key = crate::connection_pool::PoolKey {
|
||||
host: upstream.host.clone(), port: h3_port, use_tls: true,
|
||||
protocol: crate::connection_pool::PoolProtocol::H3,
|
||||
@@ -1002,6 +1018,8 @@ impl HttpProxyService {
|
||||
}
|
||||
}
|
||||
}
|
||||
// All protocols failed — evict cache entry
|
||||
self.protocol_cache.evict(&protocol_cache_key);
|
||||
}
|
||||
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend TLS connect timeout"));
|
||||
}
|
||||
@@ -1040,7 +1058,7 @@ impl HttpProxyService {
|
||||
match self.connect_quic_backend(&upstream.host, h3_port).await {
|
||||
Ok(quic_conn) => {
|
||||
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
|
||||
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port);
|
||||
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port, "recovery — TCP failed, H3 succeeded");
|
||||
let h3_pool_key = crate::connection_pool::PoolKey {
|
||||
host: upstream.host.clone(), port: h3_port, use_tls: true,
|
||||
protocol: crate::connection_pool::PoolProtocol::H3,
|
||||
@@ -1059,6 +1077,8 @@ impl HttpProxyService {
|
||||
}
|
||||
}
|
||||
}
|
||||
// All protocols failed — evict cache entry
|
||||
self.protocol_cache.evict(&protocol_cache_key);
|
||||
}
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable"));
|
||||
}
|
||||
@@ -1081,7 +1101,7 @@ impl HttpProxyService {
|
||||
match self.connect_quic_backend(&upstream.host, h3_port).await {
|
||||
Ok(quic_conn) => {
|
||||
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
|
||||
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port);
|
||||
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port, "recovery — TCP timeout, H3 succeeded");
|
||||
let h3_pool_key = crate::connection_pool::PoolKey {
|
||||
host: upstream.host.clone(), port: h3_port, use_tls: true,
|
||||
protocol: crate::connection_pool::PoolProtocol::H3,
|
||||
@@ -1100,6 +1120,8 @@ impl HttpProxyService {
|
||||
}
|
||||
}
|
||||
}
|
||||
// All protocols failed — evict cache entry
|
||||
self.protocol_cache.evict(&protocol_cache_key);
|
||||
}
|
||||
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout"));
|
||||
}
|
||||
@@ -1574,7 +1596,7 @@ impl HttpProxyService {
|
||||
cache_key.clone(),
|
||||
crate::protocol_cache::DetectedProtocol::H2,
|
||||
);
|
||||
self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1);
|
||||
self.protocol_cache.insert(cache_key.clone(), crate::protocol_cache::DetectedProtocol::H1, "H2 handshake timeout — downgrade");
|
||||
|
||||
match self.reconnect_backend(upstream, domain, backend_key).await {
|
||||
Some(fallback_backend) => {
|
||||
@@ -1593,6 +1615,8 @@ impl HttpProxyService {
|
||||
result
|
||||
}
|
||||
None => {
|
||||
// H2 failed and H1 reconnect also failed — evict cache
|
||||
self.protocol_cache.evict(&cache_key);
|
||||
Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable after H2 timeout fallback"))
|
||||
}
|
||||
}
|
||||
@@ -1717,7 +1741,7 @@ impl HttpProxyService {
|
||||
cache_key.clone(),
|
||||
crate::protocol_cache::DetectedProtocol::H2,
|
||||
);
|
||||
self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1);
|
||||
self.protocol_cache.insert(cache_key.clone(), crate::protocol_cache::DetectedProtocol::H1, "H2 handshake error — downgrade");
|
||||
|
||||
// Reconnect for H1 (the original io was consumed by the failed h2 handshake)
|
||||
match self.reconnect_backend(upstream, domain, backend_key).await {
|
||||
@@ -1738,6 +1762,8 @@ impl HttpProxyService {
|
||||
result
|
||||
}
|
||||
None => {
|
||||
// H2 failed and H1 reconnect also failed — evict cache
|
||||
self.protocol_cache.evict(&cache_key);
|
||||
Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable after H2 fallback"))
|
||||
}
|
||||
}
|
||||
@@ -1954,7 +1980,7 @@ impl HttpProxyService {
|
||||
if let Some(alt_svc) = resp_parts.headers.get("alt-svc").and_then(|v| v.to_str().ok()) {
|
||||
if let Some(h3_port) = parse_alt_svc_h3_port(alt_svc) {
|
||||
debug!(h3_port, "Backend advertises H3 via Alt-Svc");
|
||||
self.protocol_cache.insert_h3(cache_key.clone(), h3_port);
|
||||
self.protocol_cache.insert_h3(cache_key.clone(), h3_port, "Alt-Svc response header");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user