fix(core): harden UDP session handling, QUIC control message validation, and bridge process cleanup

This commit is contained in:
2026-03-27 11:34:31 +00:00
parent 0b2a83ddb6
commit ac993dd5a3
8 changed files with 124 additions and 30 deletions

View File

@@ -954,7 +954,10 @@ fn apply_udp_port_config(
} else {
// New session — allocate stream_id and send UDP_OPEN
let sid = next_stream_id.fetch_add(1, Ordering::Relaxed);
sessions.insert(key, sid);
if sessions.insert(key, sid).is_none() {
log::warn!("UDP session limit reached, dropping datagram from {}", client_addr);
continue;
}
let client_ip = client_addr.ip().to_string();
let client_port = client_addr.port();
@@ -1681,7 +1684,10 @@ fn apply_udp_port_config_quic(
} else {
// New session — send PROXY v2 header via control-style datagram
let sid = next_stream_id.fetch_add(1, Ordering::Relaxed);
sessions.insert(key, sid);
if sessions.insert(key, sid).is_none() {
log::warn!("QUIC UDP session limit reached, dropping datagram from {}", client_addr);
continue;
}
let client_ip = client_addr.ip().to_string();
let client_port = client_addr.port();

View File

@@ -1374,8 +1374,15 @@ async fn handle_edge_connection_quic(
let dgram_edge_id = edge_id.clone();
let dgram_token = edge_token.clone();
let dgram_handle = tokio::spawn(async move {
let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
cleanup_interval.tick().await; // consume initial tick
loop {
tokio::select! {
// Periodic sweep: prune sessions whose task has exited (receiver dropped)
_ = cleanup_interval.tick() => {
let mut s = dgram_sessions.lock().await;
s.retain(|_id, tx| !tx.is_closed());
}
datagram = dgram_conn.read_datagram() => {
match datagram {
Ok(data) => {

View File

@@ -76,6 +76,11 @@ pub async fn write_ctrl_message(
Ok(())
}
/// Maximum size for a QUIC control message payload (64 KB).
/// Control messages (CONFIG, PING, PONG) are small; this guards against
/// a malicious peer sending a crafted length field to trigger OOM.
const MAX_CTRL_MESSAGE_SIZE: usize = 65536;
/// Read a control message from a QUIC recv stream.
/// Returns (msg_type, payload). Returns None on EOF.
pub async fn read_ctrl_message(
@@ -93,6 +98,12 @@ pub async fn read_ctrl_message(
}
let msg_type = header[0];
let len = u32::from_be_bytes([header[1], header[2], header[3], header[4]]) as usize;
if len > MAX_CTRL_MESSAGE_SIZE {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("control message too large: {} bytes (max {})", len, MAX_CTRL_MESSAGE_SIZE),
));
}
let mut payload = vec![0u8; len];
if len > 0 {
recv.read_exact(&mut payload).await.map_err(|e| {

View File

@@ -17,7 +17,7 @@ pub struct UdpSession {
pub last_activity: Instant,
}
/// Manages UDP sessions with idle timeout expiry.
/// Manages UDP sessions with idle timeout expiry and a maximum session count.
pub struct UdpSessionManager {
/// Forward map: session key → session data.
sessions: HashMap<UdpSessionKey, UdpSession>,
@@ -25,14 +25,21 @@ pub struct UdpSessionManager {
by_stream_id: HashMap<u32, UdpSessionKey>,
/// Idle timeout duration.
idle_timeout: std::time::Duration,
/// Maximum number of concurrent sessions (prevents unbounded growth from floods).
max_sessions: usize,
}
impl UdpSessionManager {
pub fn new(idle_timeout: std::time::Duration) -> Self {
Self::with_max_sessions(idle_timeout, 65536)
}
pub fn with_max_sessions(idle_timeout: std::time::Duration, max_sessions: usize) -> Self {
Self {
sessions: HashMap::new(),
by_stream_id: HashMap::new(),
idle_timeout,
max_sessions,
}
}
@@ -57,8 +64,12 @@ impl UdpSessionManager {
Some(session)
}
/// Insert a new session. Returns a mutable reference to it.
pub fn insert(&mut self, key: UdpSessionKey, stream_id: u32) -> &mut UdpSession {
/// Insert a new session. Returns `None` if the session limit has been reached.
pub fn insert(&mut self, key: UdpSessionKey, stream_id: u32) -> Option<&mut UdpSession> {
// Allow re-insertion of existing keys (update), but reject truly new sessions at capacity
if !self.sessions.contains_key(&key) && self.sessions.len() >= self.max_sessions {
return None;
}
let session = UdpSession {
stream_id,
client_addr: key.client_addr,
@@ -66,7 +77,7 @@ impl UdpSessionManager {
last_activity: Instant::now(),
};
self.by_stream_id.insert(stream_id, key);
self.sessions.entry(key).or_insert(session)
Some(self.sessions.entry(key).or_insert(session))
}
/// Remove a session by stream_id.
@@ -118,7 +129,7 @@ mod tests {
fn test_insert_and_lookup() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 1);
assert!(mgr.insert(key, 1).is_some());
assert_eq!(mgr.len(), 1);
assert!(mgr.get_mut(&key).is_some());
@@ -129,7 +140,7 @@ mod tests {
fn test_client_addr_for_stream() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 42);
assert!(mgr.insert(key, 42).is_some());
assert_eq!(mgr.client_addr_for_stream(42), Some(addr(5000)));
assert_eq!(mgr.client_addr_for_stream(99), None);
@@ -139,7 +150,7 @@ mod tests {
fn test_remove_by_stream_id() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 1);
assert!(mgr.insert(key, 1).is_some());
let removed = mgr.remove_by_stream_id(1);
assert!(removed.is_some());
@@ -159,8 +170,8 @@ mod tests {
let mut mgr = UdpSessionManager::new(Duration::from_millis(50));
let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
let key2 = UdpSessionKey { client_addr: addr(5001), dest_port: 53 };
mgr.insert(key1, 1);
mgr.insert(key2, 2);
assert!(mgr.insert(key1, 1).is_some());
assert!(mgr.insert(key2, 2).is_some());
// Nothing expired yet
assert!(mgr.expire_idle().is_empty());
@@ -178,7 +189,7 @@ mod tests {
async fn test_activity_prevents_expiry() {
let mut mgr = UdpSessionManager::new(Duration::from_millis(100));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 1);
assert!(mgr.insert(key, 1).is_some());
// Touch session at 50ms (before 100ms timeout)
tokio::time::sleep(Duration::from_millis(50)).await;
@@ -200,11 +211,35 @@ mod tests {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
let key2 = UdpSessionKey { client_addr: addr(5000), dest_port: 443 };
mgr.insert(key1, 1);
mgr.insert(key2, 2);
assert!(mgr.insert(key1, 1).is_some());
assert!(mgr.insert(key2, 2).is_some());
assert_eq!(mgr.len(), 2);
assert_eq!(mgr.get_mut(&key1).unwrap().stream_id, 1);
assert_eq!(mgr.get_mut(&key2).unwrap().stream_id, 2);
}
#[test]
fn test_max_sessions_limit() {
let mut mgr = UdpSessionManager::with_max_sessions(Duration::from_secs(60), 2);
let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
let key2 = UdpSessionKey { client_addr: addr(5001), dest_port: 53 };
let key3 = UdpSessionKey { client_addr: addr(5002), dest_port: 53 };
assert!(mgr.insert(key1, 1).is_some());
assert!(mgr.insert(key2, 2).is_some());
// Third insert should be rejected (at capacity)
assert!(mgr.insert(key3, 3).is_none());
assert_eq!(mgr.len(), 2);
// Re-inserting an existing key should succeed (update, not new)
assert!(mgr.insert(key1, 1).is_some());
assert_eq!(mgr.len(), 2);
// After removing one, a new insert should succeed
mgr.remove_by_stream_id(1);
assert_eq!(mgr.len(), 1);
assert!(mgr.insert(key3, 3).is_some());
assert_eq!(mgr.len(), 2);
}
}