465 lines
16 KiB
Rust
465 lines
16 KiB
Rust
use std::time::Duration;
|
|
use tokio::sync::{mpsc, watch};
|
|
use tokio::time::{interval, timeout};
|
|
use tracing::{debug, info, warn};
|
|
|
|
use crate::telemetry::{ConnectionQuality, RttTracker};
|
|
|
|
/// Default keepalive interval (30 seconds — used for Degraded state).
|
|
pub const DEFAULT_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
|
|
|
|
/// Default keepalive ACK timeout (5 seconds).
|
|
pub const DEFAULT_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(5);
|
|
|
|
/// Link health states for adaptive keepalive.
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
pub enum LinkHealth {
|
|
/// RTT stable, jitter low, no loss. Interval: 60s.
|
|
Healthy,
|
|
/// Elevated jitter or occasional loss. Interval: 30s.
|
|
Degraded,
|
|
/// High loss or sustained jitter spike. Interval: 10s.
|
|
Critical,
|
|
}
|
|
|
|
impl std::fmt::Display for LinkHealth {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
match self {
|
|
Self::Healthy => write!(f, "healthy"),
|
|
Self::Degraded => write!(f, "degraded"),
|
|
Self::Critical => write!(f, "critical"),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Configuration for the adaptive keepalive state machine.
|
|
#[derive(Debug, Clone)]
|
|
pub struct AdaptiveKeepaliveConfig {
|
|
/// Interval when link health is Healthy.
|
|
pub healthy_interval: Duration,
|
|
/// Interval when link health is Degraded.
|
|
pub degraded_interval: Duration,
|
|
/// Interval when link health is Critical.
|
|
pub critical_interval: Duration,
|
|
/// ACK timeout (how long to wait for ACK before declaring timeout).
|
|
pub ack_timeout: Duration,
|
|
/// Jitter threshold (ms) to enter Degraded from Healthy.
|
|
pub jitter_degraded_ms: f64,
|
|
/// Jitter threshold (ms) to return to Healthy from Degraded.
|
|
pub jitter_healthy_ms: f64,
|
|
/// Loss ratio threshold to enter Degraded.
|
|
pub loss_degraded: f64,
|
|
/// Loss ratio threshold to enter Critical.
|
|
pub loss_critical: f64,
|
|
/// Loss ratio threshold to return from Critical to Degraded.
|
|
pub loss_recover: f64,
|
|
/// Loss ratio threshold to return from Degraded to Healthy.
|
|
pub loss_healthy: f64,
|
|
/// Consecutive checks required for upward state transitions (hysteresis).
|
|
pub upgrade_checks: u32,
|
|
/// Consecutive timeouts to declare peer dead in Critical state.
|
|
pub dead_peer_timeouts: u32,
|
|
}
|
|
|
|
impl Default for AdaptiveKeepaliveConfig {
|
|
fn default() -> Self {
|
|
Self {
|
|
healthy_interval: Duration::from_secs(60),
|
|
degraded_interval: Duration::from_secs(30),
|
|
critical_interval: Duration::from_secs(10),
|
|
ack_timeout: Duration::from_secs(5),
|
|
jitter_degraded_ms: 50.0,
|
|
jitter_healthy_ms: 30.0,
|
|
loss_degraded: 0.05,
|
|
loss_critical: 0.20,
|
|
loss_recover: 0.10,
|
|
loss_healthy: 0.02,
|
|
upgrade_checks: 3,
|
|
dead_peer_timeouts: 3,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Signals from the keepalive monitor.
|
|
#[derive(Debug, Clone)]
|
|
pub enum KeepaliveSignal {
|
|
/// Time to send a keepalive ping. Contains the timestamp (ms since epoch) to embed in payload.
|
|
SendPing(u64),
|
|
/// Peer is considered dead (no ACK received within timeout repeatedly).
|
|
PeerDead,
|
|
/// Link health state changed.
|
|
LinkHealthChanged(LinkHealth),
|
|
}
|
|
|
|
/// A keepalive monitor with adaptive interval and RTT tracking.
|
|
pub struct KeepaliveMonitor {
|
|
config: AdaptiveKeepaliveConfig,
|
|
health: LinkHealth,
|
|
rtt_tracker: RttTracker,
|
|
signal_tx: mpsc::Sender<KeepaliveSignal>,
|
|
ack_rx: mpsc::Receiver<()>,
|
|
quality_tx: watch::Sender<ConnectionQuality>,
|
|
consecutive_upgrade_checks: u32,
|
|
}
|
|
|
|
/// Handle returned to the caller to send ACKs and receive signals.
|
|
pub struct KeepaliveHandle {
|
|
pub signal_rx: mpsc::Receiver<KeepaliveSignal>,
|
|
pub ack_tx: mpsc::Sender<()>,
|
|
pub quality_rx: watch::Receiver<ConnectionQuality>,
|
|
}
|
|
|
|
/// Create an adaptive keepalive monitor and its handle.
|
|
pub fn create_keepalive(
|
|
config: Option<AdaptiveKeepaliveConfig>,
|
|
) -> (KeepaliveMonitor, KeepaliveHandle) {
|
|
let config = config.unwrap_or_default();
|
|
let (signal_tx, signal_rx) = mpsc::channel(8);
|
|
let (ack_tx, ack_rx) = mpsc::channel(8);
|
|
let (quality_tx, quality_rx) = watch::channel(ConnectionQuality::default());
|
|
|
|
let monitor = KeepaliveMonitor {
|
|
config,
|
|
health: LinkHealth::Degraded, // start in Degraded, earn Healthy
|
|
rtt_tracker: RttTracker::new(30),
|
|
signal_tx,
|
|
ack_rx,
|
|
quality_tx,
|
|
consecutive_upgrade_checks: 0,
|
|
};
|
|
|
|
let handle = KeepaliveHandle {
|
|
signal_rx,
|
|
ack_tx,
|
|
quality_rx,
|
|
};
|
|
|
|
(monitor, handle)
|
|
}
|
|
|
|
impl KeepaliveMonitor {
|
|
fn current_interval(&self) -> Duration {
|
|
match self.health {
|
|
LinkHealth::Healthy => self.config.healthy_interval,
|
|
LinkHealth::Degraded => self.config.degraded_interval,
|
|
LinkHealth::Critical => self.config.critical_interval,
|
|
}
|
|
}
|
|
|
|
/// Run the keepalive loop. Blocks until the peer is dead or channels close.
|
|
pub async fn run(mut self) {
|
|
let mut ticker = interval(self.current_interval());
|
|
ticker.tick().await; // skip first immediate tick
|
|
|
|
loop {
|
|
ticker.tick().await;
|
|
|
|
// Record ping sent, get timestamp for payload
|
|
let timestamp_ms = self.rtt_tracker.mark_ping_sent();
|
|
debug!("Sending keepalive ping (ts={})", timestamp_ms);
|
|
|
|
if self
|
|
.signal_tx
|
|
.send(KeepaliveSignal::SendPing(timestamp_ms))
|
|
.await
|
|
.is_err()
|
|
{
|
|
break; // channel closed
|
|
}
|
|
|
|
// Wait for ACK within timeout
|
|
match timeout(self.config.ack_timeout, self.ack_rx.recv()).await {
|
|
Ok(Some(())) => {
|
|
if let Some(rtt) = self.rtt_tracker.record_ack(timestamp_ms) {
|
|
debug!("Keepalive ACK received, RTT: {:?}", rtt);
|
|
}
|
|
}
|
|
Ok(None) => {
|
|
break; // channel closed
|
|
}
|
|
Err(_) => {
|
|
self.rtt_tracker.record_timeout();
|
|
warn!(
|
|
"Keepalive ACK timeout (consecutive: {})",
|
|
self.rtt_tracker.consecutive_timeouts
|
|
);
|
|
}
|
|
}
|
|
|
|
// Publish quality snapshot
|
|
let quality = self.rtt_tracker.snapshot();
|
|
let _ = self.quality_tx.send(quality.clone());
|
|
|
|
// Evaluate state transition
|
|
let new_health = self.evaluate_health(&quality);
|
|
|
|
if new_health != self.health {
|
|
info!("Link health: {} -> {}", self.health, new_health);
|
|
self.health = new_health;
|
|
self.consecutive_upgrade_checks = 0;
|
|
|
|
// Reset ticker to new interval
|
|
ticker = interval(self.current_interval());
|
|
ticker.tick().await; // skip first immediate tick
|
|
|
|
let _ = self
|
|
.signal_tx
|
|
.send(KeepaliveSignal::LinkHealthChanged(new_health))
|
|
.await;
|
|
}
|
|
|
|
// Check for dead peer in Critical state
|
|
if self.health == LinkHealth::Critical
|
|
&& self.rtt_tracker.consecutive_timeouts >= self.config.dead_peer_timeouts
|
|
{
|
|
warn!("Peer considered dead after {} consecutive timeouts in Critical state",
|
|
self.rtt_tracker.consecutive_timeouts);
|
|
let _ = self.signal_tx.send(KeepaliveSignal::PeerDead).await;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
fn evaluate_health(&mut self, quality: &ConnectionQuality) -> LinkHealth {
|
|
match self.health {
|
|
LinkHealth::Healthy => {
|
|
// Downgrade conditions
|
|
if quality.consecutive_timeouts >= 2 || quality.loss_ratio > self.config.loss_critical {
|
|
self.consecutive_upgrade_checks = 0;
|
|
return LinkHealth::Critical;
|
|
}
|
|
if quality.jitter_ms > self.config.jitter_degraded_ms
|
|
|| quality.loss_ratio > self.config.loss_degraded
|
|
|| quality.consecutive_timeouts >= 1
|
|
{
|
|
self.consecutive_upgrade_checks = 0;
|
|
return LinkHealth::Degraded;
|
|
}
|
|
LinkHealth::Healthy
|
|
}
|
|
LinkHealth::Degraded => {
|
|
// Downgrade to Critical
|
|
if quality.consecutive_timeouts >= 2 || quality.loss_ratio > self.config.loss_critical {
|
|
self.consecutive_upgrade_checks = 0;
|
|
return LinkHealth::Critical;
|
|
}
|
|
// Upgrade to Healthy (with hysteresis)
|
|
if quality.jitter_ms < self.config.jitter_healthy_ms
|
|
&& quality.loss_ratio < self.config.loss_healthy
|
|
&& quality.consecutive_timeouts == 0
|
|
{
|
|
self.consecutive_upgrade_checks += 1;
|
|
if self.consecutive_upgrade_checks >= self.config.upgrade_checks {
|
|
self.consecutive_upgrade_checks = 0;
|
|
return LinkHealth::Healthy;
|
|
}
|
|
} else {
|
|
self.consecutive_upgrade_checks = 0;
|
|
}
|
|
LinkHealth::Degraded
|
|
}
|
|
LinkHealth::Critical => {
|
|
// Upgrade to Degraded (with hysteresis), never directly to Healthy
|
|
if quality.loss_ratio < self.config.loss_recover
|
|
&& quality.consecutive_timeouts == 0
|
|
{
|
|
self.consecutive_upgrade_checks += 1;
|
|
if self.consecutive_upgrade_checks >= 2 {
|
|
self.consecutive_upgrade_checks = 0;
|
|
return LinkHealth::Degraded;
|
|
}
|
|
} else {
|
|
self.consecutive_upgrade_checks = 0;
|
|
}
|
|
LinkHealth::Critical
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn default_config_values() {
|
|
let config = AdaptiveKeepaliveConfig::default();
|
|
assert_eq!(config.healthy_interval, Duration::from_secs(60));
|
|
assert_eq!(config.degraded_interval, Duration::from_secs(30));
|
|
assert_eq!(config.critical_interval, Duration::from_secs(10));
|
|
assert_eq!(config.ack_timeout, Duration::from_secs(5));
|
|
assert_eq!(config.dead_peer_timeouts, 3);
|
|
}
|
|
|
|
#[test]
|
|
fn link_health_display() {
|
|
assert_eq!(format!("{}", LinkHealth::Healthy), "healthy");
|
|
assert_eq!(format!("{}", LinkHealth::Degraded), "degraded");
|
|
assert_eq!(format!("{}", LinkHealth::Critical), "critical");
|
|
}
|
|
|
|
// Helper to create a monitor for unit-testing evaluate_health
|
|
fn make_test_monitor() -> KeepaliveMonitor {
|
|
let (signal_tx, _signal_rx) = mpsc::channel(8);
|
|
let (_ack_tx, ack_rx) = mpsc::channel(8);
|
|
let (quality_tx, _quality_rx) = watch::channel(ConnectionQuality::default());
|
|
|
|
KeepaliveMonitor {
|
|
config: AdaptiveKeepaliveConfig::default(),
|
|
health: LinkHealth::Degraded,
|
|
rtt_tracker: RttTracker::new(30),
|
|
signal_tx,
|
|
ack_rx,
|
|
quality_tx,
|
|
consecutive_upgrade_checks: 0,
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn healthy_to_degraded_on_jitter() {
|
|
let mut m = make_test_monitor();
|
|
m.health = LinkHealth::Healthy;
|
|
let q = ConnectionQuality {
|
|
jitter_ms: 60.0, // > 50ms threshold
|
|
..Default::default()
|
|
};
|
|
let result = m.evaluate_health(&q);
|
|
assert_eq!(result, LinkHealth::Degraded);
|
|
}
|
|
|
|
#[test]
|
|
fn healthy_to_degraded_on_loss() {
|
|
let mut m = make_test_monitor();
|
|
m.health = LinkHealth::Healthy;
|
|
let q = ConnectionQuality {
|
|
loss_ratio: 0.06, // > 5% threshold
|
|
..Default::default()
|
|
};
|
|
let result = m.evaluate_health(&q);
|
|
assert_eq!(result, LinkHealth::Degraded);
|
|
}
|
|
|
|
#[test]
|
|
fn healthy_to_critical_on_high_loss() {
|
|
let mut m = make_test_monitor();
|
|
m.health = LinkHealth::Healthy;
|
|
let q = ConnectionQuality {
|
|
loss_ratio: 0.25, // > 20% threshold
|
|
..Default::default()
|
|
};
|
|
let result = m.evaluate_health(&q);
|
|
assert_eq!(result, LinkHealth::Critical);
|
|
}
|
|
|
|
#[test]
|
|
fn healthy_to_critical_on_consecutive_timeouts() {
|
|
let mut m = make_test_monitor();
|
|
m.health = LinkHealth::Healthy;
|
|
let q = ConnectionQuality {
|
|
consecutive_timeouts: 2,
|
|
..Default::default()
|
|
};
|
|
let result = m.evaluate_health(&q);
|
|
assert_eq!(result, LinkHealth::Critical);
|
|
}
|
|
|
|
#[test]
|
|
fn degraded_to_healthy_requires_hysteresis() {
|
|
let mut m = make_test_monitor();
|
|
m.health = LinkHealth::Degraded;
|
|
let good_quality = ConnectionQuality {
|
|
jitter_ms: 10.0,
|
|
loss_ratio: 0.0,
|
|
consecutive_timeouts: 0,
|
|
srtt_ms: 20.0,
|
|
..Default::default()
|
|
};
|
|
|
|
// Should require 3 consecutive good checks (default upgrade_checks)
|
|
assert_eq!(m.evaluate_health(&good_quality), LinkHealth::Degraded);
|
|
assert_eq!(m.consecutive_upgrade_checks, 1);
|
|
assert_eq!(m.evaluate_health(&good_quality), LinkHealth::Degraded);
|
|
assert_eq!(m.consecutive_upgrade_checks, 2);
|
|
assert_eq!(m.evaluate_health(&good_quality), LinkHealth::Healthy);
|
|
}
|
|
|
|
#[test]
|
|
fn degraded_to_healthy_resets_on_bad_check() {
|
|
let mut m = make_test_monitor();
|
|
m.health = LinkHealth::Degraded;
|
|
let good = ConnectionQuality {
|
|
jitter_ms: 10.0,
|
|
loss_ratio: 0.0,
|
|
consecutive_timeouts: 0,
|
|
..Default::default()
|
|
};
|
|
let bad = ConnectionQuality {
|
|
jitter_ms: 60.0, // too high
|
|
loss_ratio: 0.0,
|
|
consecutive_timeouts: 0,
|
|
..Default::default()
|
|
};
|
|
|
|
m.evaluate_health(&good); // 1 check
|
|
m.evaluate_health(&good); // 2 checks
|
|
m.evaluate_health(&bad); // resets
|
|
assert_eq!(m.consecutive_upgrade_checks, 0);
|
|
}
|
|
|
|
#[test]
|
|
fn critical_to_degraded_requires_hysteresis() {
|
|
let mut m = make_test_monitor();
|
|
m.health = LinkHealth::Critical;
|
|
let recovering = ConnectionQuality {
|
|
loss_ratio: 0.05, // < 10% recover threshold
|
|
consecutive_timeouts: 0,
|
|
..Default::default()
|
|
};
|
|
|
|
assert_eq!(m.evaluate_health(&recovering), LinkHealth::Critical);
|
|
assert_eq!(m.consecutive_upgrade_checks, 1);
|
|
assert_eq!(m.evaluate_health(&recovering), LinkHealth::Degraded);
|
|
}
|
|
|
|
#[test]
|
|
fn critical_never_directly_to_healthy() {
|
|
let mut m = make_test_monitor();
|
|
m.health = LinkHealth::Critical;
|
|
let perfect = ConnectionQuality {
|
|
jitter_ms: 1.0,
|
|
loss_ratio: 0.0,
|
|
consecutive_timeouts: 0,
|
|
srtt_ms: 10.0,
|
|
..Default::default()
|
|
};
|
|
|
|
// Even with perfect quality, must go through Degraded first
|
|
m.evaluate_health(&perfect); // 1
|
|
let result = m.evaluate_health(&perfect); // 2 → Degraded
|
|
assert_eq!(result, LinkHealth::Degraded);
|
|
// Not Healthy yet
|
|
}
|
|
|
|
#[test]
|
|
fn degraded_to_critical_on_high_loss() {
|
|
let mut m = make_test_monitor();
|
|
m.health = LinkHealth::Degraded;
|
|
let q = ConnectionQuality {
|
|
loss_ratio: 0.25,
|
|
..Default::default()
|
|
};
|
|
assert_eq!(m.evaluate_health(&q), LinkHealth::Critical);
|
|
}
|
|
|
|
#[test]
|
|
fn interval_matches_health() {
|
|
let mut m = make_test_monitor();
|
|
m.health = LinkHealth::Healthy;
|
|
assert_eq!(m.current_interval(), Duration::from_secs(60));
|
|
m.health = LinkHealth::Degraded;
|
|
assert_eq!(m.current_interval(), Duration::from_secs(30));
|
|
m.health = LinkHealth::Critical;
|
|
assert_eq!(m.current_interval(), Duration::from_secs(10));
|
|
}
|
|
}
|