feat(metrics): add per-IP and HTTP-request metrics, propagate source IP through proxy paths, and expose new metrics to the TS adapter
This commit is contained in:
@@ -20,6 +20,7 @@ pub struct CountingBody<B> {
|
||||
counted_bytes: AtomicU64,
|
||||
metrics: Arc<MetricsCollector>,
|
||||
route_id: Option<String>,
|
||||
source_ip: Option<String>,
|
||||
/// Whether we count bytes as "in" (request body) or "out" (response body).
|
||||
direction: Direction,
|
||||
/// Whether we've already reported the bytes (to avoid double-reporting on drop).
|
||||
@@ -41,6 +42,7 @@ impl<B> CountingBody<B> {
|
||||
inner: B,
|
||||
metrics: Arc<MetricsCollector>,
|
||||
route_id: Option<String>,
|
||||
source_ip: Option<String>,
|
||||
direction: Direction,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -48,6 +50,7 @@ impl<B> CountingBody<B> {
|
||||
counted_bytes: AtomicU64::new(0),
|
||||
metrics,
|
||||
route_id,
|
||||
source_ip,
|
||||
direction,
|
||||
reported: false,
|
||||
}
|
||||
@@ -66,9 +69,10 @@ impl<B> CountingBody<B> {
|
||||
}
|
||||
|
||||
let route_id = self.route_id.as_deref();
|
||||
let source_ip = self.source_ip.as_deref();
|
||||
match self.direction {
|
||||
Direction::In => self.metrics.record_bytes(bytes, 0, route_id),
|
||||
Direction::Out => self.metrics.record_bytes(0, bytes, route_id),
|
||||
Direction::In => self.metrics.record_bytes(bytes, 0, route_id, source_ip),
|
||||
Direction::Out => self.metrics.record_bytes(0, bytes, route_id, source_ip),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -184,12 +184,14 @@ impl HttpProxyService {
|
||||
};
|
||||
|
||||
let route_id = route_match.route.id.as_deref();
|
||||
self.metrics.connection_opened(route_id);
|
||||
let ip_str = peer_addr.ip().to_string();
|
||||
self.metrics.record_http_request();
|
||||
self.metrics.connection_opened(route_id, Some(&ip_str));
|
||||
|
||||
// Apply request filters (IP check, rate limiting, auth)
|
||||
if let Some(ref security) = route_match.route.security {
|
||||
if let Some(response) = RequestFilter::apply(security, &req, &peer_addr) {
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(response);
|
||||
}
|
||||
}
|
||||
@@ -197,7 +199,7 @@ impl HttpProxyService {
|
||||
// Check for test response (returns immediately, no upstream needed)
|
||||
if let Some(ref advanced) = route_match.route.action.advanced {
|
||||
if let Some(ref test_response) = advanced.test_response {
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(Self::build_test_response(test_response));
|
||||
}
|
||||
}
|
||||
@@ -205,7 +207,7 @@ impl HttpProxyService {
|
||||
// Check for static file serving
|
||||
if let Some(ref advanced) = route_match.route.action.advanced {
|
||||
if let Some(ref static_files) = advanced.static_files {
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(Self::serve_static_file(&path, static_files));
|
||||
}
|
||||
}
|
||||
@@ -214,7 +216,7 @@ impl HttpProxyService {
|
||||
let target = match route_match.target {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "No target available"));
|
||||
}
|
||||
};
|
||||
@@ -232,7 +234,7 @@ impl HttpProxyService {
|
||||
|
||||
if is_websocket {
|
||||
let result = self.handle_websocket_upgrade(
|
||||
req, peer_addr, &upstream, route_match.route, route_id, &upstream_key, cancel,
|
||||
req, peer_addr, &upstream, route_match.route, route_id, &upstream_key, cancel, &ip_str,
|
||||
).await;
|
||||
// Note: for WebSocket, connection_ended is called inside
|
||||
// the spawned tunnel task when the connection closes.
|
||||
@@ -280,13 +282,13 @@ impl HttpProxyService {
|
||||
Ok(Err(e)) => {
|
||||
error!("Failed to connect to upstream {}:{}: {}", upstream.host, upstream.port, e);
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable"));
|
||||
}
|
||||
Err(_) => {
|
||||
error!("Upstream connect timeout for {}:{}", upstream.host, upstream.port);
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout"));
|
||||
}
|
||||
};
|
||||
@@ -296,10 +298,10 @@ impl HttpProxyService {
|
||||
|
||||
let result = if use_h2 {
|
||||
// HTTP/2 backend
|
||||
self.forward_h2(io, parts, body, upstream_headers, &upstream_path, &upstream, route_match.route, route_id).await
|
||||
self.forward_h2(io, parts, body, upstream_headers, &upstream_path, &upstream, route_match.route, route_id, &ip_str).await
|
||||
} else {
|
||||
// HTTP/1.1 backend (default)
|
||||
self.forward_h1(io, parts, body, upstream_headers, &upstream_path, &upstream, route_match.route, route_id).await
|
||||
self.forward_h1(io, parts, body, upstream_headers, &upstream_path, &upstream, route_match.route, route_id, &ip_str).await
|
||||
};
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
result
|
||||
@@ -316,12 +318,13 @@ impl HttpProxyService {
|
||||
upstream: &crate::upstream_selector::UpstreamSelection,
|
||||
route: &rustproxy_config::RouteConfig,
|
||||
route_id: Option<&str>,
|
||||
source_ip: &str,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let (mut sender, conn) = match hyper::client::conn::http1::handshake(io).await {
|
||||
Ok(h) => h,
|
||||
Err(e) => {
|
||||
error!("Upstream handshake failed: {}", e);
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend handshake failed"));
|
||||
}
|
||||
};
|
||||
@@ -351,6 +354,7 @@ impl HttpProxyService {
|
||||
body,
|
||||
Arc::clone(&self.metrics),
|
||||
route_id.map(|s| s.to_string()),
|
||||
Some(source_ip.to_string()),
|
||||
Direction::In,
|
||||
);
|
||||
|
||||
@@ -361,12 +365,12 @@ impl HttpProxyService {
|
||||
Ok(resp) => resp,
|
||||
Err(e) => {
|
||||
error!("Upstream request failed: {}", e);
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend request failed"));
|
||||
}
|
||||
};
|
||||
|
||||
self.build_streaming_response(upstream_response, route, route_id).await
|
||||
self.build_streaming_response(upstream_response, route, route_id, source_ip).await
|
||||
}
|
||||
|
||||
/// Forward request to backend via HTTP/2 with body streaming.
|
||||
@@ -380,13 +384,14 @@ impl HttpProxyService {
|
||||
upstream: &crate::upstream_selector::UpstreamSelection,
|
||||
route: &rustproxy_config::RouteConfig,
|
||||
route_id: Option<&str>,
|
||||
source_ip: &str,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let exec = hyper_util::rt::TokioExecutor::new();
|
||||
let (mut sender, conn) = match hyper::client::conn::http2::handshake(exec, io).await {
|
||||
Ok(h) => h,
|
||||
Err(e) => {
|
||||
error!("HTTP/2 upstream handshake failed: {}", e);
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 handshake failed"));
|
||||
}
|
||||
};
|
||||
@@ -415,6 +420,7 @@ impl HttpProxyService {
|
||||
body,
|
||||
Arc::clone(&self.metrics),
|
||||
route_id.map(|s| s.to_string()),
|
||||
Some(source_ip.to_string()),
|
||||
Direction::In,
|
||||
);
|
||||
|
||||
@@ -425,12 +431,12 @@ impl HttpProxyService {
|
||||
Ok(resp) => resp,
|
||||
Err(e) => {
|
||||
error!("HTTP/2 upstream request failed: {}", e);
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 request failed"));
|
||||
}
|
||||
};
|
||||
|
||||
self.build_streaming_response(upstream_response, route, route_id).await
|
||||
self.build_streaming_response(upstream_response, route, route_id, source_ip).await
|
||||
}
|
||||
|
||||
/// Build the client-facing response from an upstream response, streaming the body.
|
||||
@@ -443,6 +449,7 @@ impl HttpProxyService {
|
||||
upstream_response: Response<Incoming>,
|
||||
route: &rustproxy_config::RouteConfig,
|
||||
route_id: Option<&str>,
|
||||
source_ip: &str,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let (resp_parts, resp_body) = upstream_response.into_parts();
|
||||
|
||||
@@ -461,13 +468,14 @@ impl HttpProxyService {
|
||||
resp_body,
|
||||
Arc::clone(&self.metrics),
|
||||
route_id.map(|s| s.to_string()),
|
||||
Some(source_ip.to_string()),
|
||||
Direction::Out,
|
||||
);
|
||||
|
||||
// Close the connection metric now — the HTTP request/response cycle is done
|
||||
// from the proxy's perspective once we hand the streaming body to hyper.
|
||||
// Bytes will still be counted as they flow.
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
|
||||
let body: BoxBody<Bytes, hyper::Error> = BoxBody::new(counting_body);
|
||||
|
||||
@@ -484,6 +492,7 @@ impl HttpProxyService {
|
||||
route_id: Option<&str>,
|
||||
upstream_key: &str,
|
||||
cancel: CancellationToken,
|
||||
source_ip: &str,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
@@ -499,7 +508,7 @@ impl HttpProxyService {
|
||||
.unwrap_or("");
|
||||
if !allowed_origins.is_empty() && !allowed_origins.iter().any(|o| o == "*" || o == origin) {
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::FORBIDDEN, "Origin not allowed"));
|
||||
}
|
||||
}
|
||||
@@ -516,13 +525,13 @@ impl HttpProxyService {
|
||||
Ok(Err(e)) => {
|
||||
error!("WebSocket: failed to connect upstream {}:{}: {}", upstream.host, upstream.port, e);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable"));
|
||||
}
|
||||
Err(_) => {
|
||||
error!("WebSocket: upstream connect timeout for {}:{}", upstream.host, upstream.port);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout"));
|
||||
}
|
||||
};
|
||||
@@ -584,7 +593,7 @@ impl HttpProxyService {
|
||||
if let Err(e) = upstream_stream.write_all(raw_request.as_bytes()).await {
|
||||
error!("WebSocket: failed to send upgrade request to upstream: {}", e);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend write failed"));
|
||||
}
|
||||
|
||||
@@ -595,7 +604,7 @@ impl HttpProxyService {
|
||||
Ok(0) => {
|
||||
error!("WebSocket: upstream closed before completing handshake");
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend closed"));
|
||||
}
|
||||
Ok(_) => {
|
||||
@@ -609,14 +618,14 @@ impl HttpProxyService {
|
||||
if response_buf.len() > 8192 {
|
||||
error!("WebSocket: upstream response headers too large");
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend response too large"));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("WebSocket: failed to read upstream response: {}", e);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend read failed"));
|
||||
}
|
||||
}
|
||||
@@ -634,7 +643,7 @@ impl HttpProxyService {
|
||||
if status_code != 101 {
|
||||
debug!("WebSocket: upstream rejected upgrade with status {}", status_code);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(
|
||||
StatusCode::from_u16(status_code).unwrap_or(StatusCode::BAD_GATEWAY),
|
||||
"WebSocket upgrade rejected by backend",
|
||||
@@ -668,6 +677,7 @@ impl HttpProxyService {
|
||||
|
||||
let metrics = Arc::clone(&self.metrics);
|
||||
let route_id_owned = route_id.map(|s| s.to_string());
|
||||
let source_ip_owned = source_ip.to_string();
|
||||
let upstream_selector = self.upstream_selector.clone();
|
||||
let upstream_key_owned = upstream_key.to_string();
|
||||
|
||||
@@ -678,7 +688,7 @@ impl HttpProxyService {
|
||||
debug!("WebSocket: client upgrade failed: {}", e);
|
||||
upstream_selector.connection_ended(&upstream_key_owned);
|
||||
if let Some(ref rid) = route_id_owned {
|
||||
metrics.connection_closed(Some(rid.as_str()));
|
||||
metrics.connection_closed(Some(rid.as_str()), Some(&source_ip_owned));
|
||||
}
|
||||
return;
|
||||
}
|
||||
@@ -783,8 +793,8 @@ impl HttpProxyService {
|
||||
|
||||
upstream_selector.connection_ended(&upstream_key_owned);
|
||||
if let Some(ref rid) = route_id_owned {
|
||||
metrics.record_bytes(bytes_in, bytes_out, Some(rid.as_str()));
|
||||
metrics.connection_closed(Some(rid.as_str()));
|
||||
metrics.record_bytes(bytes_in, bytes_out, Some(rid.as_str()), Some(&source_ip_owned));
|
||||
metrics.connection_closed(Some(rid.as_str()), Some(&source_ip_owned));
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user