feat(rust-core): add adaptive keepalive telemetry, MTU handling, and per-client rate limiting APIs
This commit is contained in:
@@ -1,87 +1,464 @@
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::{mpsc, watch};
|
||||
use tokio::time::{interval, timeout};
|
||||
use tracing::{debug, warn};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
/// Default keepalive interval (30 seconds).
|
||||
use crate::telemetry::{ConnectionQuality, RttTracker};
|
||||
|
||||
/// Default keepalive interval (30 seconds — used for Degraded state).
|
||||
pub const DEFAULT_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
|
||||
|
||||
/// Default keepalive ACK timeout (10 seconds).
|
||||
pub const DEFAULT_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
/// Default keepalive ACK timeout (5 seconds).
|
||||
pub const DEFAULT_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Link health states for adaptive keepalive.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum LinkHealth {
|
||||
/// RTT stable, jitter low, no loss. Interval: 60s.
|
||||
Healthy,
|
||||
/// Elevated jitter or occasional loss. Interval: 30s.
|
||||
Degraded,
|
||||
/// High loss or sustained jitter spike. Interval: 10s.
|
||||
Critical,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for LinkHealth {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Healthy => write!(f, "healthy"),
|
||||
Self::Degraded => write!(f, "degraded"),
|
||||
Self::Critical => write!(f, "critical"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Configuration for the adaptive keepalive state machine.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AdaptiveKeepaliveConfig {
|
||||
/// Interval when link health is Healthy.
|
||||
pub healthy_interval: Duration,
|
||||
/// Interval when link health is Degraded.
|
||||
pub degraded_interval: Duration,
|
||||
/// Interval when link health is Critical.
|
||||
pub critical_interval: Duration,
|
||||
/// ACK timeout (how long to wait for ACK before declaring timeout).
|
||||
pub ack_timeout: Duration,
|
||||
/// Jitter threshold (ms) to enter Degraded from Healthy.
|
||||
pub jitter_degraded_ms: f64,
|
||||
/// Jitter threshold (ms) to return to Healthy from Degraded.
|
||||
pub jitter_healthy_ms: f64,
|
||||
/// Loss ratio threshold to enter Degraded.
|
||||
pub loss_degraded: f64,
|
||||
/// Loss ratio threshold to enter Critical.
|
||||
pub loss_critical: f64,
|
||||
/// Loss ratio threshold to return from Critical to Degraded.
|
||||
pub loss_recover: f64,
|
||||
/// Loss ratio threshold to return from Degraded to Healthy.
|
||||
pub loss_healthy: f64,
|
||||
/// Consecutive checks required for upward state transitions (hysteresis).
|
||||
pub upgrade_checks: u32,
|
||||
/// Consecutive timeouts to declare peer dead in Critical state.
|
||||
pub dead_peer_timeouts: u32,
|
||||
}
|
||||
|
||||
impl Default for AdaptiveKeepaliveConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
healthy_interval: Duration::from_secs(60),
|
||||
degraded_interval: Duration::from_secs(30),
|
||||
critical_interval: Duration::from_secs(10),
|
||||
ack_timeout: Duration::from_secs(5),
|
||||
jitter_degraded_ms: 50.0,
|
||||
jitter_healthy_ms: 30.0,
|
||||
loss_degraded: 0.05,
|
||||
loss_critical: 0.20,
|
||||
loss_recover: 0.10,
|
||||
loss_healthy: 0.02,
|
||||
upgrade_checks: 3,
|
||||
dead_peer_timeouts: 3,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Signals from the keepalive monitor.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum KeepaliveSignal {
|
||||
/// Time to send a keepalive ping.
|
||||
SendPing,
|
||||
/// Peer is considered dead (no ACK received within timeout).
|
||||
/// Time to send a keepalive ping. Contains the timestamp (ms since epoch) to embed in payload.
|
||||
SendPing(u64),
|
||||
/// Peer is considered dead (no ACK received within timeout repeatedly).
|
||||
PeerDead,
|
||||
/// Link health state changed.
|
||||
LinkHealthChanged(LinkHealth),
|
||||
}
|
||||
|
||||
/// A keepalive monitor that emits signals on a channel.
|
||||
/// A keepalive monitor with adaptive interval and RTT tracking.
|
||||
pub struct KeepaliveMonitor {
|
||||
interval: Duration,
|
||||
timeout_duration: Duration,
|
||||
config: AdaptiveKeepaliveConfig,
|
||||
health: LinkHealth,
|
||||
rtt_tracker: RttTracker,
|
||||
signal_tx: mpsc::Sender<KeepaliveSignal>,
|
||||
ack_rx: mpsc::Receiver<()>,
|
||||
quality_tx: watch::Sender<ConnectionQuality>,
|
||||
consecutive_upgrade_checks: u32,
|
||||
}
|
||||
|
||||
/// Handle returned to the caller to send ACKs and receive signals.
|
||||
pub struct KeepaliveHandle {
|
||||
pub signal_rx: mpsc::Receiver<KeepaliveSignal>,
|
||||
pub ack_tx: mpsc::Sender<()>,
|
||||
pub quality_rx: watch::Receiver<ConnectionQuality>,
|
||||
}
|
||||
|
||||
/// Create a keepalive monitor and its handle.
|
||||
/// Create an adaptive keepalive monitor and its handle.
|
||||
pub fn create_keepalive(
|
||||
keepalive_interval: Option<Duration>,
|
||||
keepalive_timeout: Option<Duration>,
|
||||
config: Option<AdaptiveKeepaliveConfig>,
|
||||
) -> (KeepaliveMonitor, KeepaliveHandle) {
|
||||
let config = config.unwrap_or_default();
|
||||
let (signal_tx, signal_rx) = mpsc::channel(8);
|
||||
let (ack_tx, ack_rx) = mpsc::channel(8);
|
||||
let (quality_tx, quality_rx) = watch::channel(ConnectionQuality::default());
|
||||
|
||||
let monitor = KeepaliveMonitor {
|
||||
interval: keepalive_interval.unwrap_or(DEFAULT_KEEPALIVE_INTERVAL),
|
||||
timeout_duration: keepalive_timeout.unwrap_or(DEFAULT_KEEPALIVE_TIMEOUT),
|
||||
config,
|
||||
health: LinkHealth::Degraded, // start in Degraded, earn Healthy
|
||||
rtt_tracker: RttTracker::new(30),
|
||||
signal_tx,
|
||||
ack_rx,
|
||||
quality_tx,
|
||||
consecutive_upgrade_checks: 0,
|
||||
};
|
||||
|
||||
let handle = KeepaliveHandle { signal_rx, ack_tx };
|
||||
let handle = KeepaliveHandle {
|
||||
signal_rx,
|
||||
ack_tx,
|
||||
quality_rx,
|
||||
};
|
||||
|
||||
(monitor, handle)
|
||||
}
|
||||
|
||||
impl KeepaliveMonitor {
|
||||
fn current_interval(&self) -> Duration {
|
||||
match self.health {
|
||||
LinkHealth::Healthy => self.config.healthy_interval,
|
||||
LinkHealth::Degraded => self.config.degraded_interval,
|
||||
LinkHealth::Critical => self.config.critical_interval,
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the keepalive loop. Blocks until the peer is dead or channels close.
|
||||
pub async fn run(mut self) {
|
||||
let mut ticker = interval(self.interval);
|
||||
let mut ticker = interval(self.current_interval());
|
||||
ticker.tick().await; // skip first immediate tick
|
||||
|
||||
loop {
|
||||
ticker.tick().await;
|
||||
debug!("Sending keepalive ping signal");
|
||||
|
||||
if self.signal_tx.send(KeepaliveSignal::SendPing).await.is_err() {
|
||||
// Channel closed
|
||||
break;
|
||||
// Record ping sent, get timestamp for payload
|
||||
let timestamp_ms = self.rtt_tracker.mark_ping_sent();
|
||||
debug!("Sending keepalive ping (ts={})", timestamp_ms);
|
||||
|
||||
if self
|
||||
.signal_tx
|
||||
.send(KeepaliveSignal::SendPing(timestamp_ms))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break; // channel closed
|
||||
}
|
||||
|
||||
// Wait for ACK within timeout
|
||||
match timeout(self.timeout_duration, self.ack_rx.recv()).await {
|
||||
match timeout(self.config.ack_timeout, self.ack_rx.recv()).await {
|
||||
Ok(Some(())) => {
|
||||
debug!("Keepalive ACK received");
|
||||
if let Some(rtt) = self.rtt_tracker.record_ack(timestamp_ms) {
|
||||
debug!("Keepalive ACK received, RTT: {:?}", rtt);
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
// Channel closed
|
||||
break;
|
||||
break; // channel closed
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("Keepalive ACK timeout — peer considered dead");
|
||||
let _ = self.signal_tx.send(KeepaliveSignal::PeerDead).await;
|
||||
break;
|
||||
self.rtt_tracker.record_timeout();
|
||||
warn!(
|
||||
"Keepalive ACK timeout (consecutive: {})",
|
||||
self.rtt_tracker.consecutive_timeouts
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Publish quality snapshot
|
||||
let quality = self.rtt_tracker.snapshot();
|
||||
let _ = self.quality_tx.send(quality.clone());
|
||||
|
||||
// Evaluate state transition
|
||||
let new_health = self.evaluate_health(&quality);
|
||||
|
||||
if new_health != self.health {
|
||||
info!("Link health: {} -> {}", self.health, new_health);
|
||||
self.health = new_health;
|
||||
self.consecutive_upgrade_checks = 0;
|
||||
|
||||
// Reset ticker to new interval
|
||||
ticker = interval(self.current_interval());
|
||||
ticker.tick().await; // skip first immediate tick
|
||||
|
||||
let _ = self
|
||||
.signal_tx
|
||||
.send(KeepaliveSignal::LinkHealthChanged(new_health))
|
||||
.await;
|
||||
}
|
||||
|
||||
// Check for dead peer in Critical state
|
||||
if self.health == LinkHealth::Critical
|
||||
&& self.rtt_tracker.consecutive_timeouts >= self.config.dead_peer_timeouts
|
||||
{
|
||||
warn!("Peer considered dead after {} consecutive timeouts in Critical state",
|
||||
self.rtt_tracker.consecutive_timeouts);
|
||||
let _ = self.signal_tx.send(KeepaliveSignal::PeerDead).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn evaluate_health(&mut self, quality: &ConnectionQuality) -> LinkHealth {
|
||||
match self.health {
|
||||
LinkHealth::Healthy => {
|
||||
// Downgrade conditions
|
||||
if quality.consecutive_timeouts >= 2 || quality.loss_ratio > self.config.loss_critical {
|
||||
self.consecutive_upgrade_checks = 0;
|
||||
return LinkHealth::Critical;
|
||||
}
|
||||
if quality.jitter_ms > self.config.jitter_degraded_ms
|
||||
|| quality.loss_ratio > self.config.loss_degraded
|
||||
|| quality.consecutive_timeouts >= 1
|
||||
{
|
||||
self.consecutive_upgrade_checks = 0;
|
||||
return LinkHealth::Degraded;
|
||||
}
|
||||
LinkHealth::Healthy
|
||||
}
|
||||
LinkHealth::Degraded => {
|
||||
// Downgrade to Critical
|
||||
if quality.consecutive_timeouts >= 2 || quality.loss_ratio > self.config.loss_critical {
|
||||
self.consecutive_upgrade_checks = 0;
|
||||
return LinkHealth::Critical;
|
||||
}
|
||||
// Upgrade to Healthy (with hysteresis)
|
||||
if quality.jitter_ms < self.config.jitter_healthy_ms
|
||||
&& quality.loss_ratio < self.config.loss_healthy
|
||||
&& quality.consecutive_timeouts == 0
|
||||
{
|
||||
self.consecutive_upgrade_checks += 1;
|
||||
if self.consecutive_upgrade_checks >= self.config.upgrade_checks {
|
||||
self.consecutive_upgrade_checks = 0;
|
||||
return LinkHealth::Healthy;
|
||||
}
|
||||
} else {
|
||||
self.consecutive_upgrade_checks = 0;
|
||||
}
|
||||
LinkHealth::Degraded
|
||||
}
|
||||
LinkHealth::Critical => {
|
||||
// Upgrade to Degraded (with hysteresis), never directly to Healthy
|
||||
if quality.loss_ratio < self.config.loss_recover
|
||||
&& quality.consecutive_timeouts == 0
|
||||
{
|
||||
self.consecutive_upgrade_checks += 1;
|
||||
if self.consecutive_upgrade_checks >= 2 {
|
||||
self.consecutive_upgrade_checks = 0;
|
||||
return LinkHealth::Degraded;
|
||||
}
|
||||
} else {
|
||||
self.consecutive_upgrade_checks = 0;
|
||||
}
|
||||
LinkHealth::Critical
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn default_config_values() {
|
||||
let config = AdaptiveKeepaliveConfig::default();
|
||||
assert_eq!(config.healthy_interval, Duration::from_secs(60));
|
||||
assert_eq!(config.degraded_interval, Duration::from_secs(30));
|
||||
assert_eq!(config.critical_interval, Duration::from_secs(10));
|
||||
assert_eq!(config.ack_timeout, Duration::from_secs(5));
|
||||
assert_eq!(config.dead_peer_timeouts, 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn link_health_display() {
|
||||
assert_eq!(format!("{}", LinkHealth::Healthy), "healthy");
|
||||
assert_eq!(format!("{}", LinkHealth::Degraded), "degraded");
|
||||
assert_eq!(format!("{}", LinkHealth::Critical), "critical");
|
||||
}
|
||||
|
||||
// Helper to create a monitor for unit-testing evaluate_health
|
||||
fn make_test_monitor() -> KeepaliveMonitor {
|
||||
let (signal_tx, _signal_rx) = mpsc::channel(8);
|
||||
let (_ack_tx, ack_rx) = mpsc::channel(8);
|
||||
let (quality_tx, _quality_rx) = watch::channel(ConnectionQuality::default());
|
||||
|
||||
KeepaliveMonitor {
|
||||
config: AdaptiveKeepaliveConfig::default(),
|
||||
health: LinkHealth::Degraded,
|
||||
rtt_tracker: RttTracker::new(30),
|
||||
signal_tx,
|
||||
ack_rx,
|
||||
quality_tx,
|
||||
consecutive_upgrade_checks: 0,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn healthy_to_degraded_on_jitter() {
|
||||
let mut m = make_test_monitor();
|
||||
m.health = LinkHealth::Healthy;
|
||||
let q = ConnectionQuality {
|
||||
jitter_ms: 60.0, // > 50ms threshold
|
||||
..Default::default()
|
||||
};
|
||||
let result = m.evaluate_health(&q);
|
||||
assert_eq!(result, LinkHealth::Degraded);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn healthy_to_degraded_on_loss() {
|
||||
let mut m = make_test_monitor();
|
||||
m.health = LinkHealth::Healthy;
|
||||
let q = ConnectionQuality {
|
||||
loss_ratio: 0.06, // > 5% threshold
|
||||
..Default::default()
|
||||
};
|
||||
let result = m.evaluate_health(&q);
|
||||
assert_eq!(result, LinkHealth::Degraded);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn healthy_to_critical_on_high_loss() {
|
||||
let mut m = make_test_monitor();
|
||||
m.health = LinkHealth::Healthy;
|
||||
let q = ConnectionQuality {
|
||||
loss_ratio: 0.25, // > 20% threshold
|
||||
..Default::default()
|
||||
};
|
||||
let result = m.evaluate_health(&q);
|
||||
assert_eq!(result, LinkHealth::Critical);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn healthy_to_critical_on_consecutive_timeouts() {
|
||||
let mut m = make_test_monitor();
|
||||
m.health = LinkHealth::Healthy;
|
||||
let q = ConnectionQuality {
|
||||
consecutive_timeouts: 2,
|
||||
..Default::default()
|
||||
};
|
||||
let result = m.evaluate_health(&q);
|
||||
assert_eq!(result, LinkHealth::Critical);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn degraded_to_healthy_requires_hysteresis() {
|
||||
let mut m = make_test_monitor();
|
||||
m.health = LinkHealth::Degraded;
|
||||
let good_quality = ConnectionQuality {
|
||||
jitter_ms: 10.0,
|
||||
loss_ratio: 0.0,
|
||||
consecutive_timeouts: 0,
|
||||
srtt_ms: 20.0,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// Should require 3 consecutive good checks (default upgrade_checks)
|
||||
assert_eq!(m.evaluate_health(&good_quality), LinkHealth::Degraded);
|
||||
assert_eq!(m.consecutive_upgrade_checks, 1);
|
||||
assert_eq!(m.evaluate_health(&good_quality), LinkHealth::Degraded);
|
||||
assert_eq!(m.consecutive_upgrade_checks, 2);
|
||||
assert_eq!(m.evaluate_health(&good_quality), LinkHealth::Healthy);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn degraded_to_healthy_resets_on_bad_check() {
|
||||
let mut m = make_test_monitor();
|
||||
m.health = LinkHealth::Degraded;
|
||||
let good = ConnectionQuality {
|
||||
jitter_ms: 10.0,
|
||||
loss_ratio: 0.0,
|
||||
consecutive_timeouts: 0,
|
||||
..Default::default()
|
||||
};
|
||||
let bad = ConnectionQuality {
|
||||
jitter_ms: 60.0, // too high
|
||||
loss_ratio: 0.0,
|
||||
consecutive_timeouts: 0,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
m.evaluate_health(&good); // 1 check
|
||||
m.evaluate_health(&good); // 2 checks
|
||||
m.evaluate_health(&bad); // resets
|
||||
assert_eq!(m.consecutive_upgrade_checks, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn critical_to_degraded_requires_hysteresis() {
|
||||
let mut m = make_test_monitor();
|
||||
m.health = LinkHealth::Critical;
|
||||
let recovering = ConnectionQuality {
|
||||
loss_ratio: 0.05, // < 10% recover threshold
|
||||
consecutive_timeouts: 0,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
assert_eq!(m.evaluate_health(&recovering), LinkHealth::Critical);
|
||||
assert_eq!(m.consecutive_upgrade_checks, 1);
|
||||
assert_eq!(m.evaluate_health(&recovering), LinkHealth::Degraded);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn critical_never_directly_to_healthy() {
|
||||
let mut m = make_test_monitor();
|
||||
m.health = LinkHealth::Critical;
|
||||
let perfect = ConnectionQuality {
|
||||
jitter_ms: 1.0,
|
||||
loss_ratio: 0.0,
|
||||
consecutive_timeouts: 0,
|
||||
srtt_ms: 10.0,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// Even with perfect quality, must go through Degraded first
|
||||
m.evaluate_health(&perfect); // 1
|
||||
let result = m.evaluate_health(&perfect); // 2 → Degraded
|
||||
assert_eq!(result, LinkHealth::Degraded);
|
||||
// Not Healthy yet
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn degraded_to_critical_on_high_loss() {
|
||||
let mut m = make_test_monitor();
|
||||
m.health = LinkHealth::Degraded;
|
||||
let q = ConnectionQuality {
|
||||
loss_ratio: 0.25,
|
||||
..Default::default()
|
||||
};
|
||||
assert_eq!(m.evaluate_health(&q), LinkHealth::Critical);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn interval_matches_health() {
|
||||
let mut m = make_test_monitor();
|
||||
m.health = LinkHealth::Healthy;
|
||||
assert_eq!(m.current_interval(), Duration::from_secs(60));
|
||||
m.health = LinkHealth::Degraded;
|
||||
assert_eq!(m.current_interval(), Duration::from_secs(30));
|
||||
m.health = LinkHealth::Critical;
|
||||
assert_eq!(m.current_interval(), Duration::from_secs(10));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user