//! A body wrapper that counts bytes flowing through and reports them to MetricsCollector. use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::task::{Context, Poll}; use bytes::Bytes; use http_body::Frame; use rustproxy_metrics::MetricsCollector; /// Flush accumulated bytes to the metrics collector every 64 KB. /// This reduces per-frame DashMap shard-locked reads from ~15 to ~1 per 4 frames /// (assuming typical 16 KB upload frames). The 1 Hz throughput sampler still sees /// data within one sampling period even at low transfer rates. const BYTE_FLUSH_THRESHOLD: u64 = 65_536; /// Wraps any `http_body::Body` and counts data bytes passing through. /// /// Bytes are accumulated and flushed to the `MetricsCollector` every /// [`BYTE_FLUSH_THRESHOLD`] bytes (and on Drop) so the throughput tracker /// (sampled at 1 Hz) reflects real-time data flow without per-frame overhead. /// /// The inner body is pinned on the heap to support `!Unpin` types like `hyper::body::Incoming`. pub struct CountingBody { inner: Pin>, metrics: Arc, route_id: Option>, source_ip: Option>, /// Whether we count bytes as "in" (request body) or "out" (response body). direction: Direction, /// Accumulated bytes not yet flushed to the metrics collector. pending_bytes: u64, /// Optional connection-level activity tracker. When set, poll_frame updates this /// to keep the idle watchdog alive during active body streaming (uploads/downloads). connection_activity: Option>, /// Start instant for computing elapsed ms for connection_activity. activity_start: Option, /// Optional active-request counter. When set, CountingBody increments on creation /// and decrements on Drop, keeping the HTTP idle watchdog aware that a response /// body is still streaming (even after the request handler has returned). active_requests: Option>, } /// Which direction the bytes flow. #[derive(Clone, Copy)] pub enum Direction { /// Request body: bytes flowing from client → upstream (counted as bytes_in) In, /// Response body: bytes flowing from upstream → client (counted as bytes_out) Out, } impl CountingBody { /// Create a new CountingBody wrapping an inner body. pub fn new( inner: B, metrics: Arc, route_id: Option>, source_ip: Option>, direction: Direction, ) -> Self { Self { inner: Box::pin(inner), metrics, route_id, source_ip, direction, pending_bytes: 0, connection_activity: None, activity_start: None, active_requests: None, } } /// Set the connection-level activity tracker. When set, each data frame /// updates this timestamp to prevent the idle watchdog from killing the /// connection during active body streaming. pub fn with_connection_activity(mut self, activity: Arc, start: std::time::Instant) -> Self { self.connection_activity = Some(activity); self.activity_start = Some(start); self } /// Set the active-request counter for the HTTP idle watchdog. /// CountingBody increments on creation and decrements on Drop, ensuring the /// idle watchdog sees an "active request" while the response body streams. pub fn with_active_requests(mut self, counter: Arc) -> Self { counter.fetch_add(1, Ordering::Relaxed); self.active_requests = Some(counter); self } /// Flush accumulated bytes to the metrics collector. #[inline] fn flush_pending(&mut self) { if self.pending_bytes == 0 { return; } let bytes = self.pending_bytes; self.pending_bytes = 0; let route_id = self.route_id.as_deref(); let source_ip = self.source_ip.as_deref(); match self.direction { Direction::In => self.metrics.record_bytes(bytes, 0, route_id, source_ip), Direction::Out => self.metrics.record_bytes(0, bytes, route_id, source_ip), } } } // CountingBody is Unpin because inner is Pin> (always Unpin). impl Unpin for CountingBody {} impl http_body::Body for CountingBody where B: http_body::Body, { type Data = Bytes; type Error = B::Error; fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>>> { let this = self.get_mut(); match this.inner.as_mut().poll_frame(cx) { Poll::Ready(Some(Ok(frame))) => { if let Some(data) = frame.data_ref() { let len = data.len() as u64; this.pending_bytes += len; if this.pending_bytes >= BYTE_FLUSH_THRESHOLD { this.flush_pending(); } // Keep the connection-level idle watchdog alive on every frame // (this is just one atomic store — cheap enough per-frame) if let (Some(activity), Some(start)) = (&this.connection_activity, &this.activity_start) { activity.store(start.elapsed().as_millis() as u64, Ordering::Relaxed); } } Poll::Ready(Some(Ok(frame))) } Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), Poll::Ready(None) => { // End of stream — flush any remaining bytes this.flush_pending(); Poll::Ready(None) } Poll::Pending => Poll::Pending, } } fn is_end_stream(&self) -> bool { self.inner.is_end_stream() } fn size_hint(&self) -> http_body::SizeHint { self.inner.size_hint() } } impl Drop for CountingBody { fn drop(&mut self) { // Flush any remaining accumulated bytes so totals stay accurate self.flush_pending(); // Decrement the active-request counter so the HTTP idle watchdog // knows this response body is no longer streaming. if let Some(ref counter) = self.active_requests { counter.fetch_sub(1, Ordering::Relaxed); } } }