Compare commits

...

6 Commits

Author SHA1 Message Date
9ba101c59b v27.3.1
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-04 18:54:05 +00:00
1ad3e61c15 fix(metrics): correct frontend and backend protocol connection tracking across h1, h2, h3, and websocket traffic 2026-04-04 18:54:05 +00:00
3bfa451341 v27.3.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-04 17:59:04 +00:00
7b3ab7378b feat(test): add end-to-end WebSocket proxy test coverage 2026-04-04 17:59:04 +00:00
527c616cd4 v27.2.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-04 16:52:25 +00:00
b04eb0ab17 feat(metrics): add frontend and backend protocol distribution metrics 2026-04-04 16:52:25 +00:00
12 changed files with 814 additions and 36 deletions

View File

@@ -1,5 +1,26 @@
# Changelog
## 2026-04-04 - 27.3.1 - fix(metrics)
correct frontend and backend protocol connection tracking across h1, h2, h3, and websocket traffic
- move frontend protocol accounting from per-request to connection lifetime tracking for HTTP/1, HTTP/2, and HTTP/3
- add backend protocol guards to connection drivers so active protocol metrics reflect live upstream connections
- prevent protocol counter underflow by using atomic saturating decrements in the metrics collector
- read backend protocol distribution directly from cached aggregate counters in the Rust metrics adapter
## 2026-04-04 - 27.3.0 - feat(test)
add end-to-end WebSocket proxy test coverage
- add comprehensive WebSocket e2e tests for upgrade handling, bidirectional messaging, header forwarding, close propagation, and large payloads
- add ws and @types/ws as development dependencies to support the new test suite
## 2026-04-04 - 27.2.0 - feat(metrics)
add frontend and backend protocol distribution metrics
- track active and total frontend protocol counts for h1, h2, h3, websocket, and other traffic
- add backend protocol counters with RAII guards to ensure metrics are decremented on all exit paths
- expose protocol distribution through the TypeScript metrics interfaces and Rust metrics adapter
## 2026-03-27 - 27.1.0 - feat(rustproxy-passthrough)
add selective connection recycling for route, security, and certificate updates

8
deno.lock generated
View File

@@ -12,9 +12,11 @@
"npm:@push.rocks/smartserve@^2.0.3": "2.0.3",
"npm:@tsclass/tsclass@^9.5.0": "9.5.0",
"npm:@types/node@^25.5.0": "25.5.0",
"npm:@types/ws@^8.18.1": "8.18.1",
"npm:minimatch@^10.2.4": "10.2.4",
"npm:typescript@^6.0.2": "6.0.2",
"npm:why-is-node-running@^3.2.2": "3.2.2"
"npm:why-is-node-running@^3.2.2": "3.2.2",
"npm:ws@^8.20.0": "8.20.0"
},
"npm": {
"@api.global/typedrequest-interfaces@2.0.2": {
@@ -6743,9 +6745,11 @@
"npm:@push.rocks/smartserve@^2.0.3",
"npm:@tsclass/tsclass@^9.5.0",
"npm:@types/node@^25.5.0",
"npm:@types/ws@^8.18.1",
"npm:minimatch@^10.2.4",
"npm:typescript@^6.0.2",
"npm:why-is-node-running@^3.2.2"
"npm:why-is-node-running@^3.2.2",
"npm:ws@^8.20.0"
]
}
}

View File

@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartproxy",
"version": "27.1.0",
"version": "27.3.1",
"private": false,
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
"main": "dist_ts/index.js",
@@ -22,8 +22,10 @@
"@git.zone/tstest": "^3.6.0",
"@push.rocks/smartserve": "^2.0.3",
"@types/node": "^25.5.0",
"@types/ws": "^8.18.1",
"typescript": "^6.0.2",
"why-is-node-running": "^3.2.2"
"why-is-node-running": "^3.2.2",
"ws": "^8.20.0"
},
"dependencies": {
"@push.rocks/smartcrypto": "^2.0.4",

22
pnpm-lock.yaml generated
View File

@@ -45,12 +45,18 @@ importers:
'@types/node':
specifier: ^25.5.0
version: 25.5.0
'@types/ws':
specifier: ^8.18.1
version: 8.18.1
typescript:
specifier: ^6.0.2
version: 6.0.2
why-is-node-running:
specifier: ^3.2.2
version: 3.2.2
ws:
specifier: ^8.20.0
version: 8.20.0
packages:
@@ -3304,18 +3310,6 @@ packages:
wrappy@1.0.2:
resolution: {integrity: sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=}
ws@8.19.0:
resolution: {integrity: sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg==}
engines: {node: '>=10.0.0'}
peerDependencies:
bufferutil: ^4.0.1
utf-8-validate: '>=5.0.2'
peerDependenciesMeta:
bufferutil:
optional: true
utf-8-validate:
optional: true
ws@8.20.0:
resolution: {integrity: sha512-sAt8BhgNbzCtgGbt2OxmpuryO63ZoDk/sqaB/znQm94T4fCEsy/yV+7CdC1kJhOU9lboAEU7R3kquuycDoibVA==}
engines: {node: '>=10.0.0'}
@@ -5296,7 +5290,7 @@ snapshots:
'@push.rocks/smartenv': 6.0.0
'@push.rocks/smartlog': 3.2.1
'@push.rocks/smartpath': 6.0.0
ws: 8.19.0
ws: 8.20.0
transitivePeerDependencies:
- bufferutil
- utf-8-validate
@@ -8033,8 +8027,6 @@ snapshots:
wrappy@1.0.2: {}
ws@8.19.0: {}
ws@8.20.0: {}
xml-parse-from-string@1.0.1: {}

View File

@@ -18,7 +18,7 @@ use tracing::{debug, warn};
use rustproxy_config::RouteConfig;
use tokio_util::sync::CancellationToken;
use crate::proxy_service::{ConnActivity, HttpProxyService};
use crate::proxy_service::{ConnActivity, HttpProxyService, ProtocolGuard};
/// HTTP/3 proxy service.
///
@@ -48,6 +48,9 @@ impl H3ProxyService {
let remote_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
debug!("HTTP/3 connection from {} on port {}", remote_addr, port);
// Track frontend H3 connection for the QUIC connection's lifetime.
let _frontend_h3_guard = ProtocolGuard::frontend(Arc::clone(self.http_proxy.metrics()), "h3");
let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
h3::server::builder()
.send_grease(false)

View File

@@ -110,6 +110,68 @@ impl Drop for ActiveRequestGuard {
}
}
/// RAII guard that calls frontend_protocol_closed or backend_protocol_closed on drop.
/// Ensures active protocol counters are decremented on all exit paths.
pub(crate) struct ProtocolGuard {
metrics: Arc<MetricsCollector>,
version: &'static str,
is_frontend: bool,
}
impl ProtocolGuard {
pub fn frontend(metrics: Arc<MetricsCollector>, version: &'static str) -> Self {
metrics.frontend_protocol_opened(version);
Self { metrics, version, is_frontend: true }
}
pub fn backend(metrics: Arc<MetricsCollector>, version: &'static str) -> Self {
metrics.backend_protocol_opened(version);
Self { metrics, version, is_frontend: false }
}
}
impl Drop for ProtocolGuard {
fn drop(&mut self) {
if self.is_frontend {
self.metrics.frontend_protocol_closed(self.version);
} else {
self.metrics.backend_protocol_closed(self.version);
}
}
}
/// Connection-level frontend protocol tracker.
///
/// In `handle_io`, the HTTP protocol (h1 vs h2) is unknown until the first request
/// arrives. This struct uses `OnceLock` so the first request detects the protocol
/// and opens the counter; subsequent requests on the same connection are no-ops.
/// On Drop (when the connection ends), the counter is closed.
pub(crate) struct FrontendProtocolTracker {
metrics: Arc<MetricsCollector>,
proto: std::sync::OnceLock<&'static str>,
}
impl FrontendProtocolTracker {
fn new(metrics: Arc<MetricsCollector>) -> Self {
Self { metrics, proto: std::sync::OnceLock::new() }
}
/// Set the frontend protocol. Only the first call opens the counter.
fn set(&self, proto: &'static str) {
if self.proto.set(proto).is_ok() {
self.metrics.frontend_protocol_opened(proto);
}
}
}
impl Drop for FrontendProtocolTracker {
fn drop(&mut self) {
if let Some(proto) = self.proto.get() {
self.metrics.frontend_protocol_closed(proto);
}
}
}
/// Backend stream that can be either plain TCP or TLS-wrapped.
/// Used for `terminate-and-reencrypt` mode where the backend requires TLS.
pub(crate) enum BackendStream {
@@ -335,6 +397,11 @@ impl HttpProxyService {
self.protocol_cache.snapshot()
}
/// Access the shared metrics collector (used by H3ProxyService for protocol tracking).
pub fn metrics(&self) -> &Arc<MetricsCollector> {
&self.metrics
}
/// Handle an incoming HTTP connection on a plain TCP stream.
pub async fn handle_connection(
self: Arc<Self>,
@@ -379,10 +446,24 @@ impl HttpProxyService {
let active_requests = Arc::new(AtomicU64::new(0));
let start = std::time::Instant::now();
// Connection-level frontend protocol tracker: the first request detects
// h1 vs h2 from req.version() and opens the counter. On connection close
// (when handle_io returns), Drop closes the counter.
let frontend_tracker = Arc::new(FrontendProtocolTracker::new(Arc::clone(&self.metrics)));
let ft_inner = Arc::clone(&frontend_tracker);
let la_inner = Arc::clone(&last_activity);
let ar_inner = Arc::clone(&active_requests);
let cancel_inner = cancel.clone();
let service = hyper::service::service_fn(move |req: Request<Incoming>| {
// Detect frontend protocol from the first request on this connection.
// OnceLock ensures only the first call opens the counter.
let proto: &'static str = match req.version() {
hyper::Version::HTTP_2 => "h2",
_ => "h1",
};
ft_inner.set(proto);
// Mark request start — RAII guard decrements on drop (panic-safe)
la_inner.store(start.elapsed().as_millis() as u64, Ordering::Relaxed);
let req_guard = ActiveRequestGuard::new(Arc::clone(&ar_inner));
@@ -625,6 +706,9 @@ impl HttpProxyService {
.map(|p| p.as_str().eq_ignore_ascii_case("websocket"))
.unwrap_or(false);
// Frontend protocol is tracked at the connection level (handle_io / h3_service).
// WebSocket tunnels additionally get their own "ws" guards in the spawned task.
if is_h1_websocket || is_h2_websocket {
let result = self.handle_websocket_upgrade(
req, peer_addr, &upstream, route_match.route, route_id, &upstream_key, cancel, &ip_str, is_h2_websocket,
@@ -1233,13 +1317,18 @@ impl HttpProxyService {
}
};
tokio::spawn(async move {
match tokio::time::timeout(std::time::Duration::from_secs(300), conn).await {
Ok(Err(e)) => debug!("Upstream connection error: {}", e),
Err(_) => debug!("H1 connection driver timed out after 300s"),
_ => {}
}
});
{
let driver_metrics = Arc::clone(&self.metrics);
tokio::spawn(async move {
// Track backend H1 connection for the driver's lifetime
let _proto_guard = ProtocolGuard::backend(driver_metrics, "h1");
match tokio::time::timeout(std::time::Duration::from_secs(300), conn).await {
Ok(Err(e)) => debug!("Upstream connection error: {}", e),
Err(_) => debug!("H1 connection driver timed out after 300s"),
_ => {}
}
});
}
self.forward_h1_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, domain, conn_activity, backend_key).await
}
@@ -1360,7 +1449,10 @@ impl HttpProxyService {
let pool = Arc::clone(&self.connection_pool);
let key = pool_key.clone();
let gen = Arc::clone(&gen_holder);
let driver_metrics = Arc::clone(&self.metrics);
tokio::spawn(async move {
// Track backend H2 connection for the driver's lifetime
let _proto_guard = ProtocolGuard::backend(driver_metrics, "h2");
if let Err(e) = conn.await {
warn!("HTTP/2 upstream connection error: {} ({:?})", e, e);
}
@@ -1659,7 +1751,10 @@ impl HttpProxyService {
let pool = Arc::clone(&self.connection_pool);
let key = pool_key.clone();
let gen = Arc::clone(&gen_holder);
let driver_metrics = Arc::clone(&self.metrics);
tokio::spawn(async move {
// Track backend H2 connection for the driver's lifetime
let _proto_guard = ProtocolGuard::backend(driver_metrics, "h2");
if let Err(e) = conn.await {
warn!("HTTP/2 upstream connection error: {} ({:?})", e, e);
}
@@ -1829,13 +1924,18 @@ impl HttpProxyService {
}
};
tokio::spawn(async move {
match tokio::time::timeout(std::time::Duration::from_secs(300), conn).await {
Ok(Err(e)) => debug!("H1 fallback: upstream connection error: {}", e),
Err(_) => debug!("H1 fallback: connection driver timed out after 300s"),
_ => {}
}
});
{
let driver_metrics = Arc::clone(&self.metrics);
tokio::spawn(async move {
// Track backend H1 connection for the driver's lifetime
let _proto_guard = ProtocolGuard::backend(driver_metrics, "h1");
match tokio::time::timeout(std::time::Duration::from_secs(300), conn).await {
Ok(Err(e)) => debug!("H1 fallback: upstream connection error: {}", e),
Err(_) => debug!("H1 fallback: connection driver timed out after 300s"),
_ => {}
}
});
}
let mut upstream_req = Request::builder()
.method(method)
@@ -2383,6 +2483,11 @@ impl HttpProxyService {
selector: upstream_selector,
key: upstream_key_owned.clone(),
};
// Track WebSocket tunnel as "ws" on both frontend and backend.
// Frontend h1/h2 is tracked at the connection level (handle_io); this
// additional "ws" guard captures the tunnel's lifetime independently.
let _frontend_ws_guard = ProtocolGuard::frontend(Arc::clone(&metrics), "ws");
let _backend_ws_guard = ProtocolGuard::backend(Arc::clone(&metrics), "ws");
let client_upgraded = match on_client_upgrade.await {
Ok(upgraded) => upgraded,
@@ -2845,7 +2950,10 @@ impl HttpProxyService {
let driver_pool = Arc::clone(&self.connection_pool);
let driver_pool_key = pool_key.clone();
let driver_gen = Arc::clone(&gen_holder);
let driver_metrics = Arc::clone(&self.metrics);
tokio::spawn(async move {
// Track backend H3 connection for the driver's lifetime
let _proto_guard = ProtocolGuard::backend(driver_metrics, "h3");
let close_err = std::future::poll_fn(|cx| driver.poll_close(cx)).await;
debug!("H3 connection driver closed: {:?}", close_err);
let g = driver_gen.load(std::sync::atomic::Ordering::Relaxed);

View File

@@ -33,6 +33,9 @@ pub struct Metrics {
pub total_datagrams_out: u64,
// Protocol detection cache snapshot (populated by RustProxy from HttpProxyService)
pub detected_protocols: Vec<ProtocolCacheEntryMetric>,
// Protocol distribution for frontend (client→proxy) and backend (proxy→upstream)
pub frontend_protocols: ProtocolMetrics,
pub backend_protocols: ProtocolMetrics,
}
/// Per-route metrics.
@@ -99,6 +102,23 @@ pub struct ProtocolCacheEntryMetric {
pub h3_consecutive_failures: Option<u32>,
}
/// Protocol distribution metrics for frontend (client→proxy) and backend (proxy→upstream).
/// Tracks active and total counts for each protocol category.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct ProtocolMetrics {
pub h1_active: u64,
pub h1_total: u64,
pub h2_active: u64,
pub h2_total: u64,
pub h3_active: u64,
pub h3_total: u64,
pub ws_active: u64,
pub ws_total: u64,
pub other_active: u64,
pub other_total: u64,
}
/// Statistics snapshot.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -170,6 +190,30 @@ pub struct MetricsCollector {
total_datagrams_in: AtomicU64,
total_datagrams_out: AtomicU64,
// ── Frontend protocol tracking (h1/h2/h3/ws/other) ──
frontend_h1_active: AtomicU64,
frontend_h1_total: AtomicU64,
frontend_h2_active: AtomicU64,
frontend_h2_total: AtomicU64,
frontend_h3_active: AtomicU64,
frontend_h3_total: AtomicU64,
frontend_ws_active: AtomicU64,
frontend_ws_total: AtomicU64,
frontend_other_active: AtomicU64,
frontend_other_total: AtomicU64,
// ── Backend protocol tracking (h1/h2/h3/ws/other) ──
backend_h1_active: AtomicU64,
backend_h1_total: AtomicU64,
backend_h2_active: AtomicU64,
backend_h2_total: AtomicU64,
backend_h3_active: AtomicU64,
backend_h3_total: AtomicU64,
backend_ws_active: AtomicU64,
backend_ws_total: AtomicU64,
backend_other_active: AtomicU64,
backend_other_total: AtomicU64,
// ── Lock-free pending throughput counters (hot path) ──
global_pending_tp_in: AtomicU64,
global_pending_tp_out: AtomicU64,
@@ -221,6 +265,26 @@ impl MetricsCollector {
total_http_requests: AtomicU64::new(0),
pending_http_requests: AtomicU64::new(0),
http_request_throughput: Mutex::new(ThroughputTracker::new(retention_seconds)),
frontend_h1_active: AtomicU64::new(0),
frontend_h1_total: AtomicU64::new(0),
frontend_h2_active: AtomicU64::new(0),
frontend_h2_total: AtomicU64::new(0),
frontend_h3_active: AtomicU64::new(0),
frontend_h3_total: AtomicU64::new(0),
frontend_ws_active: AtomicU64::new(0),
frontend_ws_total: AtomicU64::new(0),
frontend_other_active: AtomicU64::new(0),
frontend_other_total: AtomicU64::new(0),
backend_h1_active: AtomicU64::new(0),
backend_h1_total: AtomicU64::new(0),
backend_h2_active: AtomicU64::new(0),
backend_h2_total: AtomicU64::new(0),
backend_h3_active: AtomicU64::new(0),
backend_h3_total: AtomicU64::new(0),
backend_ws_active: AtomicU64::new(0),
backend_ws_total: AtomicU64::new(0),
backend_other_active: AtomicU64::new(0),
backend_other_total: AtomicU64::new(0),
global_pending_tp_in: AtomicU64::new(0),
global_pending_tp_out: AtomicU64::new(0),
route_pending_tp: DashMap::new(),
@@ -411,6 +475,63 @@ impl MetricsCollector {
self.total_datagrams_out.fetch_add(1, Ordering::Relaxed);
}
// ── Frontend/backend protocol distribution tracking ──
/// Get the (active, total) counter pair for a frontend protocol.
fn frontend_proto_counters(&self, proto: &str) -> (&AtomicU64, &AtomicU64) {
match proto {
"h2" => (&self.frontend_h2_active, &self.frontend_h2_total),
"h3" => (&self.frontend_h3_active, &self.frontend_h3_total),
"ws" => (&self.frontend_ws_active, &self.frontend_ws_total),
"other" => (&self.frontend_other_active, &self.frontend_other_total),
_ => (&self.frontend_h1_active, &self.frontend_h1_total), // h1 + default
}
}
/// Get the (active, total) counter pair for a backend protocol.
fn backend_proto_counters(&self, proto: &str) -> (&AtomicU64, &AtomicU64) {
match proto {
"h2" => (&self.backend_h2_active, &self.backend_h2_total),
"h3" => (&self.backend_h3_active, &self.backend_h3_total),
"ws" => (&self.backend_ws_active, &self.backend_ws_total),
"other" => (&self.backend_other_active, &self.backend_other_total),
_ => (&self.backend_h1_active, &self.backend_h1_total), // h1 + default
}
}
/// Record a frontend request/connection opened with a given protocol.
pub fn frontend_protocol_opened(&self, proto: &str) {
let (active, total) = self.frontend_proto_counters(proto);
active.fetch_add(1, Ordering::Relaxed);
total.fetch_add(1, Ordering::Relaxed);
}
/// Record a frontend connection closed with a given protocol.
pub fn frontend_protocol_closed(&self, proto: &str) {
let (active, _) = self.frontend_proto_counters(proto);
// Atomic saturating decrement — avoids TOCTOU race where concurrent
// closes could both read val=1, both subtract, wrapping to u64::MAX.
active.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
if v > 0 { Some(v - 1) } else { None }
}).ok();
}
/// Record a backend connection opened with a given protocol.
pub fn backend_protocol_opened(&self, proto: &str) {
let (active, total) = self.backend_proto_counters(proto);
active.fetch_add(1, Ordering::Relaxed);
total.fetch_add(1, Ordering::Relaxed);
}
/// Record a backend connection closed with a given protocol.
pub fn backend_protocol_closed(&self, proto: &str) {
let (active, _) = self.backend_proto_counters(proto);
// Atomic saturating decrement — see frontend_protocol_closed for rationale.
active.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
if v > 0 { Some(v - 1) } else { None }
}).ok();
}
// ── Per-backend recording methods ──
/// Record a successful backend connection with its connect duration.
@@ -866,6 +987,30 @@ impl MetricsCollector {
total_datagrams_in: self.total_datagrams_in.load(Ordering::Relaxed),
total_datagrams_out: self.total_datagrams_out.load(Ordering::Relaxed),
detected_protocols: vec![],
frontend_protocols: ProtocolMetrics {
h1_active: self.frontend_h1_active.load(Ordering::Relaxed),
h1_total: self.frontend_h1_total.load(Ordering::Relaxed),
h2_active: self.frontend_h2_active.load(Ordering::Relaxed),
h2_total: self.frontend_h2_total.load(Ordering::Relaxed),
h3_active: self.frontend_h3_active.load(Ordering::Relaxed),
h3_total: self.frontend_h3_total.load(Ordering::Relaxed),
ws_active: self.frontend_ws_active.load(Ordering::Relaxed),
ws_total: self.frontend_ws_total.load(Ordering::Relaxed),
other_active: self.frontend_other_active.load(Ordering::Relaxed),
other_total: self.frontend_other_total.load(Ordering::Relaxed),
},
backend_protocols: ProtocolMetrics {
h1_active: self.backend_h1_active.load(Ordering::Relaxed),
h1_total: self.backend_h1_total.load(Ordering::Relaxed),
h2_active: self.backend_h2_active.load(Ordering::Relaxed),
h2_total: self.backend_h2_total.load(Ordering::Relaxed),
h3_active: self.backend_h3_active.load(Ordering::Relaxed),
h3_total: self.backend_h3_total.load(Ordering::Relaxed),
ws_active: self.backend_ws_active.load(Ordering::Relaxed),
ws_total: self.backend_ws_total.load(Ordering::Relaxed),
other_active: self.backend_other_active.load(Ordering::Relaxed),
other_total: self.backend_other_total.load(Ordering::Relaxed),
},
}
}
}

View File

@@ -43,6 +43,33 @@ impl Drop for ConnectionGuard {
}
}
/// RAII guard for frontend+backend protocol distribution tracking.
/// Calls the appropriate _closed methods on drop for both frontend and backend.
struct ProtocolGuard {
metrics: Arc<MetricsCollector>,
frontend_proto: Option<&'static str>,
backend_proto: Option<&'static str>,
}
impl ProtocolGuard {
fn new(metrics: Arc<MetricsCollector>, frontend: &'static str, backend: &'static str) -> Self {
metrics.frontend_protocol_opened(frontend);
metrics.backend_protocol_opened(backend);
Self { metrics, frontend_proto: Some(frontend), backend_proto: Some(backend) }
}
}
impl Drop for ProtocolGuard {
fn drop(&mut self) {
if let Some(proto) = self.frontend_proto {
self.metrics.frontend_protocol_closed(proto);
}
if let Some(proto) = self.backend_proto {
self.metrics.backend_protocol_closed(proto);
}
}
}
/// RAII guard that calls ConnectionTracker::connection_closed on drop.
/// Ensures per-IP tracking is cleaned up on ALL exit paths — normal, error, or panic.
struct ConnectionTrackerGuard {
@@ -1024,6 +1051,9 @@ impl TcpListenerManager {
peer_addr, target_host, target_port, domain
);
// Track as "other" protocol (non-HTTP passthrough)
let _proto_guard = ProtocolGuard::new(Arc::clone(&metrics), "other", "other");
let mut actual_buf = vec![0u8; n];
stream.read_exact(&mut actual_buf).await?;
@@ -1090,6 +1120,8 @@ impl TcpListenerManager {
"TLS Terminate + TCP: {} -> {}:{} (domain: {:?})",
peer_addr, target_host, target_port, domain
);
// Track as "other" protocol (TLS-terminated non-HTTP)
let _proto_guard = ProtocolGuard::new(Arc::clone(&metrics), "other", "other");
// Raw TCP forwarding of decrypted stream
let backend = match tokio::time::timeout(
connect_timeout,
@@ -1176,6 +1208,8 @@ impl TcpListenerManager {
"TLS Terminate+Reencrypt + TCP: {} -> {}:{}",
peer_addr, target_host, target_port
);
// Track as "other" protocol (TLS-terminated non-HTTP, re-encrypted)
let _proto_guard = ProtocolGuard::new(Arc::clone(&metrics), "other", "other");
Self::handle_tls_reencrypt_tunnel(
buf_stream, &target_host, target_port,
peer_addr, Arc::clone(&metrics), route_id,
@@ -1192,6 +1226,8 @@ impl TcpListenerManager {
Ok(())
} else {
// Plain TCP forwarding (non-HTTP)
// Track as "other" protocol (plain TCP, non-HTTP)
let _proto_guard = ProtocolGuard::new(Arc::clone(&metrics), "other", "other");
let mut backend = match tokio::time::timeout(
connect_timeout,
tokio::net::TcpStream::connect(format!("{}:{}", target_host, target_port)),

418
test/test.websocket-e2e.ts Normal file
View File

@@ -0,0 +1,418 @@
import { tap, expect } from '@git.zone/tstest/tapbundle';
import { SmartProxy } from '../ts/index.js';
import * as http from 'http';
import WebSocket, { WebSocketServer } from 'ws';
import { findFreePorts, assertPortsFree } from './helpers/port-allocator.js';
/**
* Helper: create a WebSocket client that connects through the proxy.
* Registers the message handler BEFORE awaiting open to avoid race conditions.
*/
function connectWs(
url: string,
headers: Record<string, string> = {},
opts: WebSocket.ClientOptions = {},
): { ws: WebSocket; messages: string[]; opened: Promise<void> } {
const messages: string[] = [];
const ws = new WebSocket(url, { headers, ...opts });
// Register message handler immediately — before open fires
ws.on('message', (data) => {
messages.push(data.toString());
});
const opened = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('WebSocket open timeout')), 5000);
ws.on('open', () => { clearTimeout(timeout); resolve(); });
ws.on('error', (err) => { clearTimeout(timeout); reject(err); });
});
return { ws, messages, opened };
}
/** Wait until `predicate` returns true, with a hard timeout. */
function waitFor(predicate: () => boolean, timeoutMs = 5000): Promise<void> {
return new Promise((resolve, reject) => {
const deadline = setTimeout(() => reject(new Error('waitFor timeout')), timeoutMs);
const check = () => {
if (predicate()) { clearTimeout(deadline); resolve(); }
else setTimeout(check, 30);
};
check();
});
}
/** Graceful close helper */
function closeWs(ws: WebSocket): Promise<void> {
return new Promise((resolve) => {
if (ws.readyState === WebSocket.CLOSED) return resolve();
ws.on('close', () => resolve());
ws.close();
setTimeout(resolve, 2000); // fallback
});
}
// ─── Test 1: Basic WebSocket upgrade and bidirectional messaging ───
tap.test('should proxy WebSocket connections with bidirectional messaging', async () => {
const [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2);
// Backend: echoes messages with prefix, sends greeting on connect
const backendServer = http.createServer();
const wss = new WebSocketServer({ server: backendServer });
const backendMessages: string[] = [];
wss.on('connection', (ws) => {
ws.on('message', (data) => {
const msg = data.toString();
backendMessages.push(msg);
ws.send(`echo: ${msg}`);
});
ws.send('hello from backend');
});
await new Promise<void>((resolve) => {
backendServer.listen(BACKEND_PORT, '127.0.0.1', () => resolve());
});
const proxy = new SmartProxy({
routes: [{
name: 'ws-test-route',
match: { ports: PROXY_PORT },
action: {
type: 'forward',
targets: [{ host: '127.0.0.1', port: BACKEND_PORT }],
websocket: { enabled: true },
},
}],
});
await proxy.start();
// Connect client — message handler registered before open
const { ws, messages, opened } = connectWs(
`ws://127.0.0.1:${PROXY_PORT}/`,
{ Host: 'test.local' },
);
await opened;
// Wait for the backend greeting
await waitFor(() => messages.length >= 1);
expect(messages[0]).toEqual('hello from backend');
// Send 3 messages, expect 3 echoes
ws.send('ping 1');
ws.send('ping 2');
ws.send('ping 3');
await waitFor(() => messages.length >= 4);
expect(messages).toContain('echo: ping 1');
expect(messages).toContain('echo: ping 2');
expect(messages).toContain('echo: ping 3');
expect(backendMessages).toInclude('ping 1');
expect(backendMessages).toInclude('ping 2');
expect(backendMessages).toInclude('ping 3');
await closeWs(ws);
await proxy.stop();
await new Promise<void>((resolve) => backendServer.close(() => resolve()));
await new Promise((r) => setTimeout(r, 500));
await assertPortsFree([PROXY_PORT, BACKEND_PORT]);
});
// ─── Test 2: Multiple concurrent WebSocket connections ───
tap.test('should handle multiple concurrent WebSocket connections', async () => {
const [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2);
const backendServer = http.createServer();
const wss = new WebSocketServer({ server: backendServer });
let connectionCount = 0;
wss.on('connection', (ws) => {
const id = ++connectionCount;
ws.on('message', (data) => {
ws.send(`conn${id}: ${data.toString()}`);
});
});
await new Promise<void>((resolve) => {
backendServer.listen(BACKEND_PORT, '127.0.0.1', () => resolve());
});
const proxy = new SmartProxy({
routes: [{
name: 'ws-multi-route',
match: { ports: PROXY_PORT },
action: {
type: 'forward',
targets: [{ host: '127.0.0.1', port: BACKEND_PORT }],
websocket: { enabled: true },
},
}],
});
await proxy.start();
const NUM_CLIENTS = 5;
const clients: { ws: WebSocket; messages: string[] }[] = [];
for (let i = 0; i < NUM_CLIENTS; i++) {
const c = connectWs(
`ws://127.0.0.1:${PROXY_PORT}/`,
{ Host: 'test.local' },
);
await c.opened;
clients.push(c);
}
// Each client sends a unique message
for (let i = 0; i < NUM_CLIENTS; i++) {
clients[i].ws.send(`hello from client ${i}`);
}
// Wait for all replies
await waitFor(() => clients.every((c) => c.messages.length >= 1));
for (let i = 0; i < NUM_CLIENTS; i++) {
expect(clients[i].messages.length).toBeGreaterThanOrEqual(1);
expect(clients[i].messages[0]).toInclude(`hello from client ${i}`);
}
expect(connectionCount).toEqual(NUM_CLIENTS);
for (const c of clients) await closeWs(c.ws);
await proxy.stop();
await new Promise<void>((resolve) => backendServer.close(() => resolve()));
await new Promise((r) => setTimeout(r, 500));
await assertPortsFree([PROXY_PORT, BACKEND_PORT]);
});
// ─── Test 3: WebSocket with binary data ───
tap.test('should proxy binary WebSocket frames', async () => {
const [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2);
const backendServer = http.createServer();
const wss = new WebSocketServer({ server: backendServer });
wss.on('connection', (ws) => {
ws.on('message', (data) => {
ws.send(data, { binary: true });
});
});
await new Promise<void>((resolve) => {
backendServer.listen(BACKEND_PORT, '127.0.0.1', () => resolve());
});
const proxy = new SmartProxy({
routes: [{
name: 'ws-binary-route',
match: { ports: PROXY_PORT },
action: {
type: 'forward',
targets: [{ host: '127.0.0.1', port: BACKEND_PORT }],
websocket: { enabled: true },
},
}],
});
await proxy.start();
const receivedBuffers: Buffer[] = [];
const ws = new WebSocket(`ws://127.0.0.1:${PROXY_PORT}/`, {
headers: { Host: 'test.local' },
});
ws.on('message', (data) => {
receivedBuffers.push(Buffer.from(data as ArrayBuffer));
});
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('timeout')), 5000);
ws.on('open', () => { clearTimeout(timeout); resolve(); });
ws.on('error', (err) => { clearTimeout(timeout); reject(err); });
});
// Send a 256-byte buffer with known content
const sentBuffer = Buffer.alloc(256);
for (let i = 0; i < 256; i++) sentBuffer[i] = i;
ws.send(sentBuffer);
await waitFor(() => receivedBuffers.length >= 1);
expect(receivedBuffers[0].length).toEqual(256);
expect(Buffer.compare(receivedBuffers[0], sentBuffer)).toEqual(0);
await closeWs(ws);
await proxy.stop();
await new Promise<void>((resolve) => backendServer.close(() => resolve()));
await new Promise((r) => setTimeout(r, 500));
await assertPortsFree([PROXY_PORT, BACKEND_PORT]);
});
// ─── Test 4: WebSocket path and query string preserved ───
tap.test('should preserve path and query string through proxy', async () => {
const [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2);
const backendServer = http.createServer();
const wss = new WebSocketServer({ server: backendServer });
let receivedUrl = '';
wss.on('connection', (ws, req) => {
receivedUrl = req.url || '';
ws.send(`url: ${receivedUrl}`);
});
await new Promise<void>((resolve) => {
backendServer.listen(BACKEND_PORT, '127.0.0.1', () => resolve());
});
const proxy = new SmartProxy({
routes: [{
name: 'ws-path-route',
match: { ports: PROXY_PORT },
action: {
type: 'forward',
targets: [{ host: '127.0.0.1', port: BACKEND_PORT }],
websocket: { enabled: true },
},
}],
});
await proxy.start();
const { ws, messages, opened } = connectWs(
`ws://127.0.0.1:${PROXY_PORT}/chat/room1?token=abc123`,
{ Host: 'test.local' },
);
await opened;
await waitFor(() => messages.length >= 1);
expect(receivedUrl).toEqual('/chat/room1?token=abc123');
expect(messages[0]).toEqual('url: /chat/room1?token=abc123');
await closeWs(ws);
await proxy.stop();
await new Promise<void>((resolve) => backendServer.close(() => resolve()));
await new Promise((r) => setTimeout(r, 500));
await assertPortsFree([PROXY_PORT, BACKEND_PORT]);
});
// ─── Test 5: Clean close propagation ───
tap.test('should handle clean WebSocket close from client', async () => {
const [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2);
const backendServer = http.createServer();
const wss = new WebSocketServer({ server: backendServer });
let backendGotClose = false;
let backendCloseCode = 0;
wss.on('connection', (ws) => {
ws.on('close', (code) => {
backendGotClose = true;
backendCloseCode = code;
});
ws.on('message', (data) => {
ws.send(data);
});
});
await new Promise<void>((resolve) => {
backendServer.listen(BACKEND_PORT, '127.0.0.1', () => resolve());
});
const proxy = new SmartProxy({
routes: [{
name: 'ws-close-route',
match: { ports: PROXY_PORT },
action: {
type: 'forward',
targets: [{ host: '127.0.0.1', port: BACKEND_PORT }],
websocket: { enabled: true },
},
}],
});
await proxy.start();
const { ws, messages, opened } = connectWs(
`ws://127.0.0.1:${PROXY_PORT}/`,
{ Host: 'test.local' },
);
await opened;
// Confirm connection works with a round-trip
ws.send('test');
await waitFor(() => messages.length >= 1);
// Close with code 1000
let clientCloseCode = 0;
const closed = new Promise<void>((resolve) => {
ws.on('close', (code) => {
clientCloseCode = code;
resolve();
});
setTimeout(resolve, 3000);
});
ws.close(1000, 'done');
await closed;
// Wait for backend to register
await waitFor(() => backendGotClose, 3000);
expect(backendGotClose).toBeTrue();
expect(clientCloseCode).toEqual(1000);
await proxy.stop();
await new Promise<void>((resolve) => backendServer.close(() => resolve()));
await new Promise((r) => setTimeout(r, 500));
await assertPortsFree([PROXY_PORT, BACKEND_PORT]);
});
// ─── Test 6: Large messages ───
tap.test('should handle large WebSocket messages', async () => {
const [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2);
const backendServer = http.createServer();
const wss = new WebSocketServer({ server: backendServer, maxPayload: 5 * 1024 * 1024 });
wss.on('connection', (ws) => {
ws.on('message', (data) => {
const buf = Buffer.from(data as ArrayBuffer);
ws.send(`received ${buf.length} bytes`);
});
});
await new Promise<void>((resolve) => {
backendServer.listen(BACKEND_PORT, '127.0.0.1', () => resolve());
});
const proxy = new SmartProxy({
routes: [{
name: 'ws-large-route',
match: { ports: PROXY_PORT },
action: {
type: 'forward',
targets: [{ host: '127.0.0.1', port: BACKEND_PORT }],
websocket: { enabled: true },
},
}],
});
await proxy.start();
const { ws, messages, opened } = connectWs(
`ws://127.0.0.1:${PROXY_PORT}/`,
{ Host: 'test.local' },
{ maxPayload: 5 * 1024 * 1024 },
);
await opened;
// Send a 1MB message
const largePayload = Buffer.alloc(1024 * 1024, 0x42);
ws.send(largePayload);
await waitFor(() => messages.length >= 1);
expect(messages[0]).toEqual(`received ${1024 * 1024} bytes`);
await closeWs(ws);
await proxy.stop();
await new Promise<void>((resolve) => backendServer.close(() => resolve()));
await new Promise((r) => setTimeout(r, 500));
await assertPortsFree([PROXY_PORT, BACKEND_PORT]);
});
export default tap.start();

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartproxy',
version: '27.1.0',
version: '27.3.1',
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
}

View File

@@ -32,6 +32,23 @@ export interface IThroughputHistoryPoint {
/**
* Main metrics interface with clean, grouped API
*/
/**
* Protocol distribution for frontend (client→proxy) or backend (proxy→upstream).
* Tracks active and total counts for h1/h2/h3/ws/other.
*/
export interface IProtocolDistribution {
h1Active: number;
h1Total: number;
h2Active: number;
h2Total: number;
h3Active: number;
h3Total: number;
wsActive: number;
wsTotal: number;
otherActive: number;
otherTotal: number;
}
export interface IMetrics {
// Connection metrics
connections: {
@@ -40,6 +57,8 @@ export interface IMetrics {
byRoute(): Map<string, number>;
byIP(): Map<string, number>;
topIPs(limit?: number): Array<{ ip: string; count: number }>;
frontendProtocols(): IProtocolDistribution;
backendProtocols(): IProtocolDistribution;
};
// Throughput metrics (bytes per second)

View File

@@ -1,4 +1,4 @@
import type { IMetrics, IBackendMetrics, IProtocolCacheEntry, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js';
import type { IMetrics, IBackendMetrics, IProtocolCacheEntry, IProtocolDistribution, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js';
import type { RustProxyBridge } from './rust-proxy-bridge.js';
/**
@@ -90,6 +90,36 @@ export class RustMetricsAdapter implements IMetrics {
result.sort((a, b) => b.count - a.count);
return result.slice(0, limit);
},
frontendProtocols: (): IProtocolDistribution => {
const fp = this.cache?.frontendProtocols;
return {
h1Active: fp?.h1Active ?? 0,
h1Total: fp?.h1Total ?? 0,
h2Active: fp?.h2Active ?? 0,
h2Total: fp?.h2Total ?? 0,
h3Active: fp?.h3Active ?? 0,
h3Total: fp?.h3Total ?? 0,
wsActive: fp?.wsActive ?? 0,
wsTotal: fp?.wsTotal ?? 0,
otherActive: fp?.otherActive ?? 0,
otherTotal: fp?.otherTotal ?? 0,
};
},
backendProtocols: (): IProtocolDistribution => {
const bp = this.cache?.backendProtocols;
return {
h1Active: bp?.h1Active ?? 0,
h1Total: bp?.h1Total ?? 0,
h2Active: bp?.h2Active ?? 0,
h2Total: bp?.h2Total ?? 0,
h3Active: bp?.h3Active ?? 0,
h3Total: bp?.h3Total ?? 0,
wsActive: bp?.wsActive ?? 0,
wsTotal: bp?.wsTotal ?? 0,
otherActive: bp?.otherActive ?? 0,
otherTotal: bp?.otherTotal ?? 0,
};
},
};
public throughput = {