Compare commits

...

4 Commits

Author SHA1 Message Date
c0e432fd9b v26.2.4
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-26 07:05:57 +00:00
a3d8a3a388 fix(rustproxy-http): improve HTTP/3 connection reuse and clean up stale proxy state 2026-03-26 07:05:57 +00:00
437d1a3329 v26.2.3
Some checks failed
Default (tags) / security (push) Failing after 3s
Default (tags) / test (push) Failing after 3s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-25 07:26:47 +00:00
746d93663d fix(repo): no changes to commit 2026-03-25 07:26:47 +00:00
15 changed files with 190 additions and 226 deletions

View File

@@ -1,5 +1,18 @@
# Changelog
## 2026-03-26 - 26.2.4 - fix(rustproxy-http)
improve HTTP/3 connection reuse and clean up stale proxy state
- Reuse pooled HTTP/3 SendRequest handles to skip repeated SETTINGS handshakes and reduce request overhead on QUIC pool hits
- Add periodic cleanup for per-route rate limiters and orphaned backend metrics to prevent unbounded memory growth after traffic or backend errors stop
- Enforce HTTP max connection lifetime alongside idle timeouts and apply configured lifetime values from the TCP listener
- Reduce HTTP/3 body copying by using owned Bytes paths for request and response streaming, and replace the custom response body adapter with a stream-based implementation
- Harden auxiliary proxy components by capping datagram handler buffer growth and removing duplicate RustProxy exit listeners
## 2026-03-25 - 26.2.3 - fix(repo)
no changes to commit
## 2026-03-25 - 26.2.2 - fix(proxy)
improve connection cleanup and route validation handling

View File

@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartproxy",
"version": "26.2.2",
"version": "26.2.4",
"private": false,
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
"main": "dist_ts/index.js",

1
rust/Cargo.lock generated
View File

@@ -1270,6 +1270,7 @@ dependencies = [
"arc-swap",
"bytes",
"dashmap",
"futures",
"h3",
"h3-quinn",
"http-body",

View File

@@ -30,3 +30,4 @@ socket2 = { workspace = true }
quinn = { workspace = true }
h3 = { workspace = true }
h3-quinn = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["std"] }

View File

@@ -56,7 +56,11 @@ struct PooledH2 {
}
/// A pooled QUIC/HTTP/3 connection (multiplexed like H2).
/// Stores the h3 `SendRequest` handle so pool hits skip the h3 SETTINGS handshake.
pub struct PooledH3 {
/// Multiplexed h3 request handle — clone to open a new stream.
pub send_request: h3::client::SendRequest<h3_quinn::OpenStreams, Bytes>,
/// Raw QUIC connection — kept for liveness probing (close_reason) only.
pub connection: quinn::Connection,
pub created_at: Instant,
pub generation: u64,
@@ -197,7 +201,10 @@ impl ConnectionPool {
/// Try to get a pooled QUIC connection for the given key.
/// QUIC connections are multiplexed — the connection is shared, not removed.
pub fn checkout_h3(&self, key: &PoolKey) -> Option<(quinn::Connection, Duration)> {
pub fn checkout_h3(
&self,
key: &PoolKey,
) -> Option<(h3::client::SendRequest<h3_quinn::OpenStreams, Bytes>, quinn::Connection, Duration)> {
let entry = self.h3_pool.get(key)?;
let pooled = entry.value();
let age = pooled.created_at.elapsed();
@@ -215,13 +222,20 @@ impl ConnectionPool {
return None;
}
Some((pooled.connection.clone(), age))
Some((pooled.send_request.clone(), pooled.connection.clone(), age))
}
/// Register a QUIC connection in the pool. Returns the generation ID.
pub fn register_h3(&self, key: PoolKey, connection: quinn::Connection) -> u64 {
/// Register a QUIC connection and its h3 SendRequest handle in the pool.
/// Returns the generation ID.
pub fn register_h3(
&self,
key: PoolKey,
connection: quinn::Connection,
send_request: h3::client::SendRequest<h3_quinn::OpenStreams, Bytes>,
) -> u64 {
let gen = self.h2_generation.fetch_add(1, Ordering::Relaxed);
self.h3_pool.insert(key, PooledH3 {
send_request,
connection,
created_at: Instant::now(),
generation: gen,

View File

@@ -116,7 +116,7 @@ async fn handle_h3_request(
cancel: CancellationToken,
) -> anyhow::Result<()> {
// Stream request body from H3 client via an mpsc channel.
let (body_tx, body_rx) = tokio::sync::mpsc::channel::<Bytes>(4);
let (body_tx, body_rx) = tokio::sync::mpsc::channel::<Bytes>(32);
// Spawn the H3 body reader task with cancellation
let body_cancel = cancel.clone();
@@ -132,8 +132,7 @@ async fn handle_h3_request(
}
};
let mut chunk = chunk;
let data = Bytes::copy_from_slice(chunk.chunk());
chunk.advance(chunk.remaining());
let data = chunk.copy_to_bytes(chunk.remaining());
if body_tx.send(data).await.is_err() {
break;
}
@@ -179,8 +178,8 @@ async fn handle_h3_request(
while let Some(frame) = resp_body.frame().await {
match frame {
Ok(frame) => {
if let Some(data) = frame.data_ref() {
stream.send_data(Bytes::copy_from_slice(data)).await
if let Ok(data) = frame.into_data() {
stream.send_data(data).await
.map_err(|e| anyhow::anyhow!("Failed to send H3 data: {}", e))?;
}
}

View File

@@ -72,15 +72,16 @@ const DEFAULT_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_s
/// If no new request arrives within this duration, the connection is closed.
const DEFAULT_HTTP_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
/// Default HTTP max connection lifetime (1 hour).
/// HTTP connections are forcefully closed after this duration regardless of activity.
const DEFAULT_HTTP_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(3600);
/// Default WebSocket inactivity timeout (1 hour).
const DEFAULT_WS_INACTIVITY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3600);
/// Default WebSocket max lifetime (24 hours).
const DEFAULT_WS_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(86400);
/// Timeout for QUIC (H3) backend connections. Short because UDP is often firewalled.
const QUIC_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
/// Protocol decision for backend connection.
#[derive(Debug)]
enum ProtocolDecision {
@@ -222,6 +223,8 @@ pub struct HttpProxyService {
protocol_cache: Arc<crate::protocol_cache::ProtocolCache>,
/// HTTP keep-alive idle timeout: close connection if no new request arrives within this duration.
http_idle_timeout: std::time::Duration,
/// HTTP max connection lifetime: forcefully close connection after this duration regardless of activity.
http_max_lifetime: std::time::Duration,
/// WebSocket inactivity timeout (no data in either direction).
ws_inactivity_timeout: std::time::Duration,
/// WebSocket maximum connection lifetime.
@@ -248,6 +251,7 @@ impl HttpProxyService {
connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()),
protocol_cache: Arc::new(crate::protocol_cache::ProtocolCache::new()),
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
http_max_lifetime: DEFAULT_HTTP_MAX_LIFETIME,
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
@@ -275,21 +279,24 @@ impl HttpProxyService {
connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()),
protocol_cache: Arc::new(crate::protocol_cache::ProtocolCache::new()),
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
http_max_lifetime: DEFAULT_HTTP_MAX_LIFETIME,
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
}
}
/// Set the HTTP keep-alive idle timeout, WebSocket inactivity timeout, and
/// WebSocket max lifetime from connection config values.
/// Set the HTTP keep-alive idle timeout, HTTP max lifetime, WebSocket inactivity
/// timeout, and WebSocket max lifetime from connection config values.
pub fn set_connection_timeouts(
&mut self,
http_idle_timeout: std::time::Duration,
http_max_lifetime: std::time::Duration,
ws_inactivity_timeout: std::time::Duration,
ws_max_lifetime: std::time::Duration,
) {
self.http_idle_timeout = http_idle_timeout;
self.http_max_lifetime = http_max_lifetime;
self.ws_inactivity_timeout = ws_inactivity_timeout;
self.ws_max_lifetime = ws_max_lifetime;
}
@@ -314,6 +321,15 @@ impl HttpProxyService {
self.protocol_cache.clear();
}
/// Clean up expired entries in all per-route rate limiters.
/// Called from the background sampling task to prevent unbounded growth
/// when traffic stops after a burst of unique IPs.
pub fn cleanup_all_rate_limiters(&self) {
for entry in self.route_rate_limiters.iter() {
entry.value().cleanup();
}
}
/// Snapshot the protocol cache for metrics/UI display.
pub fn protocol_cache_snapshot(&self) -> Vec<crate::protocol_cache::ProtocolCacheEntry> {
self.protocol_cache.snapshot()
@@ -354,6 +370,7 @@ impl HttpProxyService {
// Capture timeouts before `self` is moved into the service closure.
let idle_timeout = self.http_idle_timeout;
let max_lifetime = self.http_max_lifetime;
// Activity tracker: updated at the START and END of each request.
// The idle watchdog checks this to determine if the connection is idle
@@ -412,15 +429,23 @@ impl HttpProxyService {
}
}
_ = async {
// Idle watchdog: check every 5s whether the connection has been idle
// (no active requests AND no activity for idle_timeout).
// This avoids killing long-running requests or upgraded connections.
// Idle + lifetime watchdog: check every 5s whether the connection has been
// idle (no active requests AND no activity for idle_timeout) or exceeded
// the max connection lifetime.
let check_interval = std::time::Duration::from_secs(5);
let mut last_seen = 0u64;
loop {
tokio::time::sleep(check_interval).await;
// Never close while a request is in progress
// Check max connection lifetime (unconditional — even active connections
// must eventually be recycled to prevent resource accumulation).
if start.elapsed() >= max_lifetime {
debug!("HTTP connection exceeded max lifetime ({}s) from {}",
max_lifetime.as_secs(), peer_addr);
return;
}
// Never close for idleness while a request is in progress
if active_requests.load(Ordering::Relaxed) > 0 {
last_seen = last_activity.load(Ordering::Relaxed);
continue;
@@ -437,7 +462,7 @@ impl HttpProxyService {
last_seen = current;
}
} => {
debug!("HTTP connection idle timeout ({}s) from {}", idle_timeout.as_secs(), peer_addr);
debug!("HTTP connection timeout from {}", peer_addr);
conn.as_mut().graceful_shutdown();
// Give any in-flight work 5s to drain after graceful shutdown
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), conn).await;
@@ -791,10 +816,10 @@ impl HttpProxyService {
};
// Try H3 pool checkout first
if let Some((quic_conn, _age)) = self.connection_pool.checkout_h3(&h3_pool_key) {
if let Some((pooled_sr, quic_conn, _age)) = self.connection_pool.checkout_h3(&h3_pool_key) {
self.metrics.backend_pool_hit(&upstream_key);
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
quic_conn, Some(pooled_sr), parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
@@ -807,7 +832,7 @@ impl HttpProxyService {
self.metrics.backend_pool_miss(&upstream_key);
self.metrics.backend_connection_opened(&upstream_key, std::time::Instant::now().elapsed());
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
quic_conn, None, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
@@ -966,7 +991,7 @@ impl HttpProxyService {
protocol: crate::connection_pool::PoolProtocol::H3,
};
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
quic_conn, None, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
@@ -1009,7 +1034,7 @@ impl HttpProxyService {
protocol: crate::connection_pool::PoolProtocol::H3,
};
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
quic_conn, None, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
@@ -1068,7 +1093,7 @@ impl HttpProxyService {
protocol: crate::connection_pool::PoolProtocol::H3,
};
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
quic_conn, None, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
@@ -1111,7 +1136,7 @@ impl HttpProxyService {
protocol: crate::connection_pool::PoolProtocol::H3,
};
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
quic_conn, None, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
@@ -2744,7 +2769,12 @@ impl HttpProxyService {
let quic_crypto = quinn::crypto::rustls::QuicClientConfig::try_from(tls_config)
.expect("Failed to create QUIC client crypto config");
let client_config = quinn::ClientConfig::new(Arc::new(quic_crypto));
// Tune QUIC transport to match H2 flow-control: 2 MB per-stream receive window.
let mut transport = quinn::TransportConfig::default();
transport.stream_receive_window(quinn::VarInt::from_u32(2 * 1024 * 1024));
let mut client_config = quinn::ClientConfig::new(Arc::new(quic_crypto));
client_config.transport_config(Arc::new(transport));
let mut endpoint = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())
.expect("Failed to create QUIC client endpoint");
@@ -2766,8 +2796,8 @@ impl HttpProxyService {
let server_name = host.to_string();
let connecting = self.quinn_client_endpoint.connect(addr, &server_name)?;
let connection = tokio::time::timeout(QUIC_CONNECT_TIMEOUT, connecting).await
.map_err(|_| format!("QUIC connect timeout (3s) for {}", host))??;
let connection = tokio::time::timeout(self.connect_timeout, connecting).await
.map_err(|_| format!("QUIC connect timeout ({:?}) for {}", self.connect_timeout, host))??;
debug!("QUIC backend connection established to {}:{}", host, port);
Ok(connection)
@@ -2777,6 +2807,7 @@ impl HttpProxyService {
async fn forward_h3(
&self,
quic_conn: quinn::Connection,
pooled_sender: Option<h3::client::SendRequest<h3_quinn::OpenStreams, Bytes>>,
parts: hyper::http::request::Parts,
body: BoxBody<Bytes, hyper::Error>,
upstream_headers: hyper::HeaderMap,
@@ -2789,33 +2820,42 @@ impl HttpProxyService {
conn_activity: &ConnActivity,
backend_key: &str,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let h3_quinn_conn = h3_quinn::Connection::new(quic_conn.clone());
let (mut driver, mut send_request) = match h3::client::builder()
.send_grease(false)
.build(h3_quinn_conn)
.await
{
Ok(pair) => pair,
Err(e) => {
error!(backend = %backend_key, domain = %domain, error = %e, "H3 client handshake failed");
self.metrics.backend_handshake_error(backend_key);
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 handshake failed"));
}
};
// Obtain the h3 SendRequest handle: skip handshake + driver on pool hit.
let (mut send_request, gen_holder) = if let Some(sr) = pooled_sender {
// Pool hit — reuse existing h3 session, no SETTINGS round-trip
(sr, None)
} else {
// Fresh QUIC connection — full h3 handshake + driver spawn
let h3_quinn_conn = h3_quinn::Connection::new(quic_conn.clone());
let (mut driver, sr) = match h3::client::builder()
.send_grease(false)
.build(h3_quinn_conn)
.await
{
Ok(pair) => pair,
Err(e) => {
error!(backend = %backend_key, domain = %domain, error = %e, "H3 client handshake failed");
self.metrics.backend_handshake_error(backend_key);
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 handshake failed"));
}
};
// Spawn the h3 connection driver
let driver_pool = Arc::clone(&self.connection_pool);
let driver_pool_key = pool_key.clone();
let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX));
let driver_gen = Arc::clone(&gen_holder);
tokio::spawn(async move {
let close_err = std::future::poll_fn(|cx| driver.poll_close(cx)).await;
debug!("H3 connection driver closed: {:?}", close_err);
let g = driver_gen.load(std::sync::atomic::Ordering::Relaxed);
if g != u64::MAX {
driver_pool.remove_h3_if_generation(&driver_pool_key, g);
let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX));
{
let driver_pool = Arc::clone(&self.connection_pool);
let driver_pool_key = pool_key.clone();
let driver_gen = Arc::clone(&gen_holder);
tokio::spawn(async move {
let close_err = std::future::poll_fn(|cx| driver.poll_close(cx)).await;
debug!("H3 connection driver closed: {:?}", close_err);
let g = driver_gen.load(std::sync::atomic::Ordering::Relaxed);
if g != u64::MAX {
driver_pool.remove_h3_if_generation(&driver_pool_key, g);
}
});
}
});
(sr, Some(gen_holder))
};
// Build the H3 request
let uri = hyper::Uri::builder()
@@ -2845,7 +2885,7 @@ impl HttpProxyService {
}
};
// Stream request body
// Stream request body (zero-copy: into_data yields owned Bytes)
let rid: Option<Arc<str>> = route_id.map(Arc::from);
let sip: Arc<str> = Arc::from(source_ip);
@@ -2855,9 +2895,9 @@ impl HttpProxyService {
while let Some(frame) = body.frame().await {
match frame {
Ok(frame) => {
if let Some(data) = frame.data_ref() {
if let Ok(data) = frame.into_data() {
self.metrics.record_bytes(data.len() as u64, 0, rid.as_deref(), Some(&sip));
if let Err(e) = stream.send_data(Bytes::copy_from_slice(data)).await {
if let Err(e) = stream.send_data(data).await {
error!(backend = %backend_key, error = %e, "H3 send_data failed");
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 body send failed"));
}
@@ -2899,8 +2939,23 @@ impl HttpProxyService {
ResponseFilter::apply_headers(route, headers, None);
}
// Stream response body back via an adapter
let h3_body = H3ClientResponseBody { stream };
// Stream response body back via unfold — correctly preserves waker across polls
let body_stream = futures::stream::unfold(stream, |mut s| async move {
match s.recv_data().await {
Ok(Some(mut buf)) => {
use bytes::Buf;
let data = buf.copy_to_bytes(buf.remaining());
Some((Ok::<_, hyper::Error>(http_body::Frame::data(data)), s))
}
Ok(None) => None,
Err(e) => {
warn!("H3 response body recv error: {}", e);
None
}
}
});
let h3_body = http_body_util::StreamBody::new(body_stream);
let counting_body = CountingBody::new(
h3_body,
Arc::clone(&self.metrics),
@@ -2917,10 +2972,16 @@ impl HttpProxyService {
let body: BoxBody<Bytes, hyper::Error> = BoxBody::new(counting_body);
// Register connection in pool on success
// Register connection in pool on success (fresh connections only)
if status != StatusCode::BAD_GATEWAY {
let g = self.connection_pool.register_h3(pool_key.clone(), quic_conn);
gen_holder.store(g, std::sync::atomic::Ordering::Relaxed);
if let Some(gh) = gen_holder {
let g = self.connection_pool.register_h3(
pool_key.clone(),
quic_conn,
send_request,
);
gh.store(g, std::sync::atomic::Ordering::Relaxed);
}
}
self.metrics.set_backend_protocol(backend_key, "h3");
@@ -2949,41 +3010,6 @@ fn parse_alt_svc_h3_port(header_value: &str) -> Option<u16> {
None
}
/// Response body adapter for H3 client responses.
/// Reads data from the h3 `RequestStream` recv side and presents it as an `http_body::Body`.
struct H3ClientResponseBody {
stream: h3::client::RequestStream<h3_quinn::BidiStream<Bytes>, Bytes>,
}
impl http_body::Body for H3ClientResponseBody {
type Data = Bytes;
type Error = hyper::Error;
fn poll_frame(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
// h3's recv_data is async, so we need to poll it manually.
// Use a small future to poll the recv_data call.
use std::future::Future;
let mut fut = Box::pin(self.stream.recv_data());
match fut.as_mut().poll(_cx) {
Poll::Ready(Ok(Some(mut buf))) => {
use bytes::Buf;
let data = Bytes::copy_from_slice(buf.chunk());
buf.advance(buf.remaining());
Poll::Ready(Some(Ok(http_body::Frame::data(data))))
}
Poll::Ready(Ok(None)) => Poll::Ready(None),
Poll::Ready(Err(e)) => {
warn!("H3 response body recv error: {}", e);
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}
/// Insecure certificate verifier for backend TLS connections (fallback only).
/// The production path uses the shared config from tls_handler which has the same
/// behavior but with session resumption across all outbound connections.
@@ -3052,6 +3078,7 @@ impl Default for HttpProxyService {
connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()),
protocol_cache: Arc::new(crate::protocol_cache::ProtocolCache::new()),
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
http_max_lifetime: DEFAULT_HTTP_MAX_LIFETIME,
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),

View File

@@ -624,6 +624,24 @@ impl MetricsCollector {
self.ip_pending_tp.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_throughput.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_total_connections.retain(|k, _| self.ip_connections.contains_key(k));
// Safety-net: prune orphaned backend error/stats entries for backends
// that have no active or total connections (error-only backends).
// These accumulate when backend_connect_error/backend_handshake_error
// create entries but backend_connection_opened is never called.
let known_backends: HashSet<String> = self.backend_active.iter()
.map(|e| e.key().clone())
.chain(self.backend_total.iter().map(|e| e.key().clone()))
.collect();
self.backend_connect_errors.retain(|k, _| known_backends.contains(k));
self.backend_handshake_errors.retain(|k, _| known_backends.contains(k));
self.backend_request_errors.retain(|k, _| known_backends.contains(k));
self.backend_connect_time_us.retain(|k, _| known_backends.contains(k));
self.backend_connect_count.retain(|k, _| known_backends.contains(k));
self.backend_pool_hits.retain(|k, _| known_backends.contains(k));
self.backend_pool_misses.retain(|k, _| known_backends.contains(k));
self.backend_h2_failures.retain(|k, _| known_backends.contains(k));
self.backend_protocol.retain(|k, _| known_backends.contains(k));
}
/// Remove per-route metrics for route IDs that are no longer active.

View File

@@ -10,7 +10,6 @@ pub mod forwarder;
pub mod proxy_protocol;
pub mod tls_handler;
pub mod connection_tracker;
pub mod socket_relay;
pub mod socket_opts;
pub mod udp_session;
pub mod udp_listener;
@@ -22,7 +21,6 @@ pub use forwarder::*;
pub use proxy_protocol::*;
pub use tls_handler::*;
pub use connection_tracker::*;
pub use socket_relay::*;
pub use socket_opts::*;
pub use udp_session::*;
pub use udp_listener::*;

View File

@@ -1,126 +1,4 @@
//! Socket handler relay for connecting client connections to a TypeScript handler
//! via a Unix domain socket.
//! Socket handler relay module.
//!
//! Protocol: Send a JSON metadata line terminated by `\n`, then bidirectional relay.
use tokio::net::UnixStream;
use tokio::io::{AsyncWriteExt, AsyncReadExt};
use tokio::net::TcpStream;
use serde::Serialize;
use tracing::debug;
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct RelayMetadata {
connection_id: u64,
remote_ip: String,
remote_port: u16,
local_port: u16,
sni: Option<String>,
route_name: String,
initial_data_base64: Option<String>,
}
/// Relay a client connection to a TypeScript handler via Unix domain socket.
///
/// Protocol: Send a JSON metadata line terminated by `\n`, then bidirectional relay.
pub async fn relay_to_handler(
client: TcpStream,
relay_socket_path: &str,
connection_id: u64,
remote_ip: String,
remote_port: u16,
local_port: u16,
sni: Option<String>,
route_name: String,
initial_data: Option<&[u8]>,
) -> std::io::Result<()> {
debug!(
"Relaying connection {} to handler socket {}",
connection_id, relay_socket_path
);
// Connect to TypeScript handler Unix socket
let mut handler = UnixStream::connect(relay_socket_path).await?;
// Build and send metadata header
let initial_data_base64 = initial_data.map(base64_encode);
let metadata = RelayMetadata {
connection_id,
remote_ip,
remote_port,
local_port,
sni,
route_name,
initial_data_base64,
};
let metadata_json = serde_json::to_string(&metadata)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
handler.write_all(metadata_json.as_bytes()).await?;
handler.write_all(b"\n").await?;
// Bidirectional relay between client and handler
let (mut client_read, mut client_write) = client.into_split();
let (mut handler_read, mut handler_write) = handler.into_split();
let c2h = tokio::spawn(async move {
let mut buf = vec![0u8; 65536];
loop {
let n = match client_read.read(&mut buf).await {
Ok(0) | Err(_) => break,
Ok(n) => n,
};
if handler_write.write_all(&buf[..n]).await.is_err() {
break;
}
}
let _ = handler_write.shutdown().await;
});
let h2c = tokio::spawn(async move {
let mut buf = vec![0u8; 65536];
loop {
let n = match handler_read.read(&mut buf).await {
Ok(0) | Err(_) => break,
Ok(n) => n,
};
if client_write.write_all(&buf[..n]).await.is_err() {
break;
}
}
let _ = client_write.shutdown().await;
});
let _ = tokio::join!(c2h, h2c);
debug!("Relay connection {} completed", connection_id);
Ok(())
}
/// Simple base64 encoding without external dependency.
fn base64_encode(data: &[u8]) -> String {
const CHARS: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let mut result = String::new();
for chunk in data.chunks(3) {
let b0 = chunk[0] as u32;
let b1 = if chunk.len() > 1 { chunk[1] as u32 } else { 0 };
let b2 = if chunk.len() > 2 { chunk[2] as u32 } else { 0 };
let n = (b0 << 16) | (b1 << 8) | b2;
result.push(CHARS[((n >> 18) & 0x3F) as usize] as char);
result.push(CHARS[((n >> 12) & 0x3F) as usize] as char);
if chunk.len() > 1 {
result.push(CHARS[((n >> 6) & 0x3F) as usize] as char);
} else {
result.push('=');
}
if chunk.len() > 2 {
result.push(CHARS[(n & 0x3F) as usize] as char);
} else {
result.push('=');
}
}
result
}
//! Note: The actual relay logic lives in `tcp_listener::relay_to_socket_handler()`
//! which has proper timeouts, cancellation, and metrics integration.

View File

@@ -182,6 +182,7 @@ impl TcpListenerManager {
http_proxy_svc.set_backend_tls_config_alpn(tls_handler::shared_backend_tls_config_alpn());
http_proxy_svc.set_connection_timeouts(
std::time::Duration::from_millis(conn_config.socket_timeout_ms),
std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms),
std::time::Duration::from_millis(conn_config.socket_timeout_ms),
std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms),
);
@@ -220,6 +221,7 @@ impl TcpListenerManager {
http_proxy_svc.set_backend_tls_config_alpn(tls_handler::shared_backend_tls_config_alpn());
http_proxy_svc.set_connection_timeouts(
std::time::Duration::from_millis(conn_config.socket_timeout_ms),
std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms),
std::time::Duration::from_millis(conn_config.socket_timeout_ms),
std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms),
);
@@ -263,6 +265,7 @@ impl TcpListenerManager {
http_proxy_svc.set_backend_tls_config_alpn(tls_handler::shared_backend_tls_config_alpn());
http_proxy_svc.set_connection_timeouts(
std::time::Duration::from_millis(config.socket_timeout_ms),
std::time::Duration::from_millis(config.max_connection_lifetime_ms),
std::time::Duration::from_millis(config.socket_timeout_ms),
std::time::Duration::from_millis(config.max_connection_lifetime_ms),
);

View File

@@ -363,6 +363,7 @@ impl RustProxy {
// Start the throughput sampling task with cooperative cancellation
let metrics = Arc::clone(&self.metrics);
let conn_tracker = self.listener_manager.as_ref().unwrap().conn_tracker().clone();
let http_proxy = self.listener_manager.as_ref().unwrap().http_proxy().clone();
let interval_ms = self.options.metrics.as_ref()
.and_then(|m| m.sample_interval_ms)
.unwrap_or(1000);
@@ -378,6 +379,9 @@ impl RustProxy {
metrics.sample_all();
// Periodically clean up stale rate-limit timestamp entries
conn_tracker.cleanup_stale_timestamps();
// Clean up expired rate limiter entries to prevent unbounded
// growth from unique IPs after traffic stops
http_proxy.cleanup_all_rate_limiters();
}
}
}

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartproxy',
version: '26.2.2',
version: '26.2.4',
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
}

View File

@@ -26,6 +26,8 @@ interface IDatagramRelayMessage {
* - TS→Rust: { type: "reply", sourceIp, sourcePort, destPort, payloadBase64 }
*/
export class DatagramHandlerServer {
private static readonly MAX_BUFFER_SIZE = 50 * 1024 * 1024; // 50 MB
private server: plugins.net.Server | null = null;
private connection: plugins.net.Socket | null = null;
private socketPath: string;
@@ -100,6 +102,11 @@ export class DatagramHandlerServer {
socket.on('data', (chunk: Buffer) => {
this.readBuffer = Buffer.concat([this.readBuffer, chunk]);
if (this.readBuffer.length > DatagramHandlerServer.MAX_BUFFER_SIZE) {
logger.log('error', `DatagramHandlerServer: buffer exceeded ${DatagramHandlerServer.MAX_BUFFER_SIZE} bytes, resetting`);
this.readBuffer = Buffer.alloc(0);
return;
}
this.processFrames();
});

View File

@@ -128,6 +128,7 @@ export class SmartProxy extends plugins.EventEmitter {
}
// Handle unexpected exit (only emits error if not intentionally stopping)
this.bridge.removeAllListeners('exit');
this.bridge.on('exit', (code: number | null, signal: string | null) => {
if (this.stopping) return;
logger.log('error', `RustProxy exited unexpectedly (code=${code}, signal=${signal})`, { component: 'smart-proxy' });