fix(metrics): centralize connection-closed reporting via ConnectionGuard and remove duplicate explicit metrics.connection_closed calls
This commit is contained in:
@@ -1,5 +1,13 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-02-16 - 25.7.3 - fix(metrics)
|
||||
centralize connection-closed reporting via ConnectionGuard and remove duplicate explicit metrics.connection_closed calls
|
||||
|
||||
- Removed numerous explicit metrics.connection_closed calls from rust/crates/rustproxy-http/src/proxy_service.rs so connection teardown and byte counting are handled by the connection guard / counting body instead of ad-hoc calls.
|
||||
- Simplified ConnectionGuard in rust/crates/rustproxy-passthrough/src/tcp_listener.rs: removed the disarm flag and disarm() method so Drop always reports connection_closed.
|
||||
- Stopped disarming the TCP-level guard when handing connections off to HTTP proxy paths (HTTP/WebSocket/streaming flows) to avoid missing or double-reporting metrics.
|
||||
- Fixes incorrect/duplicate connection-closed metric emission and ensures consistent byte/connection accounting during streaming and WebSocket upgrades.
|
||||
|
||||
## 2026-02-16 - 25.7.2 - fix(rustproxy-http)
|
||||
preserve original Host header when proxying and add X-Forwarded-* headers; add TLS WebSocket echo backend helper and integration test for terminate-and-reencrypt websocket
|
||||
|
||||
|
||||
@@ -309,12 +309,10 @@ impl HttpProxyService {
|
||||
let route_id = route_match.route.id.as_deref();
|
||||
let ip_str = peer_addr.ip().to_string();
|
||||
self.metrics.record_http_request();
|
||||
self.metrics.connection_opened(route_id, Some(&ip_str));
|
||||
|
||||
// Apply request filters (IP check, rate limiting, auth)
|
||||
if let Some(ref security) = route_match.route.security {
|
||||
if let Some(response) = RequestFilter::apply(security, &req, &peer_addr) {
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(response);
|
||||
}
|
||||
}
|
||||
@@ -322,7 +320,6 @@ impl HttpProxyService {
|
||||
// Check for test response (returns immediately, no upstream needed)
|
||||
if let Some(ref advanced) = route_match.route.action.advanced {
|
||||
if let Some(ref test_response) = advanced.test_response {
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(Self::build_test_response(test_response));
|
||||
}
|
||||
}
|
||||
@@ -330,7 +327,6 @@ impl HttpProxyService {
|
||||
// Check for static file serving
|
||||
if let Some(ref advanced) = route_match.route.action.advanced {
|
||||
if let Some(ref static_files) = advanced.static_files {
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(Self::serve_static_file(&path, static_files));
|
||||
}
|
||||
}
|
||||
@@ -339,7 +335,6 @@ impl HttpProxyService {
|
||||
let target = match route_match.target {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "No target available"));
|
||||
}
|
||||
};
|
||||
@@ -459,13 +454,11 @@ impl HttpProxyService {
|
||||
Ok(Err(e)) => {
|
||||
error!("Failed TLS connect to upstream {}:{}: {}", upstream.host, upstream.port, e);
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend TLS unavailable"));
|
||||
}
|
||||
Err(_) => {
|
||||
error!("Upstream TLS connect timeout for {}:{}", upstream.host, upstream.port);
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend TLS connect timeout"));
|
||||
}
|
||||
}
|
||||
@@ -481,13 +474,11 @@ impl HttpProxyService {
|
||||
Ok(Err(e)) => {
|
||||
error!("Failed to connect to upstream {}:{}: {}", upstream.host, upstream.port, e);
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable"));
|
||||
}
|
||||
Err(_) => {
|
||||
error!("Upstream connect timeout for {}:{}", upstream.host, upstream.port);
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout"));
|
||||
}
|
||||
}
|
||||
@@ -523,7 +514,6 @@ impl HttpProxyService {
|
||||
Ok(h) => h,
|
||||
Err(e) => {
|
||||
error!("Upstream handshake failed: {}", e);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend handshake failed"));
|
||||
}
|
||||
};
|
||||
@@ -559,7 +549,6 @@ impl HttpProxyService {
|
||||
Ok(resp) => resp,
|
||||
Err(e) => {
|
||||
error!("Upstream request failed: {}", e);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend request failed"));
|
||||
}
|
||||
};
|
||||
@@ -585,7 +574,6 @@ impl HttpProxyService {
|
||||
Ok(h) => h,
|
||||
Err(e) => {
|
||||
error!("HTTP/2 upstream handshake failed: {}", e);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 handshake failed"));
|
||||
}
|
||||
};
|
||||
@@ -620,7 +608,6 @@ impl HttpProxyService {
|
||||
Ok(resp) => resp,
|
||||
Err(e) => {
|
||||
error!("HTTP/2 upstream request failed: {}", e);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 request failed"));
|
||||
}
|
||||
};
|
||||
@@ -631,8 +618,7 @@ impl HttpProxyService {
|
||||
/// Build the client-facing response from an upstream response, streaming the body.
|
||||
///
|
||||
/// The response body is wrapped in a `CountingBody` that counts bytes as they
|
||||
/// stream from upstream to client. When the body is fully consumed (or dropped),
|
||||
/// it reports byte counts to the metrics collector and calls `connection_closed`.
|
||||
/// stream from upstream to client.
|
||||
async fn build_streaming_response(
|
||||
&self,
|
||||
upstream_response: Response<Incoming>,
|
||||
@@ -661,11 +647,6 @@ impl HttpProxyService {
|
||||
Direction::Out,
|
||||
);
|
||||
|
||||
// Close the connection metric now — the HTTP request/response cycle is done
|
||||
// from the proxy's perspective once we hand the streaming body to hyper.
|
||||
// Bytes will still be counted as they flow.
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
|
||||
let body: BoxBody<Bytes, hyper::Error> = BoxBody::new(counting_body);
|
||||
|
||||
Ok(response.body(body).unwrap())
|
||||
@@ -697,7 +678,6 @@ impl HttpProxyService {
|
||||
.unwrap_or("");
|
||||
if !allowed_origins.is_empty() && !allowed_origins.iter().any(|o| o == "*" || o == origin) {
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::FORBIDDEN, "Origin not allowed"));
|
||||
}
|
||||
}
|
||||
@@ -715,13 +695,11 @@ impl HttpProxyService {
|
||||
Ok(Err(e)) => {
|
||||
error!("WebSocket: failed TLS connect upstream {}:{}: {}", upstream.host, upstream.port, e);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend TLS unavailable"));
|
||||
}
|
||||
Err(_) => {
|
||||
error!("WebSocket: upstream TLS connect timeout for {}:{}", upstream.host, upstream.port);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend TLS connect timeout"));
|
||||
}
|
||||
}
|
||||
@@ -737,13 +715,11 @@ impl HttpProxyService {
|
||||
Ok(Err(e)) => {
|
||||
error!("WebSocket: failed to connect upstream {}:{}: {}", upstream.host, upstream.port, e);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable"));
|
||||
}
|
||||
Err(_) => {
|
||||
error!("WebSocket: upstream connect timeout for {}:{}", upstream.host, upstream.port);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout"));
|
||||
}
|
||||
}
|
||||
@@ -836,7 +812,6 @@ impl HttpProxyService {
|
||||
if let Err(e) = upstream_stream.write_all(raw_request.as_bytes()).await {
|
||||
error!("WebSocket: failed to send upgrade request to upstream: {}", e);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend write failed"));
|
||||
}
|
||||
|
||||
@@ -847,7 +822,6 @@ impl HttpProxyService {
|
||||
Ok(0) => {
|
||||
error!("WebSocket: upstream closed before completing handshake");
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend closed"));
|
||||
}
|
||||
Ok(_) => {
|
||||
@@ -861,14 +835,12 @@ impl HttpProxyService {
|
||||
if response_buf.len() > 8192 {
|
||||
error!("WebSocket: upstream response headers too large");
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend response too large"));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("WebSocket: failed to read upstream response: {}", e);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend read failed"));
|
||||
}
|
||||
}
|
||||
@@ -886,7 +858,6 @@ impl HttpProxyService {
|
||||
if status_code != 101 {
|
||||
debug!("WebSocket: upstream rejected upgrade with status {}", status_code);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(
|
||||
StatusCode::from_u16(status_code).unwrap_or(StatusCode::BAD_GATEWAY),
|
||||
"WebSocket upgrade rejected by backend",
|
||||
@@ -930,9 +901,6 @@ impl HttpProxyService {
|
||||
Err(e) => {
|
||||
debug!("WebSocket: client upgrade failed: {}", e);
|
||||
upstream_selector.connection_ended(&upstream_key_owned);
|
||||
if let Some(ref rid) = route_id_owned {
|
||||
metrics.connection_closed(Some(rid.as_str()), Some(&source_ip_owned));
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -1037,7 +1005,6 @@ impl HttpProxyService {
|
||||
upstream_selector.connection_ended(&upstream_key_owned);
|
||||
if let Some(ref rid) = route_id_owned {
|
||||
metrics.record_bytes(bytes_in, bytes_out, Some(rid.as_str()), Some(&source_ip_owned));
|
||||
metrics.connection_closed(Some(rid.as_str()), Some(&source_ip_owned));
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -22,7 +22,6 @@ struct ConnectionGuard {
|
||||
metrics: Arc<MetricsCollector>,
|
||||
route_id: Option<String>,
|
||||
source_ip: Option<String>,
|
||||
disarmed: bool,
|
||||
}
|
||||
|
||||
impl ConnectionGuard {
|
||||
@@ -31,24 +30,15 @@ impl ConnectionGuard {
|
||||
metrics,
|
||||
route_id: route_id.map(|s| s.to_string()),
|
||||
source_ip: source_ip.map(|s| s.to_string()),
|
||||
disarmed: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Disarm the guard — prevents the Drop from running.
|
||||
/// Use when handing off to a path that manages its own cleanup (e.g., HTTP proxy).
|
||||
fn disarm(mut self) {
|
||||
self.disarmed = true;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ConnectionGuard {
|
||||
fn drop(&mut self) {
|
||||
if !self.disarmed {
|
||||
self.metrics.connection_closed(self.route_id.as_deref(), self.source_ip.as_deref());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ListenerError {
|
||||
@@ -844,8 +834,6 @@ impl TcpListenerManager {
|
||||
"TLS Terminate + HTTP: {} -> {}:{} (domain: {:?})",
|
||||
peer_addr, target_host, target_port, domain
|
||||
);
|
||||
// HTTP proxy manages its own per-request metrics — disarm TCP-level guard
|
||||
_conn_guard.disarm();
|
||||
http_proxy.handle_io(buf_stream, peer_addr, port, cancel.clone()).await;
|
||||
} else {
|
||||
debug!(
|
||||
@@ -917,7 +905,6 @@ impl TcpListenerManager {
|
||||
"TLS Terminate+Reencrypt + HTTP: {} (domain: {:?})",
|
||||
peer_addr, domain
|
||||
);
|
||||
_conn_guard.disarm();
|
||||
http_proxy.handle_io(buf_stream, peer_addr, port, cancel.clone()).await;
|
||||
} else {
|
||||
// Non-HTTP: TLS-to-TLS tunnel (existing behavior for raw TCP protocols)
|
||||
@@ -937,8 +924,6 @@ impl TcpListenerManager {
|
||||
if is_http {
|
||||
// Plain HTTP - use HTTP proxy for request-level routing
|
||||
debug!("HTTP proxy: {} on port {}", peer_addr, port);
|
||||
// HTTP proxy manages its own per-request metrics — disarm TCP-level guard
|
||||
_conn_guard.disarm();
|
||||
http_proxy.handle_connection(stream, peer_addr, port, cancel.clone()).await;
|
||||
Ok(())
|
||||
} else {
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartproxy',
|
||||
version: '25.7.2',
|
||||
version: '25.7.3',
|
||||
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user