feat(remoteingress-core): add UDP tunneling over QUIC datagrams and expand transport-specific test coverage
This commit is contained in:
@@ -1331,6 +1331,122 @@ async fn handle_edge_connection_quic(
|
||||
}
|
||||
});
|
||||
|
||||
// UDP sessions for QUIC datagram transport
|
||||
let quic_udp_sessions: Arc<Mutex<HashMap<u32, mpsc::Sender<Bytes>>>> =
|
||||
Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
// Spawn QUIC datagram receiver task
|
||||
let dgram_conn = quic_conn.clone();
|
||||
let dgram_sessions = quic_udp_sessions.clone();
|
||||
let dgram_target = target_host.clone();
|
||||
let dgram_edge_id = edge_id.clone();
|
||||
let dgram_token = edge_token.clone();
|
||||
let dgram_handle = tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
datagram = dgram_conn.read_datagram() => {
|
||||
match datagram {
|
||||
Ok(data) => {
|
||||
if data.len() < 4 { continue; }
|
||||
let session_id = u32::from_be_bytes([data[0], data[1], data[2], data[3]]);
|
||||
let payload = &data[4..];
|
||||
|
||||
// Check for OPEN magic byte (0xFF)
|
||||
if !payload.is_empty() && payload[0] == 0xFF {
|
||||
// This is a session OPEN: [0xFF][proxy_v2_header:28]
|
||||
let proxy_data = &payload[1..];
|
||||
let dest_port = if proxy_data.len() >= 28 {
|
||||
u16::from_be_bytes([proxy_data[26], proxy_data[27]])
|
||||
} else {
|
||||
53 // fallback
|
||||
};
|
||||
|
||||
// Create upstream UDP socket
|
||||
let target = dgram_target.clone();
|
||||
let conn = dgram_conn.clone();
|
||||
let sessions = dgram_sessions.clone();
|
||||
let session_token = dgram_token.child_token();
|
||||
let (tx, mut rx) = mpsc::channel::<Bytes>(256);
|
||||
|
||||
{
|
||||
let mut s = sessions.lock().await;
|
||||
s.insert(session_id, tx);
|
||||
}
|
||||
|
||||
tokio::spawn(async move {
|
||||
let upstream = match UdpSocket::bind("0.0.0.0:0").await {
|
||||
Ok(s) => Arc::new(s),
|
||||
Err(e) => {
|
||||
log::error!("QUIC UDP session {} bind failed: {}", session_id, e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Err(e) = upstream.connect((target.as_str(), dest_port)).await {
|
||||
log::error!("QUIC UDP session {} connect failed: {}", session_id, e);
|
||||
return;
|
||||
}
|
||||
|
||||
// Upstream recv → QUIC datagram back to edge
|
||||
let upstream_recv = upstream.clone();
|
||||
let recv_conn = conn.clone();
|
||||
let recv_token = session_token.clone();
|
||||
let recv_handle = tokio::spawn(async move {
|
||||
let mut buf = vec![0u8; 65536];
|
||||
loop {
|
||||
tokio::select! {
|
||||
result = upstream_recv.recv(&mut buf) => {
|
||||
match result {
|
||||
Ok(len) => {
|
||||
let mut dgram = Vec::with_capacity(4 + len);
|
||||
dgram.extend_from_slice(&session_id.to_be_bytes());
|
||||
dgram.extend_from_slice(&buf[..len]);
|
||||
let _ = recv_conn.send_datagram(dgram.into());
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
_ = recv_token.cancelled() => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Edge datagrams → upstream
|
||||
loop {
|
||||
tokio::select! {
|
||||
data = rx.recv() => {
|
||||
match data {
|
||||
Some(datagram) => {
|
||||
let _ = upstream.send(&datagram).await;
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
_ = session_token.cancelled() => break,
|
||||
}
|
||||
}
|
||||
recv_handle.abort();
|
||||
});
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
// Regular data datagram — forward to upstream
|
||||
let sessions = dgram_sessions.lock().await;
|
||||
if let Some(tx) = sessions.get(&session_id) {
|
||||
let _ = tx.try_send(Bytes::copy_from_slice(payload));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::debug!("QUIC datagram recv error from edge {}: {}", dgram_edge_id, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = dgram_token.cancelled() => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Control stream loop: forward config updates and handle PONG
|
||||
let disconnect_reason;
|
||||
loop {
|
||||
@@ -1399,6 +1515,7 @@ async fn handle_edge_connection_quic(
|
||||
// Cleanup
|
||||
edge_token.cancel();
|
||||
data_handle.abort();
|
||||
dgram_handle.abort();
|
||||
quic_conn.close(quinn::VarInt::from_u32(0), b"hub_shutdown");
|
||||
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user