feat(remoteingress-core): add UDP tunneling support between edge and hub

This commit is contained in:
2026-03-19 12:02:41 +00:00
parent 61fa69f108
commit a96b4ba84a
9 changed files with 849 additions and 16 deletions

View File

@@ -3,7 +3,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::net::{TcpListener, TcpStream, UdpSocket};
use tokio::sync::{mpsc, Mutex, Notify, RwLock, Semaphore};
use tokio::time::{interval, sleep_until, Instant};
use tokio_rustls::TlsAcceptor;
@@ -23,6 +23,14 @@ enum FrameAction {
Disconnect(String),
}
/// Per-UDP-session state tracked in the hub.
struct HubUdpSessionState {
/// Channel for forwarding datagrams from edge to the upstream UdpSocket task.
data_tx: mpsc::Sender<Bytes>,
/// Cancellation token for this session's upstream task.
cancel_token: CancellationToken,
}
/// Per-stream state tracked in the hub's stream map.
struct HubStreamState {
/// Unbounded channel to deliver FRAME_DATA payloads to the upstream writer task.
@@ -69,6 +77,8 @@ pub struct AllowedEdge {
pub secret: String,
#[serde(default)]
pub listen_ports: Vec<u16>,
#[serde(default)]
pub listen_ports_udp: Vec<u16>,
pub stun_interval_secs: Option<u64>,
}
@@ -77,6 +87,8 @@ pub struct AllowedEdge {
#[serde(rename_all = "camelCase")]
struct HandshakeResponse {
listen_ports: Vec<u16>,
#[serde(default)]
listen_ports_udp: Vec<u16>,
stun_interval_secs: u64,
}
@@ -85,6 +97,8 @@ struct HandshakeResponse {
#[serde(rename_all = "camelCase")]
pub struct EdgeConfigUpdate {
pub listen_ports: Vec<u16>,
#[serde(default)]
pub listen_ports_udp: Vec<u16>,
}
/// Runtime status of a connected edge.
@@ -179,12 +193,13 @@ impl TunnelHub {
if let Some(info) = connected.get(&edge.id) {
// Check if ports changed compared to old config
let ports_changed = match map.get(&edge.id) {
Some(old) => old.listen_ports != edge.listen_ports,
Some(old) => old.listen_ports != edge.listen_ports || old.listen_ports_udp != edge.listen_ports_udp,
None => true, // newly allowed edge that's already connected
};
if ports_changed {
let update = EdgeConfigUpdate {
listen_ports: edge.listen_ports.clone(),
listen_ports_udp: edge.listen_ports_udp.clone(),
};
let _ = info.config_tx.try_send(update);
}
@@ -381,6 +396,7 @@ async fn handle_hub_frame(
frame: Frame,
tunnel_io: &mut remoteingress_protocol::TunnelIo<HubTlsStream>,
streams: &mut HashMap<u32, HubStreamState>,
udp_sessions: &mut HashMap<u32, HubUdpSessionState>,
stream_semaphore: &Arc<Semaphore>,
edge_stream_count: &Arc<AtomicU32>,
edge_id: &str,
@@ -682,6 +698,96 @@ async fn handle_hub_frame(
FRAME_PONG => {
log::debug!("Received PONG from edge {}", edge_id);
}
FRAME_UDP_OPEN => {
// Open a UDP session: parse PROXY v2 header, connect upstream, start forwarding
let stream_id = frame.stream_id;
let dest_port = parse_dest_port_from_proxy_v2(&frame.payload).unwrap_or(53);
let target = target_host.to_string();
let data_writer_tx = data_tx.clone();
let session_token = edge_token.child_token();
let edge_id_str = edge_id.to_string();
// Channel for forwarding datagrams from edge to upstream
let (udp_tx, mut udp_rx) = mpsc::channel::<Bytes>(256);
udp_sessions.insert(stream_id, HubUdpSessionState {
data_tx: udp_tx,
cancel_token: session_token.clone(),
});
// Spawn upstream UDP forwarder
tokio::spawn(async move {
let upstream = match UdpSocket::bind("0.0.0.0:0").await {
Ok(s) => s,
Err(e) => {
log::error!("UDP session {} failed to bind: {}", stream_id, e);
return;
}
};
if let Err(e) = upstream.connect((target.as_str(), dest_port)).await {
log::error!("UDP session {} failed to connect to {}:{}: {}", stream_id, target, dest_port, e);
return;
}
// Task: upstream -> edge (return datagrams)
let upstream_recv = Arc::new(upstream);
let upstream_send = upstream_recv.clone();
let recv_token = session_token.clone();
let recv_handle = tokio::spawn(async move {
let mut buf = vec![0u8; 65536];
loop {
tokio::select! {
result = upstream_recv.recv(&mut buf) => {
match result {
Ok(len) => {
let frame = encode_frame(stream_id, FRAME_UDP_DATA_BACK, &buf[..len]);
if data_writer_tx.try_send(frame).is_err() {
break;
}
}
Err(e) => {
log::debug!("UDP session {} upstream recv error: {}", stream_id, e);
break;
}
}
}
_ = recv_token.cancelled() => break,
}
}
});
// Forward datagrams from edge to upstream
loop {
tokio::select! {
data = udp_rx.recv() => {
match data {
Some(datagram) => {
if let Err(e) = upstream_send.send(&datagram).await {
log::debug!("UDP session {} upstream send error: {}", stream_id, e);
break;
}
}
None => break,
}
}
_ = session_token.cancelled() => break,
}
}
recv_handle.abort();
log::debug!("UDP session {} closed for edge {}", stream_id, edge_id_str);
});
}
FRAME_UDP_DATA => {
// Forward datagram to upstream
if let Some(state) = udp_sessions.get(&frame.stream_id) {
let _ = state.data_tx.try_send(frame.payload);
}
}
FRAME_UDP_CLOSE => {
if let Some(state) = udp_sessions.remove(&frame.stream_id) {
state.cancel_token.cancel();
}
}
_ => {
log::warn!("Unexpected frame type {} from edge", frame.frame_type);
}
@@ -738,14 +844,14 @@ async fn handle_edge_connection(
let secret = parts[2];
// Verify credentials and extract edge config
let (listen_ports, stun_interval_secs) = {
let (listen_ports, listen_ports_udp, stun_interval_secs) = {
let edges = allowed.read().await;
match edges.get(&edge_id) {
Some(edge) => {
if !constant_time_eq(secret.as_bytes(), edge.secret.as_bytes()) {
return Err(format!("invalid secret for edge {}", edge_id).into());
}
(edge.listen_ports.clone(), edge.stun_interval_secs.unwrap_or(300))
(edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300))
}
None => {
return Err(format!("unknown edge {}", edge_id).into());
@@ -762,6 +868,7 @@ async fn handle_edge_connection(
// Send handshake response with initial config before frame protocol begins
let handshake = HandshakeResponse {
listen_ports: listen_ports.clone(),
listen_ports_udp: listen_ports_udp.clone(),
stun_interval_secs,
};
let mut handshake_json = serde_json::to_string(&handshake)?;
@@ -771,6 +878,7 @@ async fn handle_edge_connection(
// Track this edge
let mut streams: HashMap<u32, HubStreamState> = HashMap::new();
let mut udp_sessions: HashMap<u32, HubUdpSessionState> = HashMap::new();
// Per-edge active stream counter for adaptive flow control
let edge_stream_count = Arc::new(AtomicU32::new(0));
// Cleanup channel: spawned stream tasks send stream_id here when done
@@ -881,7 +989,8 @@ async fn handle_edge_connection(
last_activity = Instant::now();
liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur);
if let FrameAction::Disconnect(reason) = handle_hub_frame(
frame, &mut tunnel_io, &mut streams, &stream_semaphore, &edge_stream_count,
frame, &mut tunnel_io, &mut streams, &mut udp_sessions,
&stream_semaphore, &edge_stream_count,
&edge_id, &event_tx, &ctrl_tx, &data_tx, &sustained_tx, &target_host, &edge_token,
&cleanup_tx,
).await {
@@ -904,7 +1013,8 @@ async fn handle_edge_connection(
last_activity = Instant::now();
liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur);
if let FrameAction::Disconnect(reason) = handle_hub_frame(
frame, &mut tunnel_io, &mut streams, &stream_semaphore, &edge_stream_count,
frame, &mut tunnel_io, &mut streams, &mut udp_sessions,
&stream_semaphore, &edge_stream_count,
&edge_id, &event_tx, &ctrl_tx, &data_tx, &sustained_tx, &target_host, &edge_token,
&cleanup_tx,
).await {
@@ -976,6 +1086,20 @@ fn parse_dest_port_from_proxy(header: &str) -> Option<u16> {
}
}
/// Parse destination port from a PROXY protocol v2 binary header.
/// The header must be at least 28 bytes (16 fixed + 12 IPv4 address block).
/// Dest port is at bytes 26-27 (network byte order).
fn parse_dest_port_from_proxy_v2(header: &[u8]) -> Option<u16> {
if header.len() < 28 {
return None;
}
// Verify signature
if header[0..12] != remoteingress_protocol::PROXY_V2_SIGNATURE {
return None;
}
Some(u16::from_be_bytes([header[26], header[27]]))
}
/// Build TLS server config from PEM strings, or auto-generate self-signed.
fn build_tls_config(
config: &HubConfig,
@@ -1083,14 +1207,14 @@ async fn handle_edge_connection_quic(
let secret = parts[2];
// Verify credentials
let (listen_ports, stun_interval_secs) = {
let (listen_ports, listen_ports_udp, stun_interval_secs) = {
let edges = allowed.read().await;
match edges.get(&edge_id) {
Some(edge) => {
if !constant_time_eq(secret.as_bytes(), edge.secret.as_bytes()) {
return Err(format!("invalid secret for edge {}", edge_id).into());
}
(edge.listen_ports.clone(), edge.stun_interval_secs.unwrap_or(300))
(edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300))
}
None => return Err(format!("unknown edge {}", edge_id).into()),
}
@@ -1105,6 +1229,7 @@ async fn handle_edge_connection_quic(
// Send handshake response on control stream
let handshake = HandshakeResponse {
listen_ports: listen_ports.clone(),
listen_ports_udp: listen_ports_udp.clone(),
stun_interval_secs,
};
let mut handshake_json = serde_json::to_string(&handshake)?;
@@ -1495,6 +1620,7 @@ mod tests {
fn test_handshake_response_serializes_camel_case() {
let resp = HandshakeResponse {
listen_ports: vec![443, 8080],
listen_ports_udp: vec![],
stun_interval_secs: 300,
};
let json = serde_json::to_value(&resp).unwrap();
@@ -1509,9 +1635,11 @@ mod tests {
fn test_edge_config_update_serializes_camel_case() {
let update = EdgeConfigUpdate {
listen_ports: vec![80, 443],
listen_ports_udp: vec![53],
};
let json = serde_json::to_value(&update).unwrap();
assert_eq!(json["listenPorts"], serde_json::json!([80, 443]));
assert_eq!(json["listenPortsUdp"], serde_json::json!([53]));
assert!(json.get("listen_ports").is_none());
}