Compare commits

..

2 Commits

Author SHA1 Message Date
56f5697e1b v26.1.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-22 10:20:00 +00:00
f04875885f feat(rustproxy-http): add protocol failure suppression, h3 fallback escalation, and protocol cache metrics exposure 2026-03-22 10:20:00 +00:00
9 changed files with 603 additions and 32 deletions

View File

@@ -1,5 +1,12 @@
# Changelog
## 2026-03-22 - 26.1.0 - feat(rustproxy-http)
add protocol failure suppression, h3 fallback escalation, and protocol cache metrics exposure
- introduces escalating cooldowns for failed H2/H3 protocol detection to prevent repeated upgrades to unstable backends
- adds within-request escalation to cached HTTP/3 when TCP or TLS backend connections fail in auto-detect mode
- exposes detected protocol cache entries and suppression state through Rust metrics and the TypeScript metrics adapter
## 2026-03-21 - 26.0.0 - BREAKING CHANGE(ts-api,rustproxy)
remove deprecated TypeScript protocol and utility exports while hardening QUIC, HTTP/3, WebSocket, and rate limiter cleanup paths

View File

@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartproxy",
"version": "26.0.0",
"version": "26.1.0",
"private": false,
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
"main": "dist_ts/index.js",

View File

@@ -1,11 +1,22 @@
//! Bounded, TTL-based protocol detection cache for backend protocol auto-detection.
//! Bounded, TTL-based protocol detection cache with generic failure suppression.
//!
//! Caches the detected protocol (H1, H2, or H3) per backend endpoint and requested
//! domain (host:port + requested_host). This prevents cache oscillation when multiple
//! frontend domains share the same backend but differ in protocol support.
//!
//! H3 detection uses the browser model: Alt-Svc headers from H1/H2 responses are
//! parsed and cached, including the advertised H3 port (which may differ from TCP).
//! ## Upgrade signals
//!
//! - ALPN (TLS handshake) → detects H2 vs H1
//! - Alt-Svc (response header) → advertises H3
//!
//! ## Failure suppression
//!
//! When a protocol fails, `record_failure()` prevents upgrade signals from
//! re-introducing it until an escalating cooldown expires (5s → 10s → ... → 300s).
//! Within-request escalation is allowed via `can_retry()` after a 5s minimum gap.
//!
//! Cascading: when a lower protocol also fails, higher protocol cooldowns are
//! reduced to 5s remaining (not instant clear), preventing tight retry loops.
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -18,14 +29,23 @@ use tracing::debug;
const PROTOCOL_CACHE_TTL: Duration = Duration::from_secs(300); // 5 minutes
/// Maximum number of entries in the protocol cache.
/// Prevents unbounded growth when backends come and go.
const PROTOCOL_CACHE_MAX_ENTRIES: usize = 4096;
/// Background cleanup interval for the protocol cache.
/// Background cleanup interval.
const PROTOCOL_CACHE_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
/// Minimum cooldown between retry attempts of a failed protocol.
const PROTOCOL_FAILURE_COOLDOWN: Duration = Duration::from_secs(5);
/// Maximum cooldown (escalation ceiling). Matches cache TTL.
const PROTOCOL_FAILURE_MAX_COOLDOWN: Duration = Duration::from_secs(300);
/// Consecutive failure count at which cooldown reaches maximum.
/// 5s × 2^5 = 160s, 5s × 2^6 = 320s → capped at 300s.
const PROTOCOL_FAILURE_ESCALATION_CAP: u32 = 6;
/// Detected backend protocol.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DetectedProtocol {
H1,
H2,
@@ -58,7 +78,83 @@ struct CachedEntry {
h3_port: Option<u16>,
}
/// Bounded, TTL-based protocol detection cache.
/// Failure record for a single protocol level.
#[derive(Debug, Clone)]
struct FailureRecord {
/// When the failure was last recorded.
failed_at: Instant,
/// Current cooldown duration. Escalates on consecutive failures.
cooldown: Duration,
/// Number of consecutive failures (for escalation).
consecutive_failures: u32,
}
/// Per-key failure state. Tracks failures at each upgradeable protocol level.
/// H1 is never tracked (it's the protocol floor — nothing to fall back to).
#[derive(Debug, Clone, Default)]
struct FailureState {
h2: Option<FailureRecord>,
h3: Option<FailureRecord>,
}
impl FailureState {
fn is_empty(&self) -> bool {
self.h2.is_none() && self.h3.is_none()
}
fn all_expired(&self) -> bool {
let h2_expired = self.h2.as_ref()
.map(|r| r.failed_at.elapsed() >= r.cooldown)
.unwrap_or(true);
let h3_expired = self.h3.as_ref()
.map(|r| r.failed_at.elapsed() >= r.cooldown)
.unwrap_or(true);
h2_expired && h3_expired
}
fn get(&self, protocol: DetectedProtocol) -> Option<&FailureRecord> {
match protocol {
DetectedProtocol::H2 => self.h2.as_ref(),
DetectedProtocol::H3 => self.h3.as_ref(),
DetectedProtocol::H1 => None,
}
}
fn get_mut(&mut self, protocol: DetectedProtocol) -> &mut Option<FailureRecord> {
match protocol {
DetectedProtocol::H2 => &mut self.h2,
DetectedProtocol::H3 => &mut self.h3,
DetectedProtocol::H1 => unreachable!("H1 failures are never recorded"),
}
}
}
/// Snapshot of a single protocol cache entry, suitable for metrics/UI display.
#[derive(Debug, Clone)]
pub struct ProtocolCacheEntry {
pub host: String,
pub port: u16,
pub domain: Option<String>,
pub protocol: String,
pub h3_port: Option<u16>,
pub age_secs: u64,
pub h2_suppressed: bool,
pub h3_suppressed: bool,
pub h2_cooldown_remaining_secs: Option<u64>,
pub h3_cooldown_remaining_secs: Option<u64>,
pub h2_consecutive_failures: Option<u32>,
pub h3_consecutive_failures: Option<u32>,
}
/// Exponential backoff: PROTOCOL_FAILURE_COOLDOWN × 2^(n-1), capped at MAX.
fn escalate_cooldown(consecutive: u32) -> Duration {
let base = PROTOCOL_FAILURE_COOLDOWN.as_secs();
let exp = consecutive.saturating_sub(1).min(63) as u64;
let secs = base.saturating_mul(1u64.checked_shl(exp as u32).unwrap_or(u64::MAX));
Duration::from_secs(secs.min(PROTOCOL_FAILURE_MAX_COOLDOWN.as_secs()))
}
/// Bounded, TTL-based protocol detection cache with failure suppression.
///
/// Memory safety guarantees:
/// - Hard cap at `PROTOCOL_CACHE_MAX_ENTRIES` — cannot grow unboundedly.
@@ -68,6 +164,10 @@ struct CachedEntry {
/// - `Drop` — aborts the background task to prevent dangling tokio tasks.
pub struct ProtocolCache {
cache: Arc<DashMap<ProtocolCacheKey, CachedEntry>>,
/// Generic protocol failure suppression map. Tracks per-protocol failure
/// records (H2, H3) for each cache key. Used to prevent upgrade signals
/// (ALPN, Alt-Svc) from re-introducing failed protocols.
failures: Arc<DashMap<ProtocolCacheKey, FailureState>>,
cleanup_handle: Option<tokio::task::JoinHandle<()>>,
}
@@ -75,13 +175,16 @@ impl ProtocolCache {
/// Create a new protocol cache and start the background cleanup task.
pub fn new() -> Self {
let cache: Arc<DashMap<ProtocolCacheKey, CachedEntry>> = Arc::new(DashMap::new());
let failures: Arc<DashMap<ProtocolCacheKey, FailureState>> = Arc::new(DashMap::new());
let cache_clone = Arc::clone(&cache);
let failures_clone = Arc::clone(&failures);
let cleanup_handle = tokio::spawn(async move {
Self::cleanup_loop(cache_clone).await;
Self::cleanup_loop(cache_clone, failures_clone).await;
});
Self {
cache,
failures,
cleanup_handle: Some(cleanup_handle),
}
}
@@ -91,7 +194,8 @@ impl ProtocolCache {
pub fn get(&self, key: &ProtocolCacheKey) -> Option<CachedProtocol> {
let entry = self.cache.get(key)?;
if entry.detected_at.elapsed() < PROTOCOL_CACHE_TTL {
debug!("Protocol cache hit: {:?} for {}:{} (requested: {:?})", entry.protocol, key.host, key.port, key.requested_host);
debug!("Protocol cache hit: {:?} for {}:{} (requested: {:?})",
entry.protocol, key.host, key.port, key.requested_host);
Some(CachedProtocol {
protocol: entry.protocol,
h3_port: entry.h3_port,
@@ -105,20 +209,195 @@ impl ProtocolCache {
}
/// Insert a detected protocol into the cache.
/// If the cache is at capacity, evict the oldest entry first.
pub fn insert(&self, key: ProtocolCacheKey, protocol: DetectedProtocol) {
self.insert_with_h3_port(key, protocol, None);
/// Returns `false` if suppressed due to active failure suppression.
///
/// **Key semantic**: only suppresses if the protocol being inserted matches
/// a suppressed protocol. H1 inserts are NEVER suppressed — downgrades
/// always succeed.
pub fn insert(&self, key: ProtocolCacheKey, protocol: DetectedProtocol) -> bool {
if self.is_suppressed(&key, protocol) {
debug!(
host = %key.host, port = %key.port, domain = ?key.requested_host,
protocol = ?protocol,
"Protocol cache insert suppressed — recent failure"
);
return false;
}
self.insert_internal(key, protocol, None);
true
}
/// Insert an H3 detection result with the Alt-Svc advertised port.
pub fn insert_h3(&self, key: ProtocolCacheKey, h3_port: u16) {
self.insert_with_h3_port(key, DetectedProtocol::H3, Some(h3_port));
/// Returns `false` if H3 is suppressed.
pub fn insert_h3(&self, key: ProtocolCacheKey, h3_port: u16) -> bool {
if self.is_suppressed(&key, DetectedProtocol::H3) {
debug!(
host = %key.host, port = %key.port, domain = ?key.requested_host,
"H3 upgrade suppressed — recent failure"
);
return false;
}
self.insert_internal(key, DetectedProtocol::H3, Some(h3_port));
true
}
/// Record a protocol failure. Future `insert()` calls for this protocol
/// will be suppressed until the escalating cooldown expires.
///
/// Cooldown escalation: 5s → 10s → 20s → 40s → 80s → 160s → 300s.
/// Consecutive counter resets if the previous failure is older than 2× its cooldown.
///
/// Cascading: when H2 fails, H3 cooldown is reduced to 5s remaining.
/// H1 failures are ignored (H1 is the protocol floor).
pub fn record_failure(&self, key: ProtocolCacheKey, protocol: DetectedProtocol) {
if protocol == DetectedProtocol::H1 {
return; // H1 is the floor — nothing to suppress
}
let mut entry = self.failures.entry(key.clone()).or_default();
let record = entry.get_mut(protocol);
let (consecutive, new_cooldown) = match record {
Some(existing) if existing.failed_at.elapsed() < existing.cooldown.saturating_mul(2) => {
// Still within the "recent" window — escalate
let c = existing.consecutive_failures.saturating_add(1)
.min(PROTOCOL_FAILURE_ESCALATION_CAP);
(c, escalate_cooldown(c))
}
_ => {
// First failure or old failure that expired long ago — reset
(1, PROTOCOL_FAILURE_COOLDOWN)
}
};
*record = Some(FailureRecord {
failed_at: Instant::now(),
cooldown: new_cooldown,
consecutive_failures: consecutive,
});
// Cascading: when H2 fails, reduce H3 cooldown to 5s remaining
if protocol == DetectedProtocol::H2 {
Self::reduce_cooldown_to(entry.h3.as_mut(), PROTOCOL_FAILURE_COOLDOWN);
}
debug!(
host = %key.host, port = %key.port, domain = ?key.requested_host,
protocol = ?protocol,
consecutive = consecutive,
cooldown_secs = new_cooldown.as_secs(),
"Protocol failure recorded — suppressing for {:?}", new_cooldown
);
}
/// Check whether a protocol is currently suppressed for the given key.
/// Returns `true` if the protocol failed within its cooldown period.
/// H1 is never suppressed.
pub fn is_suppressed(&self, key: &ProtocolCacheKey, protocol: DetectedProtocol) -> bool {
if protocol == DetectedProtocol::H1 {
return false;
}
self.failures.get(key)
.and_then(|entry| entry.get(protocol).map(|r| r.failed_at.elapsed() < r.cooldown))
.unwrap_or(false)
}
/// Check whether a protocol can be retried (for within-request escalation).
/// Returns `true` if there's no failure record OR if ≥5s have passed since
/// the last attempt. More permissive than `is_suppressed`.
pub fn can_retry(&self, key: &ProtocolCacheKey, protocol: DetectedProtocol) -> bool {
if protocol == DetectedProtocol::H1 {
return true;
}
match self.failures.get(key) {
Some(entry) => match entry.get(protocol) {
Some(r) => r.failed_at.elapsed() >= PROTOCOL_FAILURE_COOLDOWN,
None => true, // no failure record
},
None => true,
}
}
/// Record a retry attempt WITHOUT escalating the cooldown.
/// Resets the `failed_at` timestamp to prevent rapid retries (5s gate).
/// Called before an escalation attempt. If the attempt fails,
/// `record_failure` should be called afterward with proper escalation.
pub fn record_retry_attempt(&self, key: &ProtocolCacheKey, protocol: DetectedProtocol) {
if protocol == DetectedProtocol::H1 {
return;
}
if let Some(mut entry) = self.failures.get_mut(key) {
if let Some(ref mut r) = entry.get_mut(protocol) {
r.failed_at = Instant::now();
}
}
}
/// Clear the failure record for a protocol (it recovered).
/// Called when an escalation retry succeeds.
pub fn clear_failure(&self, key: &ProtocolCacheKey, protocol: DetectedProtocol) {
if protocol == DetectedProtocol::H1 {
return;
}
if let Some(mut entry) = self.failures.get_mut(key) {
*entry.get_mut(protocol) = None;
if entry.is_empty() {
drop(entry);
self.failures.remove(key);
}
}
}
/// Clear all entries. Called on route updates to discard stale detections.
pub fn clear(&self) {
self.cache.clear();
self.failures.clear();
}
/// Snapshot all non-expired cache entries for metrics/UI display.
pub fn snapshot(&self) -> Vec<ProtocolCacheEntry> {
self.cache.iter()
.filter(|entry| entry.value().detected_at.elapsed() < PROTOCOL_CACHE_TTL)
.map(|entry| {
let key = entry.key();
let val = entry.value();
let failure_info = self.failures.get(key);
let (h2_sup, h2_cd, h2_cons) = Self::suppression_info(
failure_info.as_deref().and_then(|f| f.h2.as_ref()),
);
let (h3_sup, h3_cd, h3_cons) = Self::suppression_info(
failure_info.as_deref().and_then(|f| f.h3.as_ref()),
);
ProtocolCacheEntry {
host: key.host.clone(),
port: key.port,
domain: key.requested_host.clone(),
protocol: match val.protocol {
DetectedProtocol::H1 => "h1".to_string(),
DetectedProtocol::H2 => "h2".to_string(),
DetectedProtocol::H3 => "h3".to_string(),
},
h3_port: val.h3_port,
age_secs: val.detected_at.elapsed().as_secs(),
h2_suppressed: h2_sup,
h3_suppressed: h3_sup,
h2_cooldown_remaining_secs: h2_cd,
h3_cooldown_remaining_secs: h3_cd,
h2_consecutive_failures: h2_cons,
h3_consecutive_failures: h3_cons,
}
})
.collect()
}
// --- Internal helpers ---
/// Insert a protocol detection result with an optional H3 port.
fn insert_with_h3_port(&self, key: ProtocolCacheKey, protocol: DetectedProtocol, h3_port: Option<u16>) {
/// No suppression check — callers must check before calling.
fn insert_internal(&self, key: ProtocolCacheKey, protocol: DetectedProtocol, h3_port: Option<u16>) {
if self.cache.len() >= PROTOCOL_CACHE_MAX_ENTRIES && !self.cache.contains_key(&key) {
// Evict the oldest entry to stay within bounds
let oldest = self.cache.iter()
.min_by_key(|entry| entry.value().detected_at)
.map(|entry| entry.key().clone());
@@ -133,17 +412,48 @@ impl ProtocolCache {
});
}
/// Clear all entries. Called on route updates to discard stale detections.
pub fn clear(&self) {
self.cache.clear();
/// Reduce a failure record's remaining cooldown to `target`, if it currently
/// has MORE than `target` remaining. Never increases cooldown.
fn reduce_cooldown_to(record: Option<&mut FailureRecord>, target: Duration) {
if let Some(r) = record {
let elapsed = r.failed_at.elapsed();
if elapsed < r.cooldown {
let remaining = r.cooldown - elapsed;
if remaining > target {
// Shrink cooldown so it expires in `target` from now
r.cooldown = elapsed + target;
}
}
}
}
/// Background cleanup loop — removes expired entries every `PROTOCOL_CACHE_CLEANUP_INTERVAL`.
async fn cleanup_loop(cache: Arc<DashMap<ProtocolCacheKey, CachedEntry>>) {
/// Extract suppression info from a failure record for metrics.
fn suppression_info(record: Option<&FailureRecord>) -> (bool, Option<u64>, Option<u32>) {
match record {
Some(r) => {
let elapsed = r.failed_at.elapsed();
let suppressed = elapsed < r.cooldown;
let remaining = if suppressed {
Some((r.cooldown - elapsed).as_secs())
} else {
None
};
(suppressed, remaining, Some(r.consecutive_failures))
}
None => (false, None, None),
}
}
/// Background cleanup loop.
async fn cleanup_loop(
cache: Arc<DashMap<ProtocolCacheKey, CachedEntry>>,
failures: Arc<DashMap<ProtocolCacheKey, FailureState>>,
) {
let mut interval = tokio::time::interval(PROTOCOL_CACHE_CLEANUP_INTERVAL);
loop {
interval.tick().await;
// Clean expired cache entries
let expired: Vec<ProtocolCacheKey> = cache.iter()
.filter(|entry| entry.value().detected_at.elapsed() >= PROTOCOL_CACHE_TTL)
.map(|entry| entry.key().clone())
@@ -155,6 +465,31 @@ impl ProtocolCache {
cache.remove(&key);
}
}
// Clean fully-expired failure entries
let expired_failures: Vec<ProtocolCacheKey> = failures.iter()
.filter(|entry| entry.value().all_expired())
.map(|entry| entry.key().clone())
.collect();
if !expired_failures.is_empty() {
debug!("Protocol cache cleanup: removing {} expired failure entries", expired_failures.len());
for key in expired_failures {
failures.remove(&key);
}
}
// Safety net: cap failures map at 2× max entries
if failures.len() > PROTOCOL_CACHE_MAX_ENTRIES * 2 {
let oldest: Vec<ProtocolCacheKey> = failures.iter()
.filter(|e| e.value().all_expired())
.map(|e| e.key().clone())
.take(failures.len() - PROTOCOL_CACHE_MAX_ENTRIES)
.collect();
for key in oldest {
failures.remove(&key);
}
}
}
}
}

View File

@@ -311,6 +311,11 @@ impl HttpProxyService {
self.protocol_cache.clear();
}
/// Snapshot the protocol cache for metrics/UI display.
pub fn protocol_cache_snapshot(&self) -> Vec<crate::protocol_cache::ProtocolCacheEntry> {
self.protocol_cache.snapshot()
}
/// Handle an incoming HTTP connection on a plain TCP stream.
pub async fn handle_connection(
self: Arc<Self>,
@@ -701,6 +706,11 @@ impl HttpProxyService {
port: upstream.port,
requested_host: host.clone(),
};
// Save cached H3 port for within-request escalation (may be needed later
// if TCP connect fails and we escalate to H3 as a last resort)
let cached_h3_port = self.protocol_cache.get(&protocol_cache_key)
.and_then(|c| c.h3_port);
let protocol_decision = match backend_protocol_mode {
rustproxy_config::BackendProtocol::Http1 => ProtocolDecision::H1,
rustproxy_config::BackendProtocol::Http2 => ProtocolDecision::H2,
@@ -713,17 +723,32 @@ impl HttpProxyService {
match self.protocol_cache.get(&protocol_cache_key) {
Some(cached) => match cached.protocol {
crate::protocol_cache::DetectedProtocol::H3 => {
if let Some(h3_port) = cached.h3_port {
if self.protocol_cache.is_suppressed(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
// H3 cached but suppressed — fall back to ALPN probe
ProtocolDecision::AlpnProbe
} else if let Some(h3_port) = cached.h3_port {
ProtocolDecision::H3 { port: h3_port }
} else {
// H3 cached but no port — fall back to ALPN probe
ProtocolDecision::AlpnProbe
}
}
crate::protocol_cache::DetectedProtocol::H2 => ProtocolDecision::H2,
crate::protocol_cache::DetectedProtocol::H2 => {
if self.protocol_cache.is_suppressed(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H2) {
ProtocolDecision::H1
} else {
ProtocolDecision::H2
}
}
crate::protocol_cache::DetectedProtocol::H1 => ProtocolDecision::H1,
},
None => ProtocolDecision::AlpnProbe,
None => {
// Cache miss — skip ALPN probe if H2 is suppressed
if self.protocol_cache.is_suppressed(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H2) {
ProtocolDecision::H1
} else {
ProtocolDecision::AlpnProbe
}
}
}
}
}
@@ -776,8 +801,16 @@ impl HttpProxyService {
return result;
}
Err(e) => {
warn!(backend = %upstream_key, error = %e,
warn!(backend = %upstream_key, domain = %domain_str, error = %e,
"H3 backend connect failed, falling back to H2/H1");
// Record failure with escalating cooldown — prevents Alt-Svc
// from re-upgrading to H3 during cooldown period
if is_auto_detect_mode {
self.protocol_cache.record_failure(
protocol_cache_key.clone(),
crate::protocol_cache::DetectedProtocol::H3,
);
}
// Suppress Alt-Svc caching for the fallback to prevent re-caching H3
// from our own injected Alt-Svc header or a stale backend Alt-Svc
conn_activity.alt_svc_cache_key = None;
@@ -899,6 +932,36 @@ impl HttpProxyService {
);
self.metrics.backend_connect_error(&upstream_key);
self.upstream_selector.connection_ended(&upstream_key);
// --- Within-request escalation: try H3 via QUIC if retryable ---
if is_auto_detect_mode {
if let Some(h3_port) = cached_h3_port {
if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
debug!(backend = %upstream_key, domain = %domain_str, "TCP connect failed — escalating to H3");
match self.connect_quic_backend(&upstream.host, h3_port).await {
Ok(quic_conn) => {
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port);
let h3_pool_key = crate::connection_pool::PoolKey {
host: upstream.host.clone(), port: h3_port, use_tls: true,
protocol: crate::connection_pool::PoolProtocol::H3,
};
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
return result;
}
Err(e3) => {
debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed");
self.protocol_cache.record_failure(protocol_cache_key.clone(), crate::protocol_cache::DetectedProtocol::H3);
}
}
}
}
}
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend TLS unavailable"));
}
Err(_) => {
@@ -910,6 +973,36 @@ impl HttpProxyService {
);
self.metrics.backend_connect_error(&upstream_key);
self.upstream_selector.connection_ended(&upstream_key);
// --- Within-request escalation: try H3 via QUIC if retryable ---
if is_auto_detect_mode {
if let Some(h3_port) = cached_h3_port {
if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
debug!(backend = %upstream_key, domain = %domain_str, "TCP connect timeout — escalating to H3");
match self.connect_quic_backend(&upstream.host, h3_port).await {
Ok(quic_conn) => {
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port);
let h3_pool_key = crate::connection_pool::PoolKey {
host: upstream.host.clone(), port: h3_port, use_tls: true,
protocol: crate::connection_pool::PoolProtocol::H3,
};
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
return result;
}
Err(e3) => {
debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed");
self.protocol_cache.record_failure(protocol_cache_key.clone(), crate::protocol_cache::DetectedProtocol::H3);
}
}
}
}
}
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend TLS connect timeout"));
}
}
@@ -937,6 +1030,36 @@ impl HttpProxyService {
);
self.metrics.backend_connect_error(&upstream_key);
self.upstream_selector.connection_ended(&upstream_key);
// --- Within-request escalation: try H3 via QUIC if retryable ---
if is_auto_detect_mode {
if let Some(h3_port) = cached_h3_port {
if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
debug!(backend = %upstream_key, domain = %domain_str, "TCP connect failed — escalating to H3");
match self.connect_quic_backend(&upstream.host, h3_port).await {
Ok(quic_conn) => {
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port);
let h3_pool_key = crate::connection_pool::PoolKey {
host: upstream.host.clone(), port: h3_port, use_tls: true,
protocol: crate::connection_pool::PoolProtocol::H3,
};
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
return result;
}
Err(e3) => {
debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed");
self.protocol_cache.record_failure(protocol_cache_key.clone(), crate::protocol_cache::DetectedProtocol::H3);
}
}
}
}
}
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable"));
}
Err(_) => {
@@ -948,6 +1071,36 @@ impl HttpProxyService {
);
self.metrics.backend_connect_error(&upstream_key);
self.upstream_selector.connection_ended(&upstream_key);
// --- Within-request escalation: try H3 via QUIC if retryable ---
if is_auto_detect_mode {
if let Some(h3_port) = cached_h3_port {
if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
debug!(backend = %upstream_key, domain = %domain_str, "TCP connect timeout — escalating to H3");
match self.connect_quic_backend(&upstream.host, h3_port).await {
Ok(quic_conn) => {
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port);
let h3_pool_key = crate::connection_pool::PoolKey {
host: upstream.host.clone(), port: h3_port, use_tls: true,
protocol: crate::connection_pool::PoolProtocol::H3,
};
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
return result;
}
Err(e3) => {
debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed");
self.protocol_cache.record_failure(protocol_cache_key.clone(), crate::protocol_cache::DetectedProtocol::H3);
}
}
}
}
}
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout"));
}
}
@@ -1416,6 +1569,11 @@ impl HttpProxyService {
port: upstream.port,
requested_host: requested_host.clone(),
};
// Record H2 failure (escalating cooldown) before downgrading cache to H1
self.protocol_cache.record_failure(
cache_key.clone(),
crate::protocol_cache::DetectedProtocol::H2,
);
self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1);
match self.reconnect_backend(upstream, domain, backend_key).await {
@@ -1549,12 +1707,16 @@ impl HttpProxyService {
self.metrics.backend_h2_failure(backend_key);
self.metrics.backend_handshake_error(backend_key);
// Update cache to H1 so subsequent requests skip H2
// Record H2 failure (escalating cooldown) and downgrade cache to H1
let cache_key = crate::protocol_cache::ProtocolCacheKey {
host: upstream.host.clone(),
port: upstream.port,
requested_host: requested_host.clone(),
};
self.protocol_cache.record_failure(
cache_key.clone(),
crate::protocol_cache::DetectedProtocol::H2,
);
self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1);
// Reconnect for H1 (the original io was consumed by the failed h2 handshake)
@@ -2569,7 +2731,7 @@ impl HttpProxyService {
let connecting = self.quinn_client_endpoint.connect(addr, &server_name)?;
let connection = tokio::time::timeout(QUIC_CONNECT_TIMEOUT, connecting).await
.map_err(|_| "QUIC connect timeout (3s)")??;
.map_err(|_| format!("QUIC connect timeout (3s) for {}", host))??;
debug!("QUIC backend connection established to {}:{}", host, port);
Ok(connection)

View File

@@ -31,6 +31,8 @@ pub struct Metrics {
pub total_udp_sessions: u64,
pub total_datagrams_in: u64,
pub total_datagrams_out: u64,
// Protocol detection cache snapshot (populated by RustProxy from HttpProxyService)
pub detected_protocols: Vec<ProtocolCacheEntryMetric>,
}
/// Per-route metrics.
@@ -76,6 +78,25 @@ pub struct BackendMetrics {
pub h2_failures: u64,
}
/// Protocol cache entry for metrics/UI display.
/// Populated from the HTTP proxy service's protocol detection cache.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ProtocolCacheEntryMetric {
pub host: String,
pub port: u16,
pub domain: Option<String>,
pub protocol: String,
pub h3_port: Option<u16>,
pub age_secs: u64,
pub h2_suppressed: bool,
pub h3_suppressed: bool,
pub h2_cooldown_remaining_secs: Option<u64>,
pub h3_cooldown_remaining_secs: Option<u64>,
pub h2_consecutive_failures: Option<u32>,
pub h3_consecutive_failures: Option<u32>,
}
/// Statistics snapshot.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -824,6 +845,7 @@ impl MetricsCollector {
total_udp_sessions: self.total_udp_sessions.load(Ordering::Relaxed),
total_datagrams_in: self.total_datagrams_in.load(Ordering::Relaxed),
total_datagrams_out: self.total_datagrams_out.load(Ordering::Relaxed),
detected_protocols: vec![],
}
}
}

View File

@@ -937,8 +937,29 @@ impl RustProxy {
}
/// Get current metrics snapshot.
/// Includes protocol cache entries from the HTTP proxy service.
pub fn get_metrics(&self) -> Metrics {
self.metrics.snapshot()
let mut metrics = self.metrics.snapshot();
if let Some(ref lm) = self.listener_manager {
let entries = lm.http_proxy().protocol_cache_snapshot();
metrics.detected_protocols = entries.into_iter().map(|e| {
rustproxy_metrics::ProtocolCacheEntryMetric {
host: e.host,
port: e.port,
domain: e.domain,
protocol: e.protocol,
h3_port: e.h3_port,
age_secs: e.age_secs,
h2_suppressed: e.h2_suppressed,
h3_suppressed: e.h3_suppressed,
h2_cooldown_remaining_secs: e.h2_cooldown_remaining_secs,
h3_cooldown_remaining_secs: e.h3_cooldown_remaining_secs,
h2_consecutive_failures: e.h2_consecutive_failures,
h3_consecutive_failures: e.h3_consecutive_failures,
}
}).collect();
}
metrics
}
/// Add a listening port at runtime.

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartproxy',
version: '26.0.0',
version: '26.1.0',
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
}

View File

@@ -72,6 +72,7 @@ export interface IMetrics {
byBackend(): Map<string, IBackendMetrics>;
protocols(): Map<string, string>;
topByErrors(limit?: number): Array<{ backend: string; errors: number }>;
detectedProtocols(): IProtocolCacheEntry[];
};
// UDP metrics
@@ -113,6 +114,26 @@ export interface IMetricsConfig {
prometheusPrefix: string; // Default: smartproxy_
}
/**
* Protocol cache entry from the Rust proxy's auto-detection cache.
* Shows which protocol (h1/h2/h3) is detected for each backend+domain pair,
* including failure suppression state with escalating cooldowns.
*/
export interface IProtocolCacheEntry {
host: string;
port: number;
domain: string | null;
protocol: string;
h3Port: number | null;
ageSecs: number;
h2Suppressed: boolean;
h3Suppressed: boolean;
h2CooldownRemainingSecs: number | null;
h3CooldownRemainingSecs: number | null;
h2ConsecutiveFailures: number | null;
h3ConsecutiveFailures: number | null;
}
/**
* Per-backend metrics
*/

View File

@@ -1,4 +1,4 @@
import type { IMetrics, IBackendMetrics, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js';
import type { IMetrics, IBackendMetrics, IProtocolCacheEntry, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js';
import type { RustProxyBridge } from './rust-proxy-bridge.js';
/**
@@ -216,6 +216,9 @@ export class RustMetricsAdapter implements IMetrics {
result.sort((a, b) => b.errors - a.errors);
return result.slice(0, limit);
},
detectedProtocols: (): IProtocolCacheEntry[] => {
return this.cache?.detectedProtocols ?? [];
},
};
public udp = {