fix(proxy): improve proxy robustness: add connect timeouts, graceful shutdown, WebSocket watchdog, and metrics guard
This commit is contained in:
1
rust/Cargo.lock
generated
1
rust/Cargo.lock
generated
@@ -971,6 +971,7 @@ dependencies = [
|
||||
"rustproxy-security",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
|
||||
@@ -22,3 +22,4 @@ thiserror = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
arc-swap = { workspace = true }
|
||||
dashmap = { workspace = true }
|
||||
tokio-util = { workspace = true }
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
use bytes::Bytes;
|
||||
use http_body_util::{BodyExt, Full, combinators::BoxBody};
|
||||
@@ -14,6 +15,7 @@ use hyper::{Request, Response, StatusCode};
|
||||
use hyper_util::rt::TokioIo;
|
||||
use regex::Regex;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use rustproxy_routing::RouteManager;
|
||||
@@ -23,11 +25,22 @@ use crate::request_filter::RequestFilter;
|
||||
use crate::response_filter::ResponseFilter;
|
||||
use crate::upstream_selector::UpstreamSelector;
|
||||
|
||||
/// Default upstream connect timeout (30 seconds).
|
||||
const DEFAULT_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
|
||||
|
||||
/// Default WebSocket inactivity timeout (1 hour).
|
||||
const DEFAULT_WS_INACTIVITY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3600);
|
||||
|
||||
/// Default WebSocket max lifetime (24 hours).
|
||||
const DEFAULT_WS_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(86400);
|
||||
|
||||
/// HTTP proxy service that processes HTTP traffic.
|
||||
pub struct HttpProxyService {
|
||||
route_manager: Arc<RouteManager>,
|
||||
metrics: Arc<MetricsCollector>,
|
||||
upstream_selector: UpstreamSelector,
|
||||
/// Timeout for connecting to upstream backends.
|
||||
connect_timeout: std::time::Duration,
|
||||
}
|
||||
|
||||
impl HttpProxyService {
|
||||
@@ -36,6 +49,21 @@ impl HttpProxyService {
|
||||
route_manager,
|
||||
metrics,
|
||||
upstream_selector: UpstreamSelector::new(),
|
||||
connect_timeout: DEFAULT_CONNECT_TIMEOUT,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create with a custom connect timeout.
|
||||
pub fn with_connect_timeout(
|
||||
route_manager: Arc<RouteManager>,
|
||||
metrics: Arc<MetricsCollector>,
|
||||
connect_timeout: std::time::Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
route_manager,
|
||||
metrics,
|
||||
upstream_selector: UpstreamSelector::new(),
|
||||
connect_timeout,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,41 +73,59 @@ impl HttpProxyService {
|
||||
stream: TcpStream,
|
||||
peer_addr: std::net::SocketAddr,
|
||||
port: u16,
|
||||
cancel: CancellationToken,
|
||||
) {
|
||||
self.handle_io(stream, peer_addr, port).await;
|
||||
self.handle_io(stream, peer_addr, port, cancel).await;
|
||||
}
|
||||
|
||||
/// Handle an incoming HTTP connection on any IO type (plain TCP or TLS-terminated).
|
||||
///
|
||||
/// Uses HTTP/1.1 with upgrade support. For clients that negotiate HTTP/2,
|
||||
/// use `handle_io_auto` instead.
|
||||
/// Uses HTTP/1.1 with upgrade support. Responds to graceful shutdown via the
|
||||
/// cancel token — in-flight requests complete, but no new requests are accepted.
|
||||
pub async fn handle_io<I>(
|
||||
self: Arc<Self>,
|
||||
stream: I,
|
||||
peer_addr: std::net::SocketAddr,
|
||||
port: u16,
|
||||
cancel: CancellationToken,
|
||||
)
|
||||
where
|
||||
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
let io = TokioIo::new(stream);
|
||||
|
||||
let cancel_inner = cancel.clone();
|
||||
let service = hyper::service::service_fn(move |req: Request<Incoming>| {
|
||||
let svc = Arc::clone(&self);
|
||||
let peer = peer_addr;
|
||||
let cn = cancel_inner.clone();
|
||||
async move {
|
||||
svc.handle_request(req, peer, port).await
|
||||
svc.handle_request(req, peer, port, cn).await
|
||||
}
|
||||
});
|
||||
|
||||
// Use http1::Builder with upgrades for WebSocket support
|
||||
let conn = hyper::server::conn::http1::Builder::new()
|
||||
let mut conn = hyper::server::conn::http1::Builder::new()
|
||||
.keep_alive(true)
|
||||
.serve_connection(io, service)
|
||||
.with_upgrades();
|
||||
|
||||
if let Err(e) = conn.await {
|
||||
debug!("HTTP connection error from {}: {}", peer_addr, e);
|
||||
// Use select to support graceful shutdown via cancellation token
|
||||
let conn_pin = std::pin::Pin::new(&mut conn);
|
||||
tokio::select! {
|
||||
result = conn_pin => {
|
||||
if let Err(e) = result {
|
||||
debug!("HTTP connection error from {}: {}", peer_addr, e);
|
||||
}
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
// Graceful shutdown: let in-flight request finish, stop accepting new ones
|
||||
let conn_pin = std::pin::Pin::new(&mut conn);
|
||||
conn_pin.graceful_shutdown();
|
||||
if let Err(e) = conn.await {
|
||||
debug!("HTTP connection error during shutdown from {}: {}", peer_addr, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,6 +135,7 @@ impl HttpProxyService {
|
||||
req: Request<Incoming>,
|
||||
peer_addr: std::net::SocketAddr,
|
||||
port: u16,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let host = req.headers()
|
||||
.get("host")
|
||||
@@ -184,7 +231,7 @@ impl HttpProxyService {
|
||||
|
||||
if is_websocket {
|
||||
let result = self.handle_websocket_upgrade(
|
||||
req, peer_addr, &upstream, route_match.route, route_id, &upstream_key,
|
||||
req, peer_addr, &upstream, route_match.route, route_id, &upstream_key, cancel,
|
||||
).await;
|
||||
// Note: for WebSocket, connection_ended is called inside
|
||||
// the spawned tunnel task when the connection closes.
|
||||
@@ -223,15 +270,24 @@ impl HttpProxyService {
|
||||
}
|
||||
}
|
||||
|
||||
// Connect to upstream
|
||||
let upstream_stream = match TcpStream::connect(format!("{}:{}", upstream.host, upstream.port)).await {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
// Connect to upstream with timeout
|
||||
let upstream_stream = match tokio::time::timeout(
|
||||
self.connect_timeout,
|
||||
TcpStream::connect(format!("{}:{}", upstream.host, upstream.port)),
|
||||
).await {
|
||||
Ok(Ok(s)) => s,
|
||||
Ok(Err(e)) => {
|
||||
error!("Failed to connect to upstream {}:{}: {}", upstream.host, upstream.port, e);
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
self.metrics.connection_closed(route_id);
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable"));
|
||||
}
|
||||
Err(_) => {
|
||||
error!("Upstream connect timeout for {}:{}", upstream.host, upstream.port);
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
self.metrics.connection_closed(route_id);
|
||||
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout"));
|
||||
}
|
||||
};
|
||||
upstream_stream.set_nodelay(true).ok();
|
||||
|
||||
@@ -394,6 +450,7 @@ impl HttpProxyService {
|
||||
route: &rustproxy_config::RouteConfig,
|
||||
route_id: Option<&str>,
|
||||
upstream_key: &str,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
@@ -417,16 +474,24 @@ impl HttpProxyService {
|
||||
|
||||
info!("WebSocket upgrade from {} -> {}:{}", peer_addr, upstream.host, upstream.port);
|
||||
|
||||
let mut upstream_stream = match TcpStream::connect(
|
||||
format!("{}:{}", upstream.host, upstream.port)
|
||||
// Connect to upstream with timeout
|
||||
let mut upstream_stream = match tokio::time::timeout(
|
||||
self.connect_timeout,
|
||||
TcpStream::connect(format!("{}:{}", upstream.host, upstream.port)),
|
||||
).await {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
Ok(Ok(s)) => s,
|
||||
Ok(Err(e)) => {
|
||||
error!("WebSocket: failed to connect upstream {}:{}: {}", upstream.host, upstream.port, e);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id);
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable"));
|
||||
}
|
||||
Err(_) => {
|
||||
error!("WebSocket: upstream connect timeout for {}:{}", upstream.host, upstream.port);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id);
|
||||
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout"));
|
||||
}
|
||||
};
|
||||
upstream_stream.set_nodelay(true).ok();
|
||||
|
||||
@@ -591,6 +656,11 @@ impl HttpProxyService {
|
||||
let (mut cr, mut cw) = tokio::io::split(client_io);
|
||||
let (mut ur, mut uw) = tokio::io::split(upstream_stream);
|
||||
|
||||
// Shared activity tracker for the watchdog
|
||||
let last_activity = Arc::new(AtomicU64::new(0));
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
let la1 = Arc::clone(&last_activity);
|
||||
let c2u = tokio::spawn(async move {
|
||||
let mut buf = vec![0u8; 65536];
|
||||
let mut total = 0u64;
|
||||
@@ -603,11 +673,13 @@ impl HttpProxyService {
|
||||
break;
|
||||
}
|
||||
total += n as u64;
|
||||
la1.store(start.elapsed().as_millis() as u64, Ordering::Relaxed);
|
||||
}
|
||||
let _ = uw.shutdown().await;
|
||||
total
|
||||
});
|
||||
|
||||
let la2 = Arc::clone(&last_activity);
|
||||
let u2c = tokio::spawn(async move {
|
||||
let mut buf = vec![0u8; 65536];
|
||||
let mut total = 0u64;
|
||||
@@ -620,13 +692,59 @@ impl HttpProxyService {
|
||||
break;
|
||||
}
|
||||
total += n as u64;
|
||||
la2.store(start.elapsed().as_millis() as u64, Ordering::Relaxed);
|
||||
}
|
||||
let _ = cw.shutdown().await;
|
||||
total
|
||||
});
|
||||
|
||||
// Watchdog: monitors inactivity, max lifetime, and cancellation
|
||||
let la_watch = Arc::clone(&last_activity);
|
||||
let c2u_handle = c2u.abort_handle();
|
||||
let u2c_handle = u2c.abort_handle();
|
||||
let inactivity_timeout = DEFAULT_WS_INACTIVITY_TIMEOUT;
|
||||
let max_lifetime = DEFAULT_WS_MAX_LIFETIME;
|
||||
|
||||
let watchdog = tokio::spawn(async move {
|
||||
let check_interval = std::time::Duration::from_secs(5);
|
||||
let mut last_seen = 0u64;
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(check_interval) => {}
|
||||
_ = cancel.cancelled() => {
|
||||
debug!("WebSocket tunnel cancelled by shutdown");
|
||||
c2u_handle.abort();
|
||||
u2c_handle.abort();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Check max lifetime
|
||||
if start.elapsed() >= max_lifetime {
|
||||
debug!("WebSocket tunnel exceeded max lifetime, closing");
|
||||
c2u_handle.abort();
|
||||
u2c_handle.abort();
|
||||
break;
|
||||
}
|
||||
|
||||
// Check inactivity
|
||||
let current = la_watch.load(Ordering::Relaxed);
|
||||
if current == last_seen {
|
||||
let elapsed_since_activity = start.elapsed().as_millis() as u64 - current;
|
||||
if elapsed_since_activity >= inactivity_timeout.as_millis() as u64 {
|
||||
debug!("WebSocket tunnel inactive for {}ms, closing", elapsed_since_activity);
|
||||
c2u_handle.abort();
|
||||
u2c_handle.abort();
|
||||
break;
|
||||
}
|
||||
}
|
||||
last_seen = current;
|
||||
}
|
||||
});
|
||||
|
||||
let bytes_in = c2u.await.unwrap_or(0);
|
||||
let bytes_out = u2c.await.unwrap_or(0);
|
||||
watchdog.abort();
|
||||
|
||||
debug!("WebSocket tunnel closed: {} bytes in, {} bytes out", bytes_in, bytes_out);
|
||||
|
||||
@@ -812,6 +930,7 @@ impl Default for HttpProxyService {
|
||||
route_manager: Arc::new(RouteManager::new(vec![])),
|
||||
metrics: Arc::new(MetricsCollector::new()),
|
||||
upstream_selector: UpstreamSelector::new(),
|
||||
connect_timeout: DEFAULT_CONNECT_TIMEOUT,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,38 @@ use crate::forwarder;
|
||||
use crate::tls_handler;
|
||||
use crate::connection_tracker::ConnectionTracker;
|
||||
|
||||
/// RAII guard that decrements the active connection metric on drop.
|
||||
/// Ensures connection_closed is called on ALL exit paths — normal, error, or panic.
|
||||
struct ConnectionGuard {
|
||||
metrics: Arc<MetricsCollector>,
|
||||
route_id: Option<String>,
|
||||
disarmed: bool,
|
||||
}
|
||||
|
||||
impl ConnectionGuard {
|
||||
fn new(metrics: Arc<MetricsCollector>, route_id: Option<&str>) -> Self {
|
||||
Self {
|
||||
metrics,
|
||||
route_id: route_id.map(|s| s.to_string()),
|
||||
disarmed: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Disarm the guard — prevents the Drop from running.
|
||||
/// Use when handing off to a path that manages its own cleanup (e.g., HTTP proxy).
|
||||
fn disarm(mut self) {
|
||||
self.disarmed = true;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ConnectionGuard {
|
||||
fn drop(&mut self) {
|
||||
if !self.disarmed {
|
||||
self.metrics.connection_closed(self.route_id.as_deref());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ListenerError {
|
||||
#[error("Failed to bind port {port}: {source}")]
|
||||
@@ -105,11 +137,12 @@ pub struct TcpListenerManager {
|
||||
impl TcpListenerManager {
|
||||
pub fn new(route_manager: Arc<RouteManager>) -> Self {
|
||||
let metrics = Arc::new(MetricsCollector::new());
|
||||
let http_proxy = Arc::new(HttpProxyService::new(
|
||||
let conn_config = ConnectionConfig::default();
|
||||
let http_proxy = Arc::new(HttpProxyService::with_connect_timeout(
|
||||
Arc::clone(&route_manager),
|
||||
Arc::clone(&metrics),
|
||||
std::time::Duration::from_millis(conn_config.connection_timeout_ms),
|
||||
));
|
||||
let conn_config = ConnectionConfig::default();
|
||||
let conn_tracker = Arc::new(ConnectionTracker::new(
|
||||
conn_config.max_connections_per_ip,
|
||||
conn_config.connection_rate_limit_per_minute,
|
||||
@@ -129,11 +162,12 @@ impl TcpListenerManager {
|
||||
|
||||
/// Create with a metrics collector.
|
||||
pub fn with_metrics(route_manager: Arc<RouteManager>, metrics: Arc<MetricsCollector>) -> Self {
|
||||
let http_proxy = Arc::new(HttpProxyService::new(
|
||||
let conn_config = ConnectionConfig::default();
|
||||
let http_proxy = Arc::new(HttpProxyService::with_connect_timeout(
|
||||
Arc::clone(&route_manager),
|
||||
Arc::clone(&metrics),
|
||||
std::time::Duration::from_millis(conn_config.connection_timeout_ms),
|
||||
));
|
||||
let conn_config = ConnectionConfig::default();
|
||||
let conn_tracker = Arc::new(ConnectionTracker::new(
|
||||
conn_config.max_connections_per_ip,
|
||||
conn_config.connection_rate_limit_per_minute,
|
||||
@@ -427,6 +461,7 @@ impl TcpListenerManager {
|
||||
}
|
||||
|
||||
metrics.connection_opened(route_id);
|
||||
let _fast_guard = ConnectionGuard::new(Arc::clone(&metrics), route_id);
|
||||
|
||||
let connect_timeout = std::time::Duration::from_millis(conn_config.connection_timeout_ms);
|
||||
let inactivity_timeout = std::time::Duration::from_millis(conn_config.socket_timeout_ms);
|
||||
@@ -442,14 +477,8 @@ impl TcpListenerManager {
|
||||
tokio::net::TcpStream::connect(format!("{}:{}", target_host, target_port)),
|
||||
).await {
|
||||
Ok(Ok(s)) => s,
|
||||
Ok(Err(e)) => {
|
||||
metrics.connection_closed(route_id);
|
||||
return Err(e.into());
|
||||
}
|
||||
Err(_) => {
|
||||
metrics.connection_closed(route_id);
|
||||
return Err("Backend connection timeout".into());
|
||||
}
|
||||
Ok(Err(e)) => return Err(e.into()),
|
||||
Err(_) => return Err("Backend connection timeout".into()),
|
||||
};
|
||||
backend.set_nodelay(true)?;
|
||||
|
||||
@@ -480,7 +509,6 @@ impl TcpListenerManager {
|
||||
metrics.record_bytes(bytes_in, bytes_out, route_id);
|
||||
}
|
||||
|
||||
metrics.connection_closed(route_id);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
@@ -617,8 +645,9 @@ impl TcpListenerManager {
|
||||
}
|
||||
}
|
||||
|
||||
// Track connection in metrics
|
||||
// Track connection in metrics — guard ensures connection_closed on all exit paths
|
||||
metrics.connection_opened(route_id);
|
||||
let _conn_guard = ConnectionGuard::new(Arc::clone(&metrics), route_id);
|
||||
|
||||
// Check if this is a socket-handler route that should be relayed to TypeScript
|
||||
if route_match.route.action.action_type == RouteActionType::SocketHandler {
|
||||
@@ -628,16 +657,13 @@ impl TcpListenerManager {
|
||||
};
|
||||
|
||||
if let Some(relay_socket_path) = relay_path {
|
||||
let result = Self::relay_to_socket_handler(
|
||||
return Self::relay_to_socket_handler(
|
||||
stream, n, port, peer_addr,
|
||||
&route_match, domain.as_deref(), is_tls,
|
||||
&relay_socket_path,
|
||||
).await;
|
||||
metrics.connection_closed(route_id);
|
||||
return result;
|
||||
} else {
|
||||
debug!("Socket-handler route matched but no relay path configured");
|
||||
metrics.connection_closed(route_id);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
@@ -646,7 +672,6 @@ impl TcpListenerManager {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
debug!("Route matched but no target available");
|
||||
metrics.connection_closed(route_id);
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
@@ -765,7 +790,9 @@ impl TcpListenerManager {
|
||||
"TLS Terminate + HTTP: {} -> {}:{} (domain: {:?})",
|
||||
peer_addr, target_host, target_port, domain
|
||||
);
|
||||
http_proxy.handle_io(buf_stream, peer_addr, port).await;
|
||||
// HTTP proxy manages its own per-request metrics — disarm TCP-level guard
|
||||
_conn_guard.disarm();
|
||||
http_proxy.handle_io(buf_stream, peer_addr, port, cancel.clone()).await;
|
||||
} else {
|
||||
debug!(
|
||||
"TLS Terminate + TCP: {} -> {}:{} (domain: {:?})",
|
||||
@@ -805,7 +832,9 @@ impl TcpListenerManager {
|
||||
if is_http {
|
||||
// Plain HTTP - use HTTP proxy for request-level routing
|
||||
debug!("HTTP proxy: {} on port {}", peer_addr, port);
|
||||
http_proxy.handle_connection(stream, peer_addr, port).await;
|
||||
// HTTP proxy manages its own per-request metrics — disarm TCP-level guard
|
||||
_conn_guard.disarm();
|
||||
http_proxy.handle_connection(stream, peer_addr, port, cancel.clone()).await;
|
||||
Ok(())
|
||||
} else {
|
||||
// Plain TCP forwarding (non-HTTP)
|
||||
@@ -843,7 +872,7 @@ impl TcpListenerManager {
|
||||
}
|
||||
};
|
||||
|
||||
metrics.connection_closed(route_id);
|
||||
// ConnectionGuard handles metrics.connection_closed() on drop
|
||||
result
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user