149 lines
5.3 KiB
Rust
149 lines
5.3 KiB
Rust
//! A body wrapper that counts bytes flowing through and reports them to MetricsCollector.
|
|
|
|
use std::pin::Pin;
|
|
use std::sync::Arc;
|
|
use std::sync::atomic::{AtomicU64, Ordering};
|
|
use std::task::{Context, Poll};
|
|
|
|
use bytes::Bytes;
|
|
use http_body::Frame;
|
|
use rustproxy_metrics::MetricsCollector;
|
|
|
|
/// Wraps any `http_body::Body` and counts data bytes passing through.
|
|
///
|
|
/// Each chunk is reported to the `MetricsCollector` immediately so that
|
|
/// the throughput tracker (sampled at 1 Hz) reflects real-time data flow.
|
|
///
|
|
/// The inner body is pinned on the heap to support `!Unpin` types like `hyper::body::Incoming`.
|
|
pub struct CountingBody<B> {
|
|
inner: Pin<Box<B>>,
|
|
metrics: Arc<MetricsCollector>,
|
|
route_id: Option<String>,
|
|
source_ip: Option<String>,
|
|
/// Whether we count bytes as "in" (request body) or "out" (response body).
|
|
direction: Direction,
|
|
/// Optional connection-level activity tracker. When set, poll_frame updates this
|
|
/// to keep the idle watchdog alive during active body streaming (uploads/downloads).
|
|
connection_activity: Option<Arc<AtomicU64>>,
|
|
/// Start instant for computing elapsed ms for connection_activity.
|
|
activity_start: Option<std::time::Instant>,
|
|
/// Optional active-request counter. When set, CountingBody increments on creation
|
|
/// and decrements on Drop, keeping the HTTP idle watchdog aware that a response
|
|
/// body is still streaming (even after the request handler has returned).
|
|
active_requests: Option<Arc<AtomicU64>>,
|
|
}
|
|
|
|
/// Which direction the bytes flow.
|
|
#[derive(Clone, Copy)]
|
|
pub enum Direction {
|
|
/// Request body: bytes flowing from client → upstream (counted as bytes_in)
|
|
In,
|
|
/// Response body: bytes flowing from upstream → client (counted as bytes_out)
|
|
Out,
|
|
}
|
|
|
|
impl<B> CountingBody<B> {
|
|
/// Create a new CountingBody wrapping an inner body.
|
|
pub fn new(
|
|
inner: B,
|
|
metrics: Arc<MetricsCollector>,
|
|
route_id: Option<String>,
|
|
source_ip: Option<String>,
|
|
direction: Direction,
|
|
) -> Self {
|
|
Self {
|
|
inner: Box::pin(inner),
|
|
metrics,
|
|
route_id,
|
|
source_ip,
|
|
direction,
|
|
connection_activity: None,
|
|
activity_start: None,
|
|
active_requests: None,
|
|
}
|
|
}
|
|
|
|
/// Set the connection-level activity tracker. When set, each data frame
|
|
/// updates this timestamp to prevent the idle watchdog from killing the
|
|
/// connection during active body streaming.
|
|
pub fn with_connection_activity(mut self, activity: Arc<AtomicU64>, start: std::time::Instant) -> Self {
|
|
self.connection_activity = Some(activity);
|
|
self.activity_start = Some(start);
|
|
self
|
|
}
|
|
|
|
/// Set the active-request counter for the HTTP idle watchdog.
|
|
/// CountingBody increments on creation and decrements on Drop, ensuring the
|
|
/// idle watchdog sees an "active request" while the response body streams.
|
|
pub fn with_active_requests(mut self, counter: Arc<AtomicU64>) -> Self {
|
|
counter.fetch_add(1, Ordering::Relaxed);
|
|
self.active_requests = Some(counter);
|
|
self
|
|
}
|
|
|
|
/// Report a chunk of bytes immediately to the metrics collector.
|
|
#[inline]
|
|
fn report_chunk(&self, len: u64) {
|
|
let route_id = self.route_id.as_deref();
|
|
let source_ip = self.source_ip.as_deref();
|
|
match self.direction {
|
|
Direction::In => self.metrics.record_bytes(len, 0, route_id, source_ip),
|
|
Direction::Out => self.metrics.record_bytes(0, len, route_id, source_ip),
|
|
}
|
|
}
|
|
}
|
|
|
|
// CountingBody is Unpin because inner is Pin<Box<B>> (always Unpin).
|
|
impl<B> Unpin for CountingBody<B> {}
|
|
|
|
impl<B> http_body::Body for CountingBody<B>
|
|
where
|
|
B: http_body::Body<Data = Bytes>,
|
|
{
|
|
type Data = Bytes;
|
|
type Error = B::Error;
|
|
|
|
fn poll_frame(
|
|
self: Pin<&mut Self>,
|
|
cx: &mut Context<'_>,
|
|
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
|
|
let this = self.get_mut();
|
|
|
|
match this.inner.as_mut().poll_frame(cx) {
|
|
Poll::Ready(Some(Ok(frame))) => {
|
|
if let Some(data) = frame.data_ref() {
|
|
let len = data.len() as u64;
|
|
// Report bytes immediately so the 1 Hz throughput sampler sees them
|
|
this.report_chunk(len);
|
|
// Keep the connection-level idle watchdog alive during body streaming
|
|
if let (Some(activity), Some(start)) = (&this.connection_activity, &this.activity_start) {
|
|
activity.store(start.elapsed().as_millis() as u64, Ordering::Relaxed);
|
|
}
|
|
}
|
|
Poll::Ready(Some(Ok(frame)))
|
|
}
|
|
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
|
|
Poll::Ready(None) => Poll::Ready(None),
|
|
Poll::Pending => Poll::Pending,
|
|
}
|
|
}
|
|
|
|
fn is_end_stream(&self) -> bool {
|
|
self.inner.is_end_stream()
|
|
}
|
|
|
|
fn size_hint(&self) -> http_body::SizeHint {
|
|
self.inner.size_hint()
|
|
}
|
|
}
|
|
|
|
impl<B> Drop for CountingBody<B> {
|
|
fn drop(&mut self) {
|
|
// Decrement the active-request counter so the HTTP idle watchdog
|
|
// knows this response body is no longer streaming.
|
|
if let Some(ref counter) = self.active_requests {
|
|
counter.fetch_sub(1, Ordering::Relaxed);
|
|
}
|
|
}
|
|
}
|