fix(rustproxy-http,rustproxy-metrics): reduce per-frame metrics overhead by batching body byte accounting
This commit is contained in:
@@ -9,10 +9,17 @@ use bytes::Bytes;
|
||||
use http_body::Frame;
|
||||
use rustproxy_metrics::MetricsCollector;
|
||||
|
||||
/// Flush accumulated bytes to the metrics collector every 64 KB.
|
||||
/// This reduces per-frame DashMap shard-locked reads from ~15 to ~1 per 4 frames
|
||||
/// (assuming typical 16 KB upload frames). The 1 Hz throughput sampler still sees
|
||||
/// data within one sampling period even at low transfer rates.
|
||||
const BYTE_FLUSH_THRESHOLD: u64 = 65_536;
|
||||
|
||||
/// Wraps any `http_body::Body` and counts data bytes passing through.
|
||||
///
|
||||
/// Each chunk is reported to the `MetricsCollector` immediately so that
|
||||
/// the throughput tracker (sampled at 1 Hz) reflects real-time data flow.
|
||||
/// Bytes are accumulated and flushed to the `MetricsCollector` every
|
||||
/// [`BYTE_FLUSH_THRESHOLD`] bytes (and on Drop) so the throughput tracker
|
||||
/// (sampled at 1 Hz) reflects real-time data flow without per-frame overhead.
|
||||
///
|
||||
/// The inner body is pinned on the heap to support `!Unpin` types like `hyper::body::Incoming`.
|
||||
pub struct CountingBody<B> {
|
||||
@@ -22,6 +29,8 @@ pub struct CountingBody<B> {
|
||||
source_ip: Option<String>,
|
||||
/// Whether we count bytes as "in" (request body) or "out" (response body).
|
||||
direction: Direction,
|
||||
/// Accumulated bytes not yet flushed to the metrics collector.
|
||||
pending_bytes: u64,
|
||||
/// Optional connection-level activity tracker. When set, poll_frame updates this
|
||||
/// to keep the idle watchdog alive during active body streaming (uploads/downloads).
|
||||
connection_activity: Option<Arc<AtomicU64>>,
|
||||
@@ -57,6 +66,7 @@ impl<B> CountingBody<B> {
|
||||
route_id,
|
||||
source_ip,
|
||||
direction,
|
||||
pending_bytes: 0,
|
||||
connection_activity: None,
|
||||
activity_start: None,
|
||||
active_requests: None,
|
||||
@@ -81,14 +91,19 @@ impl<B> CountingBody<B> {
|
||||
self
|
||||
}
|
||||
|
||||
/// Report a chunk of bytes immediately to the metrics collector.
|
||||
/// Flush accumulated bytes to the metrics collector.
|
||||
#[inline]
|
||||
fn report_chunk(&self, len: u64) {
|
||||
fn flush_pending(&mut self) {
|
||||
if self.pending_bytes == 0 {
|
||||
return;
|
||||
}
|
||||
let bytes = self.pending_bytes;
|
||||
self.pending_bytes = 0;
|
||||
let route_id = self.route_id.as_deref();
|
||||
let source_ip = self.source_ip.as_deref();
|
||||
match self.direction {
|
||||
Direction::In => self.metrics.record_bytes(len, 0, route_id, source_ip),
|
||||
Direction::Out => self.metrics.record_bytes(0, len, route_id, source_ip),
|
||||
Direction::In => self.metrics.record_bytes(bytes, 0, route_id, source_ip),
|
||||
Direction::Out => self.metrics.record_bytes(0, bytes, route_id, source_ip),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -113,9 +128,12 @@ where
|
||||
Poll::Ready(Some(Ok(frame))) => {
|
||||
if let Some(data) = frame.data_ref() {
|
||||
let len = data.len() as u64;
|
||||
// Report bytes immediately so the 1 Hz throughput sampler sees them
|
||||
this.report_chunk(len);
|
||||
// Keep the connection-level idle watchdog alive during body streaming
|
||||
this.pending_bytes += len;
|
||||
if this.pending_bytes >= BYTE_FLUSH_THRESHOLD {
|
||||
this.flush_pending();
|
||||
}
|
||||
// Keep the connection-level idle watchdog alive on every frame
|
||||
// (this is just one atomic store — cheap enough per-frame)
|
||||
if let (Some(activity), Some(start)) = (&this.connection_activity, &this.activity_start) {
|
||||
activity.store(start.elapsed().as_millis() as u64, Ordering::Relaxed);
|
||||
}
|
||||
@@ -123,7 +141,11 @@ where
|
||||
Poll::Ready(Some(Ok(frame)))
|
||||
}
|
||||
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Ready(None) => {
|
||||
// End of stream — flush any remaining bytes
|
||||
this.flush_pending();
|
||||
Poll::Ready(None)
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
@@ -139,6 +161,8 @@ where
|
||||
|
||||
impl<B> Drop for CountingBody<B> {
|
||||
fn drop(&mut self) {
|
||||
// Flush any remaining accumulated bytes so totals stay accurate
|
||||
self.flush_pending();
|
||||
// Decrement the active-request counter so the HTTP idle watchdog
|
||||
// knows this response body is no longer streaming.
|
||||
if let Some(ref counter) = self.active_requests {
|
||||
|
||||
Reference in New Issue
Block a user