use std::time::Duration; use tokio::sync::{mpsc, watch}; use tokio::time::{interval, timeout}; use tracing::{debug, info, warn}; use crate::telemetry::{ConnectionQuality, RttTracker}; /// Default keepalive interval (30 seconds — used for Degraded state). pub const DEFAULT_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30); /// Default keepalive ACK timeout (5 seconds). pub const DEFAULT_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(5); /// Link health states for adaptive keepalive. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum LinkHealth { /// RTT stable, jitter low, no loss. Interval: 60s. Healthy, /// Elevated jitter or occasional loss. Interval: 30s. Degraded, /// High loss or sustained jitter spike. Interval: 10s. Critical, } impl std::fmt::Display for LinkHealth { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Healthy => write!(f, "healthy"), Self::Degraded => write!(f, "degraded"), Self::Critical => write!(f, "critical"), } } } /// Configuration for the adaptive keepalive state machine. #[derive(Debug, Clone)] pub struct AdaptiveKeepaliveConfig { /// Interval when link health is Healthy. pub healthy_interval: Duration, /// Interval when link health is Degraded. pub degraded_interval: Duration, /// Interval when link health is Critical. pub critical_interval: Duration, /// ACK timeout (how long to wait for ACK before declaring timeout). pub ack_timeout: Duration, /// Jitter threshold (ms) to enter Degraded from Healthy. pub jitter_degraded_ms: f64, /// Jitter threshold (ms) to return to Healthy from Degraded. pub jitter_healthy_ms: f64, /// Loss ratio threshold to enter Degraded. pub loss_degraded: f64, /// Loss ratio threshold to enter Critical. pub loss_critical: f64, /// Loss ratio threshold to return from Critical to Degraded. pub loss_recover: f64, /// Loss ratio threshold to return from Degraded to Healthy. pub loss_healthy: f64, /// Consecutive checks required for upward state transitions (hysteresis). pub upgrade_checks: u32, /// Consecutive timeouts to declare peer dead in Critical state. pub dead_peer_timeouts: u32, } impl Default for AdaptiveKeepaliveConfig { fn default() -> Self { Self { healthy_interval: Duration::from_secs(60), degraded_interval: Duration::from_secs(30), critical_interval: Duration::from_secs(10), ack_timeout: Duration::from_secs(5), jitter_degraded_ms: 50.0, jitter_healthy_ms: 30.0, loss_degraded: 0.05, loss_critical: 0.20, loss_recover: 0.10, loss_healthy: 0.02, upgrade_checks: 3, dead_peer_timeouts: 3, } } } /// Signals from the keepalive monitor. #[derive(Debug, Clone)] pub enum KeepaliveSignal { /// Time to send a keepalive ping. Contains the timestamp (ms since epoch) to embed in payload. SendPing(u64), /// Peer is considered dead (no ACK received within timeout repeatedly). PeerDead, /// Link health state changed. LinkHealthChanged(LinkHealth), } /// A keepalive monitor with adaptive interval and RTT tracking. pub struct KeepaliveMonitor { config: AdaptiveKeepaliveConfig, health: LinkHealth, rtt_tracker: RttTracker, signal_tx: mpsc::Sender, ack_rx: mpsc::Receiver<()>, quality_tx: watch::Sender, consecutive_upgrade_checks: u32, } /// Handle returned to the caller to send ACKs and receive signals. pub struct KeepaliveHandle { pub signal_rx: mpsc::Receiver, pub ack_tx: mpsc::Sender<()>, pub quality_rx: watch::Receiver, } /// Create an adaptive keepalive monitor and its handle. pub fn create_keepalive( config: Option, ) -> (KeepaliveMonitor, KeepaliveHandle) { let config = config.unwrap_or_default(); let (signal_tx, signal_rx) = mpsc::channel(8); let (ack_tx, ack_rx) = mpsc::channel(8); let (quality_tx, quality_rx) = watch::channel(ConnectionQuality::default()); let monitor = KeepaliveMonitor { config, health: LinkHealth::Degraded, // start in Degraded, earn Healthy rtt_tracker: RttTracker::new(30), signal_tx, ack_rx, quality_tx, consecutive_upgrade_checks: 0, }; let handle = KeepaliveHandle { signal_rx, ack_tx, quality_rx, }; (monitor, handle) } impl KeepaliveMonitor { fn current_interval(&self) -> Duration { match self.health { LinkHealth::Healthy => self.config.healthy_interval, LinkHealth::Degraded => self.config.degraded_interval, LinkHealth::Critical => self.config.critical_interval, } } /// Run the keepalive loop. Blocks until the peer is dead or channels close. pub async fn run(mut self) { let mut ticker = interval(self.current_interval()); ticker.tick().await; // skip first immediate tick loop { ticker.tick().await; // Record ping sent, get timestamp for payload let timestamp_ms = self.rtt_tracker.mark_ping_sent(); debug!("Sending keepalive ping (ts={})", timestamp_ms); if self .signal_tx .send(KeepaliveSignal::SendPing(timestamp_ms)) .await .is_err() { break; // channel closed } // Wait for ACK within timeout match timeout(self.config.ack_timeout, self.ack_rx.recv()).await { Ok(Some(())) => { if let Some(rtt) = self.rtt_tracker.record_ack(timestamp_ms) { debug!("Keepalive ACK received, RTT: {:?}", rtt); } } Ok(None) => { break; // channel closed } Err(_) => { self.rtt_tracker.record_timeout(); warn!( "Keepalive ACK timeout (consecutive: {})", self.rtt_tracker.consecutive_timeouts ); } } // Publish quality snapshot let quality = self.rtt_tracker.snapshot(); let _ = self.quality_tx.send(quality.clone()); // Evaluate state transition let new_health = self.evaluate_health(&quality); if new_health != self.health { info!("Link health: {} -> {}", self.health, new_health); self.health = new_health; self.consecutive_upgrade_checks = 0; // Reset ticker to new interval ticker = interval(self.current_interval()); ticker.tick().await; // skip first immediate tick let _ = self .signal_tx .send(KeepaliveSignal::LinkHealthChanged(new_health)) .await; } // Check for dead peer in Critical state if self.health == LinkHealth::Critical && self.rtt_tracker.consecutive_timeouts >= self.config.dead_peer_timeouts { warn!("Peer considered dead after {} consecutive timeouts in Critical state", self.rtt_tracker.consecutive_timeouts); let _ = self.signal_tx.send(KeepaliveSignal::PeerDead).await; break; } } } fn evaluate_health(&mut self, quality: &ConnectionQuality) -> LinkHealth { match self.health { LinkHealth::Healthy => { // Downgrade conditions if quality.consecutive_timeouts >= 2 || quality.loss_ratio > self.config.loss_critical { self.consecutive_upgrade_checks = 0; return LinkHealth::Critical; } if quality.jitter_ms > self.config.jitter_degraded_ms || quality.loss_ratio > self.config.loss_degraded || quality.consecutive_timeouts >= 1 { self.consecutive_upgrade_checks = 0; return LinkHealth::Degraded; } LinkHealth::Healthy } LinkHealth::Degraded => { // Downgrade to Critical if quality.consecutive_timeouts >= 2 || quality.loss_ratio > self.config.loss_critical { self.consecutive_upgrade_checks = 0; return LinkHealth::Critical; } // Upgrade to Healthy (with hysteresis) if quality.jitter_ms < self.config.jitter_healthy_ms && quality.loss_ratio < self.config.loss_healthy && quality.consecutive_timeouts == 0 { self.consecutive_upgrade_checks += 1; if self.consecutive_upgrade_checks >= self.config.upgrade_checks { self.consecutive_upgrade_checks = 0; return LinkHealth::Healthy; } } else { self.consecutive_upgrade_checks = 0; } LinkHealth::Degraded } LinkHealth::Critical => { // Upgrade to Degraded (with hysteresis), never directly to Healthy if quality.loss_ratio < self.config.loss_recover && quality.consecutive_timeouts == 0 { self.consecutive_upgrade_checks += 1; if self.consecutive_upgrade_checks >= 2 { self.consecutive_upgrade_checks = 0; return LinkHealth::Degraded; } } else { self.consecutive_upgrade_checks = 0; } LinkHealth::Critical } } } } #[cfg(test)] mod tests { use super::*; #[test] fn default_config_values() { let config = AdaptiveKeepaliveConfig::default(); assert_eq!(config.healthy_interval, Duration::from_secs(60)); assert_eq!(config.degraded_interval, Duration::from_secs(30)); assert_eq!(config.critical_interval, Duration::from_secs(10)); assert_eq!(config.ack_timeout, Duration::from_secs(5)); assert_eq!(config.dead_peer_timeouts, 3); } #[test] fn link_health_display() { assert_eq!(format!("{}", LinkHealth::Healthy), "healthy"); assert_eq!(format!("{}", LinkHealth::Degraded), "degraded"); assert_eq!(format!("{}", LinkHealth::Critical), "critical"); } // Helper to create a monitor for unit-testing evaluate_health fn make_test_monitor() -> KeepaliveMonitor { let (signal_tx, _signal_rx) = mpsc::channel(8); let (_ack_tx, ack_rx) = mpsc::channel(8); let (quality_tx, _quality_rx) = watch::channel(ConnectionQuality::default()); KeepaliveMonitor { config: AdaptiveKeepaliveConfig::default(), health: LinkHealth::Degraded, rtt_tracker: RttTracker::new(30), signal_tx, ack_rx, quality_tx, consecutive_upgrade_checks: 0, } } #[test] fn healthy_to_degraded_on_jitter() { let mut m = make_test_monitor(); m.health = LinkHealth::Healthy; let q = ConnectionQuality { jitter_ms: 60.0, // > 50ms threshold ..Default::default() }; let result = m.evaluate_health(&q); assert_eq!(result, LinkHealth::Degraded); } #[test] fn healthy_to_degraded_on_loss() { let mut m = make_test_monitor(); m.health = LinkHealth::Healthy; let q = ConnectionQuality { loss_ratio: 0.06, // > 5% threshold ..Default::default() }; let result = m.evaluate_health(&q); assert_eq!(result, LinkHealth::Degraded); } #[test] fn healthy_to_critical_on_high_loss() { let mut m = make_test_monitor(); m.health = LinkHealth::Healthy; let q = ConnectionQuality { loss_ratio: 0.25, // > 20% threshold ..Default::default() }; let result = m.evaluate_health(&q); assert_eq!(result, LinkHealth::Critical); } #[test] fn healthy_to_critical_on_consecutive_timeouts() { let mut m = make_test_monitor(); m.health = LinkHealth::Healthy; let q = ConnectionQuality { consecutive_timeouts: 2, ..Default::default() }; let result = m.evaluate_health(&q); assert_eq!(result, LinkHealth::Critical); } #[test] fn degraded_to_healthy_requires_hysteresis() { let mut m = make_test_monitor(); m.health = LinkHealth::Degraded; let good_quality = ConnectionQuality { jitter_ms: 10.0, loss_ratio: 0.0, consecutive_timeouts: 0, srtt_ms: 20.0, ..Default::default() }; // Should require 3 consecutive good checks (default upgrade_checks) assert_eq!(m.evaluate_health(&good_quality), LinkHealth::Degraded); assert_eq!(m.consecutive_upgrade_checks, 1); assert_eq!(m.evaluate_health(&good_quality), LinkHealth::Degraded); assert_eq!(m.consecutive_upgrade_checks, 2); assert_eq!(m.evaluate_health(&good_quality), LinkHealth::Healthy); } #[test] fn degraded_to_healthy_resets_on_bad_check() { let mut m = make_test_monitor(); m.health = LinkHealth::Degraded; let good = ConnectionQuality { jitter_ms: 10.0, loss_ratio: 0.0, consecutive_timeouts: 0, ..Default::default() }; let bad = ConnectionQuality { jitter_ms: 60.0, // too high loss_ratio: 0.0, consecutive_timeouts: 0, ..Default::default() }; m.evaluate_health(&good); // 1 check m.evaluate_health(&good); // 2 checks m.evaluate_health(&bad); // resets assert_eq!(m.consecutive_upgrade_checks, 0); } #[test] fn critical_to_degraded_requires_hysteresis() { let mut m = make_test_monitor(); m.health = LinkHealth::Critical; let recovering = ConnectionQuality { loss_ratio: 0.05, // < 10% recover threshold consecutive_timeouts: 0, ..Default::default() }; assert_eq!(m.evaluate_health(&recovering), LinkHealth::Critical); assert_eq!(m.consecutive_upgrade_checks, 1); assert_eq!(m.evaluate_health(&recovering), LinkHealth::Degraded); } #[test] fn critical_never_directly_to_healthy() { let mut m = make_test_monitor(); m.health = LinkHealth::Critical; let perfect = ConnectionQuality { jitter_ms: 1.0, loss_ratio: 0.0, consecutive_timeouts: 0, srtt_ms: 10.0, ..Default::default() }; // Even with perfect quality, must go through Degraded first m.evaluate_health(&perfect); // 1 let result = m.evaluate_health(&perfect); // 2 → Degraded assert_eq!(result, LinkHealth::Degraded); // Not Healthy yet } #[test] fn degraded_to_critical_on_high_loss() { let mut m = make_test_monitor(); m.health = LinkHealth::Degraded; let q = ConnectionQuality { loss_ratio: 0.25, ..Default::default() }; assert_eq!(m.evaluate_health(&q), LinkHealth::Critical); } #[test] fn interval_matches_health() { let mut m = make_test_monitor(); m.health = LinkHealth::Healthy; assert_eq!(m.current_interval(), Duration::from_secs(60)); m.health = LinkHealth::Degraded; assert_eq!(m.current_interval(), Duration::from_secs(30)); m.health = LinkHealth::Critical; assert_eq!(m.current_interval(), Duration::from_secs(10)); } }