fix(rustproxy): upgrade fallback UDP listeners to QUIC when TLS certificates become available
This commit is contained in:
@@ -222,6 +222,117 @@ impl UdpListenerManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Upgrade raw UDP fallback listeners to QUIC endpoints.
|
||||
///
|
||||
/// At startup, if no TLS certs are available, QUIC routes fall back to raw UDP.
|
||||
/// When certs become available later (via loadCertificate IPC or ACME), this method
|
||||
/// stops the raw UDP listener, drains sessions, and creates a proper QUIC endpoint.
|
||||
///
|
||||
/// This is idempotent — ports that already have QUIC endpoints are skipped.
|
||||
pub async fn upgrade_raw_to_quic(&mut self, tls_config: Arc<rustls::ServerConfig>) {
|
||||
// Find ports that are raw UDP fallback (endpoint=None) but have QUIC routes
|
||||
let rm = self.route_manager.load();
|
||||
let upgrade_ports: Vec<u16> = self.listeners.iter()
|
||||
.filter(|(_, (_, endpoint))| endpoint.is_none())
|
||||
.filter(|(port, _)| {
|
||||
rm.routes_for_port(**port).iter().any(|r| {
|
||||
r.action.udp.as_ref()
|
||||
.and_then(|u| u.quic.as_ref())
|
||||
.is_some()
|
||||
})
|
||||
})
|
||||
.map(|(port, _)| *port)
|
||||
.collect();
|
||||
|
||||
for port in upgrade_ports {
|
||||
info!("Upgrading raw UDP listener on port {} to QUIC endpoint", port);
|
||||
|
||||
// Stop the raw UDP listener task and drain sessions to release the socket
|
||||
if let Some((handle, _)) = self.listeners.remove(&port) {
|
||||
handle.abort();
|
||||
}
|
||||
let drained = self.session_table.drain_port(
|
||||
port, &self.metrics, &self.conn_tracker,
|
||||
);
|
||||
if drained > 0 {
|
||||
debug!("Drained {} UDP sessions on port {} for QUIC upgrade", drained, port);
|
||||
}
|
||||
|
||||
// Brief yield to let aborted tasks drop their socket references
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
// Create QUIC endpoint on the now-free port
|
||||
match crate::quic_handler::create_quic_endpoint(port, Arc::clone(&tls_config)) {
|
||||
Ok(endpoint) => {
|
||||
let endpoint_for_updates = endpoint.clone();
|
||||
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
|
||||
endpoint,
|
||||
port,
|
||||
Arc::clone(&self.route_manager),
|
||||
Arc::clone(&self.metrics),
|
||||
Arc::clone(&self.conn_tracker),
|
||||
self.cancel_token.child_token(),
|
||||
self.h3_service.clone(),
|
||||
));
|
||||
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||
info!("QUIC endpoint started on port {} (upgraded from raw UDP)", port);
|
||||
}
|
||||
Err(e) => {
|
||||
// Port may still be held — retry once after a brief delay
|
||||
warn!("QUIC endpoint creation failed on port {}, retrying: {}", port, e);
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
|
||||
match crate::quic_handler::create_quic_endpoint(port, Arc::clone(&tls_config)) {
|
||||
Ok(endpoint) => {
|
||||
let endpoint_for_updates = endpoint.clone();
|
||||
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
|
||||
endpoint,
|
||||
port,
|
||||
Arc::clone(&self.route_manager),
|
||||
Arc::clone(&self.metrics),
|
||||
Arc::clone(&self.conn_tracker),
|
||||
self.cancel_token.child_token(),
|
||||
self.h3_service.clone(),
|
||||
));
|
||||
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||
info!("QUIC endpoint started on port {} (upgraded from raw UDP, retry)", port);
|
||||
}
|
||||
Err(e2) => {
|
||||
error!("Failed to upgrade port {} to QUIC after retry: {}. \
|
||||
Rebinding as raw UDP.", port, e2);
|
||||
// Fallback: rebind as raw UDP so the port isn't dead
|
||||
if let Ok(()) = self.rebind_raw_udp(port).await {
|
||||
warn!("Port {} rebound as raw UDP (QUIC upgrade failed)", port);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Rebind a port as a raw UDP listener (fallback when QUIC upgrade fails).
|
||||
async fn rebind_raw_udp(&mut self, port: u16) -> anyhow::Result<()> {
|
||||
let addr: std::net::SocketAddr = ([0, 0, 0, 0], port).into();
|
||||
let socket = UdpSocket::bind(addr).await?;
|
||||
let socket = Arc::new(socket);
|
||||
|
||||
let handle = tokio::spawn(Self::recv_loop(
|
||||
socket,
|
||||
port,
|
||||
Arc::clone(&self.route_manager),
|
||||
Arc::clone(&self.metrics),
|
||||
Arc::clone(&self.conn_tracker),
|
||||
Arc::clone(&self.session_table),
|
||||
Arc::clone(&self.datagram_handler_relay),
|
||||
Arc::clone(&self.relay_writer),
|
||||
self.cancel_token.child_token(),
|
||||
));
|
||||
|
||||
self.listeners.insert(port, (handle, None));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set the datagram handler relay socket path and establish connection.
|
||||
pub async fn set_datagram_handler_relay(&mut self, path: String) {
|
||||
// Cancel previous relay reader task if any
|
||||
|
||||
@@ -201,6 +201,36 @@ impl UdpSessionTable {
|
||||
removed
|
||||
}
|
||||
|
||||
/// Drain all sessions on a given listening port, releasing socket references.
|
||||
/// Used when upgrading a raw UDP listener to QUIC — the raw UDP socket's
|
||||
/// Arc refcount must drop to zero so the port can be rebound.
|
||||
pub fn drain_port(
|
||||
&self,
|
||||
port: u16,
|
||||
metrics: &MetricsCollector,
|
||||
conn_tracker: &ConnectionTracker,
|
||||
) -> usize {
|
||||
let keys: Vec<SessionKey> = self.sessions.iter()
|
||||
.filter(|entry| entry.key().1 == port)
|
||||
.map(|entry| *entry.key())
|
||||
.collect();
|
||||
|
||||
let mut removed = 0;
|
||||
for key in keys {
|
||||
if let Some(session) = self.remove(&key) {
|
||||
session.cancel.cancel();
|
||||
conn_tracker.connection_closed(&session.source_ip);
|
||||
metrics.connection_closed(
|
||||
session.route_id.as_deref(),
|
||||
Some(&session.source_ip.to_string()),
|
||||
);
|
||||
metrics.udp_session_closed();
|
||||
removed += 1;
|
||||
}
|
||||
}
|
||||
removed
|
||||
}
|
||||
|
||||
/// Total number of active sessions.
|
||||
pub fn session_count(&self) -> usize {
|
||||
self.sessions.len()
|
||||
|
||||
Reference in New Issue
Block a user