fix(metrics): correct frontend and backend protocol connection tracking across h1, h2, h3, and websocket traffic
This commit is contained in:
@@ -1,5 +1,13 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 2026-04-04 - 27.3.1 - fix(metrics)
|
||||||
|
correct frontend and backend protocol connection tracking across h1, h2, h3, and websocket traffic
|
||||||
|
|
||||||
|
- move frontend protocol accounting from per-request to connection lifetime tracking for HTTP/1, HTTP/2, and HTTP/3
|
||||||
|
- add backend protocol guards to connection drivers so active protocol metrics reflect live upstream connections
|
||||||
|
- prevent protocol counter underflow by using atomic saturating decrements in the metrics collector
|
||||||
|
- read backend protocol distribution directly from cached aggregate counters in the Rust metrics adapter
|
||||||
|
|
||||||
## 2026-04-04 - 27.3.0 - feat(test)
|
## 2026-04-04 - 27.3.0 - feat(test)
|
||||||
add end-to-end WebSocket proxy test coverage
|
add end-to-end WebSocket proxy test coverage
|
||||||
|
|
||||||
|
|||||||
8
deno.lock
generated
8
deno.lock
generated
@@ -12,9 +12,11 @@
|
|||||||
"npm:@push.rocks/smartserve@^2.0.3": "2.0.3",
|
"npm:@push.rocks/smartserve@^2.0.3": "2.0.3",
|
||||||
"npm:@tsclass/tsclass@^9.5.0": "9.5.0",
|
"npm:@tsclass/tsclass@^9.5.0": "9.5.0",
|
||||||
"npm:@types/node@^25.5.0": "25.5.0",
|
"npm:@types/node@^25.5.0": "25.5.0",
|
||||||
|
"npm:@types/ws@^8.18.1": "8.18.1",
|
||||||
"npm:minimatch@^10.2.4": "10.2.4",
|
"npm:minimatch@^10.2.4": "10.2.4",
|
||||||
"npm:typescript@^6.0.2": "6.0.2",
|
"npm:typescript@^6.0.2": "6.0.2",
|
||||||
"npm:why-is-node-running@^3.2.2": "3.2.2"
|
"npm:why-is-node-running@^3.2.2": "3.2.2",
|
||||||
|
"npm:ws@^8.20.0": "8.20.0"
|
||||||
},
|
},
|
||||||
"npm": {
|
"npm": {
|
||||||
"@api.global/typedrequest-interfaces@2.0.2": {
|
"@api.global/typedrequest-interfaces@2.0.2": {
|
||||||
@@ -6743,9 +6745,11 @@
|
|||||||
"npm:@push.rocks/smartserve@^2.0.3",
|
"npm:@push.rocks/smartserve@^2.0.3",
|
||||||
"npm:@tsclass/tsclass@^9.5.0",
|
"npm:@tsclass/tsclass@^9.5.0",
|
||||||
"npm:@types/node@^25.5.0",
|
"npm:@types/node@^25.5.0",
|
||||||
|
"npm:@types/ws@^8.18.1",
|
||||||
"npm:minimatch@^10.2.4",
|
"npm:minimatch@^10.2.4",
|
||||||
"npm:typescript@^6.0.2",
|
"npm:typescript@^6.0.2",
|
||||||
"npm:why-is-node-running@^3.2.2"
|
"npm:why-is-node-running@^3.2.2",
|
||||||
|
"npm:ws@^8.20.0"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ use tracing::{debug, warn};
|
|||||||
use rustproxy_config::RouteConfig;
|
use rustproxy_config::RouteConfig;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
use crate::proxy_service::{ConnActivity, HttpProxyService};
|
use crate::proxy_service::{ConnActivity, HttpProxyService, ProtocolGuard};
|
||||||
|
|
||||||
/// HTTP/3 proxy service.
|
/// HTTP/3 proxy service.
|
||||||
///
|
///
|
||||||
@@ -48,6 +48,9 @@ impl H3ProxyService {
|
|||||||
let remote_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
|
let remote_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
|
||||||
debug!("HTTP/3 connection from {} on port {}", remote_addr, port);
|
debug!("HTTP/3 connection from {} on port {}", remote_addr, port);
|
||||||
|
|
||||||
|
// Track frontend H3 connection for the QUIC connection's lifetime.
|
||||||
|
let _frontend_h3_guard = ProtocolGuard::frontend(Arc::clone(self.http_proxy.metrics()), "h3");
|
||||||
|
|
||||||
let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
|
let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
|
||||||
h3::server::builder()
|
h3::server::builder()
|
||||||
.send_grease(false)
|
.send_grease(false)
|
||||||
|
|||||||
@@ -140,6 +140,38 @@ impl Drop for ProtocolGuard {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Connection-level frontend protocol tracker.
|
||||||
|
///
|
||||||
|
/// In `handle_io`, the HTTP protocol (h1 vs h2) is unknown until the first request
|
||||||
|
/// arrives. This struct uses `OnceLock` so the first request detects the protocol
|
||||||
|
/// and opens the counter; subsequent requests on the same connection are no-ops.
|
||||||
|
/// On Drop (when the connection ends), the counter is closed.
|
||||||
|
pub(crate) struct FrontendProtocolTracker {
|
||||||
|
metrics: Arc<MetricsCollector>,
|
||||||
|
proto: std::sync::OnceLock<&'static str>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FrontendProtocolTracker {
|
||||||
|
fn new(metrics: Arc<MetricsCollector>) -> Self {
|
||||||
|
Self { metrics, proto: std::sync::OnceLock::new() }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the frontend protocol. Only the first call opens the counter.
|
||||||
|
fn set(&self, proto: &'static str) {
|
||||||
|
if self.proto.set(proto).is_ok() {
|
||||||
|
self.metrics.frontend_protocol_opened(proto);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for FrontendProtocolTracker {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some(proto) = self.proto.get() {
|
||||||
|
self.metrics.frontend_protocol_closed(proto);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Backend stream that can be either plain TCP or TLS-wrapped.
|
/// Backend stream that can be either plain TCP or TLS-wrapped.
|
||||||
/// Used for `terminate-and-reencrypt` mode where the backend requires TLS.
|
/// Used for `terminate-and-reencrypt` mode where the backend requires TLS.
|
||||||
pub(crate) enum BackendStream {
|
pub(crate) enum BackendStream {
|
||||||
@@ -365,6 +397,11 @@ impl HttpProxyService {
|
|||||||
self.protocol_cache.snapshot()
|
self.protocol_cache.snapshot()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Access the shared metrics collector (used by H3ProxyService for protocol tracking).
|
||||||
|
pub fn metrics(&self) -> &Arc<MetricsCollector> {
|
||||||
|
&self.metrics
|
||||||
|
}
|
||||||
|
|
||||||
/// Handle an incoming HTTP connection on a plain TCP stream.
|
/// Handle an incoming HTTP connection on a plain TCP stream.
|
||||||
pub async fn handle_connection(
|
pub async fn handle_connection(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
@@ -409,10 +446,24 @@ impl HttpProxyService {
|
|||||||
let active_requests = Arc::new(AtomicU64::new(0));
|
let active_requests = Arc::new(AtomicU64::new(0));
|
||||||
let start = std::time::Instant::now();
|
let start = std::time::Instant::now();
|
||||||
|
|
||||||
|
// Connection-level frontend protocol tracker: the first request detects
|
||||||
|
// h1 vs h2 from req.version() and opens the counter. On connection close
|
||||||
|
// (when handle_io returns), Drop closes the counter.
|
||||||
|
let frontend_tracker = Arc::new(FrontendProtocolTracker::new(Arc::clone(&self.metrics)));
|
||||||
|
let ft_inner = Arc::clone(&frontend_tracker);
|
||||||
|
|
||||||
let la_inner = Arc::clone(&last_activity);
|
let la_inner = Arc::clone(&last_activity);
|
||||||
let ar_inner = Arc::clone(&active_requests);
|
let ar_inner = Arc::clone(&active_requests);
|
||||||
let cancel_inner = cancel.clone();
|
let cancel_inner = cancel.clone();
|
||||||
let service = hyper::service::service_fn(move |req: Request<Incoming>| {
|
let service = hyper::service::service_fn(move |req: Request<Incoming>| {
|
||||||
|
// Detect frontend protocol from the first request on this connection.
|
||||||
|
// OnceLock ensures only the first call opens the counter.
|
||||||
|
let proto: &'static str = match req.version() {
|
||||||
|
hyper::Version::HTTP_2 => "h2",
|
||||||
|
_ => "h1",
|
||||||
|
};
|
||||||
|
ft_inner.set(proto);
|
||||||
|
|
||||||
// Mark request start — RAII guard decrements on drop (panic-safe)
|
// Mark request start — RAII guard decrements on drop (panic-safe)
|
||||||
la_inner.store(start.elapsed().as_millis() as u64, Ordering::Relaxed);
|
la_inner.store(start.elapsed().as_millis() as u64, Ordering::Relaxed);
|
||||||
let req_guard = ActiveRequestGuard::new(Arc::clone(&ar_inner));
|
let req_guard = ActiveRequestGuard::new(Arc::clone(&ar_inner));
|
||||||
@@ -655,17 +706,8 @@ impl HttpProxyService {
|
|||||||
.map(|p| p.as_str().eq_ignore_ascii_case("websocket"))
|
.map(|p| p.as_str().eq_ignore_ascii_case("websocket"))
|
||||||
.unwrap_or(false);
|
.unwrap_or(false);
|
||||||
|
|
||||||
// Track frontend protocol for distribution metrics (h1/h2/h3/ws)
|
// Frontend protocol is tracked at the connection level (handle_io / h3_service).
|
||||||
let frontend_proto: &'static str = if is_h1_websocket || is_h2_websocket {
|
// WebSocket tunnels additionally get their own "ws" guards in the spawned task.
|
||||||
"ws"
|
|
||||||
} else {
|
|
||||||
match req.version() {
|
|
||||||
hyper::Version::HTTP_2 => "h2",
|
|
||||||
hyper::Version::HTTP_3 => "h3",
|
|
||||||
_ => "h1", // HTTP/1.0, HTTP/1.1
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let _frontend_proto_guard = ProtocolGuard::frontend(Arc::clone(&self.metrics), frontend_proto);
|
|
||||||
|
|
||||||
if is_h1_websocket || is_h2_websocket {
|
if is_h1_websocket || is_h2_websocket {
|
||||||
let result = self.handle_websocket_upgrade(
|
let result = self.handle_websocket_upgrade(
|
||||||
@@ -1275,13 +1317,18 @@ impl HttpProxyService {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
tokio::spawn(async move {
|
{
|
||||||
match tokio::time::timeout(std::time::Duration::from_secs(300), conn).await {
|
let driver_metrics = Arc::clone(&self.metrics);
|
||||||
Ok(Err(e)) => debug!("Upstream connection error: {}", e),
|
tokio::spawn(async move {
|
||||||
Err(_) => debug!("H1 connection driver timed out after 300s"),
|
// Track backend H1 connection for the driver's lifetime
|
||||||
_ => {}
|
let _proto_guard = ProtocolGuard::backend(driver_metrics, "h1");
|
||||||
}
|
match tokio::time::timeout(std::time::Duration::from_secs(300), conn).await {
|
||||||
});
|
Ok(Err(e)) => debug!("Upstream connection error: {}", e),
|
||||||
|
Err(_) => debug!("H1 connection driver timed out after 300s"),
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
self.forward_h1_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, domain, conn_activity, backend_key).await
|
self.forward_h1_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, domain, conn_activity, backend_key).await
|
||||||
}
|
}
|
||||||
@@ -1402,7 +1449,10 @@ impl HttpProxyService {
|
|||||||
let pool = Arc::clone(&self.connection_pool);
|
let pool = Arc::clone(&self.connection_pool);
|
||||||
let key = pool_key.clone();
|
let key = pool_key.clone();
|
||||||
let gen = Arc::clone(&gen_holder);
|
let gen = Arc::clone(&gen_holder);
|
||||||
|
let driver_metrics = Arc::clone(&self.metrics);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
// Track backend H2 connection for the driver's lifetime
|
||||||
|
let _proto_guard = ProtocolGuard::backend(driver_metrics, "h2");
|
||||||
if let Err(e) = conn.await {
|
if let Err(e) = conn.await {
|
||||||
warn!("HTTP/2 upstream connection error: {} ({:?})", e, e);
|
warn!("HTTP/2 upstream connection error: {} ({:?})", e, e);
|
||||||
}
|
}
|
||||||
@@ -1701,7 +1751,10 @@ impl HttpProxyService {
|
|||||||
let pool = Arc::clone(&self.connection_pool);
|
let pool = Arc::clone(&self.connection_pool);
|
||||||
let key = pool_key.clone();
|
let key = pool_key.clone();
|
||||||
let gen = Arc::clone(&gen_holder);
|
let gen = Arc::clone(&gen_holder);
|
||||||
|
let driver_metrics = Arc::clone(&self.metrics);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
// Track backend H2 connection for the driver's lifetime
|
||||||
|
let _proto_guard = ProtocolGuard::backend(driver_metrics, "h2");
|
||||||
if let Err(e) = conn.await {
|
if let Err(e) = conn.await {
|
||||||
warn!("HTTP/2 upstream connection error: {} ({:?})", e, e);
|
warn!("HTTP/2 upstream connection error: {} ({:?})", e, e);
|
||||||
}
|
}
|
||||||
@@ -1871,13 +1924,18 @@ impl HttpProxyService {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
tokio::spawn(async move {
|
{
|
||||||
match tokio::time::timeout(std::time::Duration::from_secs(300), conn).await {
|
let driver_metrics = Arc::clone(&self.metrics);
|
||||||
Ok(Err(e)) => debug!("H1 fallback: upstream connection error: {}", e),
|
tokio::spawn(async move {
|
||||||
Err(_) => debug!("H1 fallback: connection driver timed out after 300s"),
|
// Track backend H1 connection for the driver's lifetime
|
||||||
_ => {}
|
let _proto_guard = ProtocolGuard::backend(driver_metrics, "h1");
|
||||||
}
|
match tokio::time::timeout(std::time::Duration::from_secs(300), conn).await {
|
||||||
});
|
Ok(Err(e)) => debug!("H1 fallback: upstream connection error: {}", e),
|
||||||
|
Err(_) => debug!("H1 fallback: connection driver timed out after 300s"),
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
let mut upstream_req = Request::builder()
|
let mut upstream_req = Request::builder()
|
||||||
.method(method)
|
.method(method)
|
||||||
@@ -2425,7 +2483,10 @@ impl HttpProxyService {
|
|||||||
selector: upstream_selector,
|
selector: upstream_selector,
|
||||||
key: upstream_key_owned.clone(),
|
key: upstream_key_owned.clone(),
|
||||||
};
|
};
|
||||||
// Track backend WebSocket connection — guard decrements on tunnel close
|
// Track WebSocket tunnel as "ws" on both frontend and backend.
|
||||||
|
// Frontend h1/h2 is tracked at the connection level (handle_io); this
|
||||||
|
// additional "ws" guard captures the tunnel's lifetime independently.
|
||||||
|
let _frontend_ws_guard = ProtocolGuard::frontend(Arc::clone(&metrics), "ws");
|
||||||
let _backend_ws_guard = ProtocolGuard::backend(Arc::clone(&metrics), "ws");
|
let _backend_ws_guard = ProtocolGuard::backend(Arc::clone(&metrics), "ws");
|
||||||
|
|
||||||
let client_upgraded = match on_client_upgrade.await {
|
let client_upgraded = match on_client_upgrade.await {
|
||||||
@@ -2889,7 +2950,10 @@ impl HttpProxyService {
|
|||||||
let driver_pool = Arc::clone(&self.connection_pool);
|
let driver_pool = Arc::clone(&self.connection_pool);
|
||||||
let driver_pool_key = pool_key.clone();
|
let driver_pool_key = pool_key.clone();
|
||||||
let driver_gen = Arc::clone(&gen_holder);
|
let driver_gen = Arc::clone(&gen_holder);
|
||||||
|
let driver_metrics = Arc::clone(&self.metrics);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
// Track backend H3 connection for the driver's lifetime
|
||||||
|
let _proto_guard = ProtocolGuard::backend(driver_metrics, "h3");
|
||||||
let close_err = std::future::poll_fn(|cx| driver.poll_close(cx)).await;
|
let close_err = std::future::poll_fn(|cx| driver.poll_close(cx)).await;
|
||||||
debug!("H3 connection driver closed: {:?}", close_err);
|
debug!("H3 connection driver closed: {:?}", close_err);
|
||||||
let g = driver_gen.load(std::sync::atomic::Ordering::Relaxed);
|
let g = driver_gen.load(std::sync::atomic::Ordering::Relaxed);
|
||||||
|
|||||||
@@ -506,13 +506,14 @@ impl MetricsCollector {
|
|||||||
total.fetch_add(1, Ordering::Relaxed);
|
total.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Record a frontend request/connection closed with a given protocol.
|
/// Record a frontend connection closed with a given protocol.
|
||||||
pub fn frontend_protocol_closed(&self, proto: &str) {
|
pub fn frontend_protocol_closed(&self, proto: &str) {
|
||||||
let (active, _) = self.frontend_proto_counters(proto);
|
let (active, _) = self.frontend_proto_counters(proto);
|
||||||
let val = active.load(Ordering::Relaxed);
|
// Atomic saturating decrement — avoids TOCTOU race where concurrent
|
||||||
if val > 0 {
|
// closes could both read val=1, both subtract, wrapping to u64::MAX.
|
||||||
active.fetch_sub(1, Ordering::Relaxed);
|
active.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
|
||||||
}
|
if v > 0 { Some(v - 1) } else { None }
|
||||||
|
}).ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Record a backend connection opened with a given protocol.
|
/// Record a backend connection opened with a given protocol.
|
||||||
@@ -525,10 +526,10 @@ impl MetricsCollector {
|
|||||||
/// Record a backend connection closed with a given protocol.
|
/// Record a backend connection closed with a given protocol.
|
||||||
pub fn backend_protocol_closed(&self, proto: &str) {
|
pub fn backend_protocol_closed(&self, proto: &str) {
|
||||||
let (active, _) = self.backend_proto_counters(proto);
|
let (active, _) = self.backend_proto_counters(proto);
|
||||||
let val = active.load(Ordering::Relaxed);
|
// Atomic saturating decrement — see frontend_protocol_closed for rationale.
|
||||||
if val > 0 {
|
active.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
|
||||||
active.fetch_sub(1, Ordering::Relaxed);
|
if v > 0 { Some(v - 1) } else { None }
|
||||||
}
|
}).ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Per-backend recording methods ──
|
// ── Per-backend recording methods ──
|
||||||
|
|||||||
@@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: '@push.rocks/smartproxy',
|
name: '@push.rocks/smartproxy',
|
||||||
version: '27.3.0',
|
version: '27.3.1',
|
||||||
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
|
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -106,27 +106,14 @@ export class RustMetricsAdapter implements IMetrics {
|
|||||||
};
|
};
|
||||||
},
|
},
|
||||||
backendProtocols: (): IProtocolDistribution => {
|
backendProtocols: (): IProtocolDistribution => {
|
||||||
// Merge per-backend h1/h2/h3 data with aggregate ws/other counters
|
|
||||||
const bp = this.cache?.backendProtocols;
|
const bp = this.cache?.backendProtocols;
|
||||||
let h1Active = 0, h1Total = 0;
|
|
||||||
let h2Active = 0, h2Total = 0;
|
|
||||||
let h3Active = 0, h3Total = 0;
|
|
||||||
if (this.cache?.backends) {
|
|
||||||
for (const bm of Object.values(this.cache.backends)) {
|
|
||||||
const m = bm as any;
|
|
||||||
const active = m.activeConnections ?? 0;
|
|
||||||
const total = m.totalConnections ?? 0;
|
|
||||||
switch (m.protocol) {
|
|
||||||
case 'h2': h2Active += active; h2Total += total; break;
|
|
||||||
case 'h3': h3Active += active; h3Total += total; break;
|
|
||||||
default: h1Active += active; h1Total += total; break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return {
|
return {
|
||||||
h1Active, h1Total,
|
h1Active: bp?.h1Active ?? 0,
|
||||||
h2Active, h2Total,
|
h1Total: bp?.h1Total ?? 0,
|
||||||
h3Active, h3Total,
|
h2Active: bp?.h2Active ?? 0,
|
||||||
|
h2Total: bp?.h2Total ?? 0,
|
||||||
|
h3Active: bp?.h3Active ?? 0,
|
||||||
|
h3Total: bp?.h3Total ?? 0,
|
||||||
wsActive: bp?.wsActive ?? 0,
|
wsActive: bp?.wsActive ?? 0,
|
||||||
wsTotal: bp?.wsTotal ?? 0,
|
wsTotal: bp?.wsTotal ?? 0,
|
||||||
otherActive: bp?.otherActive ?? 0,
|
otherActive: bp?.otherActive ?? 0,
|
||||||
|
|||||||
Reference in New Issue
Block a user