Files
smartvpn/rust/src/telemetry.rs

318 lines
9.7 KiB
Rust

use serde::Serialize;
use std::collections::VecDeque;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
/// A single RTT sample.
#[derive(Debug, Clone)]
struct RttSample {
_rtt: Duration,
_timestamp: Instant,
was_timeout: bool,
}
/// Snapshot of connection quality metrics.
#[derive(Debug, Clone, Serialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct ConnectionQuality {
/// Smoothed RTT in milliseconds (EMA, RFC 6298 style).
pub srtt_ms: f64,
/// Jitter in milliseconds (mean deviation of RTT).
pub jitter_ms: f64,
/// Minimum RTT observed in the sample window.
pub min_rtt_ms: f64,
/// Maximum RTT observed in the sample window.
pub max_rtt_ms: f64,
/// Packet loss ratio over the sample window (0.0 - 1.0).
pub loss_ratio: f64,
/// Number of consecutive keepalive timeouts (0 if last succeeded).
pub consecutive_timeouts: u32,
/// Total keepalives sent.
pub keepalives_sent: u64,
/// Total keepalive ACKs received.
pub keepalives_acked: u64,
}
/// Tracks connection quality from keepalive round-trips.
pub struct RttTracker {
/// Maximum number of samples to keep in the window.
max_samples: usize,
/// Recent RTT samples (including timeout markers).
samples: VecDeque<RttSample>,
/// When the last keepalive was sent (for computing RTT on ACK).
pending_ping_sent_at: Option<Instant>,
/// Number of consecutive keepalive timeouts.
pub consecutive_timeouts: u32,
/// Smoothed RTT (EMA).
srtt: Option<f64>,
/// Jitter (mean deviation).
jitter: f64,
/// Minimum RTT observed.
min_rtt: f64,
/// Maximum RTT observed.
max_rtt: f64,
/// Total keepalives sent.
keepalives_sent: u64,
/// Total keepalive ACKs received.
keepalives_acked: u64,
/// Previous RTT sample for jitter calculation.
last_rtt_ms: Option<f64>,
}
impl RttTracker {
/// Create a new tracker with the given window size.
pub fn new(max_samples: usize) -> Self {
Self {
max_samples,
samples: VecDeque::with_capacity(max_samples),
pending_ping_sent_at: None,
consecutive_timeouts: 0,
srtt: None,
jitter: 0.0,
min_rtt: f64::MAX,
max_rtt: 0.0,
keepalives_sent: 0,
keepalives_acked: 0,
last_rtt_ms: None,
}
}
/// Record that a keepalive was sent.
/// Returns a millisecond timestamp (since UNIX epoch) to embed in the keepalive payload.
pub fn mark_ping_sent(&mut self) -> u64 {
self.pending_ping_sent_at = Some(Instant::now());
self.keepalives_sent += 1;
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
/// Record that a keepalive ACK was received with the echoed timestamp.
/// Returns the computed RTT if a pending ping was recorded.
pub fn record_ack(&mut self, _echoed_timestamp_ms: u64) -> Option<Duration> {
let sent_at = self.pending_ping_sent_at.take()?;
let rtt = sent_at.elapsed();
let rtt_ms = rtt.as_secs_f64() * 1000.0;
self.keepalives_acked += 1;
self.consecutive_timeouts = 0;
// Update SRTT (RFC 6298: alpha = 1/8)
match self.srtt {
None => {
self.srtt = Some(rtt_ms);
self.jitter = rtt_ms / 2.0;
}
Some(prev_srtt) => {
// RTTVAR = (1 - beta) * RTTVAR + beta * |SRTT - R| (beta = 1/4)
self.jitter = 0.75 * self.jitter + 0.25 * (prev_srtt - rtt_ms).abs();
// SRTT = (1 - alpha) * SRTT + alpha * R (alpha = 1/8)
self.srtt = Some(0.875 * prev_srtt + 0.125 * rtt_ms);
}
}
// Update min/max
if rtt_ms < self.min_rtt {
self.min_rtt = rtt_ms;
}
if rtt_ms > self.max_rtt {
self.max_rtt = rtt_ms;
}
self.last_rtt_ms = Some(rtt_ms);
// Push sample into window
if self.samples.len() >= self.max_samples {
self.samples.pop_front();
}
self.samples.push_back(RttSample {
_rtt: rtt,
_timestamp: Instant::now(),
was_timeout: false,
});
Some(rtt)
}
/// Record that a keepalive timed out (no ACK received).
pub fn record_timeout(&mut self) {
self.consecutive_timeouts += 1;
self.pending_ping_sent_at = None;
if self.samples.len() >= self.max_samples {
self.samples.pop_front();
}
self.samples.push_back(RttSample {
_rtt: Duration::ZERO,
_timestamp: Instant::now(),
was_timeout: true,
});
}
/// Get a snapshot of the current connection quality.
pub fn snapshot(&self) -> ConnectionQuality {
let loss_ratio = if self.samples.is_empty() {
0.0
} else {
let timeouts = self.samples.iter().filter(|s| s.was_timeout).count();
timeouts as f64 / self.samples.len() as f64
};
ConnectionQuality {
srtt_ms: self.srtt.unwrap_or(0.0),
jitter_ms: self.jitter,
min_rtt_ms: if self.min_rtt == f64::MAX { 0.0 } else { self.min_rtt },
max_rtt_ms: self.max_rtt,
loss_ratio,
consecutive_timeouts: self.consecutive_timeouts,
keepalives_sent: self.keepalives_sent,
keepalives_acked: self.keepalives_acked,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_tracker_has_zero_quality() {
let tracker = RttTracker::new(30);
let q = tracker.snapshot();
assert_eq!(q.srtt_ms, 0.0);
assert_eq!(q.jitter_ms, 0.0);
assert_eq!(q.loss_ratio, 0.0);
assert_eq!(q.consecutive_timeouts, 0);
assert_eq!(q.keepalives_sent, 0);
assert_eq!(q.keepalives_acked, 0);
}
#[test]
fn mark_ping_returns_timestamp() {
let mut tracker = RttTracker::new(30);
let ts = tracker.mark_ping_sent();
// Should be a reasonable epoch-ms value (after 2020)
assert!(ts > 1_577_836_800_000);
assert_eq!(tracker.keepalives_sent, 1);
}
#[test]
fn record_ack_computes_rtt() {
let mut tracker = RttTracker::new(30);
let ts = tracker.mark_ping_sent();
std::thread::sleep(Duration::from_millis(5));
let rtt = tracker.record_ack(ts);
assert!(rtt.is_some());
let rtt = rtt.unwrap();
assert!(rtt.as_millis() >= 4); // at least ~5ms minus scheduling jitter
assert_eq!(tracker.keepalives_acked, 1);
assert_eq!(tracker.consecutive_timeouts, 0);
}
#[test]
fn record_ack_without_pending_returns_none() {
let mut tracker = RttTracker::new(30);
assert!(tracker.record_ack(12345).is_none());
}
#[test]
fn srtt_converges() {
let mut tracker = RttTracker::new(30);
// Simulate several ping/ack cycles with ~10ms RTT
for _ in 0..10 {
let ts = tracker.mark_ping_sent();
std::thread::sleep(Duration::from_millis(10));
tracker.record_ack(ts);
}
let q = tracker.snapshot();
// SRTT should be roughly 10ms (allowing for scheduling variance)
assert!(q.srtt_ms > 5.0, "SRTT too low: {}", q.srtt_ms);
assert!(q.srtt_ms < 50.0, "SRTT too high: {}", q.srtt_ms);
}
#[test]
fn timeout_increments_counter_and_loss() {
let mut tracker = RttTracker::new(30);
tracker.mark_ping_sent();
tracker.record_timeout();
assert_eq!(tracker.consecutive_timeouts, 1);
tracker.mark_ping_sent();
tracker.record_timeout();
assert_eq!(tracker.consecutive_timeouts, 2);
let q = tracker.snapshot();
assert_eq!(q.loss_ratio, 1.0); // 2 timeouts out of 2 samples
}
#[test]
fn ack_resets_consecutive_timeouts() {
let mut tracker = RttTracker::new(30);
tracker.mark_ping_sent();
tracker.record_timeout();
assert_eq!(tracker.consecutive_timeouts, 1);
let ts = tracker.mark_ping_sent();
tracker.record_ack(ts);
assert_eq!(tracker.consecutive_timeouts, 0);
}
#[test]
fn loss_ratio_over_mixed_window() {
let mut tracker = RttTracker::new(30);
// 3 successful, 1 timeout, 1 successful = 1/5 = 0.2 loss
for _ in 0..3 {
let ts = tracker.mark_ping_sent();
tracker.record_ack(ts);
}
tracker.mark_ping_sent();
tracker.record_timeout();
let ts = tracker.mark_ping_sent();
tracker.record_ack(ts);
let q = tracker.snapshot();
assert!((q.loss_ratio - 0.2).abs() < 0.01);
}
#[test]
fn window_evicts_old_samples() {
let mut tracker = RttTracker::new(5);
// Fill window with 5 timeouts
for _ in 0..5 {
tracker.mark_ping_sent();
tracker.record_timeout();
}
assert_eq!(tracker.snapshot().loss_ratio, 1.0);
// Add 5 successes — should evict all timeouts
for _ in 0..5 {
let ts = tracker.mark_ping_sent();
tracker.record_ack(ts);
}
assert_eq!(tracker.snapshot().loss_ratio, 0.0);
}
#[test]
fn min_max_rtt_tracked() {
let mut tracker = RttTracker::new(30);
let ts = tracker.mark_ping_sent();
std::thread::sleep(Duration::from_millis(5));
tracker.record_ack(ts);
let ts = tracker.mark_ping_sent();
std::thread::sleep(Duration::from_millis(15));
tracker.record_ack(ts);
let q = tracker.snapshot();
assert!(q.min_rtt_ms < q.max_rtt_ms);
assert!(q.min_rtt_ms > 0.0);
}
}