fix(rustproxy): Use cooperative cancellation for background tasks, prune stale caches and metric entries, and switch tests to dynamic port allocation to avoid port conflicts

This commit is contained in:
2026-02-24 20:56:37 +00:00
parent 755c81c042
commit 33cd5330c4
24 changed files with 535 additions and 560 deletions

View File

@@ -167,6 +167,14 @@ impl HttpProxyService {
self.backend_tls_config = config;
}
/// Prune caches for route IDs that are no longer active.
/// Call after route updates to prevent unbounded growth.
pub fn prune_stale_routes(&self, active_route_ids: &std::collections::HashSet<String>) {
self.route_rate_limiters.retain(|k, _| active_route_ids.contains(k));
self.regex_cache.clear();
self.upstream_selector.reset_round_robin();
}
/// Handle an incoming HTTP connection on a plain TCP stream.
pub async fn handle_connection(
self: Arc<Self>,

View File

@@ -131,6 +131,14 @@ impl UpstreamSelector {
}
}
/// Clear stale round-robin counters on route update.
/// Resetting is harmless — counters just restart cycling from index 0.
pub fn reset_round_robin(&self) {
if let Ok(mut counters) = self.round_robin.lock() {
counters.clear();
}
}
fn ip_hash(addr: &SocketAddr) -> usize {
let ip_str = addr.ip().to_string();
let mut hash: usize = 5381;

View File

@@ -239,21 +239,26 @@ impl MetricsCollector {
}
if let Some(ip) = source_ip {
self.ip_bytes_in
.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(bytes_in, Ordering::Relaxed);
self.ip_bytes_out
.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(bytes_out, Ordering::Relaxed);
// Only record per-IP stats if the IP still has active connections.
// This prevents orphaned entries when record_bytes races with
// connection_closed (which evicts all per-IP data on last close).
if self.ip_connections.contains_key(ip) {
self.ip_bytes_in
.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(bytes_in, Ordering::Relaxed);
self.ip_bytes_out
.entry(ip.to_string())
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(bytes_out, Ordering::Relaxed);
// Accumulate into per-IP pending throughput counters (lock-free)
let entry = self.ip_pending_tp
.entry(ip.to_string())
.or_insert_with(|| (AtomicU64::new(0), AtomicU64::new(0)));
entry.0.fetch_add(bytes_in, Ordering::Relaxed);
entry.1.fetch_add(bytes_out, Ordering::Relaxed);
// Accumulate into per-IP pending throughput counters (lock-free)
let entry = self.ip_pending_tp
.entry(ip.to_string())
.or_insert_with(|| (AtomicU64::new(0), AtomicU64::new(0)));
entry.0.fetch_add(bytes_in, Ordering::Relaxed);
entry.1.fetch_add(bytes_out, Ordering::Relaxed);
}
}
}
@@ -347,6 +352,15 @@ impl MetricsCollector {
tracker.record_bytes(pending_reqs, 0);
tracker.sample();
}
// Safety-net: prune orphaned per-IP entries that have no corresponding
// ip_connections entry. This catches any entries created by a race between
// record_bytes and connection_closed.
self.ip_bytes_in.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_bytes_out.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_pending_tp.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_throughput.retain(|k, _| self.ip_connections.contains_key(k));
self.ip_total_connections.retain(|k, _| self.ip_connections.contains_key(k));
}
/// Remove per-route metrics for route IDs that are no longer active.
@@ -733,6 +747,49 @@ mod tests {
assert!(collector.route_total_connections.get("route-c").is_some());
}
#[test]
fn test_record_bytes_after_close_no_orphan() {
let collector = MetricsCollector::with_retention(60);
// Open a connection, record bytes, then close
collector.connection_opened(Some("route-a"), Some("10.0.0.1"));
collector.record_bytes(100, 200, Some("route-a"), Some("10.0.0.1"));
collector.connection_closed(Some("route-a"), Some("10.0.0.1"));
// IP should be fully evicted
assert!(collector.ip_connections.get("10.0.0.1").is_none());
// Now record_bytes arrives late (simulates race) — should NOT re-create entries
collector.record_bytes(50, 75, Some("route-a"), Some("10.0.0.1"));
assert!(collector.ip_bytes_in.get("10.0.0.1").is_none());
assert!(collector.ip_bytes_out.get("10.0.0.1").is_none());
assert!(collector.ip_pending_tp.get("10.0.0.1").is_none());
// Global bytes should still be counted
assert_eq!(collector.total_bytes_in.load(Ordering::Relaxed), 150);
assert_eq!(collector.total_bytes_out.load(Ordering::Relaxed), 275);
}
#[test]
fn test_sample_all_prunes_orphaned_ip_entries() {
let collector = MetricsCollector::with_retention(60);
// Manually insert orphaned entries (simulates the race before the guard)
collector.ip_bytes_in.insert("orphan-ip".to_string(), AtomicU64::new(100));
collector.ip_bytes_out.insert("orphan-ip".to_string(), AtomicU64::new(200));
collector.ip_pending_tp.insert("orphan-ip".to_string(), (AtomicU64::new(0), AtomicU64::new(0)));
// No ip_connections entry for "orphan-ip"
assert!(collector.ip_connections.get("orphan-ip").is_none());
// sample_all should prune the orphans
collector.sample_all();
assert!(collector.ip_bytes_in.get("orphan-ip").is_none());
assert!(collector.ip_bytes_out.get("orphan-ip").is_none());
assert!(collector.ip_pending_tp.get("orphan-ip").is_none());
}
#[test]
fn test_throughput_history_in_snapshot() {
let collector = MetricsCollector::with_retention(60);

View File

@@ -1,155 +0,0 @@
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
/// Per-connection tracking record with atomics for lock-free updates.
///
/// Each field uses atomics so that the forwarding tasks can update
/// bytes_received / bytes_sent / last_activity without holding any lock,
/// while the zombie scanner reads them concurrently.
pub struct ConnectionRecord {
/// Unique connection ID assigned by the ConnectionTracker.
pub id: u64,
/// Wall-clock instant when this connection was created.
pub created_at: Instant,
/// Milliseconds since `created_at` when the last activity occurred.
/// Updated atomically by the forwarding loops.
pub last_activity: AtomicU64,
/// Total bytes received from the client (inbound).
pub bytes_received: AtomicU64,
/// Total bytes sent to the client (outbound / from backend).
pub bytes_sent: AtomicU64,
/// True once the client side of the connection has closed.
pub client_closed: AtomicBool,
/// True once the backend side of the connection has closed.
pub backend_closed: AtomicBool,
/// Whether this connection uses TLS (affects zombie thresholds).
pub is_tls: AtomicBool,
/// Whether this connection has keep-alive semantics.
pub has_keep_alive: AtomicBool,
}
impl ConnectionRecord {
/// Create a new connection record with the given ID.
/// All counters start at zero, all flags start as false.
pub fn new(id: u64) -> Self {
Self {
id,
created_at: Instant::now(),
last_activity: AtomicU64::new(0),
bytes_received: AtomicU64::new(0),
bytes_sent: AtomicU64::new(0),
client_closed: AtomicBool::new(false),
backend_closed: AtomicBool::new(false),
is_tls: AtomicBool::new(false),
has_keep_alive: AtomicBool::new(false),
}
}
/// Update `last_activity` to reflect the current elapsed time.
pub fn touch(&self) {
let elapsed_ms = self.created_at.elapsed().as_millis() as u64;
self.last_activity.store(elapsed_ms, Ordering::Relaxed);
}
/// Record `n` bytes received from the client (inbound).
pub fn record_bytes_in(&self, n: u64) {
self.bytes_received.fetch_add(n, Ordering::Relaxed);
self.touch();
}
/// Record `n` bytes sent to the client (outbound / from backend).
pub fn record_bytes_out(&self, n: u64) {
self.bytes_sent.fetch_add(n, Ordering::Relaxed);
self.touch();
}
/// How long since the last activity on this connection.
pub fn idle_duration(&self) -> Duration {
let last_ms = self.last_activity.load(Ordering::Relaxed);
let age_ms = self.created_at.elapsed().as_millis() as u64;
Duration::from_millis(age_ms.saturating_sub(last_ms))
}
/// Total age of this connection (time since creation).
pub fn age(&self) -> Duration {
self.created_at.elapsed()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn test_new_record() {
let record = ConnectionRecord::new(42);
assert_eq!(record.id, 42);
assert_eq!(record.bytes_received.load(Ordering::Relaxed), 0);
assert_eq!(record.bytes_sent.load(Ordering::Relaxed), 0);
assert!(!record.client_closed.load(Ordering::Relaxed));
assert!(!record.backend_closed.load(Ordering::Relaxed));
assert!(!record.is_tls.load(Ordering::Relaxed));
assert!(!record.has_keep_alive.load(Ordering::Relaxed));
}
#[test]
fn test_record_bytes() {
let record = ConnectionRecord::new(1);
record.record_bytes_in(100);
record.record_bytes_in(200);
assert_eq!(record.bytes_received.load(Ordering::Relaxed), 300);
record.record_bytes_out(50);
record.record_bytes_out(75);
assert_eq!(record.bytes_sent.load(Ordering::Relaxed), 125);
}
#[test]
fn test_touch_updates_activity() {
let record = ConnectionRecord::new(1);
assert_eq!(record.last_activity.load(Ordering::Relaxed), 0);
// Sleep briefly so elapsed time is nonzero
thread::sleep(Duration::from_millis(10));
record.touch();
let activity = record.last_activity.load(Ordering::Relaxed);
assert!(activity >= 10, "last_activity should be at least 10ms, got {}", activity);
}
#[test]
fn test_idle_duration() {
let record = ConnectionRecord::new(1);
// Initially idle_duration ~ age since last_activity is 0
thread::sleep(Duration::from_millis(20));
let idle = record.idle_duration();
assert!(idle >= Duration::from_millis(20));
// After touch, idle should be near zero
record.touch();
let idle = record.idle_duration();
assert!(idle < Duration::from_millis(10));
}
#[test]
fn test_age() {
let record = ConnectionRecord::new(1);
thread::sleep(Duration::from_millis(20));
let age = record.age();
assert!(age >= Duration::from_millis(20));
}
#[test]
fn test_flags() {
let record = ConnectionRecord::new(1);
record.client_closed.store(true, Ordering::Relaxed);
record.is_tls.store(true, Ordering::Relaxed);
record.has_keep_alive.store(true, Ordering::Relaxed);
assert!(record.client_closed.load(Ordering::Relaxed));
assert!(!record.backend_closed.load(Ordering::Relaxed));
assert!(record.is_tls.load(Ordering::Relaxed));
assert!(record.has_keep_alive.load(Ordering::Relaxed));
}
}

View File

@@ -2,24 +2,9 @@ use dashmap::DashMap;
use std::collections::VecDeque;
use std::net::IpAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
use super::connection_record::ConnectionRecord;
/// Thresholds for zombie detection (non-TLS connections).
const HALF_ZOMBIE_TIMEOUT_PLAIN: Duration = Duration::from_secs(30);
/// Thresholds for zombie detection (TLS connections).
const HALF_ZOMBIE_TIMEOUT_TLS: Duration = Duration::from_secs(300);
/// Stuck connection timeout (non-TLS): received data but never sent any.
const STUCK_TIMEOUT_PLAIN: Duration = Duration::from_secs(60);
/// Stuck connection timeout (TLS): received data but never sent any.
const STUCK_TIMEOUT_TLS: Duration = Duration::from_secs(300);
/// Tracks active connections per IP and enforces per-IP limits and rate limiting.
/// Also maintains per-connection records for zombie detection.
pub struct ConnectionTracker {
/// Active connection counts per IP
active: DashMap<IpAddr, AtomicU64>,
@@ -29,10 +14,6 @@ pub struct ConnectionTracker {
max_per_ip: Option<u64>,
/// Maximum new connections per minute per IP (None = unlimited)
rate_limit_per_minute: Option<u64>,
/// Per-connection tracking records for zombie detection
connections: DashMap<u64, Arc<ConnectionRecord>>,
/// Monotonically increasing connection ID counter
next_id: AtomicU64,
}
impl ConnectionTracker {
@@ -42,8 +23,6 @@ impl ConnectionTracker {
timestamps: DashMap::new(),
max_per_ip,
rate_limit_per_minute,
connections: DashMap::new(),
next_id: AtomicU64::new(1),
}
}
@@ -112,118 +91,27 @@ impl ConnectionTracker {
.unwrap_or(0)
}
/// Prune stale timestamp entries for IPs that have no active connections
/// and no recent timestamps. This cleans up entries left by rate-limited IPs
/// that never had connection_opened called.
pub fn cleanup_stale_timestamps(&self) {
if self.rate_limit_per_minute.is_none() {
return; // No rate limiting — timestamps map should be empty
}
let now = Instant::now();
let one_minute = Duration::from_secs(60);
self.timestamps.retain(|ip, timestamps| {
timestamps.retain(|t| now.duration_since(*t) < one_minute);
// Keep if there are active connections or recent timestamps
!timestamps.is_empty() || self.active.contains_key(ip)
});
}
/// Get the total number of tracked IPs.
pub fn tracked_ips(&self) -> usize {
self.active.len()
}
/// Register a new connection and return its tracking record.
///
/// The returned `Arc<ConnectionRecord>` should be passed to the forwarding
/// loop so it can update bytes / activity atomics in real time.
pub fn register_connection(&self, is_tls: bool) -> Arc<ConnectionRecord> {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let record = Arc::new(ConnectionRecord::new(id));
record.is_tls.store(is_tls, Ordering::Relaxed);
self.connections.insert(id, Arc::clone(&record));
record
}
/// Remove a connection record when the connection is fully closed.
pub fn unregister_connection(&self, id: u64) {
self.connections.remove(&id);
}
/// Scan all tracked connections and return IDs of zombie connections.
///
/// A connection is considered a zombie in any of these cases:
/// - **Full zombie**: both `client_closed` and `backend_closed` are true.
/// - **Half zombie**: one side closed for longer than the threshold
/// (5 min for TLS, 30s for non-TLS).
/// - **Stuck**: `bytes_received > 0` but `bytes_sent == 0` for longer
/// than the stuck threshold (5 min for TLS, 60s for non-TLS).
pub fn scan_zombies(&self) -> Vec<u64> {
let mut zombies = Vec::new();
for entry in self.connections.iter() {
let record = entry.value();
let id = *entry.key();
let is_tls = record.is_tls.load(Ordering::Relaxed);
let client_closed = record.client_closed.load(Ordering::Relaxed);
let backend_closed = record.backend_closed.load(Ordering::Relaxed);
let idle = record.idle_duration();
let bytes_in = record.bytes_received.load(Ordering::Relaxed);
let bytes_out = record.bytes_sent.load(Ordering::Relaxed);
// Full zombie: both sides closed
if client_closed && backend_closed {
zombies.push(id);
continue;
}
// Half zombie: one side closed for too long
let half_timeout = if is_tls {
HALF_ZOMBIE_TIMEOUT_TLS
} else {
HALF_ZOMBIE_TIMEOUT_PLAIN
};
if (client_closed || backend_closed) && idle >= half_timeout {
zombies.push(id);
continue;
}
// Stuck: received data but never sent anything for too long
let stuck_timeout = if is_tls {
STUCK_TIMEOUT_TLS
} else {
STUCK_TIMEOUT_PLAIN
};
if bytes_in > 0 && bytes_out == 0 && idle >= stuck_timeout {
zombies.push(id);
}
}
zombies
}
/// Start a background task that periodically scans for zombie connections.
///
/// The scanner runs every 10 seconds and logs any zombies it finds.
/// It stops when the provided `CancellationToken` is cancelled.
pub fn start_zombie_scanner(self: &Arc<Self>, cancel: CancellationToken) {
let tracker = Arc::clone(self);
tokio::spawn(async move {
let interval = Duration::from_secs(10);
loop {
tokio::select! {
_ = cancel.cancelled() => {
debug!("Zombie scanner shutting down");
break;
}
_ = tokio::time::sleep(interval) => {
let zombies = tracker.scan_zombies();
if !zombies.is_empty() {
warn!(
"Cleaning up {} zombie connection(s): {:?}",
zombies.len(),
zombies
);
for id in &zombies {
tracker.unregister_connection(*id);
}
}
}
}
}
});
}
/// Get the total number of tracked connections (with records).
pub fn total_connections(&self) -> usize {
self.connections.len()
}
}
#[cfg(test)]
@@ -333,98 +221,27 @@ mod tests {
}
#[test]
fn test_register_unregister_connection() {
let tracker = ConnectionTracker::new(None, None);
assert_eq!(tracker.total_connections(), 0);
fn test_cleanup_stale_timestamps() {
// Rate limit of 100/min so timestamps are tracked
let tracker = ConnectionTracker::new(None, Some(100));
let ip: IpAddr = "10.0.0.1".parse().unwrap();
let record1 = tracker.register_connection(false);
assert_eq!(tracker.total_connections(), 1);
assert!(!record1.is_tls.load(Ordering::Relaxed));
// try_accept adds a timestamp entry
assert!(tracker.try_accept(&ip));
let record2 = tracker.register_connection(true);
assert_eq!(tracker.total_connections(), 2);
assert!(record2.is_tls.load(Ordering::Relaxed));
// Simulate: connection was rate-limited and never accepted,
// so no connection_opened / connection_closed pair
assert!(tracker.timestamps.get(&ip).is_some());
assert!(tracker.active.get(&ip).is_none()); // never opened
// IDs should be unique
assert_ne!(record1.id, record2.id);
// Cleanup won't remove it yet because timestamp is recent
tracker.cleanup_stale_timestamps();
assert!(tracker.timestamps.get(&ip).is_some());
tracker.unregister_connection(record1.id);
assert_eq!(tracker.total_connections(), 1);
tracker.unregister_connection(record2.id);
assert_eq!(tracker.total_connections(), 0);
}
#[test]
fn test_full_zombie_detection() {
let tracker = ConnectionTracker::new(None, None);
let record = tracker.register_connection(false);
// Not a zombie initially
assert!(tracker.scan_zombies().is_empty());
// Set both sides closed -> full zombie
record.client_closed.store(true, Ordering::Relaxed);
record.backend_closed.store(true, Ordering::Relaxed);
let zombies = tracker.scan_zombies();
assert_eq!(zombies.len(), 1);
assert_eq!(zombies[0], record.id);
}
#[test]
fn test_half_zombie_not_triggered_immediately() {
let tracker = ConnectionTracker::new(None, None);
let record = tracker.register_connection(false);
record.touch(); // mark activity now
// Only one side closed, but just now -> not a zombie yet
record.client_closed.store(true, Ordering::Relaxed);
assert!(tracker.scan_zombies().is_empty());
}
#[test]
fn test_stuck_connection_not_triggered_immediately() {
let tracker = ConnectionTracker::new(None, None);
let record = tracker.register_connection(false);
record.touch(); // mark activity now
// Has received data but sent nothing -> but just started, not stuck yet
record.bytes_received.store(1000, Ordering::Relaxed);
assert!(tracker.scan_zombies().is_empty());
}
#[test]
fn test_unregister_removes_from_zombie_scan() {
let tracker = ConnectionTracker::new(None, None);
let record = tracker.register_connection(false);
let id = record.id;
// Make it a full zombie
record.client_closed.store(true, Ordering::Relaxed);
record.backend_closed.store(true, Ordering::Relaxed);
assert_eq!(tracker.scan_zombies().len(), 1);
// Unregister should remove it
tracker.unregister_connection(id);
assert!(tracker.scan_zombies().is_empty());
}
#[test]
fn test_total_connections() {
let tracker = ConnectionTracker::new(None, None);
assert_eq!(tracker.total_connections(), 0);
let r1 = tracker.register_connection(false);
let r2 = tracker.register_connection(true);
let r3 = tracker.register_connection(false);
assert_eq!(tracker.total_connections(), 3);
tracker.unregister_connection(r2.id);
assert_eq!(tracker.total_connections(), 2);
tracker.unregister_connection(r1.id);
tracker.unregister_connection(r3.id);
assert_eq!(tracker.total_connections(), 0);
// After expiry (use 0-second window trick: create tracker with 0 rate)
// Actually, we can't fast-forward time easily, so just verify the cleanup
// doesn't panic and handles the no-rate-limit case
let tracker2 = ConnectionTracker::new(None, None);
tracker2.cleanup_stale_timestamps(); // should be a no-op
}
}

View File

@@ -8,7 +8,6 @@ pub mod sni_parser;
pub mod forwarder;
pub mod proxy_protocol;
pub mod tls_handler;
pub mod connection_record;
pub mod connection_tracker;
pub mod socket_relay;
pub mod socket_opts;
@@ -18,7 +17,6 @@ pub use sni_parser::*;
pub use forwarder::*;
pub use proxy_protocol::*;
pub use tls_handler::*;
pub use connection_record::*;
pub use connection_tracker::*;
pub use socket_relay::*;
pub use socket_opts::*;

View File

@@ -41,6 +41,25 @@ impl Drop for ConnectionGuard {
}
}
/// RAII guard that calls ConnectionTracker::connection_closed on drop.
/// Ensures per-IP tracking is cleaned up on ALL exit paths — normal, error, or panic.
struct ConnectionTrackerGuard {
tracker: Arc<ConnectionTracker>,
ip: std::net::IpAddr,
}
impl ConnectionTrackerGuard {
fn new(tracker: Arc<ConnectionTracker>, ip: std::net::IpAddr) -> Self {
Self { tracker, ip }
}
}
impl Drop for ConnectionTrackerGuard {
fn drop(&mut self) {
self.tracker.connection_closed(&self.ip);
}
}
#[derive(Debug, Error)]
pub enum ListenerError {
#[error("Failed to bind port {port}: {source}")]
@@ -351,6 +370,16 @@ impl TcpListenerManager {
self.route_manager.store(route_manager);
}
/// Prune HTTP proxy caches for route IDs that are no longer active.
pub fn prune_http_proxy_caches(&self, active_route_ids: &std::collections::HashSet<String>) {
self.http_proxy.prune_stale_routes(active_route_ids);
}
/// Get a reference to the connection tracker.
pub fn conn_tracker(&self) -> &Arc<ConnectionTracker> {
&self.conn_tracker
}
/// Get a reference to the metrics collector.
pub fn metrics(&self) -> &Arc<MetricsCollector> {
&self.metrics
@@ -429,13 +458,14 @@ impl TcpListenerManager {
tokio::spawn(async move {
// Move permit into the task — auto-releases on drop
let _permit = permit;
// RAII guard ensures connection_closed is called on all paths
let _ct_guard = ConnectionTrackerGuard::new(ct, ip);
let result = Self::handle_connection(
stream, port, peer_addr, rm, m, tc, sa, hp, cc, cn, sr,
).await;
if let Err(e) = result {
debug!("Connection error from {}: {}", peer_addr, e);
}
ct.connection_closed(&ip);
});
}
Err(e) => {
@@ -1372,3 +1402,13 @@ impl TcpListenerManager {
(bytes_in, bytes_out)
}
}
/// Safety net: cancel and abort all listener tasks if dropped without graceful_stop().
impl Drop for TcpListenerManager {
fn drop(&mut self) {
self.cancel_token.cancel();
for (_, handle) in self.listeners.drain() {
handle.abort();
}
}
}

View File

@@ -51,6 +51,7 @@ use rustproxy_passthrough::{TcpListenerManager, TlsCertConfig, ConnectionConfig}
use rustproxy_metrics::{MetricsCollector, Metrics, Statistics};
use rustproxy_tls::{CertManager, CertStore, CertBundle, CertMetadata, CertSource};
use rustproxy_nftables::{NftManager, rule_builder};
use tokio_util::sync::CancellationToken;
/// Certificate status.
#[derive(Debug, Clone)]
@@ -79,6 +80,8 @@ pub struct RustProxy {
socket_handler_relay: Arc<std::sync::RwLock<Option<String>>>,
/// Dynamically loaded certificates (via loadCertificate IPC), independent of CertManager.
loaded_certs: HashMap<String, TlsCertConfig>,
/// Cancellation token for cooperative shutdown of background tasks.
cancel_token: CancellationToken,
}
impl RustProxy {
@@ -121,6 +124,7 @@ impl RustProxy {
started_at: None,
socket_handler_relay: Arc::new(std::sync::RwLock::new(None)),
loaded_certs: HashMap::new(),
cancel_token: CancellationToken::new(),
})
}
@@ -299,18 +303,26 @@ impl RustProxy {
self.started = true;
self.started_at = Some(Instant::now());
// Start the throughput sampling task
// Start the throughput sampling task with cooperative cancellation
let metrics = Arc::clone(&self.metrics);
let conn_tracker = self.listener_manager.as_ref().unwrap().conn_tracker().clone();
let interval_ms = self.options.metrics.as_ref()
.and_then(|m| m.sample_interval_ms)
.unwrap_or(1000);
let sampling_cancel = self.cancel_token.clone();
self.sampling_handle = Some(tokio::spawn(async move {
let mut interval = tokio::time::interval(
std::time::Duration::from_millis(interval_ms)
);
loop {
interval.tick().await;
metrics.sample_all();
tokio::select! {
_ = sampling_cancel.cancelled() => break,
_ = interval.tick() => {
metrics.sample_all();
// Periodically clean up stale rate-limit timestamp entries
conn_tracker.cleanup_stale_timestamps();
}
}
}
}));
@@ -457,51 +469,59 @@ impl RustProxy {
.unwrap_or(80);
let interval = std::time::Duration::from_secs(check_interval_hours as u64 * 3600);
let renewal_cancel = self.cancel_token.clone();
let handle = tokio::spawn(async move {
loop {
tokio::time::sleep(interval).await;
debug!("Certificate renewal check triggered (interval: {}h)", check_interval_hours);
tokio::select! {
_ = renewal_cancel.cancelled() => {
debug!("Renewal timer shutting down");
break;
}
_ = tokio::time::sleep(interval) => {
debug!("Certificate renewal check triggered (interval: {}h)", check_interval_hours);
// Check which domains need renewal
let domains = {
let cm = cm_arc.lock().await;
cm.check_renewals()
};
// Check which domains need renewal
let domains = {
let cm = cm_arc.lock().await;
cm.check_renewals()
};
if domains.is_empty() {
debug!("No certificates need renewal");
continue;
}
info!("Renewing {} certificate(s)", domains.len());
// Start challenge server for renewals
let mut cs = challenge_server::ChallengeServer::new();
if let Err(e) = cs.start(acme_port).await {
error!("Failed to start challenge server for renewal: {}", e);
continue;
}
for domain in &domains {
let cs_ref = &cs;
let mut cm = cm_arc.lock().await;
let result = cm.renew_domain(domain, |token, key_auth| {
cs_ref.set_challenge(token, key_auth);
async {}
}).await;
match result {
Ok(_bundle) => {
info!("Successfully renewed certificate for {}", domain);
if domains.is_empty() {
debug!("No certificates need renewal");
continue;
}
Err(e) => {
error!("Failed to renew certificate for {}: {}", domain, e);
info!("Renewing {} certificate(s)", domains.len());
// Start challenge server for renewals
let mut cs = challenge_server::ChallengeServer::new();
if let Err(e) = cs.start(acme_port).await {
error!("Failed to start challenge server for renewal: {}", e);
continue;
}
for domain in &domains {
let cs_ref = &cs;
let mut cm = cm_arc.lock().await;
let result = cm.renew_domain(domain, |token, key_auth| {
cs_ref.set_challenge(token, key_auth);
async {}
}).await;
match result {
Ok(_bundle) => {
info!("Successfully renewed certificate for {}", domain);
}
Err(e) => {
error!("Failed to renew certificate for {}: {}", domain, e);
}
}
}
cs.stop().await;
}
}
cs.stop().await;
}
});
@@ -516,14 +536,17 @@ impl RustProxy {
info!("Stopping RustProxy...");
// Stop sampling task
// Signal all background tasks to stop cooperatively
self.cancel_token.cancel();
// Await sampling task (cooperative shutdown)
if let Some(handle) = self.sampling_handle.take() {
handle.abort();
let _ = handle.await;
}
// Stop renewal timer
// Await renewal timer (cooperative shutdown)
if let Some(handle) = self.renewal_handle.take() {
handle.abort();
let _ = handle.await;
}
// Stop challenge server if running
@@ -545,6 +568,8 @@ impl RustProxy {
}
self.listener_manager = None;
self.started = false;
// Reset cancel token so proxy can be restarted
self.cancel_token = CancellationToken::new();
info!("RustProxy stopped");
Ok(())
@@ -585,6 +610,8 @@ impl RustProxy {
// Update listener manager
if let Some(ref mut listener) = self.listener_manager {
listener.update_route_manager(Arc::clone(&new_manager));
// Prune HTTP proxy caches (rate limiters, regex cache, round-robin counters)
listener.prune_http_proxy_caches(&active_route_ids);
// Update TLS configs
let mut tls_configs = Self::extract_tls_configs(&routes);
@@ -983,3 +1010,21 @@ impl RustProxy {
configs
}
}
/// Safety net: abort background tasks if RustProxy is dropped without calling stop().
/// Normal shutdown should still use stop() for graceful behavior.
impl Drop for RustProxy {
fn drop(&mut self) {
self.cancel_token.cancel();
if let Some(handle) = self.sampling_handle.take() {
handle.abort();
}
if let Some(handle) = self.renewal_handle.take() {
handle.abort();
}
// Cancel the listener manager's token and abort accept loops
if let Some(ref mut listener) = self.listener_manager {
listener.stop_all();
}
}
}