initial
This commit is contained in:
87
rust/src/keepalive.rs
Normal file
87
rust/src/keepalive.rs
Normal file
@@ -0,0 +1,87 @@
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::{interval, timeout};
|
||||
use tracing::{debug, warn};
|
||||
|
||||
/// Default keepalive interval (30 seconds).
|
||||
pub const DEFAULT_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
|
||||
|
||||
/// Default keepalive ACK timeout (10 seconds).
|
||||
pub const DEFAULT_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
/// Signals from the keepalive monitor.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum KeepaliveSignal {
|
||||
/// Time to send a keepalive ping.
|
||||
SendPing,
|
||||
/// Peer is considered dead (no ACK received within timeout).
|
||||
PeerDead,
|
||||
}
|
||||
|
||||
/// A keepalive monitor that emits signals on a channel.
|
||||
pub struct KeepaliveMonitor {
|
||||
interval: Duration,
|
||||
timeout_duration: Duration,
|
||||
signal_tx: mpsc::Sender<KeepaliveSignal>,
|
||||
ack_rx: mpsc::Receiver<()>,
|
||||
}
|
||||
|
||||
/// Handle returned to the caller to send ACKs and receive signals.
|
||||
pub struct KeepaliveHandle {
|
||||
pub signal_rx: mpsc::Receiver<KeepaliveSignal>,
|
||||
pub ack_tx: mpsc::Sender<()>,
|
||||
}
|
||||
|
||||
/// Create a keepalive monitor and its handle.
|
||||
pub fn create_keepalive(
|
||||
keepalive_interval: Option<Duration>,
|
||||
keepalive_timeout: Option<Duration>,
|
||||
) -> (KeepaliveMonitor, KeepaliveHandle) {
|
||||
let (signal_tx, signal_rx) = mpsc::channel(8);
|
||||
let (ack_tx, ack_rx) = mpsc::channel(8);
|
||||
|
||||
let monitor = KeepaliveMonitor {
|
||||
interval: keepalive_interval.unwrap_or(DEFAULT_KEEPALIVE_INTERVAL),
|
||||
timeout_duration: keepalive_timeout.unwrap_or(DEFAULT_KEEPALIVE_TIMEOUT),
|
||||
signal_tx,
|
||||
ack_rx,
|
||||
};
|
||||
|
||||
let handle = KeepaliveHandle { signal_rx, ack_tx };
|
||||
|
||||
(monitor, handle)
|
||||
}
|
||||
|
||||
impl KeepaliveMonitor {
|
||||
/// Run the keepalive loop. Blocks until the peer is dead or channels close.
|
||||
pub async fn run(mut self) {
|
||||
let mut ticker = interval(self.interval);
|
||||
ticker.tick().await; // skip first immediate tick
|
||||
|
||||
loop {
|
||||
ticker.tick().await;
|
||||
debug!("Sending keepalive ping signal");
|
||||
|
||||
if self.signal_tx.send(KeepaliveSignal::SendPing).await.is_err() {
|
||||
// Channel closed
|
||||
break;
|
||||
}
|
||||
|
||||
// Wait for ACK within timeout
|
||||
match timeout(self.timeout_duration, self.ack_rx.recv()).await {
|
||||
Ok(Some(())) => {
|
||||
debug!("Keepalive ACK received");
|
||||
}
|
||||
Ok(None) => {
|
||||
// Channel closed
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("Keepalive ACK timeout — peer considered dead");
|
||||
let _ = self.signal_tx.send(KeepaliveSignal::PeerDead).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user