Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 34dc0cb9b6 | |||
| c83c43194b | |||
| d026d7c266 | |||
| 3b01144c51 | |||
| 56f5697e1b | |||
| f04875885f |
20
changelog.md
20
changelog.md
@@ -1,5 +1,25 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 2026-03-23 - 26.2.1 - fix(rustproxy-http)
|
||||||
|
include the upstream request URL when caching H3 Alt-Svc discoveries
|
||||||
|
|
||||||
|
- Tracks the request path that triggered Alt-Svc discovery in connection activity state
|
||||||
|
- Adds request URL context to Alt-Svc debug logging and protocol cache insertion reasons for better traceability
|
||||||
|
|
||||||
|
## 2026-03-23 - 26.2.0 - feat(protocol-cache)
|
||||||
|
add sliding TTL re-probing and eviction for backend protocol detection
|
||||||
|
|
||||||
|
- extend protocol cache entries and metrics with last accessed and last probed timestamps
|
||||||
|
- trigger periodic ALPN re-probes for cached H1/H2 entries while keeping active entries alive with a sliding 1 day TTL
|
||||||
|
- log protocol transitions with reasons and evict cache entries when all protocol fallback attempts fail
|
||||||
|
|
||||||
|
## 2026-03-22 - 26.1.0 - feat(rustproxy-http)
|
||||||
|
add protocol failure suppression, h3 fallback escalation, and protocol cache metrics exposure
|
||||||
|
|
||||||
|
- introduces escalating cooldowns for failed H2/H3 protocol detection to prevent repeated upgrades to unstable backends
|
||||||
|
- adds within-request escalation to cached HTTP/3 when TCP or TLS backend connections fail in auto-detect mode
|
||||||
|
- exposes detected protocol cache entries and suppression state through Rust metrics and the TypeScript metrics adapter
|
||||||
|
|
||||||
## 2026-03-21 - 26.0.0 - BREAKING CHANGE(ts-api,rustproxy)
|
## 2026-03-21 - 26.0.0 - BREAKING CHANGE(ts-api,rustproxy)
|
||||||
remove deprecated TypeScript protocol and utility exports while hardening QUIC, HTTP/3, WebSocket, and rate limiter cleanup paths
|
remove deprecated TypeScript protocol and utility exports while hardening QUIC, HTTP/3, WebSocket, and rate limiter cleanup paths
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@push.rocks/smartproxy",
|
"name": "@push.rocks/smartproxy",
|
||||||
"version": "26.0.0",
|
"version": "26.2.1",
|
||||||
"private": false,
|
"private": false,
|
||||||
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
|
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
|
||||||
"main": "dist_ts/index.js",
|
"main": "dist_ts/index.js",
|
||||||
|
|||||||
@@ -1,43 +1,99 @@
|
|||||||
//! Bounded, TTL-based protocol detection cache for backend protocol auto-detection.
|
//! Bounded, sliding-TTL protocol detection cache with periodic re-probing and failure suppression.
|
||||||
//!
|
//!
|
||||||
//! Caches the detected protocol (H1, H2, or H3) per backend endpoint and requested
|
//! Caches the detected protocol (H1, H2, or H3) per backend endpoint and requested
|
||||||
//! domain (host:port + requested_host). This prevents cache oscillation when multiple
|
//! domain (host:port + requested_host). This prevents cache oscillation when multiple
|
||||||
//! frontend domains share the same backend but differ in protocol support.
|
//! frontend domains share the same backend but differ in protocol support.
|
||||||
//!
|
//!
|
||||||
//! H3 detection uses the browser model: Alt-Svc headers from H1/H2 responses are
|
//! ## Sliding TTL
|
||||||
//! parsed and cached, including the advertised H3 port (which may differ from TCP).
|
//!
|
||||||
|
//! Each cache hit refreshes the entry's expiry timer (`last_accessed_at`). Entries
|
||||||
|
//! remain valid for up to 1 day of continuous use. Every 5 minutes, the next request
|
||||||
|
//! triggers an inline ALPN re-probe to verify the cached protocol is still correct.
|
||||||
|
//!
|
||||||
|
//! ## Upgrade signals
|
||||||
|
//!
|
||||||
|
//! - ALPN (TLS handshake) → detects H2 vs H1
|
||||||
|
//! - Alt-Svc (response header) → advertises H3
|
||||||
|
//!
|
||||||
|
//! ## Protocol transitions
|
||||||
|
//!
|
||||||
|
//! All protocol changes are logged at `info!()` level with the reason:
|
||||||
|
//! "Protocol transition: H1 → H2 because periodic ALPN re-probe"
|
||||||
|
//!
|
||||||
|
//! ## Failure suppression
|
||||||
|
//!
|
||||||
|
//! When a protocol fails, `record_failure()` prevents upgrade signals from
|
||||||
|
//! re-introducing it until an escalating cooldown expires (5s → 10s → ... → 300s).
|
||||||
|
//! Within-request escalation is allowed via `can_retry()` after a 5s minimum gap.
|
||||||
|
//!
|
||||||
|
//! ## Total failure eviction
|
||||||
|
//!
|
||||||
|
//! When all protocols (H3, H2, H1) fail for a backend, the cache entry is evicted
|
||||||
|
//! entirely via `evict()`, forcing a fresh probe on the next request.
|
||||||
|
//!
|
||||||
|
//! Cascading: when a lower protocol also fails, higher protocol cooldowns are
|
||||||
|
//! reduced to 5s remaining (not instant clear), preventing tight retry loops.
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use tracing::debug;
|
use tracing::{debug, info};
|
||||||
|
|
||||||
/// TTL for cached protocol detection results.
|
/// Sliding TTL for cached protocol detection results.
|
||||||
/// After this duration, the next request will re-probe the backend.
|
/// Entries that haven't been accessed for this duration are evicted.
|
||||||
const PROTOCOL_CACHE_TTL: Duration = Duration::from_secs(300); // 5 minutes
|
/// Each `get()` call refreshes the timer (sliding window).
|
||||||
|
const PROTOCOL_CACHE_TTL: Duration = Duration::from_secs(86400); // 1 day
|
||||||
|
|
||||||
|
/// Interval between inline ALPN re-probes for H1/H2 entries.
|
||||||
|
/// When a cached entry's `last_probed_at` exceeds this, the next request
|
||||||
|
/// triggers an ALPN re-probe to verify the backend still speaks the same protocol.
|
||||||
|
const PROTOCOL_REPROBE_INTERVAL: Duration = Duration::from_secs(300); // 5 minutes
|
||||||
|
|
||||||
/// Maximum number of entries in the protocol cache.
|
/// Maximum number of entries in the protocol cache.
|
||||||
/// Prevents unbounded growth when backends come and go.
|
|
||||||
const PROTOCOL_CACHE_MAX_ENTRIES: usize = 4096;
|
const PROTOCOL_CACHE_MAX_ENTRIES: usize = 4096;
|
||||||
|
|
||||||
/// Background cleanup interval for the protocol cache.
|
/// Background cleanup interval.
|
||||||
const PROTOCOL_CACHE_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
|
const PROTOCOL_CACHE_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
|
||||||
|
|
||||||
|
/// Minimum cooldown between retry attempts of a failed protocol.
|
||||||
|
const PROTOCOL_FAILURE_COOLDOWN: Duration = Duration::from_secs(5);
|
||||||
|
|
||||||
|
/// Maximum cooldown (escalation ceiling).
|
||||||
|
const PROTOCOL_FAILURE_MAX_COOLDOWN: Duration = Duration::from_secs(300);
|
||||||
|
|
||||||
|
/// Consecutive failure count at which cooldown reaches maximum.
|
||||||
|
/// 5s × 2^5 = 160s, 5s × 2^6 = 320s → capped at 300s.
|
||||||
|
const PROTOCOL_FAILURE_ESCALATION_CAP: u32 = 6;
|
||||||
|
|
||||||
/// Detected backend protocol.
|
/// Detected backend protocol.
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||||
pub enum DetectedProtocol {
|
pub enum DetectedProtocol {
|
||||||
H1,
|
H1,
|
||||||
H2,
|
H2,
|
||||||
H3,
|
H3,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for DetectedProtocol {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
DetectedProtocol::H1 => write!(f, "H1"),
|
||||||
|
DetectedProtocol::H2 => write!(f, "H2"),
|
||||||
|
DetectedProtocol::H3 => write!(f, "H3"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Result of a protocol cache lookup.
|
/// Result of a protocol cache lookup.
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
pub struct CachedProtocol {
|
pub struct CachedProtocol {
|
||||||
pub protocol: DetectedProtocol,
|
pub protocol: DetectedProtocol,
|
||||||
/// For H3: the port advertised by Alt-Svc (may differ from TCP port).
|
/// For H3: the port advertised by Alt-Svc (may differ from TCP port).
|
||||||
pub h3_port: Option<u16>,
|
pub h3_port: Option<u16>,
|
||||||
|
/// True if the entry's `last_probed_at` exceeds `PROTOCOL_REPROBE_INTERVAL`.
|
||||||
|
/// Caller should perform an inline ALPN re-probe and call `update_probe_result()`.
|
||||||
|
/// Always `false` for H3 entries (H3 is discovered via Alt-Svc, not ALPN).
|
||||||
|
pub needs_reprobe: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Key for the protocol cache: (host, port, requested_host).
|
/// Key for the protocol cache: (host, port, requested_host).
|
||||||
@@ -50,24 +106,111 @@ pub struct ProtocolCacheKey {
|
|||||||
pub requested_host: Option<String>,
|
pub requested_host: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A cached protocol detection result with a timestamp.
|
/// A cached protocol detection result with timestamps.
|
||||||
struct CachedEntry {
|
struct CachedEntry {
|
||||||
protocol: DetectedProtocol,
|
protocol: DetectedProtocol,
|
||||||
|
/// When this protocol was first detected (or last changed).
|
||||||
detected_at: Instant,
|
detected_at: Instant,
|
||||||
|
/// Last time any request used this entry (sliding-window TTL).
|
||||||
|
last_accessed_at: Instant,
|
||||||
|
/// Last time an ALPN re-probe was performed for this entry.
|
||||||
|
last_probed_at: Instant,
|
||||||
/// For H3: the port advertised by Alt-Svc (may differ from TCP port).
|
/// For H3: the port advertised by Alt-Svc (may differ from TCP port).
|
||||||
h3_port: Option<u16>,
|
h3_port: Option<u16>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Bounded, TTL-based protocol detection cache.
|
/// Failure record for a single protocol level.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct FailureRecord {
|
||||||
|
/// When the failure was last recorded.
|
||||||
|
failed_at: Instant,
|
||||||
|
/// Current cooldown duration. Escalates on consecutive failures.
|
||||||
|
cooldown: Duration,
|
||||||
|
/// Number of consecutive failures (for escalation).
|
||||||
|
consecutive_failures: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Per-key failure state. Tracks failures at each upgradeable protocol level.
|
||||||
|
/// H1 is never tracked (it's the protocol floor — nothing to fall back to).
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
struct FailureState {
|
||||||
|
h2: Option<FailureRecord>,
|
||||||
|
h3: Option<FailureRecord>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FailureState {
|
||||||
|
fn is_empty(&self) -> bool {
|
||||||
|
self.h2.is_none() && self.h3.is_none()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn all_expired(&self) -> bool {
|
||||||
|
let h2_expired = self.h2.as_ref()
|
||||||
|
.map(|r| r.failed_at.elapsed() >= r.cooldown)
|
||||||
|
.unwrap_or(true);
|
||||||
|
let h3_expired = self.h3.as_ref()
|
||||||
|
.map(|r| r.failed_at.elapsed() >= r.cooldown)
|
||||||
|
.unwrap_or(true);
|
||||||
|
h2_expired && h3_expired
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get(&self, protocol: DetectedProtocol) -> Option<&FailureRecord> {
|
||||||
|
match protocol {
|
||||||
|
DetectedProtocol::H2 => self.h2.as_ref(),
|
||||||
|
DetectedProtocol::H3 => self.h3.as_ref(),
|
||||||
|
DetectedProtocol::H1 => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_mut(&mut self, protocol: DetectedProtocol) -> &mut Option<FailureRecord> {
|
||||||
|
match protocol {
|
||||||
|
DetectedProtocol::H2 => &mut self.h2,
|
||||||
|
DetectedProtocol::H3 => &mut self.h3,
|
||||||
|
DetectedProtocol::H1 => unreachable!("H1 failures are never recorded"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Snapshot of a single protocol cache entry, suitable for metrics/UI display.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ProtocolCacheEntry {
|
||||||
|
pub host: String,
|
||||||
|
pub port: u16,
|
||||||
|
pub domain: Option<String>,
|
||||||
|
pub protocol: String,
|
||||||
|
pub h3_port: Option<u16>,
|
||||||
|
pub age_secs: u64,
|
||||||
|
pub last_accessed_secs: u64,
|
||||||
|
pub last_probed_secs: u64,
|
||||||
|
pub h2_suppressed: bool,
|
||||||
|
pub h3_suppressed: bool,
|
||||||
|
pub h2_cooldown_remaining_secs: Option<u64>,
|
||||||
|
pub h3_cooldown_remaining_secs: Option<u64>,
|
||||||
|
pub h2_consecutive_failures: Option<u32>,
|
||||||
|
pub h3_consecutive_failures: Option<u32>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Exponential backoff: PROTOCOL_FAILURE_COOLDOWN × 2^(n-1), capped at MAX.
|
||||||
|
fn escalate_cooldown(consecutive: u32) -> Duration {
|
||||||
|
let base = PROTOCOL_FAILURE_COOLDOWN.as_secs();
|
||||||
|
let exp = consecutive.saturating_sub(1).min(63) as u64;
|
||||||
|
let secs = base.saturating_mul(1u64.checked_shl(exp as u32).unwrap_or(u64::MAX));
|
||||||
|
Duration::from_secs(secs.min(PROTOCOL_FAILURE_MAX_COOLDOWN.as_secs()))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Bounded, sliding-TTL protocol detection cache with failure suppression.
|
||||||
///
|
///
|
||||||
/// Memory safety guarantees:
|
/// Memory safety guarantees:
|
||||||
/// - Hard cap at `PROTOCOL_CACHE_MAX_ENTRIES` — cannot grow unboundedly.
|
/// - Hard cap at `PROTOCOL_CACHE_MAX_ENTRIES` — cannot grow unboundedly.
|
||||||
/// - TTL expiry — stale entries naturally age out on lookup.
|
/// - Sliding TTL expiry — entries age out after 1 day without access.
|
||||||
/// - Background cleanup task — proactively removes expired entries every 60s.
|
/// - Background cleanup task — proactively removes expired entries every 60s.
|
||||||
/// - `clear()` — called on route updates to discard stale detections.
|
/// - `clear()` — called on route updates to discard stale detections.
|
||||||
/// - `Drop` — aborts the background task to prevent dangling tokio tasks.
|
/// - `Drop` — aborts the background task to prevent dangling tokio tasks.
|
||||||
pub struct ProtocolCache {
|
pub struct ProtocolCache {
|
||||||
cache: Arc<DashMap<ProtocolCacheKey, CachedEntry>>,
|
cache: Arc<DashMap<ProtocolCacheKey, CachedEntry>>,
|
||||||
|
/// Generic protocol failure suppression map. Tracks per-protocol failure
|
||||||
|
/// records (H2, H3) for each cache key. Used to prevent upgrade signals
|
||||||
|
/// (ALPN, Alt-Svc) from re-introducing failed protocols.
|
||||||
|
failures: Arc<DashMap<ProtocolCacheKey, FailureState>>,
|
||||||
cleanup_handle: Option<tokio::task::JoinHandle<()>>,
|
cleanup_handle: Option<tokio::task::JoinHandle<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -75,26 +218,40 @@ impl ProtocolCache {
|
|||||||
/// Create a new protocol cache and start the background cleanup task.
|
/// Create a new protocol cache and start the background cleanup task.
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
let cache: Arc<DashMap<ProtocolCacheKey, CachedEntry>> = Arc::new(DashMap::new());
|
let cache: Arc<DashMap<ProtocolCacheKey, CachedEntry>> = Arc::new(DashMap::new());
|
||||||
|
let failures: Arc<DashMap<ProtocolCacheKey, FailureState>> = Arc::new(DashMap::new());
|
||||||
let cache_clone = Arc::clone(&cache);
|
let cache_clone = Arc::clone(&cache);
|
||||||
|
let failures_clone = Arc::clone(&failures);
|
||||||
let cleanup_handle = tokio::spawn(async move {
|
let cleanup_handle = tokio::spawn(async move {
|
||||||
Self::cleanup_loop(cache_clone).await;
|
Self::cleanup_loop(cache_clone, failures_clone).await;
|
||||||
});
|
});
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
cache,
|
cache,
|
||||||
|
failures,
|
||||||
cleanup_handle: Some(cleanup_handle),
|
cleanup_handle: Some(cleanup_handle),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Look up the cached protocol for a backend endpoint.
|
/// Look up the cached protocol for a backend endpoint.
|
||||||
|
///
|
||||||
/// Returns `None` if not cached or expired (caller should probe via ALPN).
|
/// Returns `None` if not cached or expired (caller should probe via ALPN).
|
||||||
|
/// On hit, refreshes `last_accessed_at` (sliding TTL) and sets `needs_reprobe`
|
||||||
|
/// if the entry hasn't been probed in over 5 minutes (H1/H2 only).
|
||||||
pub fn get(&self, key: &ProtocolCacheKey) -> Option<CachedProtocol> {
|
pub fn get(&self, key: &ProtocolCacheKey) -> Option<CachedProtocol> {
|
||||||
let entry = self.cache.get(key)?;
|
let mut entry = self.cache.get_mut(key)?;
|
||||||
if entry.detected_at.elapsed() < PROTOCOL_CACHE_TTL {
|
if entry.last_accessed_at.elapsed() < PROTOCOL_CACHE_TTL {
|
||||||
debug!("Protocol cache hit: {:?} for {}:{} (requested: {:?})", entry.protocol, key.host, key.port, key.requested_host);
|
// Refresh sliding TTL
|
||||||
|
entry.last_accessed_at = Instant::now();
|
||||||
|
|
||||||
|
// H3 is the ceiling — can't ALPN-probe for H3 (discovered via Alt-Svc).
|
||||||
|
// Only H1/H2 entries trigger periodic re-probing.
|
||||||
|
let needs_reprobe = entry.protocol != DetectedProtocol::H3
|
||||||
|
&& entry.last_probed_at.elapsed() >= PROTOCOL_REPROBE_INTERVAL;
|
||||||
|
|
||||||
Some(CachedProtocol {
|
Some(CachedProtocol {
|
||||||
protocol: entry.protocol,
|
protocol: entry.protocol,
|
||||||
h3_port: entry.h3_port,
|
h3_port: entry.h3_port,
|
||||||
|
needs_reprobe,
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
// Expired — remove and return None to trigger re-probe
|
// Expired — remove and return None to trigger re-probe
|
||||||
@@ -105,47 +262,328 @@ impl ProtocolCache {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Insert a detected protocol into the cache.
|
/// Insert a detected protocol into the cache.
|
||||||
/// If the cache is at capacity, evict the oldest entry first.
|
/// Returns `false` if suppressed due to active failure suppression.
|
||||||
pub fn insert(&self, key: ProtocolCacheKey, protocol: DetectedProtocol) {
|
///
|
||||||
self.insert_with_h3_port(key, protocol, None);
|
/// **Key semantic**: only suppresses if the protocol being inserted matches
|
||||||
|
/// a suppressed protocol. H1 inserts are NEVER suppressed — downgrades
|
||||||
|
/// always succeed.
|
||||||
|
pub fn insert(&self, key: ProtocolCacheKey, protocol: DetectedProtocol, reason: &str) -> bool {
|
||||||
|
if self.is_suppressed(&key, protocol) {
|
||||||
|
debug!(
|
||||||
|
host = %key.host, port = %key.port, domain = ?key.requested_host,
|
||||||
|
protocol = ?protocol,
|
||||||
|
"Protocol cache insert suppressed — recent failure"
|
||||||
|
);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
self.insert_internal(key, protocol, None, reason);
|
||||||
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Insert an H3 detection result with the Alt-Svc advertised port.
|
/// Insert an H3 detection result with the Alt-Svc advertised port.
|
||||||
pub fn insert_h3(&self, key: ProtocolCacheKey, h3_port: u16) {
|
/// Returns `false` if H3 is suppressed.
|
||||||
self.insert_with_h3_port(key, DetectedProtocol::H3, Some(h3_port));
|
pub fn insert_h3(&self, key: ProtocolCacheKey, h3_port: u16, reason: &str) -> bool {
|
||||||
|
if self.is_suppressed(&key, DetectedProtocol::H3) {
|
||||||
|
debug!(
|
||||||
|
host = %key.host, port = %key.port, domain = ?key.requested_host,
|
||||||
|
"H3 upgrade suppressed — recent failure"
|
||||||
|
);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
self.insert_internal(key, DetectedProtocol::H3, Some(h3_port), reason);
|
||||||
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Insert a protocol detection result with an optional H3 port.
|
/// Update the cache after an inline ALPN re-probe completes.
|
||||||
fn insert_with_h3_port(&self, key: ProtocolCacheKey, protocol: DetectedProtocol, h3_port: Option<u16>) {
|
///
|
||||||
if self.cache.len() >= PROTOCOL_CACHE_MAX_ENTRIES && !self.cache.contains_key(&key) {
|
/// Always updates `last_probed_at`. If the protocol changed, logs the transition
|
||||||
// Evict the oldest entry to stay within bounds
|
/// and updates the entry. Returns `Some(new_protocol)` if changed, `None` if unchanged.
|
||||||
let oldest = self.cache.iter()
|
pub fn update_probe_result(
|
||||||
.min_by_key(|entry| entry.value().detected_at)
|
&self,
|
||||||
.map(|entry| entry.key().clone());
|
key: &ProtocolCacheKey,
|
||||||
if let Some(oldest_key) = oldest {
|
probed_protocol: DetectedProtocol,
|
||||||
self.cache.remove(&oldest_key);
|
reason: &str,
|
||||||
|
) -> Option<DetectedProtocol> {
|
||||||
|
if let Some(mut entry) = self.cache.get_mut(key) {
|
||||||
|
let old_protocol = entry.protocol;
|
||||||
|
entry.last_probed_at = Instant::now();
|
||||||
|
entry.last_accessed_at = Instant::now();
|
||||||
|
|
||||||
|
if old_protocol != probed_protocol {
|
||||||
|
info!(
|
||||||
|
host = %key.host, port = %key.port, domain = ?key.requested_host,
|
||||||
|
old = %old_protocol, new = %probed_protocol, reason = %reason,
|
||||||
|
"Protocol transition"
|
||||||
|
);
|
||||||
|
entry.protocol = probed_protocol;
|
||||||
|
entry.detected_at = Instant::now();
|
||||||
|
// Clear h3_port if downgrading from H3
|
||||||
|
if old_protocol == DetectedProtocol::H3 && probed_protocol != DetectedProtocol::H3 {
|
||||||
|
entry.h3_port = None;
|
||||||
|
}
|
||||||
|
return Some(probed_protocol);
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
host = %key.host, port = %key.port, domain = ?key.requested_host,
|
||||||
|
protocol = %old_protocol, reason = %reason,
|
||||||
|
"Re-probe confirmed — no protocol change"
|
||||||
|
);
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
// Entry was evicted between the get() and the probe completing.
|
||||||
|
// Insert as a fresh entry.
|
||||||
|
self.insert_internal(key.clone(), probed_protocol, None, reason);
|
||||||
|
Some(probed_protocol)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Record a protocol failure. Future `insert()` calls for this protocol
|
||||||
|
/// will be suppressed until the escalating cooldown expires.
|
||||||
|
///
|
||||||
|
/// Cooldown escalation: 5s → 10s → 20s → 40s → 80s → 160s → 300s.
|
||||||
|
/// Consecutive counter resets if the previous failure is older than 2× its cooldown.
|
||||||
|
///
|
||||||
|
/// Cascading: when H2 fails, H3 cooldown is reduced to 5s remaining.
|
||||||
|
/// H1 failures are ignored (H1 is the protocol floor).
|
||||||
|
pub fn record_failure(&self, key: ProtocolCacheKey, protocol: DetectedProtocol) {
|
||||||
|
if protocol == DetectedProtocol::H1 {
|
||||||
|
return; // H1 is the floor — nothing to suppress
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut entry = self.failures.entry(key.clone()).or_default();
|
||||||
|
|
||||||
|
let record = entry.get_mut(protocol);
|
||||||
|
let (consecutive, new_cooldown) = match record {
|
||||||
|
Some(existing) if existing.failed_at.elapsed() < existing.cooldown.saturating_mul(2) => {
|
||||||
|
// Still within the "recent" window — escalate
|
||||||
|
let c = existing.consecutive_failures.saturating_add(1)
|
||||||
|
.min(PROTOCOL_FAILURE_ESCALATION_CAP);
|
||||||
|
(c, escalate_cooldown(c))
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
// First failure or old failure that expired long ago — reset
|
||||||
|
(1, PROTOCOL_FAILURE_COOLDOWN)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
*record = Some(FailureRecord {
|
||||||
|
failed_at: Instant::now(),
|
||||||
|
cooldown: new_cooldown,
|
||||||
|
consecutive_failures: consecutive,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Cascading: when H2 fails, reduce H3 cooldown to 5s remaining
|
||||||
|
if protocol == DetectedProtocol::H2 {
|
||||||
|
Self::reduce_cooldown_to(entry.h3.as_mut(), PROTOCOL_FAILURE_COOLDOWN);
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
host = %key.host, port = %key.port, domain = ?key.requested_host,
|
||||||
|
protocol = ?protocol,
|
||||||
|
consecutive = consecutive,
|
||||||
|
cooldown_secs = new_cooldown.as_secs(),
|
||||||
|
"Protocol failure recorded — suppressing for {:?}", new_cooldown
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check whether a protocol is currently suppressed for the given key.
|
||||||
|
/// Returns `true` if the protocol failed within its cooldown period.
|
||||||
|
/// H1 is never suppressed.
|
||||||
|
pub fn is_suppressed(&self, key: &ProtocolCacheKey, protocol: DetectedProtocol) -> bool {
|
||||||
|
if protocol == DetectedProtocol::H1 {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
self.failures.get(key)
|
||||||
|
.and_then(|entry| entry.get(protocol).map(|r| r.failed_at.elapsed() < r.cooldown))
|
||||||
|
.unwrap_or(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check whether a protocol can be retried (for within-request escalation).
|
||||||
|
/// Returns `true` if there's no failure record OR if ≥5s have passed since
|
||||||
|
/// the last attempt. More permissive than `is_suppressed`.
|
||||||
|
pub fn can_retry(&self, key: &ProtocolCacheKey, protocol: DetectedProtocol) -> bool {
|
||||||
|
if protocol == DetectedProtocol::H1 {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
match self.failures.get(key) {
|
||||||
|
Some(entry) => match entry.get(protocol) {
|
||||||
|
Some(r) => r.failed_at.elapsed() >= PROTOCOL_FAILURE_COOLDOWN,
|
||||||
|
None => true, // no failure record
|
||||||
|
},
|
||||||
|
None => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Record a retry attempt WITHOUT escalating the cooldown.
|
||||||
|
/// Resets the `failed_at` timestamp to prevent rapid retries (5s gate).
|
||||||
|
/// Called before an escalation attempt. If the attempt fails,
|
||||||
|
/// `record_failure` should be called afterward with proper escalation.
|
||||||
|
pub fn record_retry_attempt(&self, key: &ProtocolCacheKey, protocol: DetectedProtocol) {
|
||||||
|
if protocol == DetectedProtocol::H1 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if let Some(mut entry) = self.failures.get_mut(key) {
|
||||||
|
if let Some(ref mut r) = entry.get_mut(protocol) {
|
||||||
|
r.failed_at = Instant::now();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.cache.insert(key, CachedEntry {
|
}
|
||||||
protocol,
|
|
||||||
detected_at: Instant::now(),
|
/// Clear the failure record for a protocol (it recovered).
|
||||||
h3_port,
|
/// Called when an escalation retry succeeds.
|
||||||
});
|
pub fn clear_failure(&self, key: &ProtocolCacheKey, protocol: DetectedProtocol) {
|
||||||
|
if protocol == DetectedProtocol::H1 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if let Some(mut entry) = self.failures.get_mut(key) {
|
||||||
|
*entry.get_mut(protocol) = None;
|
||||||
|
if entry.is_empty() {
|
||||||
|
drop(entry);
|
||||||
|
self.failures.remove(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Evict a cache entry entirely. Called when all protocol probes (H3, H2, H1)
|
||||||
|
/// have failed for a backend.
|
||||||
|
pub fn evict(&self, key: &ProtocolCacheKey) {
|
||||||
|
self.cache.remove(key);
|
||||||
|
self.failures.remove(key);
|
||||||
|
info!(
|
||||||
|
host = %key.host, port = %key.port, domain = ?key.requested_host,
|
||||||
|
"Cache entry evicted — all protocols failed"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Clear all entries. Called on route updates to discard stale detections.
|
/// Clear all entries. Called on route updates to discard stale detections.
|
||||||
pub fn clear(&self) {
|
pub fn clear(&self) {
|
||||||
self.cache.clear();
|
self.cache.clear();
|
||||||
|
self.failures.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Background cleanup loop — removes expired entries every `PROTOCOL_CACHE_CLEANUP_INTERVAL`.
|
/// Snapshot all non-expired cache entries for metrics/UI display.
|
||||||
async fn cleanup_loop(cache: Arc<DashMap<ProtocolCacheKey, CachedEntry>>) {
|
pub fn snapshot(&self) -> Vec<ProtocolCacheEntry> {
|
||||||
|
self.cache.iter()
|
||||||
|
.filter(|entry| entry.value().last_accessed_at.elapsed() < PROTOCOL_CACHE_TTL)
|
||||||
|
.map(|entry| {
|
||||||
|
let key = entry.key();
|
||||||
|
let val = entry.value();
|
||||||
|
let failure_info = self.failures.get(key);
|
||||||
|
|
||||||
|
let (h2_sup, h2_cd, h2_cons) = Self::suppression_info(
|
||||||
|
failure_info.as_deref().and_then(|f| f.h2.as_ref()),
|
||||||
|
);
|
||||||
|
let (h3_sup, h3_cd, h3_cons) = Self::suppression_info(
|
||||||
|
failure_info.as_deref().and_then(|f| f.h3.as_ref()),
|
||||||
|
);
|
||||||
|
|
||||||
|
ProtocolCacheEntry {
|
||||||
|
host: key.host.clone(),
|
||||||
|
port: key.port,
|
||||||
|
domain: key.requested_host.clone(),
|
||||||
|
protocol: match val.protocol {
|
||||||
|
DetectedProtocol::H1 => "h1".to_string(),
|
||||||
|
DetectedProtocol::H2 => "h2".to_string(),
|
||||||
|
DetectedProtocol::H3 => "h3".to_string(),
|
||||||
|
},
|
||||||
|
h3_port: val.h3_port,
|
||||||
|
age_secs: val.detected_at.elapsed().as_secs(),
|
||||||
|
last_accessed_secs: val.last_accessed_at.elapsed().as_secs(),
|
||||||
|
last_probed_secs: val.last_probed_at.elapsed().as_secs(),
|
||||||
|
h2_suppressed: h2_sup,
|
||||||
|
h3_suppressed: h3_sup,
|
||||||
|
h2_cooldown_remaining_secs: h2_cd,
|
||||||
|
h3_cooldown_remaining_secs: h3_cd,
|
||||||
|
h2_consecutive_failures: h2_cons,
|
||||||
|
h3_consecutive_failures: h3_cons,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Internal helpers ---
|
||||||
|
|
||||||
|
/// Insert a protocol detection result with an optional H3 port.
|
||||||
|
/// Logs protocol transitions when overwriting an existing entry.
|
||||||
|
/// No suppression check — callers must check before calling.
|
||||||
|
fn insert_internal(&self, key: ProtocolCacheKey, protocol: DetectedProtocol, h3_port: Option<u16>, reason: &str) {
|
||||||
|
// Check for existing entry to log protocol transitions
|
||||||
|
if let Some(existing) = self.cache.get(&key) {
|
||||||
|
if existing.protocol != protocol {
|
||||||
|
info!(
|
||||||
|
host = %key.host, port = %key.port, domain = ?key.requested_host,
|
||||||
|
old = %existing.protocol, new = %protocol, reason = %reason,
|
||||||
|
"Protocol transition"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
drop(existing);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Evict oldest entry if at capacity
|
||||||
|
if self.cache.len() >= PROTOCOL_CACHE_MAX_ENTRIES && !self.cache.contains_key(&key) {
|
||||||
|
let oldest = self.cache.iter()
|
||||||
|
.min_by_key(|entry| entry.value().last_accessed_at)
|
||||||
|
.map(|entry| entry.key().clone());
|
||||||
|
if let Some(oldest_key) = oldest {
|
||||||
|
self.cache.remove(&oldest_key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
self.cache.insert(key, CachedEntry {
|
||||||
|
protocol,
|
||||||
|
detected_at: now,
|
||||||
|
last_accessed_at: now,
|
||||||
|
last_probed_at: now,
|
||||||
|
h3_port,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reduce a failure record's remaining cooldown to `target`, if it currently
|
||||||
|
/// has MORE than `target` remaining. Never increases cooldown.
|
||||||
|
fn reduce_cooldown_to(record: Option<&mut FailureRecord>, target: Duration) {
|
||||||
|
if let Some(r) = record {
|
||||||
|
let elapsed = r.failed_at.elapsed();
|
||||||
|
if elapsed < r.cooldown {
|
||||||
|
let remaining = r.cooldown - elapsed;
|
||||||
|
if remaining > target {
|
||||||
|
// Shrink cooldown so it expires in `target` from now
|
||||||
|
r.cooldown = elapsed + target;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract suppression info from a failure record for metrics.
|
||||||
|
fn suppression_info(record: Option<&FailureRecord>) -> (bool, Option<u64>, Option<u32>) {
|
||||||
|
match record {
|
||||||
|
Some(r) => {
|
||||||
|
let elapsed = r.failed_at.elapsed();
|
||||||
|
let suppressed = elapsed < r.cooldown;
|
||||||
|
let remaining = if suppressed {
|
||||||
|
Some((r.cooldown - elapsed).as_secs())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
(suppressed, remaining, Some(r.consecutive_failures))
|
||||||
|
}
|
||||||
|
None => (false, None, None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Background cleanup loop.
|
||||||
|
async fn cleanup_loop(
|
||||||
|
cache: Arc<DashMap<ProtocolCacheKey, CachedEntry>>,
|
||||||
|
failures: Arc<DashMap<ProtocolCacheKey, FailureState>>,
|
||||||
|
) {
|
||||||
let mut interval = tokio::time::interval(PROTOCOL_CACHE_CLEANUP_INTERVAL);
|
let mut interval = tokio::time::interval(PROTOCOL_CACHE_CLEANUP_INTERVAL);
|
||||||
loop {
|
loop {
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
|
|
||||||
|
// Clean expired cache entries (sliding TTL based on last_accessed_at)
|
||||||
let expired: Vec<ProtocolCacheKey> = cache.iter()
|
let expired: Vec<ProtocolCacheKey> = cache.iter()
|
||||||
.filter(|entry| entry.value().detected_at.elapsed() >= PROTOCOL_CACHE_TTL)
|
.filter(|entry| entry.value().last_accessed_at.elapsed() >= PROTOCOL_CACHE_TTL)
|
||||||
.map(|entry| entry.key().clone())
|
.map(|entry| entry.key().clone())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
@@ -155,6 +593,31 @@ impl ProtocolCache {
|
|||||||
cache.remove(&key);
|
cache.remove(&key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clean fully-expired failure entries
|
||||||
|
let expired_failures: Vec<ProtocolCacheKey> = failures.iter()
|
||||||
|
.filter(|entry| entry.value().all_expired())
|
||||||
|
.map(|entry| entry.key().clone())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if !expired_failures.is_empty() {
|
||||||
|
debug!("Protocol cache cleanup: removing {} expired failure entries", expired_failures.len());
|
||||||
|
for key in expired_failures {
|
||||||
|
failures.remove(&key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Safety net: cap failures map at 2× max entries
|
||||||
|
if failures.len() > PROTOCOL_CACHE_MAX_ENTRIES * 2 {
|
||||||
|
let oldest: Vec<ProtocolCacheKey> = failures.iter()
|
||||||
|
.filter(|e| e.value().all_expired())
|
||||||
|
.map(|e| e.key().clone())
|
||||||
|
.take(failures.len() - PROTOCOL_CACHE_MAX_ENTRIES)
|
||||||
|
.collect();
|
||||||
|
for key in oldest {
|
||||||
|
failures.remove(&key);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -47,6 +47,8 @@ pub struct ConnActivity {
|
|||||||
/// checks the backend's original response headers for Alt-Svc before our
|
/// checks the backend's original response headers for Alt-Svc before our
|
||||||
/// ResponseFilter injects its own. None when not in auto-detect mode or after H3 failure.
|
/// ResponseFilter injects its own. None when not in auto-detect mode or after H3 failure.
|
||||||
alt_svc_cache_key: Option<crate::protocol_cache::ProtocolCacheKey>,
|
alt_svc_cache_key: Option<crate::protocol_cache::ProtocolCacheKey>,
|
||||||
|
/// The upstream request path that triggered Alt-Svc discovery. Logged for traceability.
|
||||||
|
alt_svc_request_url: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ConnActivity {
|
impl ConnActivity {
|
||||||
@@ -58,6 +60,7 @@ impl ConnActivity {
|
|||||||
start: std::time::Instant::now(),
|
start: std::time::Instant::now(),
|
||||||
active_requests: None,
|
active_requests: None,
|
||||||
alt_svc_cache_key: None,
|
alt_svc_cache_key: None,
|
||||||
|
alt_svc_request_url: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -311,6 +314,11 @@ impl HttpProxyService {
|
|||||||
self.protocol_cache.clear();
|
self.protocol_cache.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Snapshot the protocol cache for metrics/UI display.
|
||||||
|
pub fn protocol_cache_snapshot(&self) -> Vec<crate::protocol_cache::ProtocolCacheEntry> {
|
||||||
|
self.protocol_cache.snapshot()
|
||||||
|
}
|
||||||
|
|
||||||
/// Handle an incoming HTTP connection on a plain TCP stream.
|
/// Handle an incoming HTTP connection on a plain TCP stream.
|
||||||
pub async fn handle_connection(
|
pub async fn handle_connection(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
@@ -366,7 +374,7 @@ impl HttpProxyService {
|
|||||||
let cn = cancel_inner.clone();
|
let cn = cancel_inner.clone();
|
||||||
let la = Arc::clone(&la_inner);
|
let la = Arc::clone(&la_inner);
|
||||||
let st = start;
|
let st = start;
|
||||||
let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start, active_requests: Some(Arc::clone(&ar_inner)), alt_svc_cache_key: None };
|
let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start, active_requests: Some(Arc::clone(&ar_inner)), alt_svc_cache_key: None, alt_svc_request_url: None };
|
||||||
async move {
|
async move {
|
||||||
let req = req.map(|body| BoxBody::new(body));
|
let req = req.map(|body| BoxBody::new(body));
|
||||||
let result = svc.handle_request(req, peer, port, cn, ca).await;
|
let result = svc.handle_request(req, peer, port, cn, ca).await;
|
||||||
@@ -701,6 +709,14 @@ impl HttpProxyService {
|
|||||||
port: upstream.port,
|
port: upstream.port,
|
||||||
requested_host: host.clone(),
|
requested_host: host.clone(),
|
||||||
};
|
};
|
||||||
|
// Save cached H3 port for within-request escalation (may be needed later
|
||||||
|
// if TCP connect fails and we escalate to H3 as a last resort)
|
||||||
|
let cached_h3_port = self.protocol_cache.get(&protocol_cache_key)
|
||||||
|
.and_then(|c| c.h3_port);
|
||||||
|
|
||||||
|
// Track whether this ALPN probe is a periodic re-probe (vs first-time detection)
|
||||||
|
let mut is_reprobe = false;
|
||||||
|
|
||||||
let protocol_decision = match backend_protocol_mode {
|
let protocol_decision = match backend_protocol_mode {
|
||||||
rustproxy_config::BackendProtocol::Http1 => ProtocolDecision::H1,
|
rustproxy_config::BackendProtocol::Http1 => ProtocolDecision::H1,
|
||||||
rustproxy_config::BackendProtocol::Http2 => ProtocolDecision::H2,
|
rustproxy_config::BackendProtocol::Http2 => ProtocolDecision::H2,
|
||||||
@@ -711,19 +727,40 @@ impl HttpProxyService {
|
|||||||
ProtocolDecision::H1
|
ProtocolDecision::H1
|
||||||
} else {
|
} else {
|
||||||
match self.protocol_cache.get(&protocol_cache_key) {
|
match self.protocol_cache.get(&protocol_cache_key) {
|
||||||
|
Some(cached) if cached.needs_reprobe => {
|
||||||
|
// Entry exists but 5+ minutes since last probe — force ALPN re-probe
|
||||||
|
// (only fires for H1/H2; H3 entries have needs_reprobe=false)
|
||||||
|
is_reprobe = true;
|
||||||
|
ProtocolDecision::AlpnProbe
|
||||||
|
}
|
||||||
Some(cached) => match cached.protocol {
|
Some(cached) => match cached.protocol {
|
||||||
crate::protocol_cache::DetectedProtocol::H3 => {
|
crate::protocol_cache::DetectedProtocol::H3 => {
|
||||||
if let Some(h3_port) = cached.h3_port {
|
if self.protocol_cache.is_suppressed(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
|
||||||
|
// H3 cached but suppressed — fall back to ALPN probe
|
||||||
|
ProtocolDecision::AlpnProbe
|
||||||
|
} else if let Some(h3_port) = cached.h3_port {
|
||||||
ProtocolDecision::H3 { port: h3_port }
|
ProtocolDecision::H3 { port: h3_port }
|
||||||
} else {
|
} else {
|
||||||
// H3 cached but no port — fall back to ALPN probe
|
|
||||||
ProtocolDecision::AlpnProbe
|
ProtocolDecision::AlpnProbe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
crate::protocol_cache::DetectedProtocol::H2 => ProtocolDecision::H2,
|
crate::protocol_cache::DetectedProtocol::H2 => {
|
||||||
|
if self.protocol_cache.is_suppressed(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H2) {
|
||||||
|
ProtocolDecision::H1
|
||||||
|
} else {
|
||||||
|
ProtocolDecision::H2
|
||||||
|
}
|
||||||
|
}
|
||||||
crate::protocol_cache::DetectedProtocol::H1 => ProtocolDecision::H1,
|
crate::protocol_cache::DetectedProtocol::H1 => ProtocolDecision::H1,
|
||||||
},
|
},
|
||||||
None => ProtocolDecision::AlpnProbe,
|
None => {
|
||||||
|
// Cache miss — skip ALPN probe if H2 is suppressed
|
||||||
|
if self.protocol_cache.is_suppressed(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H2) {
|
||||||
|
ProtocolDecision::H1
|
||||||
|
} else {
|
||||||
|
ProtocolDecision::AlpnProbe
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -741,6 +778,7 @@ impl HttpProxyService {
|
|||||||
// the backend's original Alt-Svc header before ResponseFilter injects our own.
|
// the backend's original Alt-Svc header before ResponseFilter injects our own.
|
||||||
if is_auto_detect_mode {
|
if is_auto_detect_mode {
|
||||||
conn_activity.alt_svc_cache_key = Some(protocol_cache_key.clone());
|
conn_activity.alt_svc_cache_key = Some(protocol_cache_key.clone());
|
||||||
|
conn_activity.alt_svc_request_url = Some(upstream_path.to_string());
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- H3 path: try QUIC connection before TCP ---
|
// --- H3 path: try QUIC connection before TCP ---
|
||||||
@@ -776,8 +814,16 @@ impl HttpProxyService {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(backend = %upstream_key, error = %e,
|
warn!(backend = %upstream_key, domain = %domain_str, error = %e,
|
||||||
"H3 backend connect failed, falling back to H2/H1");
|
"H3 backend connect failed, falling back to H2/H1");
|
||||||
|
// Record failure with escalating cooldown — prevents Alt-Svc
|
||||||
|
// from re-upgrading to H3 during cooldown period
|
||||||
|
if is_auto_detect_mode {
|
||||||
|
self.protocol_cache.record_failure(
|
||||||
|
protocol_cache_key.clone(),
|
||||||
|
crate::protocol_cache::DetectedProtocol::H3,
|
||||||
|
);
|
||||||
|
}
|
||||||
// Suppress Alt-Svc caching for the fallback to prevent re-caching H3
|
// Suppress Alt-Svc caching for the fallback to prevent re-caching H3
|
||||||
// from our own injected Alt-Svc header or a stale backend Alt-Svc
|
// from our own injected Alt-Svc header or a stale backend Alt-Svc
|
||||||
conn_activity.alt_svc_cache_key = None;
|
conn_activity.alt_svc_cache_key = None;
|
||||||
@@ -860,7 +906,7 @@ impl HttpProxyService {
|
|||||||
let alpn = tls.get_ref().1.alpn_protocol();
|
let alpn = tls.get_ref().1.alpn_protocol();
|
||||||
let is_h2 = alpn.map(|p| p == b"h2").unwrap_or(false);
|
let is_h2 = alpn.map(|p| p == b"h2").unwrap_or(false);
|
||||||
|
|
||||||
// Cache the result
|
// Cache the result (or update existing entry for re-probes)
|
||||||
let cache_key = crate::protocol_cache::ProtocolCacheKey {
|
let cache_key = crate::protocol_cache::ProtocolCacheKey {
|
||||||
host: upstream.host.clone(),
|
host: upstream.host.clone(),
|
||||||
port: upstream.port,
|
port: upstream.port,
|
||||||
@@ -871,13 +917,18 @@ impl HttpProxyService {
|
|||||||
} else {
|
} else {
|
||||||
crate::protocol_cache::DetectedProtocol::H1
|
crate::protocol_cache::DetectedProtocol::H1
|
||||||
};
|
};
|
||||||
self.protocol_cache.insert(cache_key, detected);
|
if is_reprobe {
|
||||||
|
self.protocol_cache.update_probe_result(&cache_key, detected, "periodic ALPN re-probe");
|
||||||
|
} else {
|
||||||
|
self.protocol_cache.insert(cache_key, detected, "initial ALPN detection");
|
||||||
|
}
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
backend = %upstream_key,
|
backend = %upstream_key,
|
||||||
domain = %domain_str,
|
domain = %domain_str,
|
||||||
protocol = if is_h2 { "h2" } else { "h1" },
|
protocol = if is_h2 { "h2" } else { "h1" },
|
||||||
connect_time_ms = %connect_start.elapsed().as_millis(),
|
connect_time_ms = %connect_start.elapsed().as_millis(),
|
||||||
|
reprobe = is_reprobe,
|
||||||
"Backend protocol detected via ALPN"
|
"Backend protocol detected via ALPN"
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -899,6 +950,38 @@ impl HttpProxyService {
|
|||||||
);
|
);
|
||||||
self.metrics.backend_connect_error(&upstream_key);
|
self.metrics.backend_connect_error(&upstream_key);
|
||||||
self.upstream_selector.connection_ended(&upstream_key);
|
self.upstream_selector.connection_ended(&upstream_key);
|
||||||
|
|
||||||
|
// --- Within-request escalation: try H3 via QUIC if retryable ---
|
||||||
|
if is_auto_detect_mode {
|
||||||
|
if let Some(h3_port) = cached_h3_port {
|
||||||
|
if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
|
||||||
|
self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
|
||||||
|
debug!(backend = %upstream_key, domain = %domain_str, "TLS connect failed — escalating to H3");
|
||||||
|
match self.connect_quic_backend(&upstream.host, h3_port).await {
|
||||||
|
Ok(quic_conn) => {
|
||||||
|
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
|
||||||
|
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port, "recovery — TLS failed, H3 succeeded");
|
||||||
|
let h3_pool_key = crate::connection_pool::PoolKey {
|
||||||
|
host: upstream.host.clone(), port: h3_port, use_tls: true,
|
||||||
|
protocol: crate::connection_pool::PoolProtocol::H3,
|
||||||
|
};
|
||||||
|
let result = self.forward_h3(
|
||||||
|
quic_conn, parts, body, upstream_headers, &upstream_path,
|
||||||
|
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
|
||||||
|
).await;
|
||||||
|
self.upstream_selector.connection_ended(&upstream_key);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
Err(e3) => {
|
||||||
|
debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed");
|
||||||
|
self.protocol_cache.record_failure(protocol_cache_key.clone(), crate::protocol_cache::DetectedProtocol::H3);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// All protocols failed — evict cache entry
|
||||||
|
self.protocol_cache.evict(&protocol_cache_key);
|
||||||
|
}
|
||||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend TLS unavailable"));
|
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend TLS unavailable"));
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
@@ -910,6 +993,38 @@ impl HttpProxyService {
|
|||||||
);
|
);
|
||||||
self.metrics.backend_connect_error(&upstream_key);
|
self.metrics.backend_connect_error(&upstream_key);
|
||||||
self.upstream_selector.connection_ended(&upstream_key);
|
self.upstream_selector.connection_ended(&upstream_key);
|
||||||
|
|
||||||
|
// --- Within-request escalation: try H3 via QUIC if retryable ---
|
||||||
|
if is_auto_detect_mode {
|
||||||
|
if let Some(h3_port) = cached_h3_port {
|
||||||
|
if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
|
||||||
|
self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
|
||||||
|
debug!(backend = %upstream_key, domain = %domain_str, "TLS connect timeout — escalating to H3");
|
||||||
|
match self.connect_quic_backend(&upstream.host, h3_port).await {
|
||||||
|
Ok(quic_conn) => {
|
||||||
|
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
|
||||||
|
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port, "recovery — TLS timeout, H3 succeeded");
|
||||||
|
let h3_pool_key = crate::connection_pool::PoolKey {
|
||||||
|
host: upstream.host.clone(), port: h3_port, use_tls: true,
|
||||||
|
protocol: crate::connection_pool::PoolProtocol::H3,
|
||||||
|
};
|
||||||
|
let result = self.forward_h3(
|
||||||
|
quic_conn, parts, body, upstream_headers, &upstream_path,
|
||||||
|
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
|
||||||
|
).await;
|
||||||
|
self.upstream_selector.connection_ended(&upstream_key);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
Err(e3) => {
|
||||||
|
debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed");
|
||||||
|
self.protocol_cache.record_failure(protocol_cache_key.clone(), crate::protocol_cache::DetectedProtocol::H3);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// All protocols failed — evict cache entry
|
||||||
|
self.protocol_cache.evict(&protocol_cache_key);
|
||||||
|
}
|
||||||
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend TLS connect timeout"));
|
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend TLS connect timeout"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -937,6 +1052,38 @@ impl HttpProxyService {
|
|||||||
);
|
);
|
||||||
self.metrics.backend_connect_error(&upstream_key);
|
self.metrics.backend_connect_error(&upstream_key);
|
||||||
self.upstream_selector.connection_ended(&upstream_key);
|
self.upstream_selector.connection_ended(&upstream_key);
|
||||||
|
|
||||||
|
// --- Within-request escalation: try H3 via QUIC if retryable ---
|
||||||
|
if is_auto_detect_mode {
|
||||||
|
if let Some(h3_port) = cached_h3_port {
|
||||||
|
if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
|
||||||
|
self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
|
||||||
|
debug!(backend = %upstream_key, domain = %domain_str, "TCP connect failed — escalating to H3");
|
||||||
|
match self.connect_quic_backend(&upstream.host, h3_port).await {
|
||||||
|
Ok(quic_conn) => {
|
||||||
|
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
|
||||||
|
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port, "recovery — TCP failed, H3 succeeded");
|
||||||
|
let h3_pool_key = crate::connection_pool::PoolKey {
|
||||||
|
host: upstream.host.clone(), port: h3_port, use_tls: true,
|
||||||
|
protocol: crate::connection_pool::PoolProtocol::H3,
|
||||||
|
};
|
||||||
|
let result = self.forward_h3(
|
||||||
|
quic_conn, parts, body, upstream_headers, &upstream_path,
|
||||||
|
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
|
||||||
|
).await;
|
||||||
|
self.upstream_selector.connection_ended(&upstream_key);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
Err(e3) => {
|
||||||
|
debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed");
|
||||||
|
self.protocol_cache.record_failure(protocol_cache_key.clone(), crate::protocol_cache::DetectedProtocol::H3);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// All protocols failed — evict cache entry
|
||||||
|
self.protocol_cache.evict(&protocol_cache_key);
|
||||||
|
}
|
||||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable"));
|
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable"));
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
@@ -948,6 +1095,38 @@ impl HttpProxyService {
|
|||||||
);
|
);
|
||||||
self.metrics.backend_connect_error(&upstream_key);
|
self.metrics.backend_connect_error(&upstream_key);
|
||||||
self.upstream_selector.connection_ended(&upstream_key);
|
self.upstream_selector.connection_ended(&upstream_key);
|
||||||
|
|
||||||
|
// --- Within-request escalation: try H3 via QUIC if retryable ---
|
||||||
|
if is_auto_detect_mode {
|
||||||
|
if let Some(h3_port) = cached_h3_port {
|
||||||
|
if self.protocol_cache.can_retry(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3) {
|
||||||
|
self.protocol_cache.record_retry_attempt(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
|
||||||
|
debug!(backend = %upstream_key, domain = %domain_str, "TCP connect timeout — escalating to H3");
|
||||||
|
match self.connect_quic_backend(&upstream.host, h3_port).await {
|
||||||
|
Ok(quic_conn) => {
|
||||||
|
self.protocol_cache.clear_failure(&protocol_cache_key, crate::protocol_cache::DetectedProtocol::H3);
|
||||||
|
self.protocol_cache.insert_h3(protocol_cache_key.clone(), h3_port, "recovery — TCP timeout, H3 succeeded");
|
||||||
|
let h3_pool_key = crate::connection_pool::PoolKey {
|
||||||
|
host: upstream.host.clone(), port: h3_port, use_tls: true,
|
||||||
|
protocol: crate::connection_pool::PoolProtocol::H3,
|
||||||
|
};
|
||||||
|
let result = self.forward_h3(
|
||||||
|
quic_conn, parts, body, upstream_headers, &upstream_path,
|
||||||
|
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
|
||||||
|
).await;
|
||||||
|
self.upstream_selector.connection_ended(&upstream_key);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
Err(e3) => {
|
||||||
|
debug!(backend = %upstream_key, error = %e3, "H3 escalation also failed");
|
||||||
|
self.protocol_cache.record_failure(protocol_cache_key.clone(), crate::protocol_cache::DetectedProtocol::H3);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// All protocols failed — evict cache entry
|
||||||
|
self.protocol_cache.evict(&protocol_cache_key);
|
||||||
|
}
|
||||||
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout"));
|
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1416,7 +1595,12 @@ impl HttpProxyService {
|
|||||||
port: upstream.port,
|
port: upstream.port,
|
||||||
requested_host: requested_host.clone(),
|
requested_host: requested_host.clone(),
|
||||||
};
|
};
|
||||||
self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1);
|
// Record H2 failure (escalating cooldown) before downgrading cache to H1
|
||||||
|
self.protocol_cache.record_failure(
|
||||||
|
cache_key.clone(),
|
||||||
|
crate::protocol_cache::DetectedProtocol::H2,
|
||||||
|
);
|
||||||
|
self.protocol_cache.insert(cache_key.clone(), crate::protocol_cache::DetectedProtocol::H1, "H2 handshake timeout — downgrade");
|
||||||
|
|
||||||
match self.reconnect_backend(upstream, domain, backend_key).await {
|
match self.reconnect_backend(upstream, domain, backend_key).await {
|
||||||
Some(fallback_backend) => {
|
Some(fallback_backend) => {
|
||||||
@@ -1435,6 +1619,8 @@ impl HttpProxyService {
|
|||||||
result
|
result
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
|
// H2 failed and H1 reconnect also failed — evict cache
|
||||||
|
self.protocol_cache.evict(&cache_key);
|
||||||
Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable after H2 timeout fallback"))
|
Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable after H2 timeout fallback"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1549,13 +1735,17 @@ impl HttpProxyService {
|
|||||||
self.metrics.backend_h2_failure(backend_key);
|
self.metrics.backend_h2_failure(backend_key);
|
||||||
self.metrics.backend_handshake_error(backend_key);
|
self.metrics.backend_handshake_error(backend_key);
|
||||||
|
|
||||||
// Update cache to H1 so subsequent requests skip H2
|
// Record H2 failure (escalating cooldown) and downgrade cache to H1
|
||||||
let cache_key = crate::protocol_cache::ProtocolCacheKey {
|
let cache_key = crate::protocol_cache::ProtocolCacheKey {
|
||||||
host: upstream.host.clone(),
|
host: upstream.host.clone(),
|
||||||
port: upstream.port,
|
port: upstream.port,
|
||||||
requested_host: requested_host.clone(),
|
requested_host: requested_host.clone(),
|
||||||
};
|
};
|
||||||
self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1);
|
self.protocol_cache.record_failure(
|
||||||
|
cache_key.clone(),
|
||||||
|
crate::protocol_cache::DetectedProtocol::H2,
|
||||||
|
);
|
||||||
|
self.protocol_cache.insert(cache_key.clone(), crate::protocol_cache::DetectedProtocol::H1, "H2 handshake error — downgrade");
|
||||||
|
|
||||||
// Reconnect for H1 (the original io was consumed by the failed h2 handshake)
|
// Reconnect for H1 (the original io was consumed by the failed h2 handshake)
|
||||||
match self.reconnect_backend(upstream, domain, backend_key).await {
|
match self.reconnect_backend(upstream, domain, backend_key).await {
|
||||||
@@ -1576,6 +1766,8 @@ impl HttpProxyService {
|
|||||||
result
|
result
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
|
// H2 failed and H1 reconnect also failed — evict cache
|
||||||
|
self.protocol_cache.evict(&cache_key);
|
||||||
Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable after H2 fallback"))
|
Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable after H2 fallback"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1791,8 +1983,10 @@ impl HttpProxyService {
|
|||||||
if let Some(ref cache_key) = conn_activity.alt_svc_cache_key {
|
if let Some(ref cache_key) = conn_activity.alt_svc_cache_key {
|
||||||
if let Some(alt_svc) = resp_parts.headers.get("alt-svc").and_then(|v| v.to_str().ok()) {
|
if let Some(alt_svc) = resp_parts.headers.get("alt-svc").and_then(|v| v.to_str().ok()) {
|
||||||
if let Some(h3_port) = parse_alt_svc_h3_port(alt_svc) {
|
if let Some(h3_port) = parse_alt_svc_h3_port(alt_svc) {
|
||||||
debug!(h3_port, "Backend advertises H3 via Alt-Svc");
|
let url = conn_activity.alt_svc_request_url.as_deref().unwrap_or("-");
|
||||||
self.protocol_cache.insert_h3(cache_key.clone(), h3_port);
|
debug!(h3_port, url, "Backend advertises H3 via Alt-Svc");
|
||||||
|
let reason = format!("Alt-Svc response header ({})", url);
|
||||||
|
self.protocol_cache.insert_h3(cache_key.clone(), h3_port, &reason);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -2569,7 +2763,7 @@ impl HttpProxyService {
|
|||||||
let connecting = self.quinn_client_endpoint.connect(addr, &server_name)?;
|
let connecting = self.quinn_client_endpoint.connect(addr, &server_name)?;
|
||||||
|
|
||||||
let connection = tokio::time::timeout(QUIC_CONNECT_TIMEOUT, connecting).await
|
let connection = tokio::time::timeout(QUIC_CONNECT_TIMEOUT, connecting).await
|
||||||
.map_err(|_| "QUIC connect timeout (3s)")??;
|
.map_err(|_| format!("QUIC connect timeout (3s) for {}", host))??;
|
||||||
|
|
||||||
debug!("QUIC backend connection established to {}:{}", host, port);
|
debug!("QUIC backend connection established to {}:{}", host, port);
|
||||||
Ok(connection)
|
Ok(connection)
|
||||||
|
|||||||
@@ -31,6 +31,8 @@ pub struct Metrics {
|
|||||||
pub total_udp_sessions: u64,
|
pub total_udp_sessions: u64,
|
||||||
pub total_datagrams_in: u64,
|
pub total_datagrams_in: u64,
|
||||||
pub total_datagrams_out: u64,
|
pub total_datagrams_out: u64,
|
||||||
|
// Protocol detection cache snapshot (populated by RustProxy from HttpProxyService)
|
||||||
|
pub detected_protocols: Vec<ProtocolCacheEntryMetric>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Per-route metrics.
|
/// Per-route metrics.
|
||||||
@@ -76,6 +78,27 @@ pub struct BackendMetrics {
|
|||||||
pub h2_failures: u64,
|
pub h2_failures: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Protocol cache entry for metrics/UI display.
|
||||||
|
/// Populated from the HTTP proxy service's protocol detection cache.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct ProtocolCacheEntryMetric {
|
||||||
|
pub host: String,
|
||||||
|
pub port: u16,
|
||||||
|
pub domain: Option<String>,
|
||||||
|
pub protocol: String,
|
||||||
|
pub h3_port: Option<u16>,
|
||||||
|
pub age_secs: u64,
|
||||||
|
pub last_accessed_secs: u64,
|
||||||
|
pub last_probed_secs: u64,
|
||||||
|
pub h2_suppressed: bool,
|
||||||
|
pub h3_suppressed: bool,
|
||||||
|
pub h2_cooldown_remaining_secs: Option<u64>,
|
||||||
|
pub h3_cooldown_remaining_secs: Option<u64>,
|
||||||
|
pub h2_consecutive_failures: Option<u32>,
|
||||||
|
pub h3_consecutive_failures: Option<u32>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Statistics snapshot.
|
/// Statistics snapshot.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
@@ -824,6 +847,7 @@ impl MetricsCollector {
|
|||||||
total_udp_sessions: self.total_udp_sessions.load(Ordering::Relaxed),
|
total_udp_sessions: self.total_udp_sessions.load(Ordering::Relaxed),
|
||||||
total_datagrams_in: self.total_datagrams_in.load(Ordering::Relaxed),
|
total_datagrams_in: self.total_datagrams_in.load(Ordering::Relaxed),
|
||||||
total_datagrams_out: self.total_datagrams_out.load(Ordering::Relaxed),
|
total_datagrams_out: self.total_datagrams_out.load(Ordering::Relaxed),
|
||||||
|
detected_protocols: vec![],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -937,8 +937,31 @@ impl RustProxy {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Get current metrics snapshot.
|
/// Get current metrics snapshot.
|
||||||
|
/// Includes protocol cache entries from the HTTP proxy service.
|
||||||
pub fn get_metrics(&self) -> Metrics {
|
pub fn get_metrics(&self) -> Metrics {
|
||||||
self.metrics.snapshot()
|
let mut metrics = self.metrics.snapshot();
|
||||||
|
if let Some(ref lm) = self.listener_manager {
|
||||||
|
let entries = lm.http_proxy().protocol_cache_snapshot();
|
||||||
|
metrics.detected_protocols = entries.into_iter().map(|e| {
|
||||||
|
rustproxy_metrics::ProtocolCacheEntryMetric {
|
||||||
|
host: e.host,
|
||||||
|
port: e.port,
|
||||||
|
domain: e.domain,
|
||||||
|
protocol: e.protocol,
|
||||||
|
h3_port: e.h3_port,
|
||||||
|
age_secs: e.age_secs,
|
||||||
|
last_accessed_secs: e.last_accessed_secs,
|
||||||
|
last_probed_secs: e.last_probed_secs,
|
||||||
|
h2_suppressed: e.h2_suppressed,
|
||||||
|
h3_suppressed: e.h3_suppressed,
|
||||||
|
h2_cooldown_remaining_secs: e.h2_cooldown_remaining_secs,
|
||||||
|
h3_cooldown_remaining_secs: e.h3_cooldown_remaining_secs,
|
||||||
|
h2_consecutive_failures: e.h2_consecutive_failures,
|
||||||
|
h3_consecutive_failures: e.h3_consecutive_failures,
|
||||||
|
}
|
||||||
|
}).collect();
|
||||||
|
}
|
||||||
|
metrics
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a listening port at runtime.
|
/// Add a listening port at runtime.
|
||||||
|
|||||||
@@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: '@push.rocks/smartproxy',
|
name: '@push.rocks/smartproxy',
|
||||||
version: '26.0.0',
|
version: '26.2.1',
|
||||||
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
|
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -72,6 +72,7 @@ export interface IMetrics {
|
|||||||
byBackend(): Map<string, IBackendMetrics>;
|
byBackend(): Map<string, IBackendMetrics>;
|
||||||
protocols(): Map<string, string>;
|
protocols(): Map<string, string>;
|
||||||
topByErrors(limit?: number): Array<{ backend: string; errors: number }>;
|
topByErrors(limit?: number): Array<{ backend: string; errors: number }>;
|
||||||
|
detectedProtocols(): IProtocolCacheEntry[];
|
||||||
};
|
};
|
||||||
|
|
||||||
// UDP metrics
|
// UDP metrics
|
||||||
@@ -113,6 +114,28 @@ export interface IMetricsConfig {
|
|||||||
prometheusPrefix: string; // Default: smartproxy_
|
prometheusPrefix: string; // Default: smartproxy_
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Protocol cache entry from the Rust proxy's auto-detection cache.
|
||||||
|
* Shows which protocol (h1/h2/h3) is detected for each backend+domain pair,
|
||||||
|
* including failure suppression state with escalating cooldowns.
|
||||||
|
*/
|
||||||
|
export interface IProtocolCacheEntry {
|
||||||
|
host: string;
|
||||||
|
port: number;
|
||||||
|
domain: string | null;
|
||||||
|
protocol: string;
|
||||||
|
h3Port: number | null;
|
||||||
|
ageSecs: number;
|
||||||
|
lastAccessedSecs: number;
|
||||||
|
lastProbedSecs: number;
|
||||||
|
h2Suppressed: boolean;
|
||||||
|
h3Suppressed: boolean;
|
||||||
|
h2CooldownRemainingSecs: number | null;
|
||||||
|
h3CooldownRemainingSecs: number | null;
|
||||||
|
h2ConsecutiveFailures: number | null;
|
||||||
|
h3ConsecutiveFailures: number | null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Per-backend metrics
|
* Per-backend metrics
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import type { IMetrics, IBackendMetrics, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js';
|
import type { IMetrics, IBackendMetrics, IProtocolCacheEntry, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js';
|
||||||
import type { RustProxyBridge } from './rust-proxy-bridge.js';
|
import type { RustProxyBridge } from './rust-proxy-bridge.js';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -216,6 +216,9 @@ export class RustMetricsAdapter implements IMetrics {
|
|||||||
result.sort((a, b) => b.errors - a.errors);
|
result.sort((a, b) => b.errors - a.errors);
|
||||||
return result.slice(0, limit);
|
return result.slice(0, limit);
|
||||||
},
|
},
|
||||||
|
detectedProtocols: (): IProtocolCacheEntry[] => {
|
||||||
|
return this.cache?.detectedProtocols ?? [];
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
public udp = {
|
public udp = {
|
||||||
|
|||||||
Reference in New Issue
Block a user