Compare commits

...

19 Commits

Author SHA1 Message Date
be9898805f v25.9.2
Some checks failed
Default (tags) / security (push) Successful in 41s
Default (tags) / test (push) Failing after 4m0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-08 15:24:18 +00:00
d4aa46aed7 fix(protocol-cache): Include requested_host in protocol detection cache key to avoid cache oscillation when multiple frontend domains share the same backend 2026-03-08 15:24:18 +00:00
4f1c5c919f v25.9.1
Some checks failed
Default (tags) / security (push) Successful in 48s
Default (tags) / test (push) Failing after 4m3s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-03 16:14:16 +00:00
d51b2c5890 fix(rustproxy): Cancel connections for routes removed/disabled by adding per-route cancellation tokens and make RouteManager swappable (ArcSwap) for runtime updates 2026-03-03 16:14:16 +00:00
bb471a8cc9 v25.9.0
Some checks failed
Default (tags) / security (push) Successful in 41s
Default (tags) / test (push) Failing after 4m0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-03 11:04:01 +00:00
c52128f12d feat(rustproxy-http): add HTTP/2 auto-detection via ALPN with TTL-backed protocol cache and h1-only/h2 ALPN client configs 2026-03-03 11:04:01 +00:00
e69de246e9 v25.8.5
Some checks failed
Default (tags) / security (push) Successful in 43s
Default (tags) / test (push) Failing after 4m1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-26 21:31:38 +00:00
5126049ae6 fix(release): bump patch version (no source changes) 2026-02-26 21:31:38 +00:00
8db621657f fix(proxy): close connection buildup vectors in HTTP idle, WebSocket, socket relay, and TLS forwarding paths
- Add HTTP keep-alive idle timeout (60s default) with periodic watchdog that
  skips active requests (panic-safe via RAII ActiveRequestGuard)
- Make WebSocket inactivity/max-lifetime timeouts configurable from ConnectionConfig
  instead of hardcoded 1h/24h
- Replace bare copy_bidirectional in socket handler relay with timeout+cancel-aware
  split forwarding (inactivity, max lifetime, graceful shutdown)
- Add CancellationToken to forward_bidirectional_split_with_timeouts so TLS-terminated
  TCP connections respond to graceful shutdown
- Fix graceful_stop to actually abort listener tasks that exceed the shutdown deadline
  (previously they detached and ran forever)
- Add 10s metadata parsing timeout on TS socket-handler-server to prevent stuck sockets
2026-02-26 21:29:19 +00:00
ef060d5e79 v25.8.4
Some checks failed
Default (tags) / security (push) Successful in 40s
Default (tags) / test (push) Failing after 4m1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-26 17:32:35 +00:00
cd7f3f7f75 fix(proxy): adjust default proxy timeouts and keep-alive behavior to shorter, more consistent values 2026-02-26 17:32:35 +00:00
8df18728d4 v25.8.3
Some checks failed
Default (tags) / security (push) Successful in 29s
Default (tags) / test (push) Failing after 4m2s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-26 17:01:57 +00:00
bedecc6b6b fix(smartproxy): no code or dependency changes detected; no version bump required 2026-02-26 17:01:57 +00:00
b5f166bc92 v25.8.2
Some checks failed
Default (tags) / security (push) Successful in 31s
Default (tags) / test (push) Failing after 4m1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-26 16:58:30 +00:00
94266222fe fix(connection): improve connection handling and timeouts 2026-02-26 16:58:30 +00:00
697d51a9d4 v25.8.1
Some checks failed
Default (tags) / security (push) Successful in 42s
Default (tags) / test (push) Failing after 4m4s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-25 00:12:41 +00:00
7e5fe2bec3 fix(allocator): switch global allocator from tikv-jemallocator to mimalloc 2026-02-25 00:12:41 +00:00
f592bf627f v25.8.0
Some checks failed
Default (tags) / security (push) Successful in 42s
Default (tags) / test (push) Failing after 4m1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-24 23:22:49 +00:00
6114a00fb8 feat(rustproxy): use tikv-jemallocator as the global allocator to reduce glibc fragmentation and slow RSS growth; add allocator dependency and enable it in rustproxy, update lockfile, and run tsrust before tests 2026-02-24 23:22:49 +00:00
23 changed files with 939 additions and 128 deletions

View File

@@ -1,5 +1,82 @@
# Changelog # Changelog
## 2026-03-08 - 25.9.2 - fix(protocol-cache)
Include requested_host in protocol detection cache key to avoid cache oscillation when multiple frontend domains share the same backend
- Add ProtocolCacheKey.requested_host: Option<String> to distinguish cache entries by incoming request Host/:authority
- Update protocol cache lookups/inserts in proxy_service to populate requested_host
- Enhance debug logging to show requested_host on cache hits
- Fixes repeated ALPN probing / cache oscillation when different frontend domains share a backend with differing HTTP/2 support
## 2026-03-03 - 25.9.1 - fix(rustproxy)
Cancel connections for routes removed/disabled by adding per-route cancellation tokens and make RouteManager swappable (ArcSwap) for runtime updates
- Add per-route CancellationToken map (DashMap) to TcpListenerManager and call token.cancel() when routes are removed (invalidate_removed_routes)
- Propagate Arc<ArcSwap<RouteManager>> into HttpProxyService and passthrough listener so the route manager can be hot-swapped without restarting listeners
- Use per-route child cancellation tokens in accept/connection handling and forwarders to terminate existing connections when a route is removed
- Prune HTTP proxy caches and retain/cleanup per-route tokens when routes are active/removed
- Update test.test.sni-requirement.node.ts to allocate unique free ports via findFreePorts to avoid port conflicts during tests
## 2026-03-03 - 25.9.0 - feat(rustproxy-http)
add HTTP/2 auto-detection via ALPN with TTL-backed protocol cache and h1-only/h2 ALPN client configs
- Add protocol_cache module: bounded, TTL-based cache (5min TTL), max entries (4096), background cleanup task and clear() to discard stale detections.
- Introduce BackendProtocol::Auto and expose 'auto' in TypeScript route types to allow ALPN-based protocol auto-detection.
- Add build_tls_acceptor_h1_only() to create a TLS acceptor that advertises only http/1.1 (used for backends/tests that speak plain HTTP/1.1).
- Add shared_backend_tls_config_alpn() and default_backend_tls_config_with_alpn() to provide client TLS configs advertising h2+http/1.1 for auto-detection.
- Wire backend_tls_config_alpn and protocol_cache into proxy_service, tcp_listener and passthrough paths; add set_backend_tls_config_alpn() and prune protocol_cache on route updates.
- Update passthrough tests to use h1-only acceptor to avoid false HTTP/2 detection when backends speak plain HTTP/1.1.
- Include reconnection/fallback handling and ensure ALPN-enabled client config is used for auto-detection mode.
## 2026-02-26 - 25.8.5 - fix(release)
bump patch version (no source changes)
- No changes detected in git diff
- Current version: 25.8.4
- Recommend patch bump to 25.8.5 to record release without code changes
## 2026-02-26 - 25.8.4 - fix(proxy)
adjust default proxy timeouts and keep-alive behavior to shorter, more consistent values
- Increase connection timeout default from 30,000ms to 60,000ms (30s -> 60s).
- Reduce socket timeout default from 3,600,000ms to 60,000ms (1h -> 60s).
- Reduce max connection lifetime default from 86,400,000ms to 3,600,000ms (24h -> 1h).
- Change inactivity timeout default from 14,400,000ms to 75,000ms (4h -> 75s).
- Update keep-alive defaults: keepAliveTreatment 'extended' -> 'standard', keepAliveInactivityMultiplier 6 -> 4, extendedKeepAliveLifetime 604800000 -> 3,600,000ms (7d -> 1h).
- Apply these consistent default values across Rust crates (rustproxy-config, rustproxy-passthrough) and the TypeScript smart-proxy implementation.
- Update unit test expectations to match the new defaults.
## 2026-02-26 - 25.8.3 - fix(smartproxy)
no code or dependency changes detected; no version bump required
- No files changed in the provided diff (No changes).
- package.json version remains 25.8.2.
- No dependency or source updates detected; skip release.
## 2026-02-26 - 25.8.2 - fix(connection)
improve connection handling and timeouts
- Flush logs on process beforeExit and avoid calling process.exit in SIGINT/SIGTERM handlers to preserve host graceful shutdown
- Store protocol entries with a createdAt timestamp in ProtocolDetector and remove stale entries older than 30s to prevent leaked state from abandoned handshakes or port scanners
- Add backend connect timeout (30s) and idle timeouts (5 minutes) for dynamic forwards; destroy sockets on timeout and emit logs for timeout events
## 2026-02-25 - 25.8.1 - fix(allocator)
switch global allocator from tikv-jemallocator to mimalloc
- Replaced tikv-jemallocator with mimalloc in rust/Cargo.toml workspace dependencies.
- Updated rust/crates/rustproxy/Cargo.toml to use mimalloc as a workspace dependency.
- Updated rust/Cargo.lock: added mimalloc and libmimalloc-sys entries and removed tikv-jemallocator and tikv-jemalloc-sys entries.
- Changed the global allocator in crates/rustproxy/src/main.rs from tikv_jemallocator::Jemalloc to mimalloc::MiMalloc.
- Impact: runtime memory allocator is changed which may affect memory usage and performance; no public API changes but recommend testing memory/performance in deployments.
## 2026-02-24 - 25.8.0 - feat(rustproxy)
use tikv-jemallocator as the global allocator to reduce glibc fragmentation and slow RSS growth; add allocator dependency and enable it in rustproxy, update lockfile, and run tsrust before tests
- Added tikv-jemallocator dependency to rust/Cargo.toml and rust/crates/rustproxy/Cargo.toml
- Enabled tikv_jemallocator as the global allocator in rust/crates/rustproxy/src/main.rs
- Updated rust/Cargo.lock with tikv-jemallocator and tikv-jemalloc-sys entries
- Modified package.json test script to run tsrust before tstest
## 2026-02-24 - 25.7.10 - fix(rustproxy) ## 2026-02-24 - 25.7.10 - fix(rustproxy)
Use cooperative cancellation for background tasks, prune stale caches and metric entries, and switch tests to dynamic port allocation to avoid port conflicts Use cooperative cancellation for background tasks, prune stale caches and metric entries, and switch tests to dynamic port allocation to avoid port conflicts

View File

@@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartproxy", "name": "@push.rocks/smartproxy",
"version": "25.7.10", "version": "25.9.2",
"private": false, "private": false,
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.", "description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",
@@ -9,7 +9,7 @@
"author": "Lossless GmbH", "author": "Lossless GmbH",
"license": "MIT", "license": "MIT",
"scripts": { "scripts": {
"test": "(tstest test/**/test*.ts --verbose --timeout 60 --logfile)", "test": "(tsrust) && (tstest test/**/test*.ts --verbose --timeout 60 --logfile)",
"build": "(tsbuild tsfolders --allowimplicitany) && (tsrust)", "build": "(tsbuild tsfolders --allowimplicitany) && (tsrust)",
"format": "(gitzone format)", "format": "(gitzone format)",
"buildDocs": "tsdoc" "buildDocs": "tsdoc"

20
rust/Cargo.lock generated
View File

@@ -612,6 +612,16 @@ version = "0.2.180"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc"
[[package]]
name = "libmimalloc-sys"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "667f4fec20f29dfc6bc7357c582d91796c169ad7e2fce709468aefeb2c099870"
dependencies = [
"cc",
"libc",
]
[[package]] [[package]]
name = "lock_api" name = "lock_api"
version = "0.4.14" version = "0.4.14"
@@ -642,6 +652,15 @@ version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79"
[[package]]
name = "mimalloc"
version = "0.1.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1ee66a4b64c74f4ef288bcbb9192ad9c3feaad75193129ac8509af543894fd8"
dependencies = [
"libmimalloc-sys",
]
[[package]] [[package]]
name = "mio" name = "mio"
version = "1.1.1" version = "1.1.1"
@@ -924,6 +943,7 @@ dependencies = [
"http-body-util", "http-body-util",
"hyper", "hyper",
"hyper-util", "hyper-util",
"mimalloc",
"rcgen", "rcgen",
"rustls", "rustls",
"rustproxy-config", "rustproxy-config",

View File

@@ -91,6 +91,9 @@ libc = "0.2"
# Socket-level options (keepalive, etc.) # Socket-level options (keepalive, etc.)
socket2 = { version = "0.5", features = ["all"] } socket2 = { version = "0.5", features = ["all"] }
# mimalloc allocator (prevents glibc fragmentation / slow RSS growth)
mimalloc = "0.1"
# Internal crates # Internal crates
rustproxy-config = { path = "crates/rustproxy-config" } rustproxy-config = { path = "crates/rustproxy-config" }
rustproxy-routing = { path = "crates/rustproxy-routing" } rustproxy-routing = { path = "crates/rustproxy-routing" }

View File

@@ -298,7 +298,7 @@ impl RustProxyOptions {
/// Get the effective connection timeout in milliseconds. /// Get the effective connection timeout in milliseconds.
pub fn effective_connection_timeout(&self) -> u64 { pub fn effective_connection_timeout(&self) -> u64 {
self.connection_timeout.unwrap_or(30_000) self.connection_timeout.unwrap_or(60_000)
} }
/// Get the effective initial data timeout in milliseconds. /// Get the effective initial data timeout in milliseconds.
@@ -308,12 +308,12 @@ impl RustProxyOptions {
/// Get the effective socket timeout in milliseconds. /// Get the effective socket timeout in milliseconds.
pub fn effective_socket_timeout(&self) -> u64 { pub fn effective_socket_timeout(&self) -> u64 {
self.socket_timeout.unwrap_or(3_600_000) self.socket_timeout.unwrap_or(60_000)
} }
/// Get the effective max connection lifetime in milliseconds. /// Get the effective max connection lifetime in milliseconds.
pub fn effective_max_connection_lifetime(&self) -> u64 { pub fn effective_max_connection_lifetime(&self) -> u64 {
self.max_connection_lifetime.unwrap_or(86_400_000) self.max_connection_lifetime.unwrap_or(3_600_000)
} }
/// Get all unique ports that routes listen on. /// Get all unique ports that routes listen on.
@@ -377,10 +377,10 @@ mod tests {
#[test] #[test]
fn test_default_timeouts() { fn test_default_timeouts() {
let options = RustProxyOptions::default(); let options = RustProxyOptions::default();
assert_eq!(options.effective_connection_timeout(), 30_000); assert_eq!(options.effective_connection_timeout(), 60_000);
assert_eq!(options.effective_initial_data_timeout(), 60_000); assert_eq!(options.effective_initial_data_timeout(), 60_000);
assert_eq!(options.effective_socket_timeout(), 3_600_000); assert_eq!(options.effective_socket_timeout(), 60_000);
assert_eq!(options.effective_max_connection_lifetime(), 86_400_000); assert_eq!(options.effective_max_connection_lifetime(), 3_600_000);
} }
#[test] #[test]

View File

@@ -367,6 +367,7 @@ pub struct NfTablesOptions {
pub enum BackendProtocol { pub enum BackendProtocol {
Http1, Http1,
Http2, Http2,
Auto,
} }
/// Action options. /// Action options.

View File

@@ -5,6 +5,7 @@
pub mod connection_pool; pub mod connection_pool;
pub mod counting_body; pub mod counting_body;
pub mod protocol_cache;
pub mod proxy_service; pub mod proxy_service;
pub mod request_filter; pub mod request_filter;
pub mod response_filter; pub mod response_filter;

View File

@@ -0,0 +1,140 @@
//! Bounded, TTL-based protocol detection cache for HTTP/2 auto-detection.
//!
//! Caches the ALPN-negotiated protocol (H1 or H2) per backend endpoint and requested
//! domain (host:port + requested_host). This prevents cache oscillation when multiple
//! frontend domains share the same backend but differ in HTTP/2 support.
use std::sync::Arc;
use std::time::{Duration, Instant};
use dashmap::DashMap;
use tracing::debug;
/// TTL for cached protocol detection results.
/// After this duration, the next request will re-probe the backend.
const PROTOCOL_CACHE_TTL: Duration = Duration::from_secs(300); // 5 minutes
/// Maximum number of entries in the protocol cache.
/// Prevents unbounded growth when backends come and go.
const PROTOCOL_CACHE_MAX_ENTRIES: usize = 4096;
/// Background cleanup interval for the protocol cache.
const PROTOCOL_CACHE_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
/// Detected backend protocol.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DetectedProtocol {
H1,
H2,
}
/// Key for the protocol cache: (host, port, requested_host).
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub struct ProtocolCacheKey {
pub host: String,
pub port: u16,
/// The incoming request's domain (Host header / :authority).
/// Distinguishes protocol detection when multiple domains share the same backend.
pub requested_host: Option<String>,
}
/// A cached protocol detection result with a timestamp.
struct CachedEntry {
protocol: DetectedProtocol,
detected_at: Instant,
}
/// Bounded, TTL-based protocol detection cache.
///
/// Memory safety guarantees:
/// - Hard cap at `PROTOCOL_CACHE_MAX_ENTRIES` — cannot grow unboundedly.
/// - TTL expiry — stale entries naturally age out on lookup.
/// - Background cleanup task — proactively removes expired entries every 60s.
/// - `clear()` — called on route updates to discard stale detections.
/// - `Drop` — aborts the background task to prevent dangling tokio tasks.
pub struct ProtocolCache {
cache: Arc<DashMap<ProtocolCacheKey, CachedEntry>>,
cleanup_handle: Option<tokio::task::JoinHandle<()>>,
}
impl ProtocolCache {
/// Create a new protocol cache and start the background cleanup task.
pub fn new() -> Self {
let cache: Arc<DashMap<ProtocolCacheKey, CachedEntry>> = Arc::new(DashMap::new());
let cache_clone = Arc::clone(&cache);
let cleanup_handle = tokio::spawn(async move {
Self::cleanup_loop(cache_clone).await;
});
Self {
cache,
cleanup_handle: Some(cleanup_handle),
}
}
/// Look up the cached protocol for a backend endpoint.
/// Returns `None` if not cached or expired (caller should probe via ALPN).
pub fn get(&self, key: &ProtocolCacheKey) -> Option<DetectedProtocol> {
let entry = self.cache.get(key)?;
if entry.detected_at.elapsed() < PROTOCOL_CACHE_TTL {
debug!("Protocol cache hit: {:?} for {}:{} (requested: {:?})", entry.protocol, key.host, key.port, key.requested_host);
Some(entry.protocol)
} else {
// Expired — remove and return None to trigger re-probe
drop(entry); // release DashMap ref before remove
self.cache.remove(key);
None
}
}
/// Insert a detected protocol into the cache.
/// If the cache is at capacity, evict the oldest entry first.
pub fn insert(&self, key: ProtocolCacheKey, protocol: DetectedProtocol) {
if self.cache.len() >= PROTOCOL_CACHE_MAX_ENTRIES && !self.cache.contains_key(&key) {
// Evict the oldest entry to stay within bounds
let oldest = self.cache.iter()
.min_by_key(|entry| entry.value().detected_at)
.map(|entry| entry.key().clone());
if let Some(oldest_key) = oldest {
self.cache.remove(&oldest_key);
}
}
self.cache.insert(key, CachedEntry {
protocol,
detected_at: Instant::now(),
});
}
/// Clear all entries. Called on route updates to discard stale detections.
pub fn clear(&self) {
self.cache.clear();
}
/// Background cleanup loop — removes expired entries every `PROTOCOL_CACHE_CLEANUP_INTERVAL`.
async fn cleanup_loop(cache: Arc<DashMap<ProtocolCacheKey, CachedEntry>>) {
let mut interval = tokio::time::interval(PROTOCOL_CACHE_CLEANUP_INTERVAL);
loop {
interval.tick().await;
let expired: Vec<ProtocolCacheKey> = cache.iter()
.filter(|entry| entry.value().detected_at.elapsed() >= PROTOCOL_CACHE_TTL)
.map(|entry| entry.key().clone())
.collect();
if !expired.is_empty() {
debug!("Protocol cache cleanup: removing {} expired entries", expired.len());
for key in expired {
cache.remove(&key);
}
}
}
}
}
impl Drop for ProtocolCache {
fn drop(&mut self) {
if let Some(handle) = self.cleanup_handle.take() {
handle.abort();
}
}
}

View File

@@ -8,6 +8,7 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use arc_swap::ArcSwap;
use bytes::Bytes; use bytes::Bytes;
use dashmap::DashMap; use dashmap::DashMap;
use http_body_util::{BodyExt, Full, combinators::BoxBody}; use http_body_util::{BodyExt, Full, combinators::BoxBody};
@@ -34,12 +35,35 @@ use crate::upstream_selector::UpstreamSelector;
/// Default upstream connect timeout (30 seconds). /// Default upstream connect timeout (30 seconds).
const DEFAULT_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); const DEFAULT_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
/// Default HTTP keep-alive idle timeout (60 seconds).
/// If no new request arrives within this duration, the connection is closed.
const DEFAULT_HTTP_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
/// Default WebSocket inactivity timeout (1 hour). /// Default WebSocket inactivity timeout (1 hour).
const DEFAULT_WS_INACTIVITY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3600); const DEFAULT_WS_INACTIVITY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3600);
/// Default WebSocket max lifetime (24 hours). /// Default WebSocket max lifetime (24 hours).
const DEFAULT_WS_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(86400); const DEFAULT_WS_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(86400);
/// RAII guard that decrements the active request counter on drop.
/// Ensures the counter is correct even if the request handler panics.
struct ActiveRequestGuard {
counter: Arc<AtomicU64>,
}
impl ActiveRequestGuard {
fn new(counter: Arc<AtomicU64>) -> Self {
counter.fetch_add(1, Ordering::Relaxed);
Self { counter }
}
}
impl Drop for ActiveRequestGuard {
fn drop(&mut self) {
self.counter.fetch_sub(1, Ordering::Relaxed);
}
}
/// Backend stream that can be either plain TCP or TLS-wrapped. /// Backend stream that can be either plain TCP or TLS-wrapped.
/// Used for `terminate-and-reencrypt` mode where the backend requires TLS. /// Used for `terminate-and-reencrypt` mode where the backend requires TLS.
pub(crate) enum BackendStream { pub(crate) enum BackendStream {
@@ -110,7 +134,7 @@ async fn connect_tls_backend(
/// HTTP proxy service that processes HTTP traffic. /// HTTP proxy service that processes HTTP traffic.
pub struct HttpProxyService { pub struct HttpProxyService {
route_manager: Arc<RouteManager>, route_manager: Arc<ArcSwap<RouteManager>>,
metrics: Arc<MetricsCollector>, metrics: Arc<MetricsCollector>,
upstream_selector: UpstreamSelector, upstream_selector: UpstreamSelector,
/// Timeout for connecting to upstream backends. /// Timeout for connecting to upstream backends.
@@ -123,12 +147,22 @@ pub struct HttpProxyService {
regex_cache: DashMap<String, Regex>, regex_cache: DashMap<String, Regex>,
/// Shared backend TLS config for session resumption across connections. /// Shared backend TLS config for session resumption across connections.
backend_tls_config: Arc<rustls::ClientConfig>, backend_tls_config: Arc<rustls::ClientConfig>,
/// Backend TLS config with ALPN h2+http/1.1 for auto-detection mode.
backend_tls_config_alpn: Arc<rustls::ClientConfig>,
/// Backend connection pool for reusing keep-alive connections. /// Backend connection pool for reusing keep-alive connections.
connection_pool: Arc<crate::connection_pool::ConnectionPool>, connection_pool: Arc<crate::connection_pool::ConnectionPool>,
/// Protocol detection cache for auto mode (caches ALPN-detected protocol per backend).
protocol_cache: Arc<crate::protocol_cache::ProtocolCache>,
/// HTTP keep-alive idle timeout: close connection if no new request arrives within this duration.
http_idle_timeout: std::time::Duration,
/// WebSocket inactivity timeout (no data in either direction).
ws_inactivity_timeout: std::time::Duration,
/// WebSocket maximum connection lifetime.
ws_max_lifetime: std::time::Duration,
} }
impl HttpProxyService { impl HttpProxyService {
pub fn new(route_manager: Arc<RouteManager>, metrics: Arc<MetricsCollector>) -> Self { pub fn new(route_manager: Arc<ArcSwap<RouteManager>>, metrics: Arc<MetricsCollector>) -> Self {
Self { Self {
route_manager, route_manager,
metrics, metrics,
@@ -138,13 +172,18 @@ impl HttpProxyService {
request_counter: AtomicU64::new(0), request_counter: AtomicU64::new(0),
regex_cache: DashMap::new(), regex_cache: DashMap::new(),
backend_tls_config: Self::default_backend_tls_config(), backend_tls_config: Self::default_backend_tls_config(),
backend_tls_config_alpn: Self::default_backend_tls_config_with_alpn(),
connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()), connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()),
protocol_cache: Arc::new(crate::protocol_cache::ProtocolCache::new()),
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
} }
} }
/// Create with a custom connect timeout. /// Create with a custom connect timeout.
pub fn with_connect_timeout( pub fn with_connect_timeout(
route_manager: Arc<RouteManager>, route_manager: Arc<ArcSwap<RouteManager>>,
metrics: Arc<MetricsCollector>, metrics: Arc<MetricsCollector>,
connect_timeout: std::time::Duration, connect_timeout: std::time::Duration,
) -> Self { ) -> Self {
@@ -157,22 +196,46 @@ impl HttpProxyService {
request_counter: AtomicU64::new(0), request_counter: AtomicU64::new(0),
regex_cache: DashMap::new(), regex_cache: DashMap::new(),
backend_tls_config: Self::default_backend_tls_config(), backend_tls_config: Self::default_backend_tls_config(),
backend_tls_config_alpn: Self::default_backend_tls_config_with_alpn(),
connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()), connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()),
protocol_cache: Arc::new(crate::protocol_cache::ProtocolCache::new()),
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
} }
} }
/// Set the HTTP keep-alive idle timeout, WebSocket inactivity timeout, and
/// WebSocket max lifetime from connection config values.
pub fn set_connection_timeouts(
&mut self,
http_idle_timeout: std::time::Duration,
ws_inactivity_timeout: std::time::Duration,
ws_max_lifetime: std::time::Duration,
) {
self.http_idle_timeout = http_idle_timeout;
self.ws_inactivity_timeout = ws_inactivity_timeout;
self.ws_max_lifetime = ws_max_lifetime;
}
/// Set the shared backend TLS config (enables session resumption). /// Set the shared backend TLS config (enables session resumption).
/// Call this after construction to inject the shared config from tls_handler. /// Call this after construction to inject the shared config from tls_handler.
pub fn set_backend_tls_config(&mut self, config: Arc<rustls::ClientConfig>) { pub fn set_backend_tls_config(&mut self, config: Arc<rustls::ClientConfig>) {
self.backend_tls_config = config; self.backend_tls_config = config;
} }
/// Set the shared backend TLS config with ALPN h2+http/1.1 (for auto-detection mode).
pub fn set_backend_tls_config_alpn(&mut self, config: Arc<rustls::ClientConfig>) {
self.backend_tls_config_alpn = config;
}
/// Prune caches for route IDs that are no longer active. /// Prune caches for route IDs that are no longer active.
/// Call after route updates to prevent unbounded growth. /// Call after route updates to prevent unbounded growth.
pub fn prune_stale_routes(&self, active_route_ids: &std::collections::HashSet<String>) { pub fn prune_stale_routes(&self, active_route_ids: &std::collections::HashSet<String>) {
self.route_rate_limiters.retain(|k, _| active_route_ids.contains(k)); self.route_rate_limiters.retain(|k, _| active_route_ids.contains(k));
self.regex_cache.clear(); self.regex_cache.clear();
self.upstream_selector.reset_round_robin(); self.upstream_selector.reset_round_robin();
self.protocol_cache.clear();
} }
/// Handle an incoming HTTP connection on a plain TCP stream. /// Handle an incoming HTTP connection on a plain TCP stream.
@@ -192,6 +255,10 @@ impl HttpProxyService {
/// based on ALPN negotiation (TLS) or connection preface (h2c). /// based on ALPN negotiation (TLS) or connection preface (h2c).
/// Supports HTTP/1.1 upgrades (WebSocket) and HTTP/2 CONNECT. /// Supports HTTP/1.1 upgrades (WebSocket) and HTTP/2 CONNECT.
/// Responds to graceful shutdown via the cancel token. /// Responds to graceful shutdown via the cancel token.
///
/// An idle watchdog closes the connection if no new HTTP request arrives
/// within `http_idle_timeout` (default 60s). This prevents keep-alive
/// connections from accumulating indefinitely.
pub async fn handle_io<I>( pub async fn handle_io<I>(
self: Arc<Self>, self: Arc<Self>,
stream: I, stream: I,
@@ -204,13 +271,34 @@ impl HttpProxyService {
{ {
let io = TokioIo::new(stream); let io = TokioIo::new(stream);
// Capture timeouts before `self` is moved into the service closure.
let idle_timeout = self.http_idle_timeout;
// Activity tracker: updated at the START and END of each request.
// The idle watchdog checks this to determine if the connection is idle
// (no request in progress and none started recently).
let last_activity = Arc::new(AtomicU64::new(0));
let active_requests = Arc::new(AtomicU64::new(0));
let start = std::time::Instant::now();
let la_inner = Arc::clone(&last_activity);
let ar_inner = Arc::clone(&active_requests);
let cancel_inner = cancel.clone(); let cancel_inner = cancel.clone();
let service = hyper::service::service_fn(move |req: Request<Incoming>| { let service = hyper::service::service_fn(move |req: Request<Incoming>| {
// Mark request start — RAII guard decrements on drop (panic-safe)
la_inner.store(start.elapsed().as_millis() as u64, Ordering::Relaxed);
let req_guard = ActiveRequestGuard::new(Arc::clone(&ar_inner));
let svc = Arc::clone(&self); let svc = Arc::clone(&self);
let peer = peer_addr; let peer = peer_addr;
let cn = cancel_inner.clone(); let cn = cancel_inner.clone();
let la = Arc::clone(&la_inner);
let st = start;
async move { async move {
svc.handle_request(req, peer, port, cn).await let result = svc.handle_request(req, peer, port, cn).await;
// Mark request end — update activity timestamp before guard drops
la.store(st.elapsed().as_millis() as u64, Ordering::Relaxed);
drop(req_guard); // Explicitly drop to decrement active_requests
result
} }
}); });
@@ -221,7 +309,7 @@ impl HttpProxyService {
// Pin on the heap — auto::UpgradeableConnection is !Unpin // Pin on the heap — auto::UpgradeableConnection is !Unpin
let mut conn = Box::pin(conn); let mut conn = Box::pin(conn);
// Use select to support graceful shutdown via cancellation token // Use select to support graceful shutdown, cancellation, and idle timeout
tokio::select! { tokio::select! {
result = conn.as_mut() => { result = conn.as_mut() => {
if let Err(e) = result { if let Err(e) = result {
@@ -235,6 +323,37 @@ impl HttpProxyService {
debug!("HTTP connection error during shutdown from {}: {}", peer_addr, e); debug!("HTTP connection error during shutdown from {}: {}", peer_addr, e);
} }
} }
_ = async {
// Idle watchdog: check every 5s whether the connection has been idle
// (no active requests AND no activity for idle_timeout).
// This avoids killing long-running requests or upgraded connections.
let check_interval = std::time::Duration::from_secs(5);
let mut last_seen = 0u64;
loop {
tokio::time::sleep(check_interval).await;
// Never close while a request is in progress
if active_requests.load(Ordering::Relaxed) > 0 {
last_seen = last_activity.load(Ordering::Relaxed);
continue;
}
let current = last_activity.load(Ordering::Relaxed);
if current == last_seen {
// No new activity since last check
let elapsed_since_activity = start.elapsed().as_millis() as u64 - current;
if elapsed_since_activity >= idle_timeout.as_millis() as u64 {
return;
}
}
last_seen = current;
}
} => {
debug!("HTTP connection idle timeout ({}s) from {}", idle_timeout.as_secs(), peer_addr);
conn.as_mut().graceful_shutdown();
// Give any in-flight work 5s to drain after graceful shutdown
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), conn).await;
}
} }
} }
@@ -287,7 +406,8 @@ impl HttpProxyService {
protocol: Some("http"), protocol: Some("http"),
}; };
let route_match = match self.route_manager.find_route(&ctx) { let current_rm = self.route_manager.load();
let route_match = match current_rm.find_route(&ctx) {
Some(rm) => rm, Some(rm) => rm,
None => { None => {
debug!("No route matched for HTTP request to {:?}{}", host, path); debug!("No route matched for HTTP request to {:?}{}", host, path);
@@ -376,11 +496,11 @@ impl HttpProxyService {
return result; return result;
} }
// Determine backend protocol // Determine backend protocol mode
let use_h2 = route_match.route.action.options.as_ref() let backend_protocol_mode = route_match.route.action.options.as_ref()
.and_then(|o| o.backend_protocol.as_ref()) .and_then(|o| o.backend_protocol.as_ref())
.map(|p| *p == rustproxy_config::BackendProtocol::Http2) .cloned()
.unwrap_or(false); .unwrap_or(rustproxy_config::BackendProtocol::Auto);
// Build the upstream path (path + query), applying URL rewriting if configured // Build the upstream path (path + query), applying URL rewriting if configured
let upstream_path = { let upstream_path = {
@@ -460,7 +580,33 @@ impl HttpProxyService {
} }
} }
// --- Resolve protocol decision based on backend protocol mode ---
let is_auto_detect_mode = matches!(backend_protocol_mode, rustproxy_config::BackendProtocol::Auto);
let (use_h2, needs_alpn_probe) = match backend_protocol_mode {
rustproxy_config::BackendProtocol::Http1 => (false, false),
rustproxy_config::BackendProtocol::Http2 => (true, false),
rustproxy_config::BackendProtocol::Auto => {
if !upstream.use_tls {
// No ALPN without TLS — default to H1
(false, false)
} else {
let cache_key = crate::protocol_cache::ProtocolCacheKey {
host: upstream.host.clone(),
port: upstream.port,
requested_host: host.clone(),
};
match self.protocol_cache.get(&cache_key) {
Some(crate::protocol_cache::DetectedProtocol::H2) => (true, false),
Some(crate::protocol_cache::DetectedProtocol::H1) => (false, false),
None => (false, true), // needs ALPN probe
}
}
}
};
// --- Connection pooling: try reusing an existing connection first --- // --- Connection pooling: try reusing an existing connection first ---
// For ALPN probe mode, skip pool checkout (we don't know the protocol yet)
if !needs_alpn_probe {
let pool_key = crate::connection_pool::PoolKey { let pool_key = crate::connection_pool::PoolKey {
host: upstream.host.clone(), host: upstream.host.clone(),
port: upstream.port, port: upstream.port,
@@ -468,8 +614,7 @@ impl HttpProxyService {
h2: use_h2, h2: use_h2,
}; };
// Try pooled connection first (H2 only — H2 senders are Clone and multiplexed, // H2 pool checkout (H2 senders are Clone and multiplexed)
// so checkout doesn't consume request parts. For H1, we try pool inside forward_h1.)
if use_h2 { if use_h2 {
if let Some(sender) = self.connection_pool.checkout_h2(&pool_key) { if let Some(sender) = self.connection_pool.checkout_h2(&pool_key) {
let result = self.forward_h2_pooled( let result = self.forward_h2_pooled(
@@ -480,14 +625,53 @@ impl HttpProxyService {
return result; return result;
} }
} }
}
// Fresh connection path // --- Fresh connection path ---
let backend = if upstream.use_tls { // Choose TLS config: use ALPN config for auto-detect probe, plain config otherwise
let tls_config = if needs_alpn_probe {
&self.backend_tls_config_alpn
} else {
&self.backend_tls_config
};
// Establish backend connection
let (backend, detected_h2) = if upstream.use_tls {
match tokio::time::timeout( match tokio::time::timeout(
self.connect_timeout, self.connect_timeout,
connect_tls_backend(&self.backend_tls_config, &upstream.host, upstream.port), connect_tls_backend(tls_config, &upstream.host, upstream.port),
).await { ).await {
Ok(Ok(tls)) => BackendStream::Tls(tls), Ok(Ok(tls)) => {
let final_h2 = if needs_alpn_probe {
// Read the ALPN-negotiated protocol from the TLS connection
let alpn = tls.get_ref().1.alpn_protocol();
let is_h2 = alpn.map(|p| p == b"h2").unwrap_or(false);
// Cache the result
let cache_key = crate::protocol_cache::ProtocolCacheKey {
host: upstream.host.clone(),
port: upstream.port,
requested_host: host.clone(),
};
let detected = if is_h2 {
crate::protocol_cache::DetectedProtocol::H2
} else {
crate::protocol_cache::DetectedProtocol::H1
};
self.protocol_cache.insert(cache_key, detected);
debug!(
"Auto-detected {} for backend {}:{}",
if is_h2 { "HTTP/2" } else { "HTTP/1.1" },
upstream.host, upstream.port
);
is_h2
} else {
use_h2
};
(BackendStream::Tls(tls), final_h2)
}
Ok(Err(e)) => { Ok(Err(e)) => {
error!("Failed TLS connect to upstream {}:{}: {}", upstream.host, upstream.port, e); error!("Failed TLS connect to upstream {}:{}: {}", upstream.host, upstream.port, e);
self.upstream_selector.connection_ended(&upstream_key); self.upstream_selector.connection_ended(&upstream_key);
@@ -509,7 +693,7 @@ impl HttpProxyService {
let _ = socket2::SockRef::from(&s).set_tcp_keepalive( let _ = socket2::SockRef::from(&s).set_tcp_keepalive(
&socket2::TcpKeepalive::new().with_time(std::time::Duration::from_secs(60)) &socket2::TcpKeepalive::new().with_time(std::time::Duration::from_secs(60))
); );
BackendStream::Plain(s) (BackendStream::Plain(s), use_h2)
} }
Ok(Err(e)) => { Ok(Err(e)) => {
error!("Failed to connect to upstream {}:{}: {}", upstream.host, upstream.port, e); error!("Failed to connect to upstream {}:{}: {}", upstream.host, upstream.port, e);
@@ -524,12 +708,35 @@ impl HttpProxyService {
} }
}; };
let final_pool_key = crate::connection_pool::PoolKey {
host: upstream.host.clone(),
port: upstream.port,
use_tls: upstream.use_tls,
h2: detected_h2,
};
let io = TokioIo::new(backend); let io = TokioIo::new(backend);
let result = if use_h2 { let result = if detected_h2 {
self.forward_h2(io, parts, body, upstream_headers, &upstream_path, &upstream, route_match.route, route_id, &ip_str, &pool_key).await if is_auto_detect_mode {
// Auto-detect mode: use fallback-capable H2 forwarding
self.forward_h2_with_fallback(
io, parts, body, upstream_headers, &upstream_path,
&upstream, route_match.route, route_id, &ip_str, &final_pool_key,
host.clone(),
).await
} else { } else {
self.forward_h1(io, parts, body, upstream_headers, &upstream_path, &upstream, route_match.route, route_id, &ip_str, &pool_key).await // Explicit H2 mode: hard-fail on handshake error (preserved behavior)
self.forward_h2(
io, parts, body, upstream_headers, &upstream_path,
&upstream, route_match.route, route_id, &ip_str, &final_pool_key,
).await
}
} else {
self.forward_h1(
io, parts, body, upstream_headers, &upstream_path,
&upstream, route_match.route, route_id, &ip_str, &final_pool_key,
).await
}; };
self.upstream_selector.connection_ended(&upstream_key); self.upstream_selector.connection_ended(&upstream_key);
result result
@@ -684,6 +891,170 @@ impl HttpProxyService {
self.forward_h2_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip).await self.forward_h2_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip).await
} }
/// Forward via HTTP/2 with fallback to HTTP/1.1 (auto-detect mode).
///
/// Handles two failure scenarios:
/// 1. H2 handshake fails → reconnects and falls back to H1 (body not consumed yet).
/// 2. H2 handshake "succeeds" but request fails (backend advertises h2 via ALPN but
/// doesn't actually speak h2) → updates cache to H1. The request body is consumed
/// so this request fails, but all subsequent requests will correctly use H1.
async fn forward_h2_with_fallback(
&self,
io: TokioIo<BackendStream>,
parts: hyper::http::request::Parts,
body: Incoming,
upstream_headers: hyper::HeaderMap,
upstream_path: &str,
upstream: &crate::upstream_selector::UpstreamSelection,
route: &rustproxy_config::RouteConfig,
route_id: Option<&str>,
source_ip: &str,
pool_key: &crate::connection_pool::PoolKey,
requested_host: Option<String>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let exec = hyper_util::rt::TokioExecutor::new();
let handshake_result: Result<(
hyper::client::conn::http2::SendRequest<BoxBody<Bytes, hyper::Error>>,
hyper::client::conn::http2::Connection<TokioIo<BackendStream>, BoxBody<Bytes, hyper::Error>, hyper_util::rt::TokioExecutor>,
), hyper::Error> = hyper::client::conn::http2::handshake(exec, io).await;
match handshake_result {
Ok((mut sender, conn)) => {
tokio::spawn(async move {
if let Err(e) = conn.await {
debug!("HTTP/2 upstream connection error: {}", e);
}
});
// Build and send the h2 request inline (don't register in pool yet —
// we need to verify the request actually succeeds first, because some
// backends advertise h2 via ALPN but don't speak the h2 binary protocol).
let mut upstream_req = Request::builder()
.method(parts.method)
.uri(upstream_path);
if let Some(headers) = upstream_req.headers_mut() {
*headers = upstream_headers;
}
let counting_req_body = CountingBody::new(
body,
Arc::clone(&self.metrics),
route_id.map(|s| s.to_string()),
Some(source_ip.to_string()),
Direction::In,
);
let boxed_body: BoxBody<Bytes, hyper::Error> = BoxBody::new(counting_req_body);
let upstream_req = upstream_req.body(boxed_body).unwrap();
match sender.send_request(upstream_req).await {
Ok(upstream_response) => {
// H2 works! Register sender in pool for multiplexed reuse
self.connection_pool.register_h2(pool_key.clone(), sender);
self.build_streaming_response(upstream_response, route, route_id, source_ip).await
}
Err(e) => {
// H2 request failed — backend advertises h2 via ALPN but doesn't
// actually speak it. Update cache so future requests use H1.
// The request body is consumed so this request can't be retried,
// but all subsequent requests will correctly use H1.
warn!(
"Auto-detect: H2 request failed for {}:{}, updating cache to H1: {}",
upstream.host, upstream.port, e
);
let cache_key = crate::protocol_cache::ProtocolCacheKey {
host: upstream.host.clone(),
port: upstream.port,
requested_host: requested_host.clone(),
};
self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1);
Ok(error_response(StatusCode::BAD_GATEWAY, "Backend protocol mismatch, retrying with H1"))
}
}
}
Err(e) => {
// H2 handshake truly failed — fall back to H1
// Body is NOT consumed yet, so we can retry the full request.
warn!(
"H2 handshake failed for {}:{}, falling back to H1: {}",
upstream.host, upstream.port, e
);
// Update cache to H1 so subsequent requests skip H2
let cache_key = crate::protocol_cache::ProtocolCacheKey {
host: upstream.host.clone(),
port: upstream.port,
requested_host: requested_host.clone(),
};
self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1);
// Reconnect for H1 (the original io was consumed by the failed h2 handshake)
match self.reconnect_backend(upstream).await {
Some(fallback_backend) => {
let h1_pool_key = crate::connection_pool::PoolKey {
host: upstream.host.clone(),
port: upstream.port,
use_tls: upstream.use_tls,
h2: false,
};
let fallback_io = TokioIo::new(fallback_backend);
self.forward_h1(
fallback_io, parts, body, upstream_headers, upstream_path,
upstream, route, route_id, source_ip, &h1_pool_key,
).await
}
None => {
Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable after H2 fallback"))
}
}
}
}
}
/// Reconnect to a backend (used for H2→H1 fallback).
async fn reconnect_backend(
&self,
upstream: &crate::upstream_selector::UpstreamSelection,
) -> Option<BackendStream> {
if upstream.use_tls {
match tokio::time::timeout(
self.connect_timeout,
connect_tls_backend(&self.backend_tls_config, &upstream.host, upstream.port),
).await {
Ok(Ok(tls)) => Some(BackendStream::Tls(tls)),
Ok(Err(e)) => {
error!("H1 fallback: TLS reconnect failed for {}:{}: {}", upstream.host, upstream.port, e);
None
}
Err(_) => {
error!("H1 fallback: TLS reconnect timeout for {}:{}", upstream.host, upstream.port);
None
}
}
} else {
match tokio::time::timeout(
self.connect_timeout,
TcpStream::connect(format!("{}:{}", upstream.host, upstream.port)),
).await {
Ok(Ok(s)) => {
s.set_nodelay(true).ok();
let _ = socket2::SockRef::from(&s).set_tcp_keepalive(
&socket2::TcpKeepalive::new().with_time(std::time::Duration::from_secs(60))
);
Some(BackendStream::Plain(s))
}
Ok(Err(e)) => {
error!("H1 fallback: reconnect failed for {}:{}: {}", upstream.host, upstream.port, e);
None
}
Err(_) => {
error!("H1 fallback: reconnect timeout for {}:{}", upstream.host, upstream.port);
None
}
}
}
}
/// Common H2 forwarding logic used by both fresh and pooled paths. /// Common H2 forwarding logic used by both fresh and pooled paths.
async fn forward_h2_with_sender( async fn forward_h2_with_sender(
&self, &self,
@@ -1022,6 +1393,8 @@ impl HttpProxyService {
let source_ip_owned = source_ip.to_string(); let source_ip_owned = source_ip.to_string();
let upstream_selector = self.upstream_selector.clone(); let upstream_selector = self.upstream_selector.clone();
let upstream_key_owned = upstream_key.to_string(); let upstream_key_owned = upstream_key.to_string();
let ws_inactivity_timeout = self.ws_inactivity_timeout;
let ws_max_lifetime = self.ws_max_lifetime;
tokio::spawn(async move { tokio::spawn(async move {
let client_upgraded = match on_client_upgrade.await { let client_upgraded = match on_client_upgrade.await {
@@ -1084,8 +1457,8 @@ impl HttpProxyService {
let la_watch = Arc::clone(&last_activity); let la_watch = Arc::clone(&last_activity);
let c2u_handle = c2u.abort_handle(); let c2u_handle = c2u.abort_handle();
let u2c_handle = u2c.abort_handle(); let u2c_handle = u2c.abort_handle();
let inactivity_timeout = DEFAULT_WS_INACTIVITY_TIMEOUT; let inactivity_timeout = ws_inactivity_timeout;
let max_lifetime = DEFAULT_WS_MAX_LIFETIME; let max_lifetime = ws_max_lifetime;
let watchdog = tokio::spawn(async move { let watchdog = tokio::spawn(async move {
let check_interval = std::time::Duration::from_secs(5); let check_interval = std::time::Duration::from_secs(5);
@@ -1326,6 +1699,18 @@ impl HttpProxyService {
.with_no_client_auth(); .with_no_client_auth();
Arc::new(config) Arc::new(config)
} }
/// Build a default backend TLS config with ALPN h2+http/1.1 for auto-detection.
/// Used as fallback when no shared ALPN config is injected from tls_handler.
fn default_backend_tls_config_with_alpn() -> Arc<rustls::ClientConfig> {
let _ = rustls::crypto::ring::default_provider().install_default();
let mut config = rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(Arc::new(InsecureBackendVerifier))
.with_no_client_auth();
config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
Arc::new(config)
}
} }
/// Insecure certificate verifier for backend TLS connections (fallback only). /// Insecure certificate verifier for backend TLS connections (fallback only).
@@ -1382,7 +1767,7 @@ impl rustls::client::danger::ServerCertVerifier for InsecureBackendVerifier {
impl Default for HttpProxyService { impl Default for HttpProxyService {
fn default() -> Self { fn default() -> Self {
Self { Self {
route_manager: Arc::new(RouteManager::new(vec![])), route_manager: Arc::new(ArcSwap::from(Arc::new(RouteManager::new(vec![])))),
metrics: Arc::new(MetricsCollector::new()), metrics: Arc::new(MetricsCollector::new()),
upstream_selector: UpstreamSelector::new(), upstream_selector: UpstreamSelector::new(),
connect_timeout: DEFAULT_CONNECT_TIMEOUT, connect_timeout: DEFAULT_CONNECT_TIMEOUT,
@@ -1390,7 +1775,12 @@ impl Default for HttpProxyService {
request_counter: AtomicU64::new(0), request_counter: AtomicU64::new(0),
regex_cache: DashMap::new(), regex_cache: DashMap::new(),
backend_tls_config: Self::default_backend_tls_config(), backend_tls_config: Self::default_backend_tls_config(),
backend_tls_config_alpn: Self::default_backend_tls_config_with_alpn(),
connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()), connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()),
protocol_cache: Arc::new(crate::protocol_cache::ProtocolCache::new()),
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
} }
} }
} }

View File

@@ -1,6 +1,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use dashmap::DashMap;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio_rustls::TlsAcceptor; use tokio_rustls::TlsAcceptor;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@@ -118,10 +119,10 @@ pub struct ConnectionConfig {
impl Default for ConnectionConfig { impl Default for ConnectionConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
connection_timeout_ms: 30_000, connection_timeout_ms: 60_000,
initial_data_timeout_ms: 60_000, initial_data_timeout_ms: 60_000,
socket_timeout_ms: 3_600_000, socket_timeout_ms: 60_000,
max_connection_lifetime_ms: 86_400_000, max_connection_lifetime_ms: 3_600_000,
graceful_shutdown_timeout_ms: 30_000, graceful_shutdown_timeout_ms: 30_000,
max_connections_per_ip: None, max_connections_per_ip: None,
connection_rate_limit_per_minute: None, connection_rate_limit_per_minute: None,
@@ -162,18 +163,28 @@ pub struct TcpListenerManager {
socket_handler_relay: Arc<std::sync::RwLock<Option<String>>>, socket_handler_relay: Arc<std::sync::RwLock<Option<String>>>,
/// Global connection semaphore — limits total simultaneous connections. /// Global connection semaphore — limits total simultaneous connections.
conn_semaphore: Arc<tokio::sync::Semaphore>, conn_semaphore: Arc<tokio::sync::Semaphore>,
/// Per-route cancellation tokens (child of cancel_token).
/// When a route is removed, its token is cancelled, terminating all connections on that route.
route_cancels: Arc<DashMap<String, CancellationToken>>,
} }
impl TcpListenerManager { impl TcpListenerManager {
pub fn new(route_manager: Arc<RouteManager>) -> Self { pub fn new(route_manager: Arc<RouteManager>) -> Self {
let metrics = Arc::new(MetricsCollector::new()); let metrics = Arc::new(MetricsCollector::new());
let conn_config = ConnectionConfig::default(); let conn_config = ConnectionConfig::default();
let route_manager_swap = Arc::new(ArcSwap::from(route_manager));
let mut http_proxy_svc = HttpProxyService::with_connect_timeout( let mut http_proxy_svc = HttpProxyService::with_connect_timeout(
Arc::clone(&route_manager), Arc::clone(&route_manager_swap),
Arc::clone(&metrics), Arc::clone(&metrics),
std::time::Duration::from_millis(conn_config.connection_timeout_ms), std::time::Duration::from_millis(conn_config.connection_timeout_ms),
); );
http_proxy_svc.set_backend_tls_config(tls_handler::shared_backend_tls_config()); http_proxy_svc.set_backend_tls_config(tls_handler::shared_backend_tls_config());
http_proxy_svc.set_backend_tls_config_alpn(tls_handler::shared_backend_tls_config_alpn());
http_proxy_svc.set_connection_timeouts(
std::time::Duration::from_millis(conn_config.socket_timeout_ms),
std::time::Duration::from_millis(conn_config.socket_timeout_ms),
std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms),
);
let http_proxy = Arc::new(http_proxy_svc); let http_proxy = Arc::new(http_proxy_svc);
let conn_tracker = Arc::new(ConnectionTracker::new( let conn_tracker = Arc::new(ConnectionTracker::new(
conn_config.max_connections_per_ip, conn_config.max_connections_per_ip,
@@ -182,7 +193,7 @@ impl TcpListenerManager {
let max_conns = conn_config.max_connections as usize; let max_conns = conn_config.max_connections as usize;
Self { Self {
listeners: HashMap::new(), listeners: HashMap::new(),
route_manager: Arc::new(ArcSwap::from(route_manager)), route_manager: route_manager_swap,
metrics, metrics,
tls_configs: Arc::new(ArcSwap::from(Arc::new(HashMap::new()))), tls_configs: Arc::new(ArcSwap::from(Arc::new(HashMap::new()))),
shared_tls_acceptor: Arc::new(ArcSwap::from(Arc::new(None))), shared_tls_acceptor: Arc::new(ArcSwap::from(Arc::new(None))),
@@ -192,18 +203,26 @@ impl TcpListenerManager {
cancel_token: CancellationToken::new(), cancel_token: CancellationToken::new(),
socket_handler_relay: Arc::new(std::sync::RwLock::new(None)), socket_handler_relay: Arc::new(std::sync::RwLock::new(None)),
conn_semaphore: Arc::new(tokio::sync::Semaphore::new(max_conns)), conn_semaphore: Arc::new(tokio::sync::Semaphore::new(max_conns)),
route_cancels: Arc::new(DashMap::new()),
} }
} }
/// Create with a metrics collector. /// Create with a metrics collector.
pub fn with_metrics(route_manager: Arc<RouteManager>, metrics: Arc<MetricsCollector>) -> Self { pub fn with_metrics(route_manager: Arc<RouteManager>, metrics: Arc<MetricsCollector>) -> Self {
let conn_config = ConnectionConfig::default(); let conn_config = ConnectionConfig::default();
let route_manager_swap = Arc::new(ArcSwap::from(route_manager));
let mut http_proxy_svc = HttpProxyService::with_connect_timeout( let mut http_proxy_svc = HttpProxyService::with_connect_timeout(
Arc::clone(&route_manager), Arc::clone(&route_manager_swap),
Arc::clone(&metrics), Arc::clone(&metrics),
std::time::Duration::from_millis(conn_config.connection_timeout_ms), std::time::Duration::from_millis(conn_config.connection_timeout_ms),
); );
http_proxy_svc.set_backend_tls_config(tls_handler::shared_backend_tls_config()); http_proxy_svc.set_backend_tls_config(tls_handler::shared_backend_tls_config());
http_proxy_svc.set_backend_tls_config_alpn(tls_handler::shared_backend_tls_config_alpn());
http_proxy_svc.set_connection_timeouts(
std::time::Duration::from_millis(conn_config.socket_timeout_ms),
std::time::Duration::from_millis(conn_config.socket_timeout_ms),
std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms),
);
let http_proxy = Arc::new(http_proxy_svc); let http_proxy = Arc::new(http_proxy_svc);
let conn_tracker = Arc::new(ConnectionTracker::new( let conn_tracker = Arc::new(ConnectionTracker::new(
conn_config.max_connections_per_ip, conn_config.max_connections_per_ip,
@@ -212,7 +231,7 @@ impl TcpListenerManager {
let max_conns = conn_config.max_connections as usize; let max_conns = conn_config.max_connections as usize;
Self { Self {
listeners: HashMap::new(), listeners: HashMap::new(),
route_manager: Arc::new(ArcSwap::from(route_manager)), route_manager: route_manager_swap,
metrics, metrics,
tls_configs: Arc::new(ArcSwap::from(Arc::new(HashMap::new()))), tls_configs: Arc::new(ArcSwap::from(Arc::new(HashMap::new()))),
shared_tls_acceptor: Arc::new(ArcSwap::from(Arc::new(None))), shared_tls_acceptor: Arc::new(ArcSwap::from(Arc::new(None))),
@@ -222,6 +241,7 @@ impl TcpListenerManager {
cancel_token: CancellationToken::new(), cancel_token: CancellationToken::new(),
socket_handler_relay: Arc::new(std::sync::RwLock::new(None)), socket_handler_relay: Arc::new(std::sync::RwLock::new(None)),
conn_semaphore: Arc::new(tokio::sync::Semaphore::new(max_conns)), conn_semaphore: Arc::new(tokio::sync::Semaphore::new(max_conns)),
route_cancels: Arc::new(DashMap::new()),
} }
} }
@@ -232,6 +252,22 @@ impl TcpListenerManager {
config.connection_rate_limit_per_minute, config.connection_rate_limit_per_minute,
)); ));
self.conn_semaphore = Arc::new(tokio::sync::Semaphore::new(config.max_connections as usize)); self.conn_semaphore = Arc::new(tokio::sync::Semaphore::new(config.max_connections as usize));
// Rebuild http_proxy with updated timeouts (shares the same ArcSwap<RouteManager>)
let mut http_proxy_svc = HttpProxyService::with_connect_timeout(
Arc::clone(&self.route_manager),
Arc::clone(&self.metrics),
std::time::Duration::from_millis(config.connection_timeout_ms),
);
http_proxy_svc.set_backend_tls_config(tls_handler::shared_backend_tls_config());
http_proxy_svc.set_backend_tls_config_alpn(tls_handler::shared_backend_tls_config_alpn());
http_proxy_svc.set_connection_timeouts(
std::time::Duration::from_millis(config.socket_timeout_ms),
std::time::Duration::from_millis(config.socket_timeout_ms),
std::time::Duration::from_millis(config.max_connection_lifetime_ms),
);
self.http_proxy = Arc::new(http_proxy_svc);
self.conn_config = Arc::new(config); self.conn_config = Arc::new(config);
} }
@@ -288,12 +324,13 @@ impl TcpListenerManager {
let cancel = self.cancel_token.clone(); let cancel = self.cancel_token.clone();
let relay = Arc::clone(&self.socket_handler_relay); let relay = Arc::clone(&self.socket_handler_relay);
let semaphore = Arc::clone(&self.conn_semaphore); let semaphore = Arc::clone(&self.conn_semaphore);
let route_cancels = Arc::clone(&self.route_cancels);
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
Self::accept_loop( Self::accept_loop(
listener, port, route_manager_swap, metrics, tls_configs, listener, port, route_manager_swap, metrics, tls_configs,
shared_tls_acceptor, http_proxy, conn_config, conn_tracker, cancel, relay, shared_tls_acceptor, http_proxy, conn_config, conn_tracker, cancel, relay,
semaphore, semaphore, route_cancels,
).await; ).await;
}); });
@@ -336,13 +373,15 @@ impl TcpListenerManager {
for (port, handle) in self.listeners.drain() { for (port, handle) in self.listeners.drain() {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
let abort_handle = handle.abort_handle();
if remaining.is_zero() { if remaining.is_zero() {
handle.abort(); abort_handle.abort();
warn!("Force-stopped listener on port {} (timeout exceeded)", port); warn!("Force-stopped listener on port {} (timeout exceeded)", port);
} else { } else {
match tokio::time::timeout(remaining, handle).await { match tokio::time::timeout(remaining, handle).await {
Ok(_) => info!("Listener on port {} stopped gracefully", port), Ok(_) => info!("Listener on port {} stopped gracefully", port),
Err(_) => { Err(_) => {
abort_handle.abort();
warn!("Listener on port {} did not stop in time, aborting", port); warn!("Listener on port {} did not stop in time, aborting", port);
} }
} }
@@ -370,6 +409,20 @@ impl TcpListenerManager {
self.route_manager.store(route_manager); self.route_manager.store(route_manager);
} }
/// Cancel connections on routes that no longer exist in the active set.
/// Existing connections on removed routes are terminated via their per-route CancellationToken.
pub fn invalidate_removed_routes(&self, active_route_ids: &std::collections::HashSet<String>) {
self.route_cancels.retain(|id, token| {
if active_route_ids.contains(id) {
true
} else {
info!("Cancelling connections for removed route '{}'", id);
token.cancel();
false // remove cancelled token from map
}
});
}
/// Prune HTTP proxy caches for route IDs that are no longer active. /// Prune HTTP proxy caches for route IDs that are no longer active.
pub fn prune_http_proxy_caches(&self, active_route_ids: &std::collections::HashSet<String>) { pub fn prune_http_proxy_caches(&self, active_route_ids: &std::collections::HashSet<String>) {
self.http_proxy.prune_stale_routes(active_route_ids); self.http_proxy.prune_stale_routes(active_route_ids);
@@ -399,6 +452,7 @@ impl TcpListenerManager {
cancel: CancellationToken, cancel: CancellationToken,
socket_handler_relay: Arc<std::sync::RwLock<Option<String>>>, socket_handler_relay: Arc<std::sync::RwLock<Option<String>>>,
conn_semaphore: Arc<tokio::sync::Semaphore>, conn_semaphore: Arc<tokio::sync::Semaphore>,
route_cancels: Arc<DashMap<String, CancellationToken>>,
) { ) {
loop { loop {
tokio::select! { tokio::select! {
@@ -453,6 +507,7 @@ impl TcpListenerManager {
let ct = Arc::clone(&conn_tracker); let ct = Arc::clone(&conn_tracker);
let cn = cancel.clone(); let cn = cancel.clone();
let sr = Arc::clone(&socket_handler_relay); let sr = Arc::clone(&socket_handler_relay);
let rc = Arc::clone(&route_cancels);
debug!("Accepted connection from {} on port {}", peer_addr, port); debug!("Accepted connection from {} on port {}", peer_addr, port);
tokio::spawn(async move { tokio::spawn(async move {
@@ -461,7 +516,7 @@ impl TcpListenerManager {
// RAII guard ensures connection_closed is called on all paths // RAII guard ensures connection_closed is called on all paths
let _ct_guard = ConnectionTrackerGuard::new(ct, ip); let _ct_guard = ConnectionTrackerGuard::new(ct, ip);
let result = Self::handle_connection( let result = Self::handle_connection(
stream, port, peer_addr, rm, m, tc, sa, hp, cc, cn, sr, stream, port, peer_addr, rm, m, tc, sa, hp, cc, cn, sr, rc,
).await; ).await;
if let Err(e) = result { if let Err(e) = result {
debug!("Connection error from {}: {}", peer_addr, e); debug!("Connection error from {}: {}", peer_addr, e);
@@ -491,6 +546,7 @@ impl TcpListenerManager {
conn_config: Arc<ConnectionConfig>, conn_config: Arc<ConnectionConfig>,
cancel: CancellationToken, cancel: CancellationToken,
socket_handler_relay: Arc<std::sync::RwLock<Option<String>>>, socket_handler_relay: Arc<std::sync::RwLock<Option<String>>>,
route_cancels: Arc<DashMap<String, CancellationToken>>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
@@ -595,6 +651,14 @@ impl TcpListenerManager {
let target_port = target.port.resolve(port); let target_port = target.port.resolve(port);
let route_id = quick_match.route.id.as_deref(); let route_id = quick_match.route.id.as_deref();
// Resolve per-route cancel token (child of global cancel)
let conn_cancel = match route_id {
Some(id) => route_cancels.entry(id.to_string())
.or_insert_with(|| cancel.child_token())
.clone(),
None => cancel.clone(),
};
// Check route-level IP security // Check route-level IP security
if let Some(ref security) = quick_match.route.security { if let Some(ref security) = quick_match.route.security {
if !rustproxy_http::request_filter::RequestFilter::check_ip_security( if !rustproxy_http::request_filter::RequestFilter::check_ip_security(
@@ -649,7 +713,7 @@ impl TcpListenerManager {
let (_bytes_in, _bytes_out) = forwarder::forward_bidirectional_with_timeouts( let (_bytes_in, _bytes_out) = forwarder::forward_bidirectional_with_timeouts(
stream, backend_w, None, stream, backend_w, None,
inactivity_timeout, max_lifetime, cancel, inactivity_timeout, max_lifetime, conn_cancel,
Some(forwarder::ForwardMetricsCtx { Some(forwarder::ForwardMetricsCtx {
collector: Arc::clone(&metrics), collector: Arc::clone(&metrics),
route_id: route_id.map(|s| s.to_string()), route_id: route_id.map(|s| s.to_string()),
@@ -659,7 +723,7 @@ impl TcpListenerManager {
} else { } else {
let (_bytes_in, _bytes_out) = forwarder::forward_bidirectional_with_timeouts( let (_bytes_in, _bytes_out) = forwarder::forward_bidirectional_with_timeouts(
stream, backend, None, stream, backend, None,
inactivity_timeout, max_lifetime, cancel, inactivity_timeout, max_lifetime, conn_cancel,
Some(forwarder::ForwardMetricsCtx { Some(forwarder::ForwardMetricsCtx {
collector: Arc::clone(&metrics), collector: Arc::clone(&metrics),
route_id: route_id.map(|s| s.to_string()), route_id: route_id.map(|s| s.to_string()),
@@ -764,6 +828,16 @@ impl TcpListenerManager {
let route_id = route_match.route.id.as_deref(); let route_id = route_match.route.id.as_deref();
// Resolve per-route cancel token (child of global cancel).
// When this route is removed via updateRoutes, the token is cancelled,
// terminating all connections on this route.
let cancel = match route_id {
Some(id) => route_cancels.entry(id.to_string())
.or_insert_with(|| cancel.child_token())
.clone(),
None => cancel,
};
// Check route-level IP security for passthrough connections // Check route-level IP security for passthrough connections
if let Some(ref security) = route_match.route.security { if let Some(ref security) = route_match.route.security {
if !rustproxy_http::request_filter::RequestFilter::check_ip_security( if !rustproxy_http::request_filter::RequestFilter::check_ip_security(
@@ -791,7 +865,8 @@ impl TcpListenerManager {
stream, n, port, peer_addr, stream, n, port, peer_addr,
&route_match, domain.as_deref(), is_tls, &route_match, domain.as_deref(), is_tls,
&relay_socket_path, &relay_socket_path,
&metrics, route_id, Arc::clone(&metrics), route_id,
&conn_config, cancel.clone(),
).await; ).await;
} else { } else {
debug!("Socket-handler route matched but no relay path configured"); debug!("Socket-handler route matched but no relay path configured");
@@ -964,7 +1039,7 @@ impl TcpListenerManager {
let (_bytes_in, _bytes_out) = Self::forward_bidirectional_split_with_timeouts( let (_bytes_in, _bytes_out) = Self::forward_bidirectional_split_with_timeouts(
tls_read, tls_write, backend_read, backend_write, tls_read, tls_write, backend_read, backend_write,
inactivity_timeout, max_lifetime, inactivity_timeout, max_lifetime, cancel.clone(),
Some(forwarder::ForwardMetricsCtx { Some(forwarder::ForwardMetricsCtx {
collector: Arc::clone(&metrics), collector: Arc::clone(&metrics),
route_id: route_id.map(|s| s.to_string()), route_id: route_id.map(|s| s.to_string()),
@@ -1023,7 +1098,7 @@ impl TcpListenerManager {
Self::handle_tls_reencrypt_tunnel( Self::handle_tls_reencrypt_tunnel(
buf_stream, &target_host, target_port, buf_stream, &target_host, target_port,
peer_addr, Arc::clone(&metrics), route_id, peer_addr, Arc::clone(&metrics), route_id,
&conn_config, &conn_config, cancel.clone(),
).await?; ).await?;
} }
Ok(()) Ok(())
@@ -1100,8 +1175,10 @@ impl TcpListenerManager {
domain: Option<&str>, domain: Option<&str>,
is_tls: bool, is_tls: bool,
relay_path: &str, relay_path: &str,
metrics: &MetricsCollector, metrics: Arc<MetricsCollector>,
route_id: Option<&str>, route_id: Option<&str>,
conn_config: &ConnectionConfig,
cancel: CancellationToken,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::UnixStream; use tokio::net::UnixStream;
@@ -1141,27 +1218,34 @@ impl TcpListenerManager {
// Forward initial data to the Unix socket // Forward initial data to the Unix socket
unix_stream.write_all(&initial_buf).await?; unix_stream.write_all(&initial_buf).await?;
// Bidirectional relay between TCP client and Unix socket handler // Bidirectional relay with inactivity timeout, max lifetime, and cancellation.
// Split both streams and use the same watchdog pattern as other forwarding paths.
let initial_len = initial_buf.len() as u64; let initial_len = initial_buf.len() as u64;
match tokio::io::copy_bidirectional(&mut stream, &mut unix_stream).await { let inactivity_timeout = std::time::Duration::from_millis(conn_config.socket_timeout_ms);
Ok((c2s, s2c)) => { let max_lifetime = std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms);
// Include initial data bytes that were forwarded before copy_bidirectional
let total_in = c2s + initial_len; let (tcp_read, tcp_write) = stream.into_split();
debug!("Socket handler relay complete for {}: {} bytes in, {} bytes out", let (unix_read, unix_write) = unix_stream.into_split();
route_key, total_in, s2c);
let ip = peer_addr.ip().to_string(); let ip_str = peer_addr.ip().to_string();
metrics.record_bytes(total_in, s2c, route_id, Some(&ip)); let (_bytes_in, _bytes_out) = Self::forward_bidirectional_split_with_timeouts(
} tcp_read, tcp_write, unix_read, unix_write,
Err(e) => { inactivity_timeout, max_lifetime, cancel,
// Still record the initial data even on error Some(forwarder::ForwardMetricsCtx {
collector: Arc::clone(&metrics),
route_id: route_id.map(|s| s.to_string()),
source_ip: Some(ip_str.clone()),
}),
).await;
// Include the initial data that was forwarded before the bidirectional relay
if initial_len > 0 { if initial_len > 0 {
let ip = peer_addr.ip().to_string(); metrics.record_bytes(initial_len, 0, route_id, Some(&ip_str));
metrics.record_bytes(initial_len, 0, route_id, Some(&ip));
}
debug!("Socket handler relay ended for {}: {}", route_key, e);
}
} }
debug!("Socket handler relay complete for {}: {} bytes in, {} bytes out",
route_key, _bytes_in + initial_len, _bytes_out);
Ok(()) Ok(())
} }
@@ -1176,6 +1260,7 @@ impl TcpListenerManager {
metrics: Arc<MetricsCollector>, metrics: Arc<MetricsCollector>,
route_id: Option<&str>, route_id: Option<&str>,
conn_config: &ConnectionConfig, conn_config: &ConnectionConfig,
cancel: CancellationToken,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Connect to backend over TLS with timeout // Connect to backend over TLS with timeout
let backend_tls = match tokio::time::timeout( let backend_tls = match tokio::time::timeout(
@@ -1220,7 +1305,7 @@ impl TcpListenerManager {
let (_bytes_in, _bytes_out) = Self::forward_bidirectional_split_with_timeouts( let (_bytes_in, _bytes_out) = Self::forward_bidirectional_split_with_timeouts(
client_read, client_write, backend_read, backend_write, client_read, client_write, backend_read, backend_write,
inactivity_timeout, max_lifetime, inactivity_timeout, max_lifetime, cancel,
Some(forwarder::ForwardMetricsCtx { Some(forwarder::ForwardMetricsCtx {
collector: metrics, collector: metrics,
route_id: route_id.map(|s| s.to_string()), route_id: route_id.map(|s| s.to_string()),
@@ -1295,6 +1380,7 @@ impl TcpListenerManager {
mut backend_write: W2, mut backend_write: W2,
inactivity_timeout: std::time::Duration, inactivity_timeout: std::time::Duration,
max_lifetime: std::time::Duration, max_lifetime: std::time::Duration,
cancel: CancellationToken,
metrics: Option<forwarder::ForwardMetricsCtx>, metrics: Option<forwarder::ForwardMetricsCtx>,
) -> (u64, u64) ) -> (u64, u64)
where where
@@ -1362,7 +1448,7 @@ impl TcpListenerManager {
total total
}); });
// Watchdog task: check for inactivity and max lifetime // Watchdog task: check for inactivity, max lifetime, and cancellation
let la_watch = Arc::clone(&last_activity); let la_watch = Arc::clone(&last_activity);
let c2b_handle = c2b.abort_handle(); let c2b_handle = c2b.abort_handle();
let b2c_handle = b2c.abort_handle(); let b2c_handle = b2c.abort_handle();
@@ -1370,8 +1456,14 @@ impl TcpListenerManager {
let check_interval = std::time::Duration::from_secs(5); let check_interval = std::time::Duration::from_secs(5);
let mut last_seen = 0u64; let mut last_seen = 0u64;
loop { loop {
tokio::time::sleep(check_interval).await; tokio::select! {
_ = cancel.cancelled() => {
debug!("Split-stream connection cancelled by shutdown");
c2b_handle.abort();
b2c_handle.abort();
break;
}
_ = tokio::time::sleep(check_interval) => {
// Check max lifetime // Check max lifetime
if start.elapsed() >= max_lifetime { if start.elapsed() >= max_lifetime {
debug!("Connection exceeded max lifetime, closing"); debug!("Connection exceeded max lifetime, closing");
@@ -1394,6 +1486,8 @@ impl TcpListenerManager {
} }
last_seen = current; last_seen = current;
} }
}
}
}); });
let bytes_in = c2b.await.unwrap_or(0); let bytes_in = c2b.await.unwrap_or(0);

View File

@@ -98,10 +98,24 @@ pub fn build_shared_tls_acceptor(resolver: CertResolver) -> Result<TlsAcceptor,
} }
/// Build a TLS acceptor from PEM-encoded cert and key data. /// Build a TLS acceptor from PEM-encoded cert and key data.
/// Advertises both h2 and http/1.1 via ALPN (for client-facing connections).
pub fn build_tls_acceptor(cert_pem: &str, key_pem: &str) -> Result<TlsAcceptor, Box<dyn std::error::Error + Send + Sync>> { pub fn build_tls_acceptor(cert_pem: &str, key_pem: &str) -> Result<TlsAcceptor, Box<dyn std::error::Error + Send + Sync>> {
build_tls_acceptor_with_config(cert_pem, key_pem, None) build_tls_acceptor_with_config(cert_pem, key_pem, None)
} }
/// Build a TLS acceptor for backend servers that only speak HTTP/1.1.
/// Does NOT advertise h2 in ALPN, preventing false h2 auto-detection.
pub fn build_tls_acceptor_h1_only(cert_pem: &str, key_pem: &str) -> Result<TlsAcceptor, Box<dyn std::error::Error + Send + Sync>> {
ensure_crypto_provider();
let certs = load_certs(cert_pem)?;
let key = load_private_key(key_pem)?;
let mut config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)?;
config.alpn_protocols = vec![b"http/1.1".to_vec()];
Ok(TlsAcceptor::from(Arc::new(config)))
}
/// Build a TLS acceptor with optional RouteTls configuration for version/cipher tuning. /// Build a TLS acceptor with optional RouteTls configuration for version/cipher tuning.
pub fn build_tls_acceptor_with_config( pub fn build_tls_acceptor_with_config(
cert_pem: &str, cert_pem: &str,
@@ -204,6 +218,25 @@ pub fn shared_backend_tls_config() -> Arc<rustls::ClientConfig> {
}).clone() }).clone()
} }
/// Get or create a shared backend TLS `ClientConfig` with ALPN `h2` + `http/1.1`.
///
/// Used for auto-detection mode: the backend server picks its preferred protocol
/// via ALPN, and the proxy reads the negotiated result to decide h1 vs h2 forwarding.
static SHARED_CLIENT_CONFIG_ALPN: OnceLock<Arc<rustls::ClientConfig>> = OnceLock::new();
pub fn shared_backend_tls_config_alpn() -> Arc<rustls::ClientConfig> {
SHARED_CLIENT_CONFIG_ALPN.get_or_init(|| {
ensure_crypto_provider();
let mut config = rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(Arc::new(InsecureVerifier))
.with_no_client_auth();
config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
info!("Built shared backend TLS client config with ALPN h2+http/1.1 for auto-detection");
Arc::new(config)
}).clone()
}
/// Connect to a backend with TLS (for terminate-and-reencrypt mode). /// Connect to a backend with TLS (for terminate-and-reencrypt mode).
/// Uses the shared backend TLS config for session resumption. /// Uses the shared backend TLS config for session resumption.
pub async fn connect_tls( pub async fn connect_tls(

View File

@@ -39,6 +39,7 @@ hyper = { workspace = true }
hyper-util = { workspace = true } hyper-util = { workspace = true }
http-body-util = { workspace = true } http-body-util = { workspace = true }
bytes = { workspace = true } bytes = { workspace = true }
mimalloc = { workspace = true }
[dev-dependencies] [dev-dependencies]
rcgen = { workspace = true } rcgen = { workspace = true }

View File

@@ -610,6 +610,8 @@ impl RustProxy {
// Update listener manager // Update listener manager
if let Some(ref mut listener) = self.listener_manager { if let Some(ref mut listener) = self.listener_manager {
listener.update_route_manager(Arc::clone(&new_manager)); listener.update_route_manager(Arc::clone(&new_manager));
// Cancel connections on routes that were removed or disabled
listener.invalidate_removed_routes(&active_route_ids);
// Prune HTTP proxy caches (rate limiters, regex cache, round-robin counters) // Prune HTTP proxy caches (rate limiters, regex cache, round-robin counters)
listener.prune_http_proxy_caches(&active_route_ids); listener.prune_http_proxy_caches(&active_route_ids);

View File

@@ -1,3 +1,6 @@
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
use clap::Parser; use clap::Parser;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
use anyhow::Result; use anyhow::Result;

View File

@@ -195,7 +195,10 @@ pub async fn start_tls_http_backend(
) -> JoinHandle<()> { ) -> JoinHandle<()> {
use std::sync::Arc; use std::sync::Arc;
let acceptor = rustproxy_passthrough::build_tls_acceptor(cert_pem, key_pem) // Use h1-only acceptor: test backends speak raw HTTP/1.1 text,
// so they must NOT advertise h2 via ALPN (which would cause
// auto-detect to attempt h2 binary framing and fail).
let acceptor = rustproxy_passthrough::build_tls_acceptor_h1_only(cert_pem, key_pem)
.expect("Failed to build TLS acceptor"); .expect("Failed to build TLS acceptor");
let acceptor = Arc::new(acceptor); let acceptor = Arc::new(acceptor);
let name = backend_name.to_string(); let name = backend_name.to_string();

View File

@@ -7,10 +7,15 @@
import { expect, tap } from '@git.zone/tstest/tapbundle'; import { expect, tap } from '@git.zone/tstest/tapbundle';
import { SmartProxy } from '../ts/proxies/smart-proxy/index.js'; import { SmartProxy } from '../ts/proxies/smart-proxy/index.js';
import type { IRouteConfig } from '../ts/proxies/smart-proxy/models/route-types.js'; import type { IRouteConfig } from '../ts/proxies/smart-proxy/models/route-types.js';
import { findFreePorts } from './helpers/port-allocator.js';
// Use unique high ports for each test to avoid conflicts let testPorts: number[];
let testPort = 20000; let portIndex = 0;
const getNextPort = () => testPort++; const getNextPort = () => testPorts[portIndex++];
tap.test('setup - allocate ports', async () => {
testPorts = await findFreePorts(16);
});
// --------------------------------- Single Route, No Domain Restriction --------------------------------- // --------------------------------- Single Route, No Domain Restriction ---------------------------------

View File

@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartproxy', name: '@push.rocks/smartproxy',
version: '25.7.10', version: '25.9.2',
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
} }

View File

@@ -354,17 +354,17 @@ export class LogDeduplicator {
// Global instance for connection-related log deduplication // Global instance for connection-related log deduplication
export const connectionLogDeduplicator = new LogDeduplicator(5000); // 5 second batches export const connectionLogDeduplicator = new LogDeduplicator(5000); // 5 second batches
// Ensure logs are flushed on process exit // Ensure logs are flushed on process exit.
// Only use beforeExit — do NOT call process.exit() from SIGINT/SIGTERM handlers
// as that kills the host process's graceful shutdown (e.g., dcrouter connection draining).
process.on('beforeExit', () => { process.on('beforeExit', () => {
connectionLogDeduplicator.flushAll(); connectionLogDeduplicator.flushAll();
}); });
process.on('SIGINT', () => { process.on('SIGINT', () => {
connectionLogDeduplicator.cleanup(); connectionLogDeduplicator.cleanup();
process.exit(0);
}); });
process.on('SIGTERM', () => { process.on('SIGTERM', () => {
connectionLogDeduplicator.cleanup(); connectionLogDeduplicator.cleanup();
process.exit(0);
}); });

View File

@@ -18,7 +18,7 @@ export class ProtocolDetector {
private fragmentManager: DetectionFragmentManager; private fragmentManager: DetectionFragmentManager;
private tlsDetector: TlsDetector; private tlsDetector: TlsDetector;
private httpDetector: HttpDetector; private httpDetector: HttpDetector;
private connectionProtocols: Map<string, 'tls' | 'http'> = new Map(); private connectionProtocols: Map<string, { protocol: 'tls' | 'http'; createdAt: number }> = new Map();
constructor() { constructor() {
this.fragmentManager = new DetectionFragmentManager(); this.fragmentManager = new DetectionFragmentManager();
@@ -124,7 +124,8 @@ export class ProtocolDetector {
const connectionId = DetectionFragmentManager.createConnectionId(context); const connectionId = DetectionFragmentManager.createConnectionId(context);
// Check if we already know the protocol for this connection // Check if we already know the protocol for this connection
const knownProtocol = this.connectionProtocols.get(connectionId); const knownEntry = this.connectionProtocols.get(connectionId);
const knownProtocol = knownEntry?.protocol;
if (knownProtocol === 'http') { if (knownProtocol === 'http') {
const result = this.httpDetector.detectWithContext(buffer, context, options); const result = this.httpDetector.detectWithContext(buffer, context, options);
@@ -163,7 +164,7 @@ export class ProtocolDetector {
if (!knownProtocol) { if (!knownProtocol) {
// First peek to determine protocol type // First peek to determine protocol type
if (this.tlsDetector.canHandle(buffer)) { if (this.tlsDetector.canHandle(buffer)) {
this.connectionProtocols.set(connectionId, 'tls'); this.connectionProtocols.set(connectionId, { protocol: 'tls', createdAt: Date.now() });
// Handle TLS with fragment accumulation // Handle TLS with fragment accumulation
const handler = this.fragmentManager.getHandler('tls'); const handler = this.fragmentManager.getHandler('tls');
const fragmentResult = handler.addFragment(connectionId, buffer); const fragmentResult = handler.addFragment(connectionId, buffer);
@@ -189,7 +190,7 @@ export class ProtocolDetector {
} }
if (this.httpDetector.canHandle(buffer)) { if (this.httpDetector.canHandle(buffer)) {
this.connectionProtocols.set(connectionId, 'http'); this.connectionProtocols.set(connectionId, { protocol: 'http', createdAt: Date.now() });
const result = this.httpDetector.detectWithContext(buffer, context, options); const result = this.httpDetector.detectWithContext(buffer, context, options);
if (result) { if (result) {
if (result.isComplete) { if (result.isComplete) {
@@ -221,6 +222,14 @@ export class ProtocolDetector {
private cleanupInstance(): void { private cleanupInstance(): void {
this.fragmentManager.cleanup(); this.fragmentManager.cleanup();
// Remove stale connectionProtocols entries (abandoned handshakes, port scanners)
const maxAge = 30_000; // 30 seconds
const now = Date.now();
for (const [id, entry] of this.connectionProtocols) {
if (now - entry.createdAt > maxAge) {
this.connectionProtocols.delete(id);
}
}
} }
/** /**
@@ -242,8 +251,7 @@ export class ProtocolDetector {
* @param _maxAge Maximum age in milliseconds (default: 30 seconds) * @param _maxAge Maximum age in milliseconds (default: 30 seconds)
*/ */
static cleanupConnections(_maxAge: number = 30000): void { static cleanupConnections(_maxAge: number = 30000): void {
// Cleanup is now handled internally by the fragment manager this.getInstance().cleanupInstance();
this.getInstance().fragmentManager.cleanup();
} }
/** /**

View File

@@ -112,12 +112,12 @@ export interface ISmartProxyOptions {
maxVersion?: string; maxVersion?: string;
// Timeout settings // Timeout settings
connectionTimeout?: number; // Timeout for establishing connection to backend (ms), default: 30000 (30s) connectionTimeout?: number; // Timeout for establishing connection to backend (ms), default: 60000 (60s)
initialDataTimeout?: number; // Timeout for initial data/SNI (ms), default: 60000 (60s) initialDataTimeout?: number; // Timeout for initial data/SNI (ms), default: 60000 (60s)
socketTimeout?: number; // Socket inactivity timeout (ms), default: 3600000 (1h) socketTimeout?: number; // Socket inactivity timeout (ms), default: 60000 (60s)
inactivityCheckInterval?: number; // How often to check for inactive connections (ms), default: 60000 (60s) inactivityCheckInterval?: number; // How often to check for inactive connections (ms), default: 60000 (60s)
maxConnectionLifetime?: number; // Default max connection lifetime (ms), default: 86400000 (24h) maxConnectionLifetime?: number; // Max connection lifetime (ms), default: 3600000 (1h)
inactivityTimeout?: number; // Inactivity timeout (ms), default: 14400000 (4h) inactivityTimeout?: number; // Inactivity timeout (ms), default: 75000 (75s)
gracefulShutdownTimeout?: number; // (ms) maximum time to wait for connections to close during shutdown gracefulShutdownTimeout?: number; // (ms) maximum time to wait for connections to close during shutdown

View File

@@ -262,7 +262,7 @@ export interface IRouteAction {
// Additional options for backend-specific settings // Additional options for backend-specific settings
options?: { options?: {
backendProtocol?: 'http1' | 'http2'; backendProtocol?: 'http1' | 'http2' | 'auto';
[key: string]: any; [key: string]: any;
}; };

View File

@@ -47,16 +47,16 @@ export class SmartProxy extends plugins.EventEmitter {
// Apply defaults // Apply defaults
this.settings = { this.settings = {
...settingsArg, ...settingsArg,
initialDataTimeout: settingsArg.initialDataTimeout || 120000, initialDataTimeout: settingsArg.initialDataTimeout || 60_000,
socketTimeout: settingsArg.socketTimeout || 3600000, socketTimeout: settingsArg.socketTimeout || 60_000,
maxConnectionLifetime: settingsArg.maxConnectionLifetime || 86400000, maxConnectionLifetime: settingsArg.maxConnectionLifetime || 3_600_000,
inactivityTimeout: settingsArg.inactivityTimeout || 14400000, inactivityTimeout: settingsArg.inactivityTimeout || 75_000,
gracefulShutdownTimeout: settingsArg.gracefulShutdownTimeout || 30000, gracefulShutdownTimeout: settingsArg.gracefulShutdownTimeout || 30_000,
maxConnectionsPerIP: settingsArg.maxConnectionsPerIP || 100, maxConnectionsPerIP: settingsArg.maxConnectionsPerIP || 100,
connectionRateLimitPerMinute: settingsArg.connectionRateLimitPerMinute || 300, connectionRateLimitPerMinute: settingsArg.connectionRateLimitPerMinute || 300,
keepAliveTreatment: settingsArg.keepAliveTreatment || 'extended', keepAliveTreatment: settingsArg.keepAliveTreatment || 'standard',
keepAliveInactivityMultiplier: settingsArg.keepAliveInactivityMultiplier || 6, keepAliveInactivityMultiplier: settingsArg.keepAliveInactivityMultiplier || 4,
extendedKeepAliveLifetime: settingsArg.extendedKeepAliveLifetime || 7 * 24 * 60 * 60 * 1000, extendedKeepAliveLifetime: settingsArg.extendedKeepAliveLifetime || 3_600_000,
}; };
// Normalize ACME options // Normalize ACME options

View File

@@ -92,6 +92,16 @@ export class SocketHandlerServer {
let metadataBuffer = ''; let metadataBuffer = '';
let metadataParsed = false; let metadataParsed = false;
// 10s timeout for metadata parsing phase — if Rust connects but never
// sends the JSON metadata line, don't hold the socket open indefinitely.
socket.setTimeout(10_000);
socket.on('timeout', () => {
if (!metadataParsed) {
logger.log('warn', 'Socket handler metadata timeout, closing', { component: 'socket-handler-server' });
socket.destroy();
}
});
const onData = (chunk: Buffer) => { const onData = (chunk: Buffer) => {
if (metadataParsed) return; if (metadataParsed) return;
@@ -108,6 +118,7 @@ export class SocketHandlerServer {
} }
metadataParsed = true; metadataParsed = true;
socket.setTimeout(0); // Clear metadata timeout
socket.removeListener('data', onData); socket.removeListener('data', onData);
socket.pause(); // Prevent data loss between handler removal and pipe setup socket.pause(); // Prevent data loss between handler removal and pipe setup
@@ -254,11 +265,30 @@ export class SocketHandlerServer {
// Connect to the resolved target // Connect to the resolved target
const backend = plugins.net.connect(port, host, () => { const backend = plugins.net.connect(port, host, () => {
// Connection established — set idle timeout on both sides (5 min)
socket.setTimeout(300_000);
backend.setTimeout(300_000);
// Pipe bidirectionally // Pipe bidirectionally
socket.pipe(backend); socket.pipe(backend);
backend.pipe(socket); backend.pipe(socket);
}); });
// Connect timeout: if backend doesn't connect within 30s, destroy both
backend.setTimeout(30_000);
backend.on('timeout', () => {
logger.log('warn', `Dynamic forward timeout to ${host}:${port}`, { component: 'socket-handler-server' });
backend.destroy();
socket.destroy();
});
socket.on('timeout', () => {
logger.log('debug', `Dynamic forward client idle timeout`, { component: 'socket-handler-server' });
socket.destroy();
backend.destroy();
});
backend.on('error', (err) => { backend.on('error', (err) => {
logger.log('error', `Dynamic forward backend error: ${err.message}`, { component: 'socket-handler-server' }); logger.log('error', `Dynamic forward backend error: ${err.message}`, { component: 'socket-handler-server' });
socket.destroy(); socket.destroy();