diff --git a/changelog.md b/changelog.md index 39d9f38..8a4bcf0 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-22 - 26.1.0 - feat(rustproxy-http) +add protocol failure suppression, h3 fallback escalation, and protocol cache metrics exposure + +- introduces escalating cooldowns for failed H2/H3 protocol detection to prevent repeated upgrades to unstable backends +- adds within-request escalation to cached HTTP/3 when TCP or TLS backend connections fail in auto-detect mode +- exposes detected protocol cache entries and suppression state through Rust metrics and the TypeScript metrics adapter + ## 2026-03-21 - 26.0.0 - BREAKING CHANGE(ts-api,rustproxy) remove deprecated TypeScript protocol and utility exports while hardening QUIC, HTTP/3, WebSocket, and rate limiter cleanup paths diff --git a/rust/crates/rustproxy-http/src/protocol_cache.rs b/rust/crates/rustproxy-http/src/protocol_cache.rs index 97cb44c..1446d62 100644 --- a/rust/crates/rustproxy-http/src/protocol_cache.rs +++ b/rust/crates/rustproxy-http/src/protocol_cache.rs @@ -1,11 +1,22 @@ -//! Bounded, TTL-based protocol detection cache for backend protocol auto-detection. +//! Bounded, TTL-based protocol detection cache with generic failure suppression. //! //! Caches the detected protocol (H1, H2, or H3) per backend endpoint and requested //! domain (host:port + requested_host). This prevents cache oscillation when multiple //! frontend domains share the same backend but differ in protocol support. //! -//! H3 detection uses the browser model: Alt-Svc headers from H1/H2 responses are -//! parsed and cached, including the advertised H3 port (which may differ from TCP). +//! ## Upgrade signals +//! +//! - ALPN (TLS handshake) → detects H2 vs H1 +//! - Alt-Svc (response header) → advertises H3 +//! +//! ## Failure suppression +//! +//! When a protocol fails, `record_failure()` prevents upgrade signals from +//! re-introducing it until an escalating cooldown expires (5s → 10s → ... → 300s). +//! Within-request escalation is allowed via `can_retry()` after a 5s minimum gap. +//! +//! Cascading: when a lower protocol also fails, higher protocol cooldowns are +//! reduced to 5s remaining (not instant clear), preventing tight retry loops. use std::sync::Arc; use std::time::{Duration, Instant}; @@ -18,14 +29,23 @@ use tracing::debug; const PROTOCOL_CACHE_TTL: Duration = Duration::from_secs(300); // 5 minutes /// Maximum number of entries in the protocol cache. -/// Prevents unbounded growth when backends come and go. const PROTOCOL_CACHE_MAX_ENTRIES: usize = 4096; -/// Background cleanup interval for the protocol cache. +/// Background cleanup interval. const PROTOCOL_CACHE_CLEANUP_INTERVAL: Duration = Duration::from_secs(60); +/// Minimum cooldown between retry attempts of a failed protocol. +const PROTOCOL_FAILURE_COOLDOWN: Duration = Duration::from_secs(5); + +/// Maximum cooldown (escalation ceiling). Matches cache TTL. +const PROTOCOL_FAILURE_MAX_COOLDOWN: Duration = Duration::from_secs(300); + +/// Consecutive failure count at which cooldown reaches maximum. +/// 5s × 2^5 = 160s, 5s × 2^6 = 320s → capped at 300s. +const PROTOCOL_FAILURE_ESCALATION_CAP: u32 = 6; + /// Detected backend protocol. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum DetectedProtocol { H1, H2, @@ -58,7 +78,83 @@ struct CachedEntry { h3_port: Option, } -/// Bounded, TTL-based protocol detection cache. +/// Failure record for a single protocol level. +#[derive(Debug, Clone)] +struct FailureRecord { + /// When the failure was last recorded. + failed_at: Instant, + /// Current cooldown duration. Escalates on consecutive failures. + cooldown: Duration, + /// Number of consecutive failures (for escalation). + consecutive_failures: u32, +} + +/// Per-key failure state. Tracks failures at each upgradeable protocol level. +/// H1 is never tracked (it's the protocol floor — nothing to fall back to). +#[derive(Debug, Clone, Default)] +struct FailureState { + h2: Option, + h3: Option, +} + +impl FailureState { + fn is_empty(&self) -> bool { + self.h2.is_none() && self.h3.is_none() + } + + fn all_expired(&self) -> bool { + let h2_expired = self.h2.as_ref() + .map(|r| r.failed_at.elapsed() >= r.cooldown) + .unwrap_or(true); + let h3_expired = self.h3.as_ref() + .map(|r| r.failed_at.elapsed() >= r.cooldown) + .unwrap_or(true); + h2_expired && h3_expired + } + + fn get(&self, protocol: DetectedProtocol) -> Option<&FailureRecord> { + match protocol { + DetectedProtocol::H2 => self.h2.as_ref(), + DetectedProtocol::H3 => self.h3.as_ref(), + DetectedProtocol::H1 => None, + } + } + + fn get_mut(&mut self, protocol: DetectedProtocol) -> &mut Option { + match protocol { + DetectedProtocol::H2 => &mut self.h2, + DetectedProtocol::H3 => &mut self.h3, + DetectedProtocol::H1 => unreachable!("H1 failures are never recorded"), + } + } +} + +/// Snapshot of a single protocol cache entry, suitable for metrics/UI display. +#[derive(Debug, Clone)] +pub struct ProtocolCacheEntry { + pub host: String, + pub port: u16, + pub domain: Option, + pub protocol: String, + pub h3_port: Option, + pub age_secs: u64, + pub h2_suppressed: bool, + pub h3_suppressed: bool, + pub h2_cooldown_remaining_secs: Option, + pub h3_cooldown_remaining_secs: Option, + pub h2_consecutive_failures: Option, + pub h3_consecutive_failures: Option, +} + +/// Exponential backoff: PROTOCOL_FAILURE_COOLDOWN × 2^(n-1), capped at MAX. +fn escalate_cooldown(consecutive: u32) -> Duration { + let base = PROTOCOL_FAILURE_COOLDOWN.as_secs(); + let exp = consecutive.saturating_sub(1).min(63) as u64; + let secs = base.saturating_mul(1u64.checked_shl(exp as u32).unwrap_or(u64::MAX)); + Duration::from_secs(secs.min(PROTOCOL_FAILURE_MAX_COOLDOWN.as_secs())) +} + +/// Bounded, TTL-based protocol detection cache with failure suppression. /// /// Memory safety guarantees: /// - Hard cap at `PROTOCOL_CACHE_MAX_ENTRIES` — cannot grow unboundedly. @@ -68,6 +164,10 @@ struct CachedEntry { /// - `Drop` — aborts the background task to prevent dangling tokio tasks. pub struct ProtocolCache { cache: Arc>, + /// Generic protocol failure suppression map. Tracks per-protocol failure + /// records (H2, H3) for each cache key. Used to prevent upgrade signals + /// (ALPN, Alt-Svc) from re-introducing failed protocols. + failures: Arc>, cleanup_handle: Option>, } @@ -75,13 +175,16 @@ impl ProtocolCache { /// Create a new protocol cache and start the background cleanup task. pub fn new() -> Self { let cache: Arc> = Arc::new(DashMap::new()); + let failures: Arc> = Arc::new(DashMap::new()); let cache_clone = Arc::clone(&cache); + let failures_clone = Arc::clone(&failures); let cleanup_handle = tokio::spawn(async move { - Self::cleanup_loop(cache_clone).await; + Self::cleanup_loop(cache_clone, failures_clone).await; }); Self { cache, + failures, cleanup_handle: Some(cleanup_handle), } } @@ -91,7 +194,8 @@ impl ProtocolCache { pub fn get(&self, key: &ProtocolCacheKey) -> Option { let entry = self.cache.get(key)?; if entry.detected_at.elapsed() < PROTOCOL_CACHE_TTL { - debug!("Protocol cache hit: {:?} for {}:{} (requested: {:?})", entry.protocol, key.host, key.port, key.requested_host); + debug!("Protocol cache hit: {:?} for {}:{} (requested: {:?})", + entry.protocol, key.host, key.port, key.requested_host); Some(CachedProtocol { protocol: entry.protocol, h3_port: entry.h3_port, @@ -105,20 +209,195 @@ impl ProtocolCache { } /// Insert a detected protocol into the cache. - /// If the cache is at capacity, evict the oldest entry first. - pub fn insert(&self, key: ProtocolCacheKey, protocol: DetectedProtocol) { - self.insert_with_h3_port(key, protocol, None); + /// Returns `false` if suppressed due to active failure suppression. + /// + /// **Key semantic**: only suppresses if the protocol being inserted matches + /// a suppressed protocol. H1 inserts are NEVER suppressed — downgrades + /// always succeed. + pub fn insert(&self, key: ProtocolCacheKey, protocol: DetectedProtocol) -> bool { + if self.is_suppressed(&key, protocol) { + debug!( + host = %key.host, port = %key.port, domain = ?key.requested_host, + protocol = ?protocol, + "Protocol cache insert suppressed — recent failure" + ); + return false; + } + self.insert_internal(key, protocol, None); + true } /// Insert an H3 detection result with the Alt-Svc advertised port. - pub fn insert_h3(&self, key: ProtocolCacheKey, h3_port: u16) { - self.insert_with_h3_port(key, DetectedProtocol::H3, Some(h3_port)); + /// Returns `false` if H3 is suppressed. + pub fn insert_h3(&self, key: ProtocolCacheKey, h3_port: u16) -> bool { + if self.is_suppressed(&key, DetectedProtocol::H3) { + debug!( + host = %key.host, port = %key.port, domain = ?key.requested_host, + "H3 upgrade suppressed — recent failure" + ); + return false; + } + self.insert_internal(key, DetectedProtocol::H3, Some(h3_port)); + true } + /// Record a protocol failure. Future `insert()` calls for this protocol + /// will be suppressed until the escalating cooldown expires. + /// + /// Cooldown escalation: 5s → 10s → 20s → 40s → 80s → 160s → 300s. + /// Consecutive counter resets if the previous failure is older than 2× its cooldown. + /// + /// Cascading: when H2 fails, H3 cooldown is reduced to 5s remaining. + /// H1 failures are ignored (H1 is the protocol floor). + pub fn record_failure(&self, key: ProtocolCacheKey, protocol: DetectedProtocol) { + if protocol == DetectedProtocol::H1 { + return; // H1 is the floor — nothing to suppress + } + + let mut entry = self.failures.entry(key.clone()).or_default(); + + let record = entry.get_mut(protocol); + let (consecutive, new_cooldown) = match record { + Some(existing) if existing.failed_at.elapsed() < existing.cooldown.saturating_mul(2) => { + // Still within the "recent" window — escalate + let c = existing.consecutive_failures.saturating_add(1) + .min(PROTOCOL_FAILURE_ESCALATION_CAP); + (c, escalate_cooldown(c)) + } + _ => { + // First failure or old failure that expired long ago — reset + (1, PROTOCOL_FAILURE_COOLDOWN) + } + }; + + *record = Some(FailureRecord { + failed_at: Instant::now(), + cooldown: new_cooldown, + consecutive_failures: consecutive, + }); + + // Cascading: when H2 fails, reduce H3 cooldown to 5s remaining + if protocol == DetectedProtocol::H2 { + Self::reduce_cooldown_to(entry.h3.as_mut(), PROTOCOL_FAILURE_COOLDOWN); + } + + debug!( + host = %key.host, port = %key.port, domain = ?key.requested_host, + protocol = ?protocol, + consecutive = consecutive, + cooldown_secs = new_cooldown.as_secs(), + "Protocol failure recorded — suppressing for {:?}", new_cooldown + ); + } + + /// Check whether a protocol is currently suppressed for the given key. + /// Returns `true` if the protocol failed within its cooldown period. + /// H1 is never suppressed. + pub fn is_suppressed(&self, key: &ProtocolCacheKey, protocol: DetectedProtocol) -> bool { + if protocol == DetectedProtocol::H1 { + return false; + } + self.failures.get(key) + .and_then(|entry| entry.get(protocol).map(|r| r.failed_at.elapsed() < r.cooldown)) + .unwrap_or(false) + } + + /// Check whether a protocol can be retried (for within-request escalation). + /// Returns `true` if there's no failure record OR if ≥5s have passed since + /// the last attempt. More permissive than `is_suppressed`. + pub fn can_retry(&self, key: &ProtocolCacheKey, protocol: DetectedProtocol) -> bool { + if protocol == DetectedProtocol::H1 { + return true; + } + match self.failures.get(key) { + Some(entry) => match entry.get(protocol) { + Some(r) => r.failed_at.elapsed() >= PROTOCOL_FAILURE_COOLDOWN, + None => true, // no failure record + }, + None => true, + } + } + + /// Record a retry attempt WITHOUT escalating the cooldown. + /// Resets the `failed_at` timestamp to prevent rapid retries (5s gate). + /// Called before an escalation attempt. If the attempt fails, + /// `record_failure` should be called afterward with proper escalation. + pub fn record_retry_attempt(&self, key: &ProtocolCacheKey, protocol: DetectedProtocol) { + if protocol == DetectedProtocol::H1 { + return; + } + if let Some(mut entry) = self.failures.get_mut(key) { + if let Some(ref mut r) = entry.get_mut(protocol) { + r.failed_at = Instant::now(); + } + } + } + + /// Clear the failure record for a protocol (it recovered). + /// Called when an escalation retry succeeds. + pub fn clear_failure(&self, key: &ProtocolCacheKey, protocol: DetectedProtocol) { + if protocol == DetectedProtocol::H1 { + return; + } + if let Some(mut entry) = self.failures.get_mut(key) { + *entry.get_mut(protocol) = None; + if entry.is_empty() { + drop(entry); + self.failures.remove(key); + } + } + } + + /// Clear all entries. Called on route updates to discard stale detections. + pub fn clear(&self) { + self.cache.clear(); + self.failures.clear(); + } + + /// Snapshot all non-expired cache entries for metrics/UI display. + pub fn snapshot(&self) -> Vec { + self.cache.iter() + .filter(|entry| entry.value().detected_at.elapsed() < PROTOCOL_CACHE_TTL) + .map(|entry| { + let key = entry.key(); + let val = entry.value(); + let failure_info = self.failures.get(key); + + let (h2_sup, h2_cd, h2_cons) = Self::suppression_info( + failure_info.as_deref().and_then(|f| f.h2.as_ref()), + ); + let (h3_sup, h3_cd, h3_cons) = Self::suppression_info( + failure_info.as_deref().and_then(|f| f.h3.as_ref()), + ); + + ProtocolCacheEntry { + host: key.host.clone(), + port: key.port, + domain: key.requested_host.clone(), + protocol: match val.protocol { + DetectedProtocol::H1 => "h1".to_string(), + DetectedProtocol::H2 => "h2".to_string(), + DetectedProtocol::H3 => "h3".to_string(), + }, + h3_port: val.h3_port, + age_secs: val.detected_at.elapsed().as_secs(), + h2_suppressed: h2_sup, + h3_suppressed: h3_sup, + h2_cooldown_remaining_secs: h2_cd, + h3_cooldown_remaining_secs: h3_cd, + h2_consecutive_failures: h2_cons, + h3_consecutive_failures: h3_cons, + } + }) + .collect() + } + + // --- Internal helpers --- + /// Insert a protocol detection result with an optional H3 port. - fn insert_with_h3_port(&self, key: ProtocolCacheKey, protocol: DetectedProtocol, h3_port: Option) { + /// No suppression check — callers must check before calling. + fn insert_internal(&self, key: ProtocolCacheKey, protocol: DetectedProtocol, h3_port: Option) { if self.cache.len() >= PROTOCOL_CACHE_MAX_ENTRIES && !self.cache.contains_key(&key) { - // Evict the oldest entry to stay within bounds let oldest = self.cache.iter() .min_by_key(|entry| entry.value().detected_at) .map(|entry| entry.key().clone()); @@ -133,17 +412,48 @@ impl ProtocolCache { }); } - /// Clear all entries. Called on route updates to discard stale detections. - pub fn clear(&self) { - self.cache.clear(); + /// Reduce a failure record's remaining cooldown to `target`, if it currently + /// has MORE than `target` remaining. Never increases cooldown. + fn reduce_cooldown_to(record: Option<&mut FailureRecord>, target: Duration) { + if let Some(r) = record { + let elapsed = r.failed_at.elapsed(); + if elapsed < r.cooldown { + let remaining = r.cooldown - elapsed; + if remaining > target { + // Shrink cooldown so it expires in `target` from now + r.cooldown = elapsed + target; + } + } + } } - /// Background cleanup loop — removes expired entries every `PROTOCOL_CACHE_CLEANUP_INTERVAL`. - async fn cleanup_loop(cache: Arc>) { + /// Extract suppression info from a failure record for metrics. + fn suppression_info(record: Option<&FailureRecord>) -> (bool, Option, Option) { + match record { + Some(r) => { + let elapsed = r.failed_at.elapsed(); + let suppressed = elapsed < r.cooldown; + let remaining = if suppressed { + Some((r.cooldown - elapsed).as_secs()) + } else { + None + }; + (suppressed, remaining, Some(r.consecutive_failures)) + } + None => (false, None, None), + } + } + + /// Background cleanup loop. + async fn cleanup_loop( + cache: Arc>, + failures: Arc>, + ) { let mut interval = tokio::time::interval(PROTOCOL_CACHE_CLEANUP_INTERVAL); loop { interval.tick().await; + // Clean expired cache entries let expired: Vec = cache.iter() .filter(|entry| entry.value().detected_at.elapsed() >= PROTOCOL_CACHE_TTL) .map(|entry| entry.key().clone()) @@ -155,6 +465,31 @@ impl ProtocolCache { cache.remove(&key); } } + + // Clean fully-expired failure entries + let expired_failures: Vec = failures.iter() + .filter(|entry| entry.value().all_expired()) + .map(|entry| entry.key().clone()) + .collect(); + + if !expired_failures.is_empty() { + debug!("Protocol cache cleanup: removing {} expired failure entries", expired_failures.len()); + for key in expired_failures { + failures.remove(&key); + } + } + + // Safety net: cap failures map at 2× max entries + if failures.len() > PROTOCOL_CACHE_MAX_ENTRIES * 2 { + let oldest: Vec = failures.iter() + .filter(|e| e.value().all_expired()) + .map(|e| e.key().clone()) + .take(failures.len() - PROTOCOL_CACHE_MAX_ENTRIES) + .collect(); + for key in oldest { + failures.remove(&key); + } + } } } } diff --git a/rust/crates/rustproxy-http/src/proxy_service.rs b/rust/crates/rustproxy-http/src/proxy_service.rs index 9733fde..75f448c 100644 --- a/rust/crates/rustproxy-http/src/proxy_service.rs +++ b/rust/crates/rustproxy-http/src/proxy_service.rs @@ -311,6 +311,11 @@ impl HttpProxyService { self.protocol_cache.clear(); } + /// Snapshot the protocol cache for metrics/UI display. + pub fn protocol_cache_snapshot(&self) -> Vec { + self.protocol_cache.snapshot() + } + /// Handle an incoming HTTP connection on a plain TCP stream. pub async fn handle_connection( self: Arc, @@ -701,6 +706,11 @@ impl HttpProxyService { port: upstream.port, requested_host: host.clone(), }; + // Save cached H3 port for within-request escalation (may be needed later + // if TCP connect fails and we escalate to H3 as a last resort) + let cached_h3_port = self.protocol_cache.get(&protocol_cache_key) + .and_then(|c| c.h3_port); + let protocol_decision = match backend_protocol_mode { rustproxy_config::BackendProtocol::Http1 => ProtocolDecision::H1, rustproxy_config::BackendProtocol::Http2 => ProtocolDecision::H2, @@ -713,17 +723,32 @@ impl HttpProxyService { match self.protocol_cache.get(&protocol_cache_key) { Some(cached) => match cached.protocol { crate::protocol_cache::DetectedProtocol::H3 => { - if let Some(h3_port) = cached.h3_port { + if self.protocol_cache.is_suppressed(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) { + // H3 cached but suppressed — fall back to ALPN probe + ProtocolDecision::AlpnProbe + } else if let Some(h3_port) = cached.h3_port { ProtocolDecision::H3 { port: h3_port } } else { - // H3 cached but no port — fall back to ALPN probe ProtocolDecision::AlpnProbe } } - crate::protocol_cache::DetectedProtocol::H2 => ProtocolDecision::H2, + crate::protocol_cache::DetectedProtocol::H2 => { + if self.protocol_cache.is_suppressed(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H2) { + ProtocolDecision::H1 + } else { + ProtocolDecision::H2 + } + } crate::protocol_cache::DetectedProtocol::H1 => ProtocolDecision::H1, }, - None => ProtocolDecision::AlpnProbe, + None => { + // Cache miss — skip ALPN probe if H2 is suppressed + if self.protocol_cache.is_suppressed(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H2) { + ProtocolDecision::H1 + } else { + ProtocolDecision::AlpnProbe + } + } } } } @@ -776,8 +801,16 @@ impl HttpProxyService { return result; } Err(e) => { - warn!(backend = %upstream_key, error = %e, + warn!(backend = %upstream_key, domain = %domain_str, error = %e, "H3 backend connect failed, falling back to H2/H1"); + // Record failure with escalating cooldown — prevents Alt-Svc + // from re-upgrading to H3 during cooldown period + if is_auto_detect_mode { + self.protocol_cache.record_failure( + protocol_cache_key.clone(), + crate::protocol_cache::DetectedProtocol::H3, + ); + } // Suppress Alt-Svc caching for the fallback to prevent re-caching H3 // from our own injected Alt-Svc header or a stale backend Alt-Svc conn_activity.alt_svc_cache_key = None; @@ -899,6 +932,36 @@ impl HttpProxyService { ); self.metrics.backend_connect_error(&upstream_key); self.upstream_selector.connection_ended(&upstream_key); + + // --- Within-request escalation: try H3 via QUIC if retryable --- + if is_auto_detect_mode { + if let Some(h3_port) = cached_h3_port { + if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) { + self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3); + debug!(backend = %upstream_key, domain = %domain_str, "TCP connect failed — escalating to H3"); + match self.connect_quic_backend(&upstream.host, h3_port).await { + Ok(quic_conn) => { + self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3); + self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port); + let h3_pool_key = crate::connection_pool::PoolKey { + host: upstream.host.clone(), port: h3_port, use_tls: true, + protocol: crate::connection_pool::PoolProtocol::H3, + }; + let result = self.forward_h3( + quic_conn, parts, body, upstream_headers, &upstream_path, + route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key, + ).await; + self.upstream_selector.connection_ended(&upstream_key); + return result; + } + Err(e3) => { + debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed"); + self.protocol_cache.record_failure(protocol_cache_key.clone(), crate::protocol_cache::DetectedProtocol::H3); + } + } + } + } + } return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend TLS unavailable")); } Err(_) => { @@ -910,6 +973,36 @@ impl HttpProxyService { ); self.metrics.backend_connect_error(&upstream_key); self.upstream_selector.connection_ended(&upstream_key); + + // --- Within-request escalation: try H3 via QUIC if retryable --- + if is_auto_detect_mode { + if let Some(h3_port) = cached_h3_port { + if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) { + self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3); + debug!(backend = %upstream_key, domain = %domain_str, "TCP connect timeout — escalating to H3"); + match self.connect_quic_backend(&upstream.host, h3_port).await { + Ok(quic_conn) => { + self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3); + self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port); + let h3_pool_key = crate::connection_pool::PoolKey { + host: upstream.host.clone(), port: h3_port, use_tls: true, + protocol: crate::connection_pool::PoolProtocol::H3, + }; + let result = self.forward_h3( + quic_conn, parts, body, upstream_headers, &upstream_path, + route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key, + ).await; + self.upstream_selector.connection_ended(&upstream_key); + return result; + } + Err(e3) => { + debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed"); + self.protocol_cache.record_failure(protocol_cache_key.clone(), crate::protocol_cache::DetectedProtocol::H3); + } + } + } + } + } return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend TLS connect timeout")); } } @@ -937,6 +1030,36 @@ impl HttpProxyService { ); self.metrics.backend_connect_error(&upstream_key); self.upstream_selector.connection_ended(&upstream_key); + + // --- Within-request escalation: try H3 via QUIC if retryable --- + if is_auto_detect_mode { + if let Some(h3_port) = cached_h3_port { + if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) { + self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3); + debug!(backend = %upstream_key, domain = %domain_str, "TCP connect failed — escalating to H3"); + match self.connect_quic_backend(&upstream.host, h3_port).await { + Ok(quic_conn) => { + self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3); + self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port); + let h3_pool_key = crate::connection_pool::PoolKey { + host: upstream.host.clone(), port: h3_port, use_tls: true, + protocol: crate::connection_pool::PoolProtocol::H3, + }; + let result = self.forward_h3( + quic_conn, parts, body, upstream_headers, &upstream_path, + route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key, + ).await; + self.upstream_selector.connection_ended(&upstream_key); + return result; + } + Err(e3) => { + debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed"); + self.protocol_cache.record_failure(protocol_cache_key.clone(), crate::protocol_cache::DetectedProtocol::H3); + } + } + } + } + } return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable")); } Err(_) => { @@ -948,6 +1071,36 @@ impl HttpProxyService { ); self.metrics.backend_connect_error(&upstream_key); self.upstream_selector.connection_ended(&upstream_key); + + // --- Within-request escalation: try H3 via QUIC if retryable --- + if is_auto_detect_mode { + if let Some(h3_port) = cached_h3_port { + if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) { + self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3); + debug!(backend = %upstream_key, domain = %domain_str, "TCP connect timeout — escalating to H3"); + match self.connect_quic_backend(&upstream.host, h3_port).await { + Ok(quic_conn) => { + self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3); + self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port); + let h3_pool_key = crate::connection_pool::PoolKey { + host: upstream.host.clone(), port: h3_port, use_tls: true, + protocol: crate::connection_pool::PoolProtocol::H3, + }; + let result = self.forward_h3( + quic_conn, parts, body, upstream_headers, &upstream_path, + route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key, + ).await; + self.upstream_selector.connection_ended(&upstream_key); + return result; + } + Err(e3) => { + debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed"); + self.protocol_cache.record_failure(protocol_cache_key.clone(), crate::protocol_cache::DetectedProtocol::H3); + } + } + } + } + } return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout")); } } @@ -1416,6 +1569,11 @@ impl HttpProxyService { port: upstream.port, requested_host: requested_host.clone(), }; + // Record H2 failure (escalating cooldown) before downgrading cache to H1 + self.protocol_cache.record_failure( + cache_key.clone(), + crate::protocol_cache::DetectedProtocol::H2, + ); self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1); match self.reconnect_backend(upstream, domain, backend_key).await { @@ -1549,12 +1707,16 @@ impl HttpProxyService { self.metrics.backend_h2_failure(backend_key); self.metrics.backend_handshake_error(backend_key); - // Update cache to H1 so subsequent requests skip H2 + // Record H2 failure (escalating cooldown) and downgrade cache to H1 let cache_key = crate::protocol_cache::ProtocolCacheKey { host: upstream.host.clone(), port: upstream.port, requested_host: requested_host.clone(), }; + self.protocol_cache.record_failure( + cache_key.clone(), + crate::protocol_cache::DetectedProtocol::H2, + ); self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1); // Reconnect for H1 (the original io was consumed by the failed h2 handshake) @@ -2569,7 +2731,7 @@ impl HttpProxyService { let connecting = self.quinn_client_endpoint.connect(addr, &server_name)?; let connection = tokio::time::timeout(QUIC_CONNECT_TIMEOUT, connecting).await - .map_err(|_| "QUIC connect timeout (3s)")??; + .map_err(|_| format!("QUIC connect timeout (3s) for {}", host))??; debug!("QUIC backend connection established to {}:{}", host, port); Ok(connection) diff --git a/rust/crates/rustproxy-metrics/src/collector.rs b/rust/crates/rustproxy-metrics/src/collector.rs index 43f229c..19fd8e7 100644 --- a/rust/crates/rustproxy-metrics/src/collector.rs +++ b/rust/crates/rustproxy-metrics/src/collector.rs @@ -31,6 +31,8 @@ pub struct Metrics { pub total_udp_sessions: u64, pub total_datagrams_in: u64, pub total_datagrams_out: u64, + // Protocol detection cache snapshot (populated by RustProxy from HttpProxyService) + pub detected_protocols: Vec, } /// Per-route metrics. @@ -76,6 +78,25 @@ pub struct BackendMetrics { pub h2_failures: u64, } +/// Protocol cache entry for metrics/UI display. +/// Populated from the HTTP proxy service's protocol detection cache. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ProtocolCacheEntryMetric { + pub host: String, + pub port: u16, + pub domain: Option, + pub protocol: String, + pub h3_port: Option, + pub age_secs: u64, + pub h2_suppressed: bool, + pub h3_suppressed: bool, + pub h2_cooldown_remaining_secs: Option, + pub h3_cooldown_remaining_secs: Option, + pub h2_consecutive_failures: Option, + pub h3_consecutive_failures: Option, +} + /// Statistics snapshot. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -824,6 +845,7 @@ impl MetricsCollector { total_udp_sessions: self.total_udp_sessions.load(Ordering::Relaxed), total_datagrams_in: self.total_datagrams_in.load(Ordering::Relaxed), total_datagrams_out: self.total_datagrams_out.load(Ordering::Relaxed), + detected_protocols: vec![], } } } diff --git a/rust/crates/rustproxy/src/lib.rs b/rust/crates/rustproxy/src/lib.rs index 93ad9e7..b5fc0b3 100644 --- a/rust/crates/rustproxy/src/lib.rs +++ b/rust/crates/rustproxy/src/lib.rs @@ -937,8 +937,29 @@ impl RustProxy { } /// Get current metrics snapshot. + /// Includes protocol cache entries from the HTTP proxy service. pub fn get_metrics(&self) -> Metrics { - self.metrics.snapshot() + let mut metrics = self.metrics.snapshot(); + if let Some(ref lm) = self.listener_manager { + let entries = lm.http_proxy().protocol_cache_snapshot(); + metrics.detected_protocols = entries.into_iter().map(|e| { + rustproxy_metrics::ProtocolCacheEntryMetric { + host: e.host, + port: e.port, + domain: e.domain, + protocol: e.protocol, + h3_port: e.h3_port, + age_secs: e.age_secs, + h2_suppressed: e.h2_suppressed, + h3_suppressed: e.h3_suppressed, + h2_cooldown_remaining_secs: e.h2_cooldown_remaining_secs, + h3_cooldown_remaining_secs: e.h3_cooldown_remaining_secs, + h2_consecutive_failures: e.h2_consecutive_failures, + h3_consecutive_failures: e.h3_consecutive_failures, + } + }).collect(); + } + metrics } /// Add a listening port at runtime. diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 0b27d39..c2b5dbb 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '26.0.0', + version: '26.1.0', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' } diff --git a/ts/proxies/smart-proxy/models/metrics-types.ts b/ts/proxies/smart-proxy/models/metrics-types.ts index 0fcb890..1df4d68 100644 --- a/ts/proxies/smart-proxy/models/metrics-types.ts +++ b/ts/proxies/smart-proxy/models/metrics-types.ts @@ -72,6 +72,7 @@ export interface IMetrics { byBackend(): Map; protocols(): Map; topByErrors(limit?: number): Array<{ backend: string; errors: number }>; + detectedProtocols(): IProtocolCacheEntry[]; }; // UDP metrics @@ -113,6 +114,26 @@ export interface IMetricsConfig { prometheusPrefix: string; // Default: smartproxy_ } +/** + * Protocol cache entry from the Rust proxy's auto-detection cache. + * Shows which protocol (h1/h2/h3) is detected for each backend+domain pair, + * including failure suppression state with escalating cooldowns. + */ +export interface IProtocolCacheEntry { + host: string; + port: number; + domain: string | null; + protocol: string; + h3Port: number | null; + ageSecs: number; + h2Suppressed: boolean; + h3Suppressed: boolean; + h2CooldownRemainingSecs: number | null; + h3CooldownRemainingSecs: number | null; + h2ConsecutiveFailures: number | null; + h3ConsecutiveFailures: number | null; +} + /** * Per-backend metrics */ diff --git a/ts/proxies/smart-proxy/rust-metrics-adapter.ts b/ts/proxies/smart-proxy/rust-metrics-adapter.ts index cdbffb9..f493a4d 100644 --- a/ts/proxies/smart-proxy/rust-metrics-adapter.ts +++ b/ts/proxies/smart-proxy/rust-metrics-adapter.ts @@ -1,4 +1,4 @@ -import type { IMetrics, IBackendMetrics, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js'; +import type { IMetrics, IBackendMetrics, IProtocolCacheEntry, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js'; import type { RustProxyBridge } from './rust-proxy-bridge.js'; /** @@ -216,6 +216,9 @@ export class RustMetricsAdapter implements IMetrics { result.sort((a, b) => b.errors - a.errors); return result.slice(0, limit); }, + detectedProtocols: (): IProtocolCacheEntry[] => { + return this.cache?.detectedProtocols ?? []; + }, }; public udp = {