From 3c24bf659bb8faf2a7a594fe0e2c6936fc998000 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Fri, 27 Mar 2026 22:34:13 +0000 Subject: [PATCH] feat(rustproxy-passthrough): add selective connection recycling for route, security, and certificate updates --- changelog.md | 8 + .../src/connection_registry.rs | 329 ++++++++++++++++++ rust/crates/rustproxy-passthrough/src/lib.rs | 2 + .../rustproxy-passthrough/src/quic_handler.rs | 38 +- .../rustproxy-passthrough/src/tcp_listener.rs | 51 ++- .../rustproxy-passthrough/src/udp_listener.rs | 18 + .../rustproxy-routing/src/route_manager.rs | 5 + rust/crates/rustproxy/src/lib.rs | 69 +++- ts/00_commitinfo_data.ts | 2 +- 9 files changed, 514 insertions(+), 8 deletions(-) create mode 100644 rust/crates/rustproxy-passthrough/src/connection_registry.rs diff --git a/changelog.md b/changelog.md index 22eaf95..ff13ff2 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # Changelog +## 2026-03-27 - 27.1.0 - feat(rustproxy-passthrough) +add selective connection recycling for route, security, and certificate updates + +- introduce a shared connection registry to track active TCP and QUIC connections by route, source IP, and domain +- recycle only affected connections when route actions or security rules change instead of broadly invalidating traffic +- gracefully recycle existing connections when TLS certificates change for a domain +- apply route-level IP security checks to QUIC connections and share route cancellation state with UDP listeners + ## 2026-03-26 - 27.0.0 - BREAKING CHANGE(smart-proxy) remove route helper APIs and standardize route configuration on plain route objects diff --git a/rust/crates/rustproxy-passthrough/src/connection_registry.rs b/rust/crates/rustproxy-passthrough/src/connection_registry.rs new file mode 100644 index 0000000..ee360ad --- /dev/null +++ b/rust/crates/rustproxy-passthrough/src/connection_registry.rs @@ -0,0 +1,329 @@ +//! Shared connection registry for selective connection recycling. +//! +//! Tracks active connections across both TCP and QUIC with metadata +//! (source IP, SNI domain, route ID, cancel token) so that connections +//! can be selectively recycled when certificates, security rules, or +//! route targets change. + +use std::collections::HashSet; +use std::net::IpAddr; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use dashmap::DashMap; +use tokio_util::sync::CancellationToken; +use tracing::info; + +use rustproxy_config::RouteSecurity; +use rustproxy_http::request_filter::RequestFilter; +use rustproxy_routing::matchers::domain_matches; + +/// Metadata about an active connection. +pub struct ConnectionEntry { + /// Per-connection cancel token (child of per-route token). + pub cancel: CancellationToken, + /// Client source IP. + pub source_ip: IpAddr, + /// SNI domain from TLS handshake (None for non-TLS connections). + pub domain: Option, + /// Route ID this connection was matched to (None if route has no ID). + pub route_id: Option, +} + +/// Transport-agnostic registry of active connections. +/// +/// Used by both `TcpListenerManager` and `UdpListenerManager` to track +/// connections and enable selective recycling on config changes. +pub struct ConnectionRegistry { + connections: DashMap, + next_id: AtomicU64, +} + +impl ConnectionRegistry { + pub fn new() -> Self { + Self { + connections: DashMap::new(), + next_id: AtomicU64::new(1), + } + } + + /// Register a connection and return its ID + RAII guard. + /// + /// The guard automatically removes the connection from the registry on drop. + pub fn register(self: &Arc, entry: ConnectionEntry) -> (u64, ConnectionRegistryGuard) { + let id = self.next_id.fetch_add(1, Ordering::Relaxed); + self.connections.insert(id, entry); + let guard = ConnectionRegistryGuard { + registry: Arc::clone(self), + conn_id: id, + }; + (id, guard) + } + + /// Number of tracked connections (for metrics/debugging). + pub fn len(&self) -> usize { + self.connections.len() + } + + /// Recycle connections whose SNI domain matches a renewed certificate domain. + /// + /// Uses bidirectional domain matching so that: + /// - Cert `*.example.com` recycles connections for `sub.example.com` + /// - Cert `sub.example.com` recycles connections on routes with `*.example.com` + pub fn recycle_for_cert_change(&self, cert_domain: &str) { + let mut recycled = 0u64; + self.connections.retain(|_, entry| { + let matches = entry.domain.as_deref() + .map(|d| domain_matches(cert_domain, d) || domain_matches(d, cert_domain)) + .unwrap_or(false); + if matches { + entry.cancel.cancel(); + recycled += 1; + false + } else { + true + } + }); + if recycled > 0 { + info!( + "Recycled {} connection(s) for cert change on domain '{}'", + recycled, cert_domain + ); + } + } + + /// Recycle connections on a route whose security config changed. + /// + /// Re-evaluates each connection's source IP against the new security rules. + /// Only connections from now-blocked IPs are terminated; allowed IPs are undisturbed. + pub fn recycle_for_security_change(&self, route_id: &str, new_security: &RouteSecurity) { + let mut recycled = 0u64; + self.connections.retain(|_, entry| { + if entry.route_id.as_deref() == Some(route_id) { + if !RequestFilter::check_ip_security(new_security, &entry.source_ip) { + info!( + "Terminating connection from {} — IP now blocked on route '{}'", + entry.source_ip, route_id + ); + entry.cancel.cancel(); + recycled += 1; + return false; + } + } + true + }); + if recycled > 0 { + info!( + "Recycled {} connection(s) for security change on route '{}'", + recycled, route_id + ); + } + } + + /// Recycle all connections on a route (e.g., when targets changed). + pub fn recycle_for_route_change(&self, route_id: &str) { + let mut recycled = 0u64; + self.connections.retain(|_, entry| { + if entry.route_id.as_deref() == Some(route_id) { + entry.cancel.cancel(); + recycled += 1; + false + } else { + true + } + }); + if recycled > 0 { + info!( + "Recycled {} connection(s) for config change on route '{}'", + recycled, route_id + ); + } + } + + /// Remove connections on routes that no longer exist. + /// + /// This complements per-route CancellationToken cancellation — + /// the token cascade handles graceful shutdown, this cleans up the registry. + pub fn cleanup_removed_routes(&self, active_route_ids: &HashSet) { + self.connections.retain(|_, entry| { + match &entry.route_id { + Some(id) => active_route_ids.contains(id), + None => true, // keep connections without a route ID + } + }); + } +} + +/// RAII guard that removes a connection from the registry on drop. +pub struct ConnectionRegistryGuard { + registry: Arc, + conn_id: u64, +} + +impl Drop for ConnectionRegistryGuard { + fn drop(&mut self) { + self.registry.connections.remove(&self.conn_id); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_registry() -> Arc { + Arc::new(ConnectionRegistry::new()) + } + + #[test] + fn test_register_and_guard_cleanup() { + let reg = make_registry(); + let token = CancellationToken::new(); + let entry = ConnectionEntry { + cancel: token.clone(), + source_ip: "10.0.0.1".parse().unwrap(), + domain: Some("example.com".to_string()), + route_id: Some("route-1".to_string()), + }; + let (id, guard) = reg.register(entry); + assert_eq!(reg.len(), 1); + assert!(reg.connections.contains_key(&id)); + + drop(guard); + assert_eq!(reg.len(), 0); + assert!(!token.is_cancelled()); + } + + #[test] + fn test_recycle_for_cert_change_exact() { + let reg = make_registry(); + let t1 = CancellationToken::new(); + let t2 = CancellationToken::new(); + let (_, _g1) = reg.register(ConnectionEntry { + cancel: t1.clone(), + source_ip: "10.0.0.1".parse().unwrap(), + domain: Some("api.example.com".to_string()), + route_id: Some("r1".to_string()), + }); + let (_, _g2) = reg.register(ConnectionEntry { + cancel: t2.clone(), + source_ip: "10.0.0.2".parse().unwrap(), + domain: Some("other.com".to_string()), + route_id: Some("r2".to_string()), + }); + + reg.recycle_for_cert_change("api.example.com"); + assert!(t1.is_cancelled()); + assert!(!t2.is_cancelled()); + // Registry retains unmatched entry (guard still alive keeps it too, + // but the retain removed the matched one before guard could) + } + + #[test] + fn test_recycle_for_cert_change_wildcard() { + let reg = make_registry(); + let t1 = CancellationToken::new(); + let t2 = CancellationToken::new(); + let (_, _g1) = reg.register(ConnectionEntry { + cancel: t1.clone(), + source_ip: "10.0.0.1".parse().unwrap(), + domain: Some("sub.example.com".to_string()), + route_id: Some("r1".to_string()), + }); + let (_, _g2) = reg.register(ConnectionEntry { + cancel: t2.clone(), + source_ip: "10.0.0.2".parse().unwrap(), + domain: Some("other.com".to_string()), + route_id: Some("r2".to_string()), + }); + + // Wildcard cert should match subdomain + reg.recycle_for_cert_change("*.example.com"); + assert!(t1.is_cancelled()); + assert!(!t2.is_cancelled()); + } + + #[test] + fn test_recycle_for_security_change() { + let reg = make_registry(); + let t1 = CancellationToken::new(); + let t2 = CancellationToken::new(); + let (_, _g1) = reg.register(ConnectionEntry { + cancel: t1.clone(), + source_ip: "10.0.0.1".parse().unwrap(), + domain: None, + route_id: Some("r1".to_string()), + }); + let (_, _g2) = reg.register(ConnectionEntry { + cancel: t2.clone(), + source_ip: "10.0.0.2".parse().unwrap(), + domain: None, + route_id: Some("r1".to_string()), + }); + + // Block 10.0.0.1, allow 10.0.0.2 + let security = RouteSecurity { + ip_allow_list: None, + ip_block_list: Some(vec!["10.0.0.1".to_string()]), + max_connections: None, + authentication: None, + rate_limit: None, + basic_auth: None, + jwt_auth: None, + }; + + reg.recycle_for_security_change("r1", &security); + assert!(t1.is_cancelled()); + assert!(!t2.is_cancelled()); + } + + #[test] + fn test_recycle_for_route_change() { + let reg = make_registry(); + let t1 = CancellationToken::new(); + let t2 = CancellationToken::new(); + let (_, _g1) = reg.register(ConnectionEntry { + cancel: t1.clone(), + source_ip: "10.0.0.1".parse().unwrap(), + domain: None, + route_id: Some("r1".to_string()), + }); + let (_, _g2) = reg.register(ConnectionEntry { + cancel: t2.clone(), + source_ip: "10.0.0.2".parse().unwrap(), + domain: None, + route_id: Some("r2".to_string()), + }); + + reg.recycle_for_route_change("r1"); + assert!(t1.is_cancelled()); + assert!(!t2.is_cancelled()); + } + + #[test] + fn test_cleanup_removed_routes() { + let reg = make_registry(); + let t1 = CancellationToken::new(); + let t2 = CancellationToken::new(); + let (_, _g1) = reg.register(ConnectionEntry { + cancel: t1.clone(), + source_ip: "10.0.0.1".parse().unwrap(), + domain: None, + route_id: Some("active".to_string()), + }); + let (_, _g2) = reg.register(ConnectionEntry { + cancel: t2.clone(), + source_ip: "10.0.0.2".parse().unwrap(), + domain: None, + route_id: Some("removed".to_string()), + }); + + let mut active = HashSet::new(); + active.insert("active".to_string()); + reg.cleanup_removed_routes(&active); + + // "removed" route entry was cleaned from registry + // (but guard is still alive so len may differ — the retain already removed it) + assert!(!t1.is_cancelled()); // not cancelled by cleanup, only by token cascade + assert!(!t2.is_cancelled()); // cleanup doesn't cancel, just removes from registry + } +} diff --git a/rust/crates/rustproxy-passthrough/src/lib.rs b/rust/crates/rustproxy-passthrough/src/lib.rs index 28424a7..7c10e9d 100644 --- a/rust/crates/rustproxy-passthrough/src/lib.rs +++ b/rust/crates/rustproxy-passthrough/src/lib.rs @@ -10,6 +10,7 @@ pub mod forwarder; pub mod proxy_protocol; pub mod tls_handler; pub mod connection_tracker; +pub mod connection_registry; pub mod socket_opts; pub mod udp_session; pub mod udp_listener; @@ -21,6 +22,7 @@ pub use forwarder::*; pub use proxy_protocol::*; pub use tls_handler::*; pub use connection_tracker::*; +pub use connection_registry::*; pub use socket_opts::*; pub use udp_session::*; pub use udp_listener::*; diff --git a/rust/crates/rustproxy-passthrough/src/quic_handler.rs b/rust/crates/rustproxy-passthrough/src/quic_handler.rs index bc33476..6df497f 100644 --- a/rust/crates/rustproxy-passthrough/src/quic_handler.rs +++ b/rust/crates/rustproxy-passthrough/src/quic_handler.rs @@ -30,6 +30,7 @@ use rustproxy_routing::{MatchContext, RouteManager}; use rustproxy_http::h3_service::H3ProxyService; use crate::connection_tracker::ConnectionTracker; +use crate::connection_registry::{ConnectionEntry, ConnectionRegistry}; /// Create a QUIC server endpoint on the given port with the provided TLS config. /// @@ -350,6 +351,8 @@ pub async fn quic_accept_loop( cancel: CancellationToken, h3_service: Option>, real_client_map: Option>>, + route_cancels: Arc>, + connection_registry: Arc, ) { loop { let incoming = tokio::select! { @@ -406,17 +409,48 @@ pub async fn quic_accept_loop( } }; + // Check route-level IP security (previously missing for QUIC) + if let Some(ref security) = route.security { + if !rustproxy_http::request_filter::RequestFilter::check_ip_security( + security, &ip, + ) { + debug!("QUIC connection from {} blocked by route security", real_addr); + continue; + } + } + conn_tracker.connection_opened(&ip); let route_id = route.name.clone().or(route.id.clone()); metrics.connection_opened(route_id.as_deref(), Some(&ip_str)); + // Resolve per-route cancel token (child of global cancel) + let route_cancel = match route_id.as_deref() { + Some(id) => route_cancels.entry(id.to_string()) + .or_insert_with(|| cancel.child_token()) + .clone(), + None => cancel.child_token(), + }; + // Per-connection child token for selective recycling + let conn_cancel = route_cancel.child_token(); + + // Register in connection registry + let registry = Arc::clone(&connection_registry); + let reg_entry = ConnectionEntry { + cancel: conn_cancel.clone(), + source_ip: ip, + domain: None, // QUIC Initial is encrypted, domain comes later via H3 :authority + route_id: route_id.clone(), + }; + let metrics = Arc::clone(&metrics); let conn_tracker = Arc::clone(&conn_tracker); - let cancel = cancel.child_token(); let h3_svc = h3_service.clone(); let real_client_addr = if real_addr != remote_addr { Some(real_addr) } else { None }; tokio::spawn(async move { + // Register in connection registry (RAII guard removes on drop) + let (_conn_id, _registry_guard) = registry.register(reg_entry); + // RAII guard: ensures metrics/tracker cleanup even on panic struct QuicConnGuard { tracker: Arc, @@ -439,7 +473,7 @@ pub async fn quic_accept_loop( route_id, }; - match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel, h3_svc, real_client_addr).await { + match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &conn_cancel, h3_svc, real_client_addr).await { Ok(()) => debug!("QUIC connection from {} completed", real_addr), Err(e) => debug!("QUIC connection from {} error: {}", real_addr, e), } diff --git a/rust/crates/rustproxy-passthrough/src/tcp_listener.rs b/rust/crates/rustproxy-passthrough/src/tcp_listener.rs index 1eee236..26f82d1 100644 --- a/rust/crates/rustproxy-passthrough/src/tcp_listener.rs +++ b/rust/crates/rustproxy-passthrough/src/tcp_listener.rs @@ -16,6 +16,7 @@ use crate::sni_parser; use crate::forwarder; use crate::tls_handler; use crate::connection_tracker::ConnectionTracker; +use crate::connection_registry::{ConnectionEntry, ConnectionRegistry}; use crate::socket_opts; /// RAII guard that decrements the active connection metric on drop. @@ -166,6 +167,8 @@ pub struct TcpListenerManager { /// Per-route cancellation tokens (child of cancel_token). /// When a route is removed, its token is cancelled, terminating all connections on that route. route_cancels: Arc>, + /// Shared connection registry for selective recycling on config changes. + connection_registry: Arc, } impl TcpListenerManager { @@ -205,6 +208,7 @@ impl TcpListenerManager { socket_handler_relay: Arc::new(std::sync::RwLock::new(None)), conn_semaphore: Arc::new(tokio::sync::Semaphore::new(max_conns)), route_cancels: Arc::new(DashMap::new()), + connection_registry: Arc::new(ConnectionRegistry::new()), } } @@ -244,6 +248,7 @@ impl TcpListenerManager { socket_handler_relay: Arc::new(std::sync::RwLock::new(None)), conn_semaphore: Arc::new(tokio::sync::Semaphore::new(max_conns)), route_cancels: Arc::new(DashMap::new()), + connection_registry: Arc::new(ConnectionRegistry::new()), } } @@ -328,12 +333,13 @@ impl TcpListenerManager { let relay = Arc::clone(&self.socket_handler_relay); let semaphore = Arc::clone(&self.conn_semaphore); let route_cancels = Arc::clone(&self.route_cancels); + let connection_registry = Arc::clone(&self.connection_registry); let handle = tokio::spawn(async move { Self::accept_loop( listener, port, route_manager_swap, metrics, tls_configs, shared_tls_acceptor, http_proxy, conn_config, conn_tracker, cancel, relay, - semaphore, route_cancels, + semaphore, route_cancels, connection_registry, ).await; }); @@ -446,6 +452,16 @@ impl TcpListenerManager { &self.metrics } + /// Get a reference to the shared connection registry. + pub fn connection_registry(&self) -> &Arc { + &self.connection_registry + } + + /// Get a reference to the per-route cancellation tokens. + pub fn route_cancels(&self) -> &Arc> { + &self.route_cancels + } + /// Accept loop for a single port. async fn accept_loop( listener: TcpListener, @@ -461,6 +477,7 @@ impl TcpListenerManager { socket_handler_relay: Arc>>, conn_semaphore: Arc, route_cancels: Arc>, + connection_registry: Arc, ) { loop { tokio::select! { @@ -514,6 +531,7 @@ impl TcpListenerManager { let cn = cancel.clone(); let sr = Arc::clone(&socket_handler_relay); let rc = Arc::clone(&route_cancels); + let cr = Arc::clone(&connection_registry); debug!("Accepted connection from {} on port {}", peer_addr, port); tokio::spawn(async move { @@ -522,7 +540,7 @@ impl TcpListenerManager { // RAII guard ensures connection_closed is called on all paths let _ct_guard = ConnectionTrackerGuard::new(ct, ip); let result = Self::handle_connection( - stream, port, peer_addr, rm, m, tc, sa, hp, cc, cn, sr, rc, + stream, port, peer_addr, rm, m, tc, sa, hp, cc, cn, sr, rc, cr, ).await; if let Err(e) = result { warn!("Connection error from {}: {}", peer_addr, e); @@ -553,6 +571,7 @@ impl TcpListenerManager { cancel: CancellationToken, socket_handler_relay: Arc>>, route_cancels: Arc>, + connection_registry: Arc, ) -> Result<(), Box> { use tokio::io::AsyncReadExt; @@ -672,12 +691,24 @@ impl TcpListenerManager { let route_id = quick_match.route.id.as_deref(); // Resolve per-route cancel token (child of global cancel) - let conn_cancel = match route_id { + let route_cancel = match route_id { Some(id) => route_cancels.entry(id.to_string()) .or_insert_with(|| cancel.child_token()) .clone(), None => cancel.clone(), }; + // Per-connection child token for selective recycling + let conn_cancel = route_cancel.child_token(); + + // Register in connection registry for selective recycling + let (_conn_id, _registry_guard) = connection_registry.register( + ConnectionEntry { + cancel: conn_cancel.clone(), + source_ip: peer_addr.ip(), + domain: None, // fast path has no domain + route_id: route_id.map(|s| s.to_string()), + }, + ); // Check route-level IP security if let Some(ref security) = quick_match.route.security { @@ -852,12 +883,24 @@ impl TcpListenerManager { // Resolve per-route cancel token (child of global cancel). // When this route is removed via updateRoutes, the token is cancelled, // terminating all connections on this route. - let cancel = match route_id { + let route_cancel = match route_id { Some(id) => route_cancels.entry(id.to_string()) .or_insert_with(|| cancel.child_token()) .clone(), None => cancel, }; + // Per-connection child token for selective recycling + let cancel = route_cancel.child_token(); + + // Register in connection registry for selective recycling + let (_conn_id, _registry_guard) = connection_registry.register( + ConnectionEntry { + cancel: cancel.clone(), + source_ip: peer_addr.ip(), + domain: domain.clone(), + route_id: route_id.map(|s| s.to_string()), + }, + ); // Check route-level IP security for passthrough connections if let Some(ref security) = route_match.route.security { diff --git a/rust/crates/rustproxy-passthrough/src/udp_listener.rs b/rust/crates/rustproxy-passthrough/src/udp_listener.rs index f7e9d08..31b290c 100644 --- a/rust/crates/rustproxy-passthrough/src/udp_listener.rs +++ b/rust/crates/rustproxy-passthrough/src/udp_listener.rs @@ -28,6 +28,8 @@ use rustproxy_routing::{MatchContext, RouteManager}; use rustproxy_http::h3_service::H3ProxyService; +use crate::connection_registry::ConnectionRegistry; + use crate::connection_tracker::ConnectionTracker; use crate::udp_session::{SessionKey, UdpSession, UdpSessionConfig, UdpSessionTable}; @@ -56,6 +58,10 @@ pub struct UdpListenerManager { /// Trusted proxy IPs that may send PROXY protocol v2 headers. /// When non-empty, PROXY v2 detection is enabled on both raw UDP and QUIC paths. proxy_ips: Arc>, + /// Per-route cancellation tokens (shared with TcpListenerManager). + route_cancels: Arc>, + /// Shared connection registry for selective recycling. + connection_registry: Arc, } impl Drop for UdpListenerManager { @@ -76,6 +82,8 @@ impl UdpListenerManager { metrics: Arc, conn_tracker: Arc, cancel_token: CancellationToken, + route_cancels: Arc>, + connection_registry: Arc, ) -> Self { Self { listeners: HashMap::new(), @@ -89,6 +97,8 @@ impl UdpListenerManager { relay_reader_cancel: None, h3_service: None, proxy_ips: Arc::new(Vec::new()), + route_cancels, + connection_registry, } } @@ -152,6 +162,8 @@ impl UdpListenerManager { self.cancel_token.child_token(), self.h3_service.clone(), None, + Arc::clone(&self.route_cancels), + Arc::clone(&self.connection_registry), )); self.listeners.insert(port, (handle, Some(endpoint_for_updates))); info!("QUIC endpoint started on port {}", port); @@ -173,6 +185,8 @@ impl UdpListenerManager { self.cancel_token.child_token(), self.h3_service.clone(), Some(relay.real_client_map), + Arc::clone(&self.route_cancels), + Arc::clone(&self.connection_registry), )); self.listeners.insert(port, (handle, Some(endpoint_for_updates))); info!("QUIC endpoint with PROXY relay started on port {}", port); @@ -356,6 +370,8 @@ impl UdpListenerManager { self.cancel_token.child_token(), self.h3_service.clone(), None, + Arc::clone(&self.route_cancels), + Arc::clone(&self.connection_registry), )); self.listeners.insert(port, (handle, Some(endpoint_for_updates))); Ok(()) @@ -379,6 +395,8 @@ impl UdpListenerManager { self.cancel_token.child_token(), self.h3_service.clone(), Some(relay.real_client_map), + Arc::clone(&self.route_cancels), + Arc::clone(&self.connection_registry), )); self.listeners.insert(port, (handle, Some(endpoint_for_updates))); Ok(()) diff --git a/rust/crates/rustproxy-routing/src/route_manager.rs b/rust/crates/rustproxy-routing/src/route_manager.rs index c70faa1..4d0ef8c 100644 --- a/rust/crates/rustproxy-routing/src/route_manager.rs +++ b/rust/crates/rustproxy-routing/src/route_manager.rs @@ -281,6 +281,11 @@ impl RouteManager { .unwrap_or_default() } + /// Get all enabled routes. + pub fn routes(&self) -> &[RouteConfig] { + &self.routes + } + /// Get the total number of enabled routes. pub fn route_count(&self) -> usize { self.routes.len() diff --git a/rust/crates/rustproxy/src/lib.rs b/rust/crates/rustproxy/src/lib.rs index c2e1111..adf0eb9 100644 --- a/rust/crates/rustproxy/src/lib.rs +++ b/rust/crates/rustproxy/src/lib.rs @@ -356,12 +356,17 @@ impl RustProxy { // Bind UDP ports (if any) if !udp_ports.is_empty() { - let conn_tracker = self.listener_manager.as_ref().unwrap().conn_tracker().clone(); + let tcp_mgr = self.listener_manager.as_ref().unwrap(); + let conn_tracker = tcp_mgr.conn_tracker().clone(); + let route_cancels = tcp_mgr.route_cancels().clone(); + let connection_registry = tcp_mgr.connection_registry().clone(); let mut udp_mgr = UdpListenerManager::new( Arc::clone(&*self.route_table.load()), Arc::clone(&self.metrics), conn_tracker, self.cancel_token.clone(), + route_cancels, + connection_registry, ); udp_mgr.set_proxy_ips(udp_proxy_ips.clone()); @@ -707,6 +712,9 @@ impl RustProxy { .collect(); self.metrics.retain_backends(&active_backends); + // Capture old route manager for diff-based connection recycling + let old_manager = self.route_table.load_full(); + // Atomically swap the route table let new_manager = Arc::new(new_manager); self.route_table.store(Arc::clone(&new_manager)); @@ -742,9 +750,47 @@ impl RustProxy { listener.update_route_manager(Arc::clone(&new_manager)); // Cancel connections on routes that were removed or disabled listener.invalidate_removed_routes(&active_route_ids); + // Clean up registry entries for removed routes + listener.connection_registry().cleanup_removed_routes(&active_route_ids); // Prune HTTP proxy caches (rate limiters, regex cache, round-robin counters) listener.prune_http_proxy_caches(&active_route_ids); + // Diff-based connection recycling for changed routes + { + let registry = listener.connection_registry(); + for new_route in &routes { + let new_id = match &new_route.id { + Some(id) => id.as_str(), + None => continue, + }; + // Find corresponding old route + let old_route = old_manager.routes().iter().find(|r| { + r.id.as_deref() == Some(new_id) + }); + let old_route = match old_route { + Some(r) => r, + None => continue, // new route, no existing connections to recycle + }; + + // Security diff: re-evaluate existing connections' IPs + let old_sec = serde_json::to_string(&old_route.security).ok(); + let new_sec = serde_json::to_string(&new_route.security).ok(); + if old_sec != new_sec { + if let Some(ref security) = new_route.security { + registry.recycle_for_security_change(new_id, security); + } + // If security removed entirely (became more permissive), no recycling needed + } + + // Action diff (targets, TLS mode, etc.): recycle all connections on route + let old_action = serde_json::to_string(&old_route.action).ok(); + let new_action = serde_json::to_string(&new_route.action).ok(); + if old_action != new_action { + registry.recycle_for_route_change(new_id); + } + } + } + // Add new ports for port in &new_ports { if !old_ports.contains(port) { @@ -787,12 +833,16 @@ impl RustProxy { if self.udp_listener_manager.is_none() { if let Some(ref listener) = self.listener_manager { let conn_tracker = listener.conn_tracker().clone(); + let route_cancels = listener.route_cancels().clone(); + let connection_registry = listener.connection_registry().clone(); let conn_config = Self::build_connection_config(&self.options); let mut udp_mgr = UdpListenerManager::new( Arc::clone(&new_manager), Arc::clone(&self.metrics), conn_tracker, self.cancel_token.clone(), + route_cancels, + connection_registry, ); udp_mgr.set_proxy_ips(conn_config.proxy_ips); self.udp_listener_manager = Some(udp_mgr); @@ -1096,6 +1146,10 @@ impl RustProxy { } /// Load a certificate for a domain and hot-swap the TLS configuration. + /// + /// If the cert PEM differs from the currently loaded cert for this domain, + /// existing connections for the domain are gracefully recycled (GOAWAY for + /// HTTP/2, Connection: close for HTTP/1.1, graceful FIN for TCP). pub async fn load_certificate( &mut self, domain: &str, @@ -1105,6 +1159,12 @@ impl RustProxy { ) -> Result<()> { info!("Loading certificate for domain: {}", domain); + // Check if the cert actually changed (for selective connection recycling) + let cert_changed = self.loaded_certs + .get(domain) + .map(|existing| existing.cert_pem != cert_pem) + .unwrap_or(false); // new domain = no existing connections to recycle + // Store in cert manager if available if let Some(ref cm_arc) = self.cert_manager { let now = std::time::SystemTime::now() @@ -1153,6 +1213,13 @@ impl RustProxy { } } + // Recycle existing connections if cert actually changed + if cert_changed { + if let Some(ref listener) = self.listener_manager { + listener.connection_registry().recycle_for_cert_change(domain); + } + } + info!("Certificate loaded and TLS config updated for {}", domain); Ok(()) } diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 84b4408..93d97af 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '27.0.0', + version: '27.1.0', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' }