Compare commits

..

2 Commits

Author SHA1 Message Date
d026d7c266 v26.2.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-23 09:42:07 +00:00
3b01144c51 feat(protocol-cache): add sliding TTL re-probing and eviction for backend protocol detection 2026-03-23 09:42:07 +00:00
8 changed files with 204 additions and 37 deletions

View File

@@ -1,5 +1,12 @@
# Changelog # Changelog
## 2026-03-23 - 26.2.0 - feat(protocol-cache)
add sliding TTL re-probing and eviction for backend protocol detection
- extend protocol cache entries and metrics with last accessed and last probed timestamps
- trigger periodic ALPN re-probes for cached H1/H2 entries while keeping active entries alive with a sliding 1 day TTL
- log protocol transitions with reasons and evict cache entries when all protocol fallback attempts fail
## 2026-03-22 - 26.1.0 - feat(rustproxy-http) ## 2026-03-22 - 26.1.0 - feat(rustproxy-http)
add protocol failure suppression, h3 fallback escalation, and protocol cache metrics exposure add protocol failure suppression, h3 fallback escalation, and protocol cache metrics exposure

View File

@@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartproxy", "name": "@push.rocks/smartproxy",
"version": "26.1.0", "version": "26.2.0",
"private": false, "private": false,
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.", "description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",

View File

@@ -1,20 +1,36 @@
//! Bounded, TTL-based protocol detection cache with generic failure suppression. //! Bounded, sliding-TTL protocol detection cache with periodic re-probing and failure suppression.
//! //!
//! Caches the detected protocol (H1, H2, or H3) per backend endpoint and requested //! Caches the detected protocol (H1, H2, or H3) per backend endpoint and requested
//! domain (host:port + requested_host). This prevents cache oscillation when multiple //! domain (host:port + requested_host). This prevents cache oscillation when multiple
//! frontend domains share the same backend but differ in protocol support. //! frontend domains share the same backend but differ in protocol support.
//! //!
//! ## Sliding TTL
//!
//! Each cache hit refreshes the entry's expiry timer (`last_accessed_at`). Entries
//! remain valid for up to 1 day of continuous use. Every 5 minutes, the next request
//! triggers an inline ALPN re-probe to verify the cached protocol is still correct.
//!
//! ## Upgrade signals //! ## Upgrade signals
//! //!
//! - ALPN (TLS handshake) → detects H2 vs H1 //! - ALPN (TLS handshake) → detects H2 vs H1
//! - Alt-Svc (response header) → advertises H3 //! - Alt-Svc (response header) → advertises H3
//! //!
//! ## Protocol transitions
//!
//! All protocol changes are logged at `info!()` level with the reason:
//! "Protocol transition: H1 → H2 because periodic ALPN re-probe"
//!
//! ## Failure suppression //! ## Failure suppression
//! //!
//! When a protocol fails, `record_failure()` prevents upgrade signals from //! When a protocol fails, `record_failure()` prevents upgrade signals from
//! re-introducing it until an escalating cooldown expires (5s → 10s → ... → 300s). //! re-introducing it until an escalating cooldown expires (5s → 10s → ... → 300s).
//! Within-request escalation is allowed via `can_retry()` after a 5s minimum gap. //! Within-request escalation is allowed via `can_retry()` after a 5s minimum gap.
//! //!
//! ## Total failure eviction
//!
//! When all protocols (H3, H2, H1) fail for a backend, the cache entry is evicted
//! entirely via `evict()`, forcing a fresh probe on the next request.
//!
//! Cascading: when a lower protocol also fails, higher protocol cooldowns are //! Cascading: when a lower protocol also fails, higher protocol cooldowns are
//! reduced to 5s remaining (not instant clear), preventing tight retry loops. //! reduced to 5s remaining (not instant clear), preventing tight retry loops.
@@ -22,11 +38,17 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use dashmap::DashMap; use dashmap::DashMap;
use tracing::debug; use tracing::{debug, info};
/// TTL for cached protocol detection results. /// Sliding TTL for cached protocol detection results.
/// After this duration, the next request will re-probe the backend. /// Entries that haven't been accessed for this duration are evicted.
const PROTOCOL_CACHE_TTL: Duration = Duration::from_secs(300); // 5 minutes /// Each `get()` call refreshes the timer (sliding window).
const PROTOCOL_CACHE_TTL: Duration = Duration::from_secs(86400); // 1 day
/// Interval between inline ALPN re-probes for H1/H2 entries.
/// When a cached entry's `last_probed_at` exceeds this, the next request
/// triggers an ALPN re-probe to verify the backend still speaks the same protocol.
const PROTOCOL_REPROBE_INTERVAL: Duration = Duration::from_secs(300); // 5 minutes
/// Maximum number of entries in the protocol cache. /// Maximum number of entries in the protocol cache.
const PROTOCOL_CACHE_MAX_ENTRIES: usize = 4096; const PROTOCOL_CACHE_MAX_ENTRIES: usize = 4096;
@@ -37,7 +59,7 @@ const PROTOCOL_CACHE_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
/// Minimum cooldown between retry attempts of a failed protocol. /// Minimum cooldown between retry attempts of a failed protocol.
const PROTOCOL_FAILURE_COOLDOWN: Duration = Duration::from_secs(5); const PROTOCOL_FAILURE_COOLDOWN: Duration = Duration::from_secs(5);
/// Maximum cooldown (escalation ceiling). Matches cache TTL. /// Maximum cooldown (escalation ceiling).
const PROTOCOL_FAILURE_MAX_COOLDOWN: Duration = Duration::from_secs(300); const PROTOCOL_FAILURE_MAX_COOLDOWN: Duration = Duration::from_secs(300);
/// Consecutive failure count at which cooldown reaches maximum. /// Consecutive failure count at which cooldown reaches maximum.
@@ -52,12 +74,26 @@ pub enum DetectedProtocol {
H3, H3,
} }
impl std::fmt::Display for DetectedProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DetectedProtocol::H1 => write!(f, "H1"),
DetectedProtocol::H2 => write!(f, "H2"),
DetectedProtocol::H3 => write!(f, "H3"),
}
}
}
/// Result of a protocol cache lookup. /// Result of a protocol cache lookup.
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub struct CachedProtocol { pub struct CachedProtocol {
pub protocol: DetectedProtocol, pub protocol: DetectedProtocol,
/// For H3: the port advertised by Alt-Svc (may differ from TCP port). /// For H3: the port advertised by Alt-Svc (may differ from TCP port).
pub h3_port: Option<u16>, pub h3_port: Option<u16>,
/// True if the entry's `last_probed_at` exceeds `PROTOCOL_REPROBE_INTERVAL`.
/// Caller should perform an inline ALPN re-probe and call `update_probe_result()`.
/// Always `false` for H3 entries (H3 is discovered via Alt-Svc, not ALPN).
pub needs_reprobe: bool,
} }
/// Key for the protocol cache: (host, port, requested_host). /// Key for the protocol cache: (host, port, requested_host).
@@ -70,10 +106,15 @@ pub struct ProtocolCacheKey {
pub requested_host: Option<String>, pub requested_host: Option<String>,
} }
/// A cached protocol detection result with a timestamp. /// A cached protocol detection result with timestamps.
struct CachedEntry { struct CachedEntry {
protocol: DetectedProtocol, protocol: DetectedProtocol,
/// When this protocol was first detected (or last changed).
detected_at: Instant, detected_at: Instant,
/// Last time any request used this entry (sliding-window TTL).
last_accessed_at: Instant,
/// Last time an ALPN re-probe was performed for this entry.
last_probed_at: Instant,
/// For H3: the port advertised by Alt-Svc (may differ from TCP port). /// For H3: the port advertised by Alt-Svc (may differ from TCP port).
h3_port: Option<u16>, h3_port: Option<u16>,
} }
@@ -138,6 +179,8 @@ pub struct ProtocolCacheEntry {
pub protocol: String, pub protocol: String,
pub h3_port: Option<u16>, pub h3_port: Option<u16>,
pub age_secs: u64, pub age_secs: u64,
pub last_accessed_secs: u64,
pub last_probed_secs: u64,
pub h2_suppressed: bool, pub h2_suppressed: bool,
pub h3_suppressed: bool, pub h3_suppressed: bool,
pub h2_cooldown_remaining_secs: Option<u64>, pub h2_cooldown_remaining_secs: Option<u64>,
@@ -154,11 +197,11 @@ fn escalate_cooldown(consecutive: u32) -> Duration {
Duration::from_secs(secs.min(PROTOCOL_FAILURE_MAX_COOLDOWN.as_secs())) Duration::from_secs(secs.min(PROTOCOL_FAILURE_MAX_COOLDOWN.as_secs()))
} }
/// Bounded, TTL-based protocol detection cache with failure suppression. /// Bounded, sliding-TTL protocol detection cache with failure suppression.
/// ///
/// Memory safety guarantees: /// Memory safety guarantees:
/// - Hard cap at `PROTOCOL_CACHE_MAX_ENTRIES` — cannot grow unboundedly. /// - Hard cap at `PROTOCOL_CACHE_MAX_ENTRIES` — cannot grow unboundedly.
/// - TTL expiry — stale entries naturally age out on lookup. /// - Sliding TTL expiry — entries age out after 1 day without access.
/// - Background cleanup task — proactively removes expired entries every 60s. /// - Background cleanup task — proactively removes expired entries every 60s.
/// - `clear()` — called on route updates to discard stale detections. /// - `clear()` — called on route updates to discard stale detections.
/// - `Drop` — aborts the background task to prevent dangling tokio tasks. /// - `Drop` — aborts the background task to prevent dangling tokio tasks.
@@ -190,15 +233,25 @@ impl ProtocolCache {
} }
/// Look up the cached protocol for a backend endpoint. /// Look up the cached protocol for a backend endpoint.
///
/// Returns `None` if not cached or expired (caller should probe via ALPN). /// Returns `None` if not cached or expired (caller should probe via ALPN).
/// On hit, refreshes `last_accessed_at` (sliding TTL) and sets `needs_reprobe`
/// if the entry hasn't been probed in over 5 minutes (H1/H2 only).
pub fn get(&self, key: &ProtocolCacheKey) -> Option<CachedProtocol> { pub fn get(&self, key: &ProtocolCacheKey) -> Option<CachedProtocol> {
let entry = self.cache.get(key)?; let mut entry = self.cache.get_mut(key)?;
if entry.detected_at.elapsed() < PROTOCOL_CACHE_TTL { if entry.last_accessed_at.elapsed() < PROTOCOL_CACHE_TTL {
debug!("Protocol cache hit: {:?} for {}:{} (requested: {:?})", // Refresh sliding TTL
entry.protocol, key.host, key.port, key.requested_host); entry.last_accessed_at = Instant::now();
// H3 is the ceiling — can't ALPN-probe for H3 (discovered via Alt-Svc).
// Only H1/H2 entries trigger periodic re-probing.
let needs_reprobe = entry.protocol != DetectedProtocol::H3
&& entry.last_probed_at.elapsed() >= PROTOCOL_REPROBE_INTERVAL;
Some(CachedProtocol { Some(CachedProtocol {
protocol: entry.protocol, protocol: entry.protocol,
h3_port: entry.h3_port, h3_port: entry.h3_port,
needs_reprobe,
}) })
} else { } else {
// Expired — remove and return None to trigger re-probe // Expired — remove and return None to trigger re-probe
@@ -214,7 +267,7 @@ impl ProtocolCache {
/// **Key semantic**: only suppresses if the protocol being inserted matches /// **Key semantic**: only suppresses if the protocol being inserted matches
/// a suppressed protocol. H1 inserts are NEVER suppressed — downgrades /// a suppressed protocol. H1 inserts are NEVER suppressed — downgrades
/// always succeed. /// always succeed.
pub fn insert(&self, key: ProtocolCacheKey, protocol: DetectedProtocol) -> bool { pub fn insert(&self, key: ProtocolCacheKey, protocol: DetectedProtocol, reason: &str) -> bool {
if self.is_suppressed(&key, protocol) { if self.is_suppressed(&key, protocol) {
debug!( debug!(
host = %key.host, port = %key.port, domain = ?key.requested_host, host = %key.host, port = %key.port, domain = ?key.requested_host,
@@ -223,13 +276,13 @@ impl ProtocolCache {
); );
return false; return false;
} }
self.insert_internal(key, protocol, None); self.insert_internal(key, protocol, None, reason);
true true
} }
/// Insert an H3 detection result with the Alt-Svc advertised port. /// Insert an H3 detection result with the Alt-Svc advertised port.
/// Returns `false` if H3 is suppressed. /// Returns `false` if H3 is suppressed.
pub fn insert_h3(&self, key: ProtocolCacheKey, h3_port: u16) -> bool { pub fn insert_h3(&self, key: ProtocolCacheKey, h3_port: u16, reason: &str) -> bool {
if self.is_suppressed(&key, DetectedProtocol::H3) { if self.is_suppressed(&key, DetectedProtocol::H3) {
debug!( debug!(
host = %key.host, port = %key.port, domain = ?key.requested_host, host = %key.host, port = %key.port, domain = ?key.requested_host,
@@ -237,10 +290,54 @@ impl ProtocolCache {
); );
return false; return false;
} }
self.insert_internal(key, DetectedProtocol::H3, Some(h3_port)); self.insert_internal(key, DetectedProtocol::H3, Some(h3_port), reason);
true true
} }
/// Update the cache after an inline ALPN re-probe completes.
///
/// Always updates `last_probed_at`. If the protocol changed, logs the transition
/// and updates the entry. Returns `Some(new_protocol)` if changed, `None` if unchanged.
pub fn update_probe_result(
&self,
key: &ProtocolCacheKey,
probed_protocol: DetectedProtocol,
reason: &str,
) -> Option<DetectedProtocol> {
if let Some(mut entry) = self.cache.get_mut(key) {
let old_protocol = entry.protocol;
entry.last_probed_at = Instant::now();
entry.last_accessed_at = Instant::now();
if old_protocol != probed_protocol {
info!(
host = %key.host, port = %key.port, domain = ?key.requested_host,
old = %old_protocol, new = %probed_protocol, reason = %reason,
"Protocol transition"
);
entry.protocol = probed_protocol;
entry.detected_at = Instant::now();
// Clear h3_port if downgrading from H3
if old_protocol == DetectedProtocol::H3 && probed_protocol != DetectedProtocol::H3 {
entry.h3_port = None;
}
return Some(probed_protocol);
}
debug!(
host = %key.host, port = %key.port, domain = ?key.requested_host,
protocol = %old_protocol, reason = %reason,
"Re-probe confirmed — no protocol change"
);
None
} else {
// Entry was evicted between the get() and the probe completing.
// Insert as a fresh entry.
self.insert_internal(key.clone(), probed_protocol, None, reason);
Some(probed_protocol)
}
}
/// Record a protocol failure. Future `insert()` calls for this protocol /// Record a protocol failure. Future `insert()` calls for this protocol
/// will be suppressed until the escalating cooldown expires. /// will be suppressed until the escalating cooldown expires.
/// ///
@@ -281,7 +378,7 @@ impl ProtocolCache {
Self::reduce_cooldown_to(entry.h3.as_mut(), PROTOCOL_FAILURE_COOLDOWN); Self::reduce_cooldown_to(entry.h3.as_mut(), PROTOCOL_FAILURE_COOLDOWN);
} }
debug!( info!(
host = %key.host, port = %key.port, domain = ?key.requested_host, host = %key.host, port = %key.port, domain = ?key.requested_host,
protocol = ?protocol, protocol = ?protocol,
consecutive = consecutive, consecutive = consecutive,
@@ -348,6 +445,17 @@ impl ProtocolCache {
} }
} }
/// Evict a cache entry entirely. Called when all protocol probes (H3, H2, H1)
/// have failed for a backend.
pub fn evict(&self, key: &ProtocolCacheKey) {
self.cache.remove(key);
self.failures.remove(key);
info!(
host = %key.host, port = %key.port, domain = ?key.requested_host,
"Cache entry evicted — all protocols failed"
);
}
/// Clear all entries. Called on route updates to discard stale detections. /// Clear all entries. Called on route updates to discard stale detections.
pub fn clear(&self) { pub fn clear(&self) {
self.cache.clear(); self.cache.clear();
@@ -357,7 +465,7 @@ impl ProtocolCache {
/// Snapshot all non-expired cache entries for metrics/UI display. /// Snapshot all non-expired cache entries for metrics/UI display.
pub fn snapshot(&self) -> Vec<ProtocolCacheEntry> { pub fn snapshot(&self) -> Vec<ProtocolCacheEntry> {
self.cache.iter() self.cache.iter()
.filter(|entry| entry.value().detected_at.elapsed() < PROTOCOL_CACHE_TTL) .filter(|entry| entry.value().last_accessed_at.elapsed() < PROTOCOL_CACHE_TTL)
.map(|entry| { .map(|entry| {
let key = entry.key(); let key = entry.key();
let val = entry.value(); let val = entry.value();
@@ -381,6 +489,8 @@ impl ProtocolCache {
}, },
h3_port: val.h3_port, h3_port: val.h3_port,
age_secs: val.detected_at.elapsed().as_secs(), age_secs: val.detected_at.elapsed().as_secs(),
last_accessed_secs: val.last_accessed_at.elapsed().as_secs(),
last_probed_secs: val.last_probed_at.elapsed().as_secs(),
h2_suppressed: h2_sup, h2_suppressed: h2_sup,
h3_suppressed: h3_sup, h3_suppressed: h3_sup,
h2_cooldown_remaining_secs: h2_cd, h2_cooldown_remaining_secs: h2_cd,
@@ -395,19 +505,37 @@ impl ProtocolCache {
// --- Internal helpers --- // --- Internal helpers ---
/// Insert a protocol detection result with an optional H3 port. /// Insert a protocol detection result with an optional H3 port.
/// Logs protocol transitions when overwriting an existing entry.
/// No suppression check — callers must check before calling. /// No suppression check — callers must check before calling.
fn insert_internal(&self, key: ProtocolCacheKey, protocol: DetectedProtocol, h3_port: Option<u16>) { fn insert_internal(&self, key: ProtocolCacheKey, protocol: DetectedProtocol, h3_port: Option<u16>, reason: &str) {
// Check for existing entry to log protocol transitions
if let Some(existing) = self.cache.get(&key) {
if existing.protocol != protocol {
info!(
host = %key.host, port = %key.port, domain = ?key.requested_host,
old = %existing.protocol, new = %protocol, reason = %reason,
"Protocol transition"
);
}
drop(existing);
}
// Evict oldest entry if at capacity
if self.cache.len() >= PROTOCOL_CACHE_MAX_ENTRIES && !self.cache.contains_key(&key) { if self.cache.len() >= PROTOCOL_CACHE_MAX_ENTRIES && !self.cache.contains_key(&key) {
let oldest = self.cache.iter() let oldest = self.cache.iter()
.min_by_key(|entry| entry.value().detected_at) .min_by_key(|entry| entry.value().last_accessed_at)
.map(|entry| entry.key().clone()); .map(|entry| entry.key().clone());
if let Some(oldest_key) = oldest { if let Some(oldest_key) = oldest {
self.cache.remove(&oldest_key); self.cache.remove(&oldest_key);
} }
} }
let now = Instant::now();
self.cache.insert(key, CachedEntry { self.cache.insert(key, CachedEntry {
protocol, protocol,
detected_at: Instant::now(), detected_at: now,
last_accessed_at: now,
last_probed_at: now,
h3_port, h3_port,
}); });
} }
@@ -453,9 +581,9 @@ impl ProtocolCache {
loop { loop {
interval.tick().await; interval.tick().await;
// Clean expired cache entries // Clean expired cache entries (sliding TTL based on last_accessed_at)
let expired: Vec<ProtocolCacheKey> = cache.iter() let expired: Vec<ProtocolCacheKey> = cache.iter()
.filter(|entry| entry.value().detected_at.elapsed() >= PROTOCOL_CACHE_TTL) .filter(|entry| entry.value().last_accessed_at.elapsed() >= PROTOCOL_CACHE_TTL)
.map(|entry| entry.key().clone()) .map(|entry| entry.key().clone())
.collect(); .collect();

View File

@@ -711,6 +711,9 @@ impl HttpProxyService {
let cached_h3_port = self.protocol_cache.get(&protocol_cache_key) let cached_h3_port = self.protocol_cache.get(&protocol_cache_key)
.and_then(|c| c.h3_port); .and_then(|c| c.h3_port);
// Track whether this ALPN probe is a periodic re-probe (vs first-time detection)
let mut is_reprobe = false;
let protocol_decision = match backend_protocol_mode { let protocol_decision = match backend_protocol_mode {
rustproxy_config::BackendProtocol::Http1 => ProtocolDecision::H1, rustproxy_config::BackendProtocol::Http1 => ProtocolDecision::H1,
rustproxy_config::BackendProtocol::Http2 => ProtocolDecision::H2, rustproxy_config::BackendProtocol::Http2 => ProtocolDecision::H2,
@@ -721,6 +724,12 @@ impl HttpProxyService {
ProtocolDecision::H1 ProtocolDecision::H1
} else { } else {
match self.protocol_cache.get(&protocol_cache_key) { match self.protocol_cache.get(&protocol_cache_key) {
Some(cached) if cached.needs_reprobe => {
// Entry exists but 5+ minutes since last probe — force ALPN re-probe
// (only fires for H1/H2; H3 entries have needs_reprobe=false)
is_reprobe = true;
ProtocolDecision::AlpnProbe
}
Some(cached) => match cached.protocol { Some(cached) => match cached.protocol {
crate::protocol_cache::DetectedProtocol::H3 => { crate::protocol_cache::DetectedProtocol::H3 => {
if self.protocol_cache.is_suppressed(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) { if self.protocol_cache.is_suppressed(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
@@ -893,7 +902,7 @@ impl HttpProxyService {
let alpn = tls.get_ref().1.alpn_protocol(); let alpn = tls.get_ref().1.alpn_protocol();
let is_h2 = alpn.map(|p| p == b"h2").unwrap_or(false); let is_h2 = alpn.map(|p| p == b"h2").unwrap_or(false);
// Cache the result // Cache the result (or update existing entry for re-probes)
let cache_key = crate::protocol_cache::ProtocolCacheKey { let cache_key = crate::protocol_cache::ProtocolCacheKey {
host: upstream.host.clone(), host: upstream.host.clone(),
port: upstream.port, port: upstream.port,
@@ -904,13 +913,18 @@ impl HttpProxyService {
} else { } else {
crate::protocol_cache::DetectedProtocol::H1 crate::protocol_cache::DetectedProtocol::H1
}; };
self.protocol_cache.insert(cache_key, detected); if is_reprobe {
self.protocol_cache.update_probe_result(&cache_key, detected, "periodic ALPN re-probe");
} else {
self.protocol_cache.insert(cache_key, detected, "initial ALPN detection");
}
info!( info!(
backend = %upstream_key, backend = %upstream_key,
domain = %domain_str, domain = %domain_str,
protocol = if is_h2 { "h2" } else { "h1" }, protocol = if is_h2 { "h2" } else { "h1" },
connect_time_ms = %connect_start.elapsed().as_millis(), connect_time_ms = %connect_start.elapsed().as_millis(),
reprobe = is_reprobe,
"Backend protocol detected via ALPN" "Backend protocol detected via ALPN"
); );
@@ -938,11 +952,11 @@ impl HttpProxyService {
if let Some(h3_port) = cached_h3_port { if let Some(h3_port) = cached_h3_port {
if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) { if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3); self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
debug!(backend = %upstream_key, domain = %domain_str, "TCP connect failed — escalating to H3"); debug!(backend = %upstream_key, domain = %domain_str, "TLS connect failed — escalating to H3");
match self.connect_quic_backend(&upstream.host, h3_port).await { match self.connect_quic_backend(&upstream.host, h3_port).await {
Ok(quic_conn) => { Ok(quic_conn) => {
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3); self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port); self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port, "recovery — TLS failed, H3 succeeded");
let h3_pool_key = crate::connection_pool::PoolKey { let h3_pool_key = crate::connection_pool::PoolKey {
host: upstream.host.clone(), port: h3_port, use_tls: true, host: upstream.host.clone(), port: h3_port, use_tls: true,
protocol: crate::connection_pool::PoolProtocol::H3, protocol: crate::connection_pool::PoolProtocol::H3,
@@ -961,6 +975,8 @@ impl HttpProxyService {
} }
} }
} }
// All protocols failed — evict cache entry
self.protocol_cache.evict(&protocol_cache_key);
} }
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend TLS unavailable")); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend TLS unavailable"));
} }
@@ -979,11 +995,11 @@ impl HttpProxyService {
if let Some(h3_port) = cached_h3_port { if let Some(h3_port) = cached_h3_port {
if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) { if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3); self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
debug!(backend = %upstream_key, domain = %domain_str, "TCP connect timeout — escalating to H3"); debug!(backend = %upstream_key, domain = %domain_str, "TLS connect timeout — escalating to H3");
match self.connect_quic_backend(&upstream.host, h3_port).await { match self.connect_quic_backend(&upstream.host, h3_port).await {
Ok(quic_conn) => { Ok(quic_conn) => {
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3); self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port); self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port, "recovery — TLS timeout, H3 succeeded");
let h3_pool_key = crate::connection_pool::PoolKey { let h3_pool_key = crate::connection_pool::PoolKey {
host: upstream.host.clone(), port: h3_port, use_tls: true, host: upstream.host.clone(), port: h3_port, use_tls: true,
protocol: crate::connection_pool::PoolProtocol::H3, protocol: crate::connection_pool::PoolProtocol::H3,
@@ -1002,6 +1018,8 @@ impl HttpProxyService {
} }
} }
} }
// All protocols failed — evict cache entry
self.protocol_cache.evict(&protocol_cache_key);
} }
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend TLS connect timeout")); return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend TLS connect timeout"));
} }
@@ -1040,7 +1058,7 @@ impl HttpProxyService {
match self.connect_quic_backend(&upstream.host, h3_port).await { match self.connect_quic_backend(&upstream.host, h3_port).await {
Ok(quic_conn) => { Ok(quic_conn) => {
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3); self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port); self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port, "recovery — TCP failed, H3 succeeded");
let h3_pool_key = crate::connection_pool::PoolKey { let h3_pool_key = crate::connection_pool::PoolKey {
host: upstream.host.clone(), port: h3_port, use_tls: true, host: upstream.host.clone(), port: h3_port, use_tls: true,
protocol: crate::connection_pool::PoolProtocol::H3, protocol: crate::connection_pool::PoolProtocol::H3,
@@ -1059,6 +1077,8 @@ impl HttpProxyService {
} }
} }
} }
// All protocols failed — evict cache entry
self.protocol_cache.evict(&protocol_cache_key);
} }
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable")); return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable"));
} }
@@ -1081,7 +1101,7 @@ impl HttpProxyService {
match self.connect_quic_backend(&upstream.host, h3_port).await { match self.connect_quic_backend(&upstream.host, h3_port).await {
Ok(quic_conn) => { Ok(quic_conn) => {
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3); self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port); self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port, "recovery — TCP timeout, H3 succeeded");
let h3_pool_key = crate::connection_pool::PoolKey { let h3_pool_key = crate::connection_pool::PoolKey {
host: upstream.host.clone(), port: h3_port, use_tls: true, host: upstream.host.clone(), port: h3_port, use_tls: true,
protocol: crate::connection_pool::PoolProtocol::H3, protocol: crate::connection_pool::PoolProtocol::H3,
@@ -1100,6 +1120,8 @@ impl HttpProxyService {
} }
} }
} }
// All protocols failed — evict cache entry
self.protocol_cache.evict(&protocol_cache_key);
} }
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout")); return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout"));
} }
@@ -1574,7 +1596,7 @@ impl HttpProxyService {
cache_key.clone(), cache_key.clone(),
crate::protocol_cache::DetectedProtocol::H2, crate::protocol_cache::DetectedProtocol::H2,
); );
self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1); self.protocol_cache.insert(cache_key.clone(), crate::protocol_cache::DetectedProtocol::H1, "H2 handshake timeout — downgrade");
match self.reconnect_backend(upstream, domain, backend_key).await { match self.reconnect_backend(upstream, domain, backend_key).await {
Some(fallback_backend) => { Some(fallback_backend) => {
@@ -1593,6 +1615,8 @@ impl HttpProxyService {
result result
} }
None => { None => {
// H2 failed and H1 reconnect also failed — evict cache
self.protocol_cache.evict(&cache_key);
Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable after H2 timeout fallback")) Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable after H2 timeout fallback"))
} }
} }
@@ -1717,7 +1741,7 @@ impl HttpProxyService {
cache_key.clone(), cache_key.clone(),
crate::protocol_cache::DetectedProtocol::H2, crate::protocol_cache::DetectedProtocol::H2,
); );
self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1); self.protocol_cache.insert(cache_key.clone(), crate::protocol_cache::DetectedProtocol::H1, "H2 handshake error — downgrade");
// Reconnect for H1 (the original io was consumed by the failed h2 handshake) // Reconnect for H1 (the original io was consumed by the failed h2 handshake)
match self.reconnect_backend(upstream, domain, backend_key).await { match self.reconnect_backend(upstream, domain, backend_key).await {
@@ -1738,6 +1762,8 @@ impl HttpProxyService {
result result
} }
None => { None => {
// H2 failed and H1 reconnect also failed — evict cache
self.protocol_cache.evict(&cache_key);
Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable after H2 fallback")) Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable after H2 fallback"))
} }
} }
@@ -1954,7 +1980,7 @@ impl HttpProxyService {
if let Some(alt_svc) = resp_parts.headers.get("alt-svc").and_then(|v| v.to_str().ok()) { if let Some(alt_svc) = resp_parts.headers.get("alt-svc").and_then(|v| v.to_str().ok()) {
if let Some(h3_port) = parse_alt_svc_h3_port(alt_svc) { if let Some(h3_port) = parse_alt_svc_h3_port(alt_svc) {
debug!(h3_port, "Backend advertises H3 via Alt-Svc"); debug!(h3_port, "Backend advertises H3 via Alt-Svc");
self.protocol_cache.insert_h3(cache_key.clone(), h3_port); self.protocol_cache.insert_h3(cache_key.clone(), h3_port, "Alt-Svc response header");
} }
} }
} }

View File

@@ -89,6 +89,8 @@ pub struct ProtocolCacheEntryMetric {
pub protocol: String, pub protocol: String,
pub h3_port: Option<u16>, pub h3_port: Option<u16>,
pub age_secs: u64, pub age_secs: u64,
pub last_accessed_secs: u64,
pub last_probed_secs: u64,
pub h2_suppressed: bool, pub h2_suppressed: bool,
pub h3_suppressed: bool, pub h3_suppressed: bool,
pub h2_cooldown_remaining_secs: Option<u64>, pub h2_cooldown_remaining_secs: Option<u64>,

View File

@@ -950,6 +950,8 @@ impl RustProxy {
protocol: e.protocol, protocol: e.protocol,
h3_port: e.h3_port, h3_port: e.h3_port,
age_secs: e.age_secs, age_secs: e.age_secs,
last_accessed_secs: e.last_accessed_secs,
last_probed_secs: e.last_probed_secs,
h2_suppressed: e.h2_suppressed, h2_suppressed: e.h2_suppressed,
h3_suppressed: e.h3_suppressed, h3_suppressed: e.h3_suppressed,
h2_cooldown_remaining_secs: e.h2_cooldown_remaining_secs, h2_cooldown_remaining_secs: e.h2_cooldown_remaining_secs,

View File

@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartproxy', name: '@push.rocks/smartproxy',
version: '26.1.0', version: '26.2.0',
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
} }

View File

@@ -126,6 +126,8 @@ export interface IProtocolCacheEntry {
protocol: string; protocol: string;
h3Port: number | null; h3Port: number | null;
ageSecs: number; ageSecs: number;
lastAccessedSecs: number;
lastProbedSecs: number;
h2Suppressed: boolean; h2Suppressed: boolean;
h3Suppressed: boolean; h3Suppressed: boolean;
h2CooldownRemainingSecs: number | null; h2CooldownRemainingSecs: number | null;