fix(websocket): keep upgraded WebSocket tunnels on dedicated lifecycle timeouts

This commit is contained in:
2026-05-29 05:27:42 +00:00
parent 72be691668
commit d02b5a45d1
5 changed files with 127 additions and 150 deletions
@@ -45,6 +45,9 @@ pub struct ConnActivity {
/// increments on creation and decrements on Drop, keeping the watchdog aware that
/// a response body is still streaming after the request handler has returned.
active_requests: Option<Arc<AtomicU64>>,
/// Active upgraded tunnel counter. When set, upgraded WebSocket streams keep the
/// HTTP keep-alive/lifetime watchdog out of the tunnel lifecycle.
active_upgrades: Option<Arc<AtomicU64>>,
/// Protocol cache key for Alt-Svc discovery. When set, `build_streaming_response`
/// checks the backend's original response headers for Alt-Svc before our
/// ResponseFilter injects its own. None when not in auto-detect mode or after H3 failure.
@@ -61,6 +64,7 @@ impl ConnActivity {
last_activity: Arc::new(AtomicU64::new(0)),
start: std::time::Instant::now(),
active_requests: None,
active_upgrades: None,
alt_svc_cache_key: None,
alt_svc_request_url: None,
}
@@ -488,6 +492,7 @@ impl HttpProxyService {
// (no request in progress and none started recently).
let last_activity = Arc::new(AtomicU64::new(0));
let active_requests = Arc::new(AtomicU64::new(0));
let active_upgrades = Arc::new(AtomicU64::new(0));
let start = std::time::Instant::now();
// Connection-level frontend protocol tracker: the first request detects
@@ -498,6 +503,7 @@ impl HttpProxyService {
let la_inner = Arc::clone(&last_activity);
let ar_inner = Arc::clone(&active_requests);
let au_inner = Arc::clone(&active_upgrades);
let cancel_inner = cancel.clone();
let vpn_info = Arc::new(vpn_info);
let service = hyper::service::service_fn(move |req: Request<Incoming>| {
@@ -522,6 +528,7 @@ impl HttpProxyService {
last_activity: Arc::clone(&la_inner),
start,
active_requests: Some(Arc::clone(&ar_inner)),
active_upgrades: Some(Arc::clone(&au_inner)),
alt_svc_cache_key: None,
alt_svc_request_url: None,
};
@@ -572,8 +579,14 @@ impl HttpProxyService {
loop {
tokio::time::sleep(check_interval).await;
// Check max connection lifetime (unconditional — even active connections
// must eventually be recycled to prevent resource accumulation).
// Upgraded tunnels have their own WebSocket watchdog and lifetime.
if active_upgrades.load(Ordering::Relaxed) > 0 {
last_seen = last_activity.load(Ordering::Relaxed);
continue;
}
// Check max connection lifetime (unconditional for regular HTTP — even active
// connections must eventually be recycled to prevent resource accumulation).
if start.elapsed() >= max_lifetime {
debug!("HTTP connection exceeded max lifetime ({}s) from {}",
max_lifetime.as_secs(), peer_addr);
@@ -789,11 +802,7 @@ impl HttpProxyService {
cancel,
&ip_str,
is_h2_websocket,
if is_h2_websocket {
Some(conn_activity.clone())
} else {
None
},
Some(conn_activity.clone()),
)
.await;
// Note: for WebSocket, connection_ended is called inside
@@ -3286,8 +3295,19 @@ impl HttpProxyService {
let upstream_key_owned = upstream_key.to_string();
let ws_inactivity_timeout = self.ws_inactivity_timeout;
let ws_max_lifetime = self.ws_max_lifetime;
let ws_request_guard = conn_activity
.as_ref()
.and_then(|ca| ca.active_requests.as_ref())
.map(|counter| ActiveRequestGuard::new(Arc::clone(counter)));
let ws_upgrade_guard = conn_activity
.as_ref()
.and_then(|ca| ca.active_upgrades.as_ref())
.map(|counter| ActiveRequestGuard::new(Arc::clone(counter)));
tokio::spawn(async move {
let _ws_request_guard = ws_request_guard;
let _ws_upgrade_guard = ws_upgrade_guard;
// RAII guard: ensures connection_ended is called even if this task panics
struct WsUpstreamGuard {
selector: UpstreamSelector,
@@ -154,6 +154,9 @@ pub struct ConnectionConfig {
pub max_connections: u64,
}
const DEFAULT_WS_INACTIVITY_TIMEOUT_MS: u64 = 3_600_000;
const DEFAULT_WS_MAX_LIFETIME_MS: u64 = 86_400_000;
impl Default for ConnectionConfig {
fn default() -> Self {
Self {
@@ -225,8 +228,8 @@ impl TcpListenerManager {
http_proxy_svc.set_connection_timeouts(
std::time::Duration::from_millis(conn_config.socket_timeout_ms),
std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms),
std::time::Duration::from_millis(conn_config.socket_timeout_ms),
std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms),
std::time::Duration::from_millis(DEFAULT_WS_INACTIVITY_TIMEOUT_MS),
std::time::Duration::from_millis(DEFAULT_WS_MAX_LIFETIME_MS),
);
let http_proxy = Arc::new(http_proxy_svc);
let conn_tracker = Arc::new(ConnectionTracker::new(
@@ -266,8 +269,8 @@ impl TcpListenerManager {
http_proxy_svc.set_connection_timeouts(
std::time::Duration::from_millis(conn_config.socket_timeout_ms),
std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms),
std::time::Duration::from_millis(conn_config.socket_timeout_ms),
std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms),
std::time::Duration::from_millis(DEFAULT_WS_INACTIVITY_TIMEOUT_MS),
std::time::Duration::from_millis(DEFAULT_WS_MAX_LIFETIME_MS),
);
let http_proxy = Arc::new(http_proxy_svc);
let conn_tracker = Arc::new(ConnectionTracker::new(
@@ -313,8 +316,8 @@ impl TcpListenerManager {
http_proxy_svc.set_connection_timeouts(
std::time::Duration::from_millis(config.socket_timeout_ms),
std::time::Duration::from_millis(config.max_connection_lifetime_ms),
std::time::Duration::from_millis(config.socket_timeout_ms),
std::time::Duration::from_millis(config.max_connection_lifetime_ms),
std::time::Duration::from_millis(DEFAULT_WS_INACTIVITY_TIMEOUT_MS),
std::time::Duration::from_millis(DEFAULT_WS_MAX_LIFETIME_MS),
);
self.http_proxy = Arc::new(http_proxy_svc);