BREAKING CHANGE(ts-api,rustproxy): remove deprecated TypeScript protocol and utility exports while hardening QUIC, HTTP/3, WebSocket, and rate limiter cleanup paths
This commit is contained in:
@@ -43,6 +43,7 @@ impl H3ProxyService {
|
||||
_fallback_route: &RouteConfig,
|
||||
port: u16,
|
||||
real_client_addr: Option<SocketAddr>,
|
||||
parent_cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let remote_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
|
||||
debug!("HTTP/3 connection from {} on port {}", remote_addr, port);
|
||||
@@ -55,35 +56,44 @@ impl H3ProxyService {
|
||||
.map_err(|e| anyhow::anyhow!("H3 connection setup failed: {}", e))?;
|
||||
|
||||
loop {
|
||||
match h3_conn.accept().await {
|
||||
Ok(Some(resolver)) => {
|
||||
let (request, stream) = match resolver.resolve_request().await {
|
||||
Ok(pair) => pair,
|
||||
let resolver = tokio::select! {
|
||||
_ = parent_cancel.cancelled() => {
|
||||
debug!("HTTP/3 connection from {} cancelled by parent", remote_addr);
|
||||
break;
|
||||
}
|
||||
result = h3_conn.accept() => {
|
||||
match result {
|
||||
Ok(Some(resolver)) => resolver,
|
||||
Ok(None) => {
|
||||
debug!("HTTP/3 connection from {} closed", remote_addr);
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("HTTP/3 request resolve error: {}", e);
|
||||
continue;
|
||||
debug!("HTTP/3 accept error from {}: {}", remote_addr, e);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let http_proxy = Arc::clone(&self.http_proxy);
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = handle_h3_request(
|
||||
request, stream, port, remote_addr, &http_proxy,
|
||||
).await {
|
||||
debug!("HTTP/3 request error from {}: {}", remote_addr, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
Ok(None) => {
|
||||
debug!("HTTP/3 connection from {} closed", remote_addr);
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let (request, stream) = match resolver.resolve_request().await {
|
||||
Ok(pair) => pair,
|
||||
Err(e) => {
|
||||
debug!("HTTP/3 accept error from {}: {}", remote_addr, e);
|
||||
break;
|
||||
debug!("HTTP/3 request resolve error: {}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let http_proxy = Arc::clone(&self.http_proxy);
|
||||
let request_cancel = parent_cancel.child_token();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = handle_h3_request(
|
||||
request, stream, port, remote_addr, &http_proxy, request_cancel,
|
||||
).await {
|
||||
debug!("HTTP/3 request error from {}: {}", remote_addr, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -103,13 +113,25 @@ async fn handle_h3_request(
|
||||
port: u16,
|
||||
peer_addr: SocketAddr,
|
||||
http_proxy: &HttpProxyService,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
// Stream request body from H3 client via an mpsc channel.
|
||||
let (body_tx, body_rx) = tokio::sync::mpsc::channel::<Bytes>(4);
|
||||
|
||||
// Spawn the H3 body reader task
|
||||
// Spawn the H3 body reader task with cancellation
|
||||
let body_cancel = cancel.clone();
|
||||
let body_reader = tokio::spawn(async move {
|
||||
while let Ok(Some(mut chunk)) = stream.recv_data().await {
|
||||
loop {
|
||||
let chunk = tokio::select! {
|
||||
_ = body_cancel.cancelled() => break,
|
||||
result = stream.recv_data() => {
|
||||
match result {
|
||||
Ok(Some(chunk)) => chunk,
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
};
|
||||
let mut chunk = chunk;
|
||||
let data = Bytes::copy_from_slice(chunk.chunk());
|
||||
chunk.advance(chunk.remaining());
|
||||
if body_tx.send(data).await.is_err() {
|
||||
@@ -128,7 +150,6 @@ async fn handle_h3_request(
|
||||
|
||||
// Delegate to HttpProxyService — same backend path as TCP/HTTP:
|
||||
// route matching, ALPN protocol detection, connection pool, H1/H2/H3 auto.
|
||||
let cancel = CancellationToken::new();
|
||||
let conn_activity = ConnActivity::new_standalone();
|
||||
let response = http_proxy.handle_request(req, peer_addr, port, cancel, conn_activity).await
|
||||
.map_err(|e| anyhow::anyhow!("Backend request failed: {}", e))?;
|
||||
|
||||
@@ -203,6 +203,10 @@ pub struct HttpProxyService {
|
||||
route_rate_limiters: Arc<DashMap<String, Arc<RateLimiter>>>,
|
||||
/// Request counter for periodic rate limiter cleanup.
|
||||
request_counter: AtomicU64,
|
||||
/// Epoch for time-based rate limiter cleanup.
|
||||
rate_limiter_epoch: std::time::Instant,
|
||||
/// Last rate limiter cleanup time (ms since epoch).
|
||||
last_rate_limiter_cleanup_ms: AtomicU64,
|
||||
/// Cache of compiled URL rewrite regexes (keyed by pattern string).
|
||||
regex_cache: DashMap<String, Regex>,
|
||||
/// Shared backend TLS config for session resumption across connections.
|
||||
@@ -233,6 +237,8 @@ impl HttpProxyService {
|
||||
connect_timeout: DEFAULT_CONNECT_TIMEOUT,
|
||||
route_rate_limiters: Arc::new(DashMap::new()),
|
||||
request_counter: AtomicU64::new(0),
|
||||
rate_limiter_epoch: std::time::Instant::now(),
|
||||
last_rate_limiter_cleanup_ms: AtomicU64::new(0),
|
||||
regex_cache: DashMap::new(),
|
||||
backend_tls_config: Self::default_backend_tls_config(),
|
||||
backend_tls_config_alpn: Self::default_backend_tls_config_with_alpn(),
|
||||
@@ -258,6 +264,8 @@ impl HttpProxyService {
|
||||
connect_timeout,
|
||||
route_rate_limiters: Arc::new(DashMap::new()),
|
||||
request_counter: AtomicU64::new(0),
|
||||
rate_limiter_epoch: std::time::Instant::now(),
|
||||
last_rate_limiter_cleanup_ms: AtomicU64::new(0),
|
||||
regex_cache: DashMap::new(),
|
||||
backend_tls_config: Self::default_backend_tls_config(),
|
||||
backend_tls_config_alpn: Self::default_backend_tls_config_with_alpn(),
|
||||
@@ -524,9 +532,13 @@ impl HttpProxyService {
|
||||
}
|
||||
}
|
||||
|
||||
// Periodic rate limiter cleanup (every 1000 requests)
|
||||
// Periodic rate limiter cleanup (every 1000 requests or every 60s)
|
||||
let count = self.request_counter.fetch_add(1, Ordering::Relaxed);
|
||||
if count % 1000 == 0 {
|
||||
let now_ms = self.rate_limiter_epoch.elapsed().as_millis() as u64;
|
||||
let last_cleanup = self.last_rate_limiter_cleanup_ms.load(Ordering::Relaxed);
|
||||
let time_triggered = now_ms.saturating_sub(last_cleanup) >= 60_000;
|
||||
if count % 1000 == 0 || time_triggered {
|
||||
self.last_rate_limiter_cleanup_ms.store(now_ms, Ordering::Relaxed);
|
||||
for entry in self.route_rate_limiters.iter() {
|
||||
entry.value().cleanup();
|
||||
}
|
||||
@@ -2134,12 +2146,26 @@ impl HttpProxyService {
|
||||
let ws_max_lifetime = self.ws_max_lifetime;
|
||||
|
||||
tokio::spawn(async move {
|
||||
// RAII guard: ensures connection_ended is called even if this task panics
|
||||
struct WsUpstreamGuard {
|
||||
selector: UpstreamSelector,
|
||||
key: String,
|
||||
}
|
||||
impl Drop for WsUpstreamGuard {
|
||||
fn drop(&mut self) {
|
||||
self.selector.connection_ended(&self.key);
|
||||
}
|
||||
}
|
||||
let _upstream_guard = WsUpstreamGuard {
|
||||
selector: upstream_selector,
|
||||
key: upstream_key_owned.clone(),
|
||||
};
|
||||
|
||||
let client_upgraded = match on_client_upgrade.await {
|
||||
Ok(upgraded) => upgraded,
|
||||
Err(e) => {
|
||||
debug!("WebSocket: client upgrade failed: {}", e);
|
||||
upstream_selector.connection_ended(&upstream_key_owned);
|
||||
return;
|
||||
return; // _upstream_guard Drop handles connection_ended
|
||||
}
|
||||
};
|
||||
|
||||
@@ -2298,9 +2324,7 @@ impl HttpProxyService {
|
||||
watchdog.abort();
|
||||
|
||||
debug!("WebSocket tunnel closed: {} bytes in, {} bytes out", bytes_in, bytes_out);
|
||||
|
||||
upstream_selector.connection_ended(&upstream_key_owned);
|
||||
// Bytes already reported per-chunk in the copy loops above
|
||||
// _upstream_guard Drop handles connection_ended on all paths including panic
|
||||
});
|
||||
|
||||
let body: BoxBody<Bytes, hyper::Error> = BoxBody::new(
|
||||
@@ -2822,6 +2846,8 @@ impl Default for HttpProxyService {
|
||||
connect_timeout: DEFAULT_CONNECT_TIMEOUT,
|
||||
route_rate_limiters: Arc::new(DashMap::new()),
|
||||
request_counter: AtomicU64::new(0),
|
||||
rate_limiter_epoch: std::time::Instant::now(),
|
||||
last_rate_limiter_cleanup_ms: AtomicU64::new(0),
|
||||
regex_cache: DashMap::new(),
|
||||
backend_tls_config: Self::default_backend_tls_config(),
|
||||
backend_tls_config_alpn: Self::default_backend_tls_config_with_alpn(),
|
||||
|
||||
@@ -411,11 +411,24 @@ impl MetricsCollector {
|
||||
}
|
||||
|
||||
/// Record a backend connection closing.
|
||||
/// Removes all per-backend tracking entries when the active count reaches 0.
|
||||
pub fn backend_connection_closed(&self, key: &str) {
|
||||
if let Some(counter) = self.backend_active.get(key) {
|
||||
let val = counter.load(Ordering::Relaxed);
|
||||
if val > 0 {
|
||||
counter.fetch_sub(1, Ordering::Relaxed);
|
||||
let prev = counter.fetch_sub(1, Ordering::Relaxed);
|
||||
if prev <= 1 {
|
||||
// Active count reached 0 — clean up all per-backend maps
|
||||
drop(counter); // release DashMap ref before remove
|
||||
self.backend_active.remove(key);
|
||||
self.backend_total.remove(key);
|
||||
self.backend_protocol.remove(key);
|
||||
self.backend_connect_errors.remove(key);
|
||||
self.backend_handshake_errors.remove(key);
|
||||
self.backend_request_errors.remove(key);
|
||||
self.backend_connect_time_us.remove(key);
|
||||
self.backend_connect_count.remove(key);
|
||||
self.backend_pool_hits.remove(key);
|
||||
self.backend_pool_misses.remove(key);
|
||||
self.backend_h2_failures.remove(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1213,10 +1226,13 @@ mod tests {
|
||||
// No entry created
|
||||
assert!(collector.backend_active.get(key).is_none());
|
||||
|
||||
// Open one, close two — should saturate at 0
|
||||
// Open one, close — entries are removed when active count reaches 0
|
||||
collector.backend_connection_opened(key, Duration::from_millis(1));
|
||||
collector.backend_connection_closed(key);
|
||||
// Entry should be cleaned up (active reached 0)
|
||||
assert!(collector.backend_active.get(key).is_none());
|
||||
// Second close on missing entry is a no-op
|
||||
collector.backend_connection_closed(key);
|
||||
assert_eq!(collector.backend_active.get(key).unwrap().load(Ordering::Relaxed), 0);
|
||||
assert!(collector.backend_active.get(key).is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
@@ -276,6 +276,17 @@ async fn quic_proxy_relay_loop(
|
||||
debug!("QUIC relay: cleaned up stale session for {}", key);
|
||||
}
|
||||
}
|
||||
|
||||
// Also clean orphaned proxy_addr_map entries (PROXY header received
|
||||
// but no relay session was ever created, e.g. client never sent data)
|
||||
let orphaned: Vec<SocketAddr> = proxy_addr_map.iter()
|
||||
.filter(|entry| relay_sessions.get(entry.key()).is_none())
|
||||
.map(|entry| *entry.key())
|
||||
.collect();
|
||||
for key in orphaned {
|
||||
proxy_addr_map.remove(&key);
|
||||
debug!("QUIC relay: cleaned up orphaned proxy_addr_map entry for {}", key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -399,14 +410,32 @@ pub async fn quic_accept_loop(
|
||||
let real_client_addr = if real_addr != remote_addr { Some(real_addr) } else { None };
|
||||
|
||||
tokio::spawn(async move {
|
||||
// RAII guard: ensures metrics/tracker cleanup even on panic
|
||||
struct QuicConnGuard {
|
||||
tracker: Arc<ConnectionTracker>,
|
||||
metrics: Arc<MetricsCollector>,
|
||||
ip: std::net::IpAddr,
|
||||
ip_str: String,
|
||||
route_id: Option<String>,
|
||||
}
|
||||
impl Drop for QuicConnGuard {
|
||||
fn drop(&mut self) {
|
||||
self.tracker.connection_closed(&self.ip);
|
||||
self.metrics.connection_closed(self.route_id.as_deref(), Some(&self.ip_str));
|
||||
}
|
||||
}
|
||||
let _guard = QuicConnGuard {
|
||||
tracker: conn_tracker,
|
||||
metrics: Arc::clone(&metrics),
|
||||
ip,
|
||||
ip_str,
|
||||
route_id,
|
||||
};
|
||||
|
||||
match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel, h3_svc, real_client_addr).await {
|
||||
Ok(()) => debug!("QUIC connection from {} completed", real_addr),
|
||||
Err(e) => debug!("QUIC connection from {} error: {}", real_addr, e),
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
conn_tracker.connection_closed(&ip);
|
||||
metrics.connection_closed(route_id.as_deref(), Some(&ip_str));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -439,7 +468,7 @@ async fn handle_quic_connection(
|
||||
if enable_http3 {
|
||||
if let Some(ref h3_svc) = h3_service {
|
||||
debug!("HTTP/3 enabled for route {:?}, dispatching to H3ProxyService", route.name);
|
||||
h3_svc.handle_connection(connection, &route, port, real_client_addr).await
|
||||
h3_svc.handle_connection(connection, &route, port, real_client_addr, cancel).await
|
||||
} else {
|
||||
warn!("HTTP/3 enabled for route {:?} but H3ProxyService not initialized", route.name);
|
||||
// Keep connection alive until cancelled
|
||||
@@ -502,6 +531,7 @@ async fn handle_quic_stream_forwarding(
|
||||
let ip_str = effective_addr.ip().to_string();
|
||||
let stream_metrics = Arc::clone(&metrics_arc);
|
||||
let stream_route_id = route_id.map(|s| s.to_string());
|
||||
let stream_cancel = cancel.child_token();
|
||||
|
||||
// Spawn a task for each QUIC stream → TCP bidirectional forwarding
|
||||
tokio::spawn(async move {
|
||||
@@ -509,6 +539,7 @@ async fn handle_quic_stream_forwarding(
|
||||
send_stream,
|
||||
recv_stream,
|
||||
&backend_addr,
|
||||
stream_cancel,
|
||||
).await {
|
||||
Ok((bytes_in, bytes_out)) => {
|
||||
stream_metrics.record_bytes(
|
||||
@@ -529,27 +560,111 @@ async fn handle_quic_stream_forwarding(
|
||||
}
|
||||
|
||||
/// Forward a single QUIC bidirectional stream to a TCP backend connection.
|
||||
///
|
||||
/// Includes inactivity timeout (60s), max lifetime (10min), and cancellation
|
||||
/// to prevent leaked stream tasks when the parent connection closes.
|
||||
async fn forward_quic_stream_to_tcp(
|
||||
mut quic_send: quinn::SendStream,
|
||||
mut quic_recv: quinn::RecvStream,
|
||||
backend_addr: &str,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<(u64, u64)> {
|
||||
let inactivity_timeout = std::time::Duration::from_secs(60);
|
||||
let max_lifetime = std::time::Duration::from_secs(600);
|
||||
|
||||
// Connect to backend TCP
|
||||
let tcp_stream = tokio::net::TcpStream::connect(backend_addr).await?;
|
||||
let (mut tcp_read, mut tcp_write) = tcp_stream.into_split();
|
||||
|
||||
// Bidirectional copy
|
||||
let client_to_backend = tokio::io::copy(&mut quic_recv, &mut tcp_write);
|
||||
let backend_to_client = tokio::io::copy(&mut tcp_read, &mut quic_send);
|
||||
let last_activity = Arc::new(AtomicU64::new(0));
|
||||
let start = std::time::Instant::now();
|
||||
let conn_cancel = CancellationToken::new();
|
||||
|
||||
let (c2b, b2c) = tokio::join!(client_to_backend, backend_to_client);
|
||||
let la1 = Arc::clone(&last_activity);
|
||||
let cc1 = conn_cancel.clone();
|
||||
let c2b = tokio::spawn(async move {
|
||||
let mut buf = vec![0u8; 65536];
|
||||
let mut total = 0u64;
|
||||
loop {
|
||||
let n = tokio::select! {
|
||||
result = quic_recv.read(&mut buf) => match result {
|
||||
Ok(Some(0)) | Ok(None) | Err(_) => break,
|
||||
Ok(Some(n)) => n,
|
||||
},
|
||||
_ = cc1.cancelled() => break,
|
||||
};
|
||||
if tcp_write.write_all(&buf[..n]).await.is_err() {
|
||||
break;
|
||||
}
|
||||
total += n as u64;
|
||||
la1.store(start.elapsed().as_millis() as u64, Ordering::Relaxed);
|
||||
}
|
||||
let _ = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(2),
|
||||
tcp_write.shutdown(),
|
||||
).await;
|
||||
total
|
||||
});
|
||||
|
||||
let bytes_in = c2b.unwrap_or(0);
|
||||
let bytes_out = b2c.unwrap_or(0);
|
||||
let la2 = Arc::clone(&last_activity);
|
||||
let cc2 = conn_cancel.clone();
|
||||
let b2c = tokio::spawn(async move {
|
||||
let mut buf = vec![0u8; 65536];
|
||||
let mut total = 0u64;
|
||||
loop {
|
||||
let n = tokio::select! {
|
||||
result = tcp_read.read(&mut buf) => match result {
|
||||
Ok(0) | Err(_) => break,
|
||||
Ok(n) => n,
|
||||
},
|
||||
_ = cc2.cancelled() => break,
|
||||
};
|
||||
// quinn SendStream implements AsyncWrite
|
||||
if quic_send.write_all(&buf[..n]).await.is_err() {
|
||||
break;
|
||||
}
|
||||
total += n as u64;
|
||||
la2.store(start.elapsed().as_millis() as u64, Ordering::Relaxed);
|
||||
}
|
||||
let _ = quic_send.finish();
|
||||
total
|
||||
});
|
||||
|
||||
// Graceful shutdown
|
||||
let _ = quic_send.finish();
|
||||
let _ = tcp_write.shutdown().await;
|
||||
// Watchdog: inactivity, max lifetime, and cancellation
|
||||
let la_watch = Arc::clone(&last_activity);
|
||||
let c2b_abort = c2b.abort_handle();
|
||||
let b2c_abort = b2c.abort_handle();
|
||||
tokio::spawn(async move {
|
||||
let check_interval = std::time::Duration::from_secs(5);
|
||||
let mut last_seen = 0u64;
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => break,
|
||||
_ = tokio::time::sleep(check_interval) => {
|
||||
if start.elapsed() >= max_lifetime {
|
||||
debug!("QUIC stream exceeded max lifetime, closing");
|
||||
break;
|
||||
}
|
||||
let current = la_watch.load(Ordering::Relaxed);
|
||||
if current == last_seen {
|
||||
let elapsed = start.elapsed().as_millis() as u64 - current;
|
||||
if elapsed >= inactivity_timeout.as_millis() as u64 {
|
||||
debug!("QUIC stream inactive for {}ms, closing", elapsed);
|
||||
break;
|
||||
}
|
||||
}
|
||||
last_seen = current;
|
||||
}
|
||||
}
|
||||
}
|
||||
conn_cancel.cancel();
|
||||
tokio::time::sleep(std::time::Duration::from_secs(4)).await;
|
||||
c2b_abort.abort();
|
||||
b2c_abort.abort();
|
||||
});
|
||||
|
||||
let bytes_in = c2b.await.unwrap_or(0);
|
||||
let bytes_out = b2c.await.unwrap_or(0);
|
||||
|
||||
Ok((bytes_in, bytes_out))
|
||||
}
|
||||
|
||||
@@ -504,7 +504,29 @@ impl UdpListenerManager {
|
||||
// Only populated when proxy_ips is non-empty.
|
||||
let proxy_addr_map: DashMap<SocketAddr, SocketAddr> = DashMap::new();
|
||||
|
||||
// Periodic cleanup for proxy_addr_map to prevent unbounded growth
|
||||
let mut last_proxy_cleanup = tokio::time::Instant::now();
|
||||
let proxy_cleanup_interval = std::time::Duration::from_secs(60);
|
||||
|
||||
loop {
|
||||
// Periodic cleanup: remove proxy_addr_map entries with no active session
|
||||
if !proxy_addr_map.is_empty() && last_proxy_cleanup.elapsed() >= proxy_cleanup_interval {
|
||||
last_proxy_cleanup = tokio::time::Instant::now();
|
||||
let stale: Vec<SocketAddr> = proxy_addr_map.iter()
|
||||
.filter(|entry| {
|
||||
let key: SessionKey = (*entry.key(), port);
|
||||
session_table.get(&key).is_none()
|
||||
})
|
||||
.map(|entry| *entry.key())
|
||||
.collect();
|
||||
if !stale.is_empty() {
|
||||
debug!("UDP proxy_addr_map cleanup: removing {} stale entries on port {}", stale.len(), port);
|
||||
for addr in stale {
|
||||
proxy_addr_map.remove(&addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let (len, client_addr) = tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
debug!("UDP recv loop on port {} cancelled", port);
|
||||
|
||||
Reference in New Issue
Block a user