Files
smartproxy/rust/crates/rustproxy-metrics/src/throughput.rs

233 lines
7.1 KiB
Rust
Raw Normal View History

use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
/// A single throughput sample.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ThroughputSample {
pub timestamp_ms: u64,
pub bytes_in: u64,
pub bytes_out: u64,
}
/// Circular buffer for 1Hz throughput sampling.
/// Matches smartproxy's ThroughputTracker.
pub struct ThroughputTracker {
/// Circular buffer of samples
samples: Vec<ThroughputSample>,
/// Current write index
write_index: usize,
/// Number of valid samples
count: usize,
/// Maximum number of samples to retain
capacity: usize,
/// Accumulated bytes since last sample
pending_bytes_in: AtomicU64,
pending_bytes_out: AtomicU64,
/// When the tracker was created
created_at: Instant,
}
impl ThroughputTracker {
/// Create a new tracker with the given capacity (seconds of retention).
pub fn new(retention_seconds: usize) -> Self {
Self {
samples: Vec::with_capacity(retention_seconds),
write_index: 0,
count: 0,
capacity: retention_seconds,
pending_bytes_in: AtomicU64::new(0),
pending_bytes_out: AtomicU64::new(0),
created_at: Instant::now(),
}
}
/// Record bytes (called from data flow callbacks).
pub fn record_bytes(&self, bytes_in: u64, bytes_out: u64) {
self.pending_bytes_in.fetch_add(bytes_in, Ordering::Relaxed);
self.pending_bytes_out.fetch_add(bytes_out, Ordering::Relaxed);
}
/// Take a sample (called at 1Hz).
pub fn sample(&mut self) {
let bytes_in = self.pending_bytes_in.swap(0, Ordering::Relaxed);
let bytes_out = self.pending_bytes_out.swap(0, Ordering::Relaxed);
let timestamp_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let sample = ThroughputSample {
timestamp_ms,
bytes_in,
bytes_out,
};
if self.samples.len() < self.capacity {
self.samples.push(sample);
} else {
self.samples[self.write_index] = sample;
}
self.write_index = (self.write_index + 1) % self.capacity;
self.count = (self.count + 1).min(self.capacity);
}
/// Get throughput over the last N seconds.
pub fn throughput(&self, window_seconds: usize) -> (u64, u64) {
let window = window_seconds.min(self.count);
if window == 0 {
return (0, 0);
}
let mut total_in = 0u64;
let mut total_out = 0u64;
for i in 0..window {
let idx = if self.write_index >= i + 1 {
self.write_index - i - 1
} else {
self.capacity - (i + 1 - self.write_index)
};
if idx < self.samples.len() {
total_in += self.samples[idx].bytes_in;
total_out += self.samples[idx].bytes_out;
}
}
(total_in / window as u64, total_out / window as u64)
}
/// Get instant throughput (last 1 second).
pub fn instant(&self) -> (u64, u64) {
self.throughput(1)
}
/// Get recent throughput (last 10 seconds).
pub fn recent(&self) -> (u64, u64) {
self.throughput(10)
}
/// Return the last N samples in chronological order (oldest first).
pub fn history(&self, window_seconds: usize) -> Vec<ThroughputSample> {
let window = window_seconds.min(self.count);
if window == 0 {
return Vec::new();
}
let mut result = Vec::with_capacity(window);
for i in 0..window {
let idx = if self.write_index >= i + 1 {
self.write_index - i - 1
} else {
self.capacity - (i + 1 - self.write_index)
};
if idx < self.samples.len() {
result.push(self.samples[idx]);
}
}
result.reverse(); // Return oldest-first (chronological)
result
}
/// How long this tracker has been alive.
pub fn uptime(&self) -> std::time::Duration {
self.created_at.elapsed()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_empty_throughput() {
let tracker = ThroughputTracker::new(60);
let (bytes_in, bytes_out) = tracker.throughput(10);
assert_eq!(bytes_in, 0);
assert_eq!(bytes_out, 0);
}
#[test]
fn test_single_sample() {
let mut tracker = ThroughputTracker::new(60);
tracker.record_bytes(1000, 2000);
tracker.sample();
let (bytes_in, bytes_out) = tracker.instant();
assert_eq!(bytes_in, 1000);
assert_eq!(bytes_out, 2000);
}
#[test]
fn test_circular_buffer_wrap() {
let mut tracker = ThroughputTracker::new(3); // Small capacity
for i in 0..5 {
tracker.record_bytes(i * 100, i * 200);
tracker.sample();
}
// Should still work after wrapping
let (bytes_in, bytes_out) = tracker.throughput(3);
assert!(bytes_in > 0);
assert!(bytes_out > 0);
}
#[test]
fn test_window_averaging() {
let mut tracker = ThroughputTracker::new(60);
// Record 3 samples of different sizes
tracker.record_bytes(100, 200);
tracker.sample();
tracker.record_bytes(200, 400);
tracker.sample();
tracker.record_bytes(300, 600);
tracker.sample();
// Average over 3 samples: (100+200+300)/3 = 200, (200+400+600)/3 = 400
let (avg_in, avg_out) = tracker.throughput(3);
assert_eq!(avg_in, 200);
assert_eq!(avg_out, 400);
}
#[test]
fn test_uptime_positive() {
let tracker = ThroughputTracker::new(60);
std::thread::sleep(std::time::Duration::from_millis(10));
assert!(tracker.uptime().as_millis() >= 10);
}
#[test]
fn test_history_returns_chronological() {
let mut tracker = ThroughputTracker::new(60);
for i in 1..=5 {
tracker.record_bytes(i * 100, i * 200);
tracker.sample();
}
let history = tracker.history(5);
assert_eq!(history.len(), 5);
// First sample should have 100 bytes_in, last should have 500
assert_eq!(history[0].bytes_in, 100);
assert_eq!(history[4].bytes_in, 500);
}
#[test]
fn test_history_wraps_around() {
let mut tracker = ThroughputTracker::new(3); // Small capacity
for i in 1..=5 {
tracker.record_bytes(i * 100, i * 200);
tracker.sample();
}
// Only last 3 should be retained
let history = tracker.history(10); // Ask for more than available
assert_eq!(history.len(), 3);
assert_eq!(history[0].bytes_in, 300);
assert_eq!(history[1].bytes_in, 400);
assert_eq!(history[2].bytes_in, 500);
}
#[test]
fn test_history_empty() {
let tracker = ThroughputTracker::new(60);
let history = tracker.history(10);
assert!(history.is_empty());
}
}