diff --git a/changelog.md b/changelog.md index e74df6a..6720e8d 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-03-19 - 25.16.3 - fix(rustproxy) +upgrade fallback UDP listeners to QUIC when TLS certificates become available + +- Rebuild and apply QUIC TLS configuration during route and certificate updates instead of only when adding new UDP ports. +- Add logic to drain UDP sessions, stop raw fallback listeners, and start QUIC endpoints on existing ports once TLS is available. +- Retry QUIC endpoint creation during upgrade and fall back to rebinding raw UDP if the upgrade cannot complete. + ## 2026-03-19 - 25.16.2 - fix(rustproxy-http) cache backend Alt-Svc only from original upstream responses during protocol auto-detection diff --git a/rust/crates/rustproxy-passthrough/src/udp_listener.rs b/rust/crates/rustproxy-passthrough/src/udp_listener.rs index 3d9acc1..b6225d7 100644 --- a/rust/crates/rustproxy-passthrough/src/udp_listener.rs +++ b/rust/crates/rustproxy-passthrough/src/udp_listener.rs @@ -222,6 +222,117 @@ impl UdpListenerManager { } } + /// Upgrade raw UDP fallback listeners to QUIC endpoints. + /// + /// At startup, if no TLS certs are available, QUIC routes fall back to raw UDP. + /// When certs become available later (via loadCertificate IPC or ACME), this method + /// stops the raw UDP listener, drains sessions, and creates a proper QUIC endpoint. + /// + /// This is idempotent — ports that already have QUIC endpoints are skipped. + pub async fn upgrade_raw_to_quic(&mut self, tls_config: Arc) { + // Find ports that are raw UDP fallback (endpoint=None) but have QUIC routes + let rm = self.route_manager.load(); + let upgrade_ports: Vec = self.listeners.iter() + .filter(|(_, (_, endpoint))| endpoint.is_none()) + .filter(|(port, _)| { + rm.routes_for_port(**port).iter().any(|r| { + r.action.udp.as_ref() + .and_then(|u| u.quic.as_ref()) + .is_some() + }) + }) + .map(|(port, _)| *port) + .collect(); + + for port in upgrade_ports { + info!("Upgrading raw UDP listener on port {} to QUIC endpoint", port); + + // Stop the raw UDP listener task and drain sessions to release the socket + if let Some((handle, _)) = self.listeners.remove(&port) { + handle.abort(); + } + let drained = self.session_table.drain_port( + port, &self.metrics, &self.conn_tracker, + ); + if drained > 0 { + debug!("Drained {} UDP sessions on port {} for QUIC upgrade", drained, port); + } + + // Brief yield to let aborted tasks drop their socket references + tokio::task::yield_now().await; + + // Create QUIC endpoint on the now-free port + match crate::quic_handler::create_quic_endpoint(port, Arc::clone(&tls_config)) { + Ok(endpoint) => { + let endpoint_for_updates = endpoint.clone(); + let handle = tokio::spawn(crate::quic_handler::quic_accept_loop( + endpoint, + port, + Arc::clone(&self.route_manager), + Arc::clone(&self.metrics), + Arc::clone(&self.conn_tracker), + self.cancel_token.child_token(), + self.h3_service.clone(), + )); + self.listeners.insert(port, (handle, Some(endpoint_for_updates))); + info!("QUIC endpoint started on port {} (upgraded from raw UDP)", port); + } + Err(e) => { + // Port may still be held — retry once after a brief delay + warn!("QUIC endpoint creation failed on port {}, retrying: {}", port, e); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + match crate::quic_handler::create_quic_endpoint(port, Arc::clone(&tls_config)) { + Ok(endpoint) => { + let endpoint_for_updates = endpoint.clone(); + let handle = tokio::spawn(crate::quic_handler::quic_accept_loop( + endpoint, + port, + Arc::clone(&self.route_manager), + Arc::clone(&self.metrics), + Arc::clone(&self.conn_tracker), + self.cancel_token.child_token(), + self.h3_service.clone(), + )); + self.listeners.insert(port, (handle, Some(endpoint_for_updates))); + info!("QUIC endpoint started on port {} (upgraded from raw UDP, retry)", port); + } + Err(e2) => { + error!("Failed to upgrade port {} to QUIC after retry: {}. \ + Rebinding as raw UDP.", port, e2); + // Fallback: rebind as raw UDP so the port isn't dead + if let Ok(()) = self.rebind_raw_udp(port).await { + warn!("Port {} rebound as raw UDP (QUIC upgrade failed)", port); + } + } + } + } + } + } + } + + /// Rebind a port as a raw UDP listener (fallback when QUIC upgrade fails). + async fn rebind_raw_udp(&mut self, port: u16) -> anyhow::Result<()> { + let addr: std::net::SocketAddr = ([0, 0, 0, 0], port).into(); + let socket = UdpSocket::bind(addr).await?; + let socket = Arc::new(socket); + + let handle = tokio::spawn(Self::recv_loop( + socket, + port, + Arc::clone(&self.route_manager), + Arc::clone(&self.metrics), + Arc::clone(&self.conn_tracker), + Arc::clone(&self.session_table), + Arc::clone(&self.datagram_handler_relay), + Arc::clone(&self.relay_writer), + self.cancel_token.child_token(), + )); + + self.listeners.insert(port, (handle, None)); + Ok(()) + } + /// Set the datagram handler relay socket path and establish connection. pub async fn set_datagram_handler_relay(&mut self, path: String) { // Cancel previous relay reader task if any diff --git a/rust/crates/rustproxy-passthrough/src/udp_session.rs b/rust/crates/rustproxy-passthrough/src/udp_session.rs index 322403c..f12e9bb 100644 --- a/rust/crates/rustproxy-passthrough/src/udp_session.rs +++ b/rust/crates/rustproxy-passthrough/src/udp_session.rs @@ -201,6 +201,36 @@ impl UdpSessionTable { removed } + /// Drain all sessions on a given listening port, releasing socket references. + /// Used when upgrading a raw UDP listener to QUIC — the raw UDP socket's + /// Arc refcount must drop to zero so the port can be rebound. + pub fn drain_port( + &self, + port: u16, + metrics: &MetricsCollector, + conn_tracker: &ConnectionTracker, + ) -> usize { + let keys: Vec = self.sessions.iter() + .filter(|entry| entry.key().1 == port) + .map(|entry| *entry.key()) + .collect(); + + let mut removed = 0; + for key in keys { + if let Some(session) = self.remove(&key) { + session.cancel.cancel(); + conn_tracker.connection_closed(&session.source_ip); + metrics.connection_closed( + session.route_id.as_deref(), + Some(&session.source_ip.to_string()), + ); + metrics.udp_session_closed(); + removed += 1; + } + } + removed + } + /// Total number of active sessions. pub fn session_count(&self) -> usize { self.sessions.len() diff --git a/rust/crates/rustproxy/src/lib.rs b/rust/crates/rustproxy/src/lib.rs index c322734..a45c58f 100644 --- a/rust/crates/rustproxy/src/lib.rs +++ b/rust/crates/rustproxy/src/lib.rs @@ -783,12 +783,10 @@ impl RustProxy { } } - // Build TLS config for QUIC before taking mutable borrow on udp_mgr - let quic_tls = if new_udp_ports.iter().any(|p| !old_udp_ports.contains(p)) { + // Build TLS config for QUIC (needed for new ports and upgrading existing raw UDP) + let quic_tls = { let tls_configs = self.current_tls_configs().await; Self::build_quic_tls_config(&tls_configs) - } else { - None }; if let Some(ref mut udp_mgr) = self.udp_listener_manager { @@ -806,6 +804,12 @@ impl RustProxy { udp_mgr.remove_port(*port); } } + + // Upgrade existing raw UDP fallback listeners to QUIC if TLS is now available + if let Some(ref quic_config) = quic_tls { + udp_mgr.update_quic_tls(Arc::clone(quic_config)); + udp_mgr.upgrade_raw_to_quic(Arc::clone(quic_config)).await; + } } } else if self.udp_listener_manager.is_some() { // All UDP routes removed — shut down UDP manager @@ -862,12 +866,12 @@ impl RustProxy { .map_err(|e| anyhow::anyhow!("ACME provisioning failed: {}", e))?; // Hot-swap into TLS configs - if let Some(ref mut listener) = self.listener_manager { - let mut tls_configs = Self::extract_tls_configs(&self.options.routes); - tls_configs.insert(domain.clone(), TlsCertConfig { - cert_pem: bundle.cert_pem.clone(), - key_pem: bundle.key_pem.clone(), - }); + let mut tls_configs = Self::extract_tls_configs(&self.options.routes); + tls_configs.insert(domain.clone(), TlsCertConfig { + cert_pem: bundle.cert_pem.clone(), + key_pem: bundle.key_pem.clone(), + }); + { let cm = cm_arc.lock().await; for (d, b) in cm.store().iter() { if !tls_configs.contains_key(d) { @@ -877,9 +881,22 @@ impl RustProxy { }); } } + } + + let quic_tls = Self::build_quic_tls_config(&tls_configs); + + if let Some(ref listener) = self.listener_manager { listener.set_tls_configs(tls_configs); } + // Update existing QUIC endpoints and upgrade raw UDP fallback listeners + if let Some(ref mut udp_mgr) = self.udp_listener_manager { + if let Some(ref quic_config) = quic_tls { + udp_mgr.update_quic_tls(Arc::clone(quic_config)); + udp_mgr.upgrade_raw_to_quic(Arc::clone(quic_config)).await; + } + } + info!("Certificate provisioned and loaded for route '{}'", route_name); Ok(()) } @@ -1104,17 +1121,18 @@ impl RustProxy { // Hot-swap TLS config on TCP and QUIC listeners let tls_configs = self.current_tls_configs().await; + // Build QUIC TLS config before TCP consumes the map + let quic_tls = Self::build_quic_tls_config(&tls_configs); + if let Some(ref listener) = self.listener_manager { - // Build QUIC TLS config before TCP consumes the map - let quic_tls = Self::build_quic_tls_config(&tls_configs); - listener.set_tls_configs(tls_configs); + } - // Also update QUIC endpoints with the new certs - if let Some(ref udp_mgr) = self.udp_listener_manager { - if let Some(quic_config) = quic_tls { - udp_mgr.update_quic_tls(quic_config); - } + // Update existing QUIC endpoints and upgrade raw UDP fallback listeners + if let Some(ref mut udp_mgr) = self.udp_listener_manager { + if let Some(ref quic_config) = quic_tls { + udp_mgr.update_quic_tls(Arc::clone(quic_config)); + udp_mgr.upgrade_raw_to_quic(Arc::clone(quic_config)).await; } } diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index d3e005d..4cf54a3 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '25.16.2', + version: '25.16.3', description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' }