Compare commits

..

2 Commits

Author SHA1 Message Date
bfa88f8d76 v4.11.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 12:02:41 +00:00
a96b4ba84a feat(remoteingress-core): add UDP tunneling support between edge and hub 2026-03-19 12:02:41 +00:00
10 changed files with 850 additions and 17 deletions

View File

@@ -1,5 +1,14 @@
# Changelog
## 2026-03-19 - 4.11.0 - feat(remoteingress-core)
add UDP tunneling support between edge and hub
- extend edge and hub handshake/config updates with UDP listen ports
- add UDP tunnel frame types and PROXY protocol v2 header helpers in the protocol crate
- introduce UDP session management on the edge and upstream UDP forwarding on the hub
- add Node.js integration tests covering UDP echo and concurrent datagrams
- expose UDP listen port configuration in the TypeScript hub API
## 2026-03-19 - 4.10.0 - feat(core,edge,hub,transport)
add QUIC tunnel transport support with optional edge transport selection

View File

@@ -1,6 +1,6 @@
{
"name": "@serve.zone/remoteingress",
"version": "4.10.0",
"version": "4.11.0",
"private": false,
"description": "Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.",
"main": "dist_ts/index.js",

View File

@@ -3,7 +3,7 @@ use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::net::{TcpListener, TcpStream, UdpSocket};
use tokio::sync::{mpsc, Mutex, Notify, RwLock};
use tokio::task::JoinHandle;
use tokio::time::{Instant, sleep_until};
@@ -15,6 +15,7 @@ use bytes::Bytes;
use remoteingress_protocol::*;
use crate::transport::TransportMode;
use crate::transport::quic as quic_transport;
use crate::udp_session::{UdpSessionKey, UdpSessionManager};
type EdgeTlsStream = tokio_rustls::client::TlsStream<TcpStream>;
@@ -59,6 +60,8 @@ pub struct EdgeConfig {
#[serde(rename_all = "camelCase")]
struct HandshakeConfig {
listen_ports: Vec<u16>,
#[serde(default)]
listen_ports_udp: Vec<u16>,
#[serde(default = "default_stun_interval")]
stun_interval_secs: u64,
}
@@ -72,6 +75,8 @@ fn default_stun_interval() -> u64 {
#[serde(rename_all = "camelCase")]
struct ConfigUpdate {
listen_ports: Vec<u16>,
#[serde(default)]
listen_ports_udp: Vec<u16>,
}
/// Events emitted by the edge.
@@ -344,7 +349,8 @@ enum EdgeLoopResult {
}
/// Process a single frame received from the hub side of the tunnel.
/// Handles FRAME_DATA_BACK, FRAME_WINDOW_UPDATE_BACK, FRAME_CLOSE_BACK, FRAME_CONFIG, FRAME_PING.
/// Handles FRAME_DATA_BACK, FRAME_WINDOW_UPDATE_BACK, FRAME_CLOSE_BACK, FRAME_CONFIG, FRAME_PING,
/// and UDP frames: FRAME_UDP_DATA_BACK, FRAME_UDP_CLOSE.
async fn handle_edge_frame(
frame: Frame,
tunnel_io: &mut remoteingress_protocol::TunnelIo<EdgeTlsStream>,
@@ -355,11 +361,14 @@ async fn handle_edge_frame(
tunnel_data_tx: &mpsc::Sender<Bytes>,
tunnel_sustained_tx: &mpsc::Sender<Bytes>,
port_listeners: &mut HashMap<u16, JoinHandle<()>>,
udp_listeners: &mut HashMap<u16, JoinHandle<()>>,
active_streams: &Arc<AtomicU32>,
next_stream_id: &Arc<AtomicU32>,
edge_id: &str,
connection_token: &CancellationToken,
bind_address: &str,
udp_sessions: &Arc<Mutex<UdpSessionManager>>,
udp_sockets: &Arc<Mutex<HashMap<u16, Arc<UdpSocket>>>>,
) -> EdgeFrameAction {
match frame.frame_type {
FRAME_DATA_BACK => {
@@ -394,7 +403,7 @@ async fn handle_edge_frame(
}
FRAME_CONFIG => {
if let Ok(update) = serde_json::from_slice::<ConfigUpdate>(&frame.payload) {
log::info!("Config update from hub: ports {:?}", update.listen_ports);
log::info!("Config update from hub: ports {:?}, udp {:?}", update.listen_ports, update.listen_ports_udp);
*listen_ports.write().await = update.listen_ports.clone();
let _ = event_tx.try_send(EdgeEvent::PortsUpdated {
listen_ports: update.listen_ports.clone(),
@@ -412,12 +421,39 @@ async fn handle_edge_frame(
connection_token,
bind_address,
);
apply_udp_port_config(
&update.listen_ports_udp,
udp_listeners,
tunnel_writer_tx,
tunnel_data_tx,
udp_sessions,
udp_sockets,
next_stream_id,
connection_token,
bind_address,
);
}
}
FRAME_PING => {
// Queue PONG directly — no channel round-trip, guaranteed delivery
tunnel_io.queue_ctrl(encode_frame(0, FRAME_PONG, &[]));
}
FRAME_UDP_DATA_BACK => {
// Dispatch return UDP datagram to the original client
let mut sessions = udp_sessions.lock().await;
if let Some(session) = sessions.get_by_stream_id(frame.stream_id) {
let client_addr = session.client_addr;
let dest_port = session.dest_port;
let sockets = udp_sockets.lock().await;
if let Some(socket) = sockets.get(&dest_port) {
let _ = socket.send_to(&frame.payload, client_addr).await;
}
}
}
FRAME_UDP_CLOSE => {
let mut sessions = udp_sessions.lock().await;
sessions.remove_by_stream_id(frame.stream_id);
}
_ => {
log::warn!("Unexpected frame type {} from hub", frame.frame_type);
}
@@ -581,6 +617,24 @@ async fn connect_to_hub_and_run(
bind_address,
);
// UDP session manager + listeners
let udp_sessions: Arc<Mutex<UdpSessionManager>> =
Arc::new(Mutex::new(UdpSessionManager::new(Duration::from_secs(60))));
let udp_sockets: Arc<Mutex<HashMap<u16, Arc<UdpSocket>>>> =
Arc::new(Mutex::new(HashMap::new()));
let mut udp_listeners: HashMap<u16, JoinHandle<()>> = HashMap::new();
apply_udp_port_config(
&handshake.listen_ports_udp,
&mut udp_listeners,
&tunnel_ctrl_tx,
&tunnel_data_tx,
&udp_sessions,
&udp_sockets,
next_stream_id,
connection_token,
bind_address,
);
// Single-owner I/O engine — no tokio::io::split, no mutex
let mut tunnel_io = remoteingress_protocol::TunnelIo::new(tls_stream, Vec::new());
@@ -605,7 +659,8 @@ async fn connect_to_hub_and_run(
if let EdgeFrameAction::Disconnect(reason) = handle_edge_frame(
frame, &mut tunnel_io, &client_writers, listen_ports, event_tx,
&tunnel_writer_tx, &tunnel_data_tx, &tunnel_sustained_tx, &mut port_listeners,
active_streams, next_stream_id, &config.edge_id, connection_token, bind_address,
&mut udp_listeners, active_streams, next_stream_id, &config.edge_id,
connection_token, bind_address, &udp_sessions, &udp_sockets,
).await {
break 'io_loop EdgeLoopResult::Reconnect(reason);
}
@@ -623,7 +678,8 @@ async fn connect_to_hub_and_run(
if let EdgeFrameAction::Disconnect(reason) = handle_edge_frame(
frame, &mut tunnel_io, &client_writers, listen_ports, event_tx,
&tunnel_writer_tx, &tunnel_data_tx, &tunnel_sustained_tx, &mut port_listeners,
active_streams, next_stream_id, &config.edge_id, connection_token, bind_address,
&mut udp_listeners, active_streams, next_stream_id, &config.edge_id,
connection_token, bind_address, &udp_sessions, &udp_sockets,
).await {
break EdgeLoopResult::Reconnect(reason);
}
@@ -661,6 +717,9 @@ async fn connect_to_hub_and_run(
for (_, h) in port_listeners.drain() {
h.abort();
}
for (_, h) in udp_listeners.drain() {
h.abort();
}
// Graceful TLS shutdown: send close_notify so the hub sees a clean disconnect.
// Stream handlers are already cancelled, so no new data is being produced.
@@ -790,6 +849,107 @@ fn apply_port_config(
}
}
/// Apply UDP port configuration: bind UdpSockets for added ports, abort removed ports.
fn apply_udp_port_config(
new_ports: &[u16],
udp_listeners: &mut HashMap<u16, JoinHandle<()>>,
tunnel_ctrl_tx: &mpsc::Sender<Bytes>,
tunnel_data_tx: &mpsc::Sender<Bytes>,
udp_sessions: &Arc<Mutex<UdpSessionManager>>,
udp_sockets: &Arc<Mutex<HashMap<u16, Arc<UdpSocket>>>>,
next_stream_id: &Arc<AtomicU32>,
connection_token: &CancellationToken,
bind_address: &str,
) {
let new_set: std::collections::HashSet<u16> = new_ports.iter().copied().collect();
let old_set: std::collections::HashSet<u16> = udp_listeners.keys().copied().collect();
// Remove ports no longer needed
for &port in old_set.difference(&new_set) {
if let Some(handle) = udp_listeners.remove(&port) {
log::info!("Stopping UDP listener on port {}", port);
handle.abort();
}
// Remove socket from shared map
let sockets = udp_sockets.clone();
tokio::spawn(async move {
sockets.lock().await.remove(&port);
});
}
// Add new ports
for &port in new_set.difference(&old_set) {
let tunnel_ctrl_tx = tunnel_ctrl_tx.clone();
let tunnel_data_tx = tunnel_data_tx.clone();
let udp_sessions = udp_sessions.clone();
let udp_sockets = udp_sockets.clone();
let next_stream_id = next_stream_id.clone();
let port_token = connection_token.child_token();
let bind_addr = bind_address.to_string();
let handle = tokio::spawn(async move {
let socket = match UdpSocket::bind((bind_addr.as_str(), port)).await {
Ok(s) => Arc::new(s),
Err(e) => {
log::error!("Failed to bind UDP port {}: {}", port, e);
return;
}
};
log::info!("Listening on UDP port {}", port);
// Register socket in shared map for return traffic
udp_sockets.lock().await.insert(port, socket.clone());
let mut buf = vec![0u8; 65536]; // max UDP datagram size
loop {
tokio::select! {
recv_result = socket.recv_from(&mut buf) => {
match recv_result {
Ok((len, client_addr)) => {
let key = UdpSessionKey { client_addr, dest_port: port };
let mut sessions = udp_sessions.lock().await;
let stream_id = if let Some(session) = sessions.get_mut(&key) {
session.stream_id
} else {
// New session — allocate stream_id and send UDP_OPEN
let sid = next_stream_id.fetch_add(1, Ordering::Relaxed);
sessions.insert(key, sid);
let client_ip = client_addr.ip().to_string();
let client_port = client_addr.port();
let proxy_header = build_proxy_v2_header_from_str(
&client_ip, "0.0.0.0", client_port, port,
ProxyV2Transport::Udp,
);
let open_frame = encode_frame(sid, FRAME_UDP_OPEN, &proxy_header);
let _ = tunnel_ctrl_tx.try_send(open_frame);
log::debug!("New UDP session {} from {} -> port {}", sid, client_addr, port);
sid
};
drop(sessions); // release lock before sending
// Send datagram through tunnel
let data_frame = encode_frame(stream_id, FRAME_UDP_DATA, &buf[..len]);
let _ = tunnel_data_tx.try_send(data_frame);
}
Err(e) => {
log::error!("UDP recv error on port {}: {}", port, e);
}
}
}
_ = port_token.cancelled() => {
log::info!("UDP port {} listener cancelled", port);
break;
}
}
}
});
udp_listeners.insert(port, handle);
}
}
async fn handle_client_connection(
client_stream: TcpStream,
client_addr: std::net::SocketAddr,

View File

@@ -3,7 +3,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::net::{TcpListener, TcpStream, UdpSocket};
use tokio::sync::{mpsc, Mutex, Notify, RwLock, Semaphore};
use tokio::time::{interval, sleep_until, Instant};
use tokio_rustls::TlsAcceptor;
@@ -23,6 +23,14 @@ enum FrameAction {
Disconnect(String),
}
/// Per-UDP-session state tracked in the hub.
struct HubUdpSessionState {
/// Channel for forwarding datagrams from edge to the upstream UdpSocket task.
data_tx: mpsc::Sender<Bytes>,
/// Cancellation token for this session's upstream task.
cancel_token: CancellationToken,
}
/// Per-stream state tracked in the hub's stream map.
struct HubStreamState {
/// Unbounded channel to deliver FRAME_DATA payloads to the upstream writer task.
@@ -69,6 +77,8 @@ pub struct AllowedEdge {
pub secret: String,
#[serde(default)]
pub listen_ports: Vec<u16>,
#[serde(default)]
pub listen_ports_udp: Vec<u16>,
pub stun_interval_secs: Option<u64>,
}
@@ -77,6 +87,8 @@ pub struct AllowedEdge {
#[serde(rename_all = "camelCase")]
struct HandshakeResponse {
listen_ports: Vec<u16>,
#[serde(default)]
listen_ports_udp: Vec<u16>,
stun_interval_secs: u64,
}
@@ -85,6 +97,8 @@ struct HandshakeResponse {
#[serde(rename_all = "camelCase")]
pub struct EdgeConfigUpdate {
pub listen_ports: Vec<u16>,
#[serde(default)]
pub listen_ports_udp: Vec<u16>,
}
/// Runtime status of a connected edge.
@@ -179,12 +193,13 @@ impl TunnelHub {
if let Some(info) = connected.get(&edge.id) {
// Check if ports changed compared to old config
let ports_changed = match map.get(&edge.id) {
Some(old) => old.listen_ports != edge.listen_ports,
Some(old) => old.listen_ports != edge.listen_ports || old.listen_ports_udp != edge.listen_ports_udp,
None => true, // newly allowed edge that's already connected
};
if ports_changed {
let update = EdgeConfigUpdate {
listen_ports: edge.listen_ports.clone(),
listen_ports_udp: edge.listen_ports_udp.clone(),
};
let _ = info.config_tx.try_send(update);
}
@@ -381,6 +396,7 @@ async fn handle_hub_frame(
frame: Frame,
tunnel_io: &mut remoteingress_protocol::TunnelIo<HubTlsStream>,
streams: &mut HashMap<u32, HubStreamState>,
udp_sessions: &mut HashMap<u32, HubUdpSessionState>,
stream_semaphore: &Arc<Semaphore>,
edge_stream_count: &Arc<AtomicU32>,
edge_id: &str,
@@ -682,6 +698,96 @@ async fn handle_hub_frame(
FRAME_PONG => {
log::debug!("Received PONG from edge {}", edge_id);
}
FRAME_UDP_OPEN => {
// Open a UDP session: parse PROXY v2 header, connect upstream, start forwarding
let stream_id = frame.stream_id;
let dest_port = parse_dest_port_from_proxy_v2(&frame.payload).unwrap_or(53);
let target = target_host.to_string();
let data_writer_tx = data_tx.clone();
let session_token = edge_token.child_token();
let edge_id_str = edge_id.to_string();
// Channel for forwarding datagrams from edge to upstream
let (udp_tx, mut udp_rx) = mpsc::channel::<Bytes>(256);
udp_sessions.insert(stream_id, HubUdpSessionState {
data_tx: udp_tx,
cancel_token: session_token.clone(),
});
// Spawn upstream UDP forwarder
tokio::spawn(async move {
let upstream = match UdpSocket::bind("0.0.0.0:0").await {
Ok(s) => s,
Err(e) => {
log::error!("UDP session {} failed to bind: {}", stream_id, e);
return;
}
};
if let Err(e) = upstream.connect((target.as_str(), dest_port)).await {
log::error!("UDP session {} failed to connect to {}:{}: {}", stream_id, target, dest_port, e);
return;
}
// Task: upstream -> edge (return datagrams)
let upstream_recv = Arc::new(upstream);
let upstream_send = upstream_recv.clone();
let recv_token = session_token.clone();
let recv_handle = tokio::spawn(async move {
let mut buf = vec![0u8; 65536];
loop {
tokio::select! {
result = upstream_recv.recv(&mut buf) => {
match result {
Ok(len) => {
let frame = encode_frame(stream_id, FRAME_UDP_DATA_BACK, &buf[..len]);
if data_writer_tx.try_send(frame).is_err() {
break;
}
}
Err(e) => {
log::debug!("UDP session {} upstream recv error: {}", stream_id, e);
break;
}
}
}
_ = recv_token.cancelled() => break,
}
}
});
// Forward datagrams from edge to upstream
loop {
tokio::select! {
data = udp_rx.recv() => {
match data {
Some(datagram) => {
if let Err(e) = upstream_send.send(&datagram).await {
log::debug!("UDP session {} upstream send error: {}", stream_id, e);
break;
}
}
None => break,
}
}
_ = session_token.cancelled() => break,
}
}
recv_handle.abort();
log::debug!("UDP session {} closed for edge {}", stream_id, edge_id_str);
});
}
FRAME_UDP_DATA => {
// Forward datagram to upstream
if let Some(state) = udp_sessions.get(&frame.stream_id) {
let _ = state.data_tx.try_send(frame.payload);
}
}
FRAME_UDP_CLOSE => {
if let Some(state) = udp_sessions.remove(&frame.stream_id) {
state.cancel_token.cancel();
}
}
_ => {
log::warn!("Unexpected frame type {} from edge", frame.frame_type);
}
@@ -738,14 +844,14 @@ async fn handle_edge_connection(
let secret = parts[2];
// Verify credentials and extract edge config
let (listen_ports, stun_interval_secs) = {
let (listen_ports, listen_ports_udp, stun_interval_secs) = {
let edges = allowed.read().await;
match edges.get(&edge_id) {
Some(edge) => {
if !constant_time_eq(secret.as_bytes(), edge.secret.as_bytes()) {
return Err(format!("invalid secret for edge {}", edge_id).into());
}
(edge.listen_ports.clone(), edge.stun_interval_secs.unwrap_or(300))
(edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300))
}
None => {
return Err(format!("unknown edge {}", edge_id).into());
@@ -762,6 +868,7 @@ async fn handle_edge_connection(
// Send handshake response with initial config before frame protocol begins
let handshake = HandshakeResponse {
listen_ports: listen_ports.clone(),
listen_ports_udp: listen_ports_udp.clone(),
stun_interval_secs,
};
let mut handshake_json = serde_json::to_string(&handshake)?;
@@ -771,6 +878,7 @@ async fn handle_edge_connection(
// Track this edge
let mut streams: HashMap<u32, HubStreamState> = HashMap::new();
let mut udp_sessions: HashMap<u32, HubUdpSessionState> = HashMap::new();
// Per-edge active stream counter for adaptive flow control
let edge_stream_count = Arc::new(AtomicU32::new(0));
// Cleanup channel: spawned stream tasks send stream_id here when done
@@ -881,7 +989,8 @@ async fn handle_edge_connection(
last_activity = Instant::now();
liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur);
if let FrameAction::Disconnect(reason) = handle_hub_frame(
frame, &mut tunnel_io, &mut streams, &stream_semaphore, &edge_stream_count,
frame, &mut tunnel_io, &mut streams, &mut udp_sessions,
&stream_semaphore, &edge_stream_count,
&edge_id, &event_tx, &ctrl_tx, &data_tx, &sustained_tx, &target_host, &edge_token,
&cleanup_tx,
).await {
@@ -904,7 +1013,8 @@ async fn handle_edge_connection(
last_activity = Instant::now();
liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur);
if let FrameAction::Disconnect(reason) = handle_hub_frame(
frame, &mut tunnel_io, &mut streams, &stream_semaphore, &edge_stream_count,
frame, &mut tunnel_io, &mut streams, &mut udp_sessions,
&stream_semaphore, &edge_stream_count,
&edge_id, &event_tx, &ctrl_tx, &data_tx, &sustained_tx, &target_host, &edge_token,
&cleanup_tx,
).await {
@@ -976,6 +1086,20 @@ fn parse_dest_port_from_proxy(header: &str) -> Option<u16> {
}
}
/// Parse destination port from a PROXY protocol v2 binary header.
/// The header must be at least 28 bytes (16 fixed + 12 IPv4 address block).
/// Dest port is at bytes 26-27 (network byte order).
fn parse_dest_port_from_proxy_v2(header: &[u8]) -> Option<u16> {
if header.len() < 28 {
return None;
}
// Verify signature
if header[0..12] != remoteingress_protocol::PROXY_V2_SIGNATURE {
return None;
}
Some(u16::from_be_bytes([header[26], header[27]]))
}
/// Build TLS server config from PEM strings, or auto-generate self-signed.
fn build_tls_config(
config: &HubConfig,
@@ -1083,14 +1207,14 @@ async fn handle_edge_connection_quic(
let secret = parts[2];
// Verify credentials
let (listen_ports, stun_interval_secs) = {
let (listen_ports, listen_ports_udp, stun_interval_secs) = {
let edges = allowed.read().await;
match edges.get(&edge_id) {
Some(edge) => {
if !constant_time_eq(secret.as_bytes(), edge.secret.as_bytes()) {
return Err(format!("invalid secret for edge {}", edge_id).into());
}
(edge.listen_ports.clone(), edge.stun_interval_secs.unwrap_or(300))
(edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300))
}
None => return Err(format!("unknown edge {}", edge_id).into()),
}
@@ -1105,6 +1229,7 @@ async fn handle_edge_connection_quic(
// Send handshake response on control stream
let handshake = HandshakeResponse {
listen_ports: listen_ports.clone(),
listen_ports_udp: listen_ports_udp.clone(),
stun_interval_secs,
};
let mut handshake_json = serde_json::to_string(&handshake)?;
@@ -1495,6 +1620,7 @@ mod tests {
fn test_handshake_response_serializes_camel_case() {
let resp = HandshakeResponse {
listen_ports: vec![443, 8080],
listen_ports_udp: vec![],
stun_interval_secs: 300,
};
let json = serde_json::to_value(&resp).unwrap();
@@ -1509,9 +1635,11 @@ mod tests {
fn test_edge_config_update_serializes_camel_case() {
let update = EdgeConfigUpdate {
listen_ports: vec![80, 443],
listen_ports_udp: vec![53],
};
let json = serde_json::to_value(&update).unwrap();
assert_eq!(json["listenPorts"], serde_json::json!([80, 443]));
assert_eq!(json["listenPortsUdp"], serde_json::json!([53]));
assert!(json.get("listen_ports").is_none());
}

View File

@@ -2,5 +2,6 @@ pub mod hub;
pub mod edge;
pub mod stun;
pub mod transport;
pub mod udp_session;
pub use remoteingress_protocol as protocol;

View File

@@ -0,0 +1,210 @@
use std::collections::HashMap;
use std::net::SocketAddr;
use tokio::time::Instant;
/// Key identifying a unique UDP "session" (one client endpoint talking to one destination port).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct UdpSessionKey {
pub client_addr: SocketAddr,
pub dest_port: u16,
}
/// A single UDP session tracked by the edge.
pub struct UdpSession {
pub stream_id: u32,
pub client_addr: SocketAddr,
pub dest_port: u16,
pub last_activity: Instant,
}
/// Manages UDP sessions with idle timeout expiry.
pub struct UdpSessionManager {
/// Forward map: session key → session data.
sessions: HashMap<UdpSessionKey, UdpSession>,
/// Reverse map: stream_id → session key (for dispatching return traffic).
by_stream_id: HashMap<u32, UdpSessionKey>,
/// Idle timeout duration.
idle_timeout: std::time::Duration,
}
impl UdpSessionManager {
pub fn new(idle_timeout: std::time::Duration) -> Self {
Self {
sessions: HashMap::new(),
by_stream_id: HashMap::new(),
idle_timeout,
}
}
/// Look up an existing session by key. Updates last_activity on hit.
pub fn get_mut(&mut self, key: &UdpSessionKey) -> Option<&mut UdpSession> {
let session = self.sessions.get_mut(key)?;
session.last_activity = Instant::now();
Some(session)
}
/// Look up a session's client address by stream_id (for return traffic).
pub fn client_addr_for_stream(&self, stream_id: u32) -> Option<SocketAddr> {
let key = self.by_stream_id.get(&stream_id)?;
self.sessions.get(key).map(|s| s.client_addr)
}
/// Look up a session by stream_id. Updates last_activity on hit.
pub fn get_by_stream_id(&mut self, stream_id: u32) -> Option<&mut UdpSession> {
let key = self.by_stream_id.get(&stream_id)?;
let session = self.sessions.get_mut(key)?;
session.last_activity = Instant::now();
Some(session)
}
/// Insert a new session. Returns a mutable reference to it.
pub fn insert(&mut self, key: UdpSessionKey, stream_id: u32) -> &mut UdpSession {
let session = UdpSession {
stream_id,
client_addr: key.client_addr,
dest_port: key.dest_port,
last_activity: Instant::now(),
};
self.by_stream_id.insert(stream_id, key);
self.sessions.entry(key).or_insert(session)
}
/// Remove a session by stream_id.
pub fn remove_by_stream_id(&mut self, stream_id: u32) -> Option<UdpSession> {
if let Some(key) = self.by_stream_id.remove(&stream_id) {
self.sessions.remove(&key)
} else {
None
}
}
/// Expire idle sessions. Returns the stream_ids of expired sessions.
pub fn expire_idle(&mut self) -> Vec<u32> {
let now = Instant::now();
let timeout = self.idle_timeout;
let expired_keys: Vec<UdpSessionKey> = self
.sessions
.iter()
.filter(|(_, s)| now.duration_since(s.last_activity) >= timeout)
.map(|(k, _)| *k)
.collect();
let mut expired_ids = Vec::with_capacity(expired_keys.len());
for key in expired_keys {
if let Some(session) = self.sessions.remove(&key) {
self.by_stream_id.remove(&session.stream_id);
expired_ids.push(session.stream_id);
}
}
expired_ids
}
/// Number of active sessions.
pub fn len(&self) -> usize {
self.sessions.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
fn addr(port: u16) -> SocketAddr {
SocketAddr::from(([127, 0, 0, 1], port))
}
#[test]
fn test_insert_and_lookup() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 1);
assert_eq!(mgr.len(), 1);
assert!(mgr.get_mut(&key).is_some());
assert_eq!(mgr.get_mut(&key).unwrap().stream_id, 1);
}
#[test]
fn test_client_addr_for_stream() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 42);
assert_eq!(mgr.client_addr_for_stream(42), Some(addr(5000)));
assert_eq!(mgr.client_addr_for_stream(99), None);
}
#[test]
fn test_remove_by_stream_id() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 1);
let removed = mgr.remove_by_stream_id(1);
assert!(removed.is_some());
assert_eq!(mgr.len(), 0);
assert!(mgr.get_mut(&key).is_none());
assert_eq!(mgr.client_addr_for_stream(1), None);
}
#[test]
fn test_remove_nonexistent() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
assert!(mgr.remove_by_stream_id(999).is_none());
}
#[tokio::test]
async fn test_expire_idle() {
let mut mgr = UdpSessionManager::new(Duration::from_millis(50));
let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
let key2 = UdpSessionKey { client_addr: addr(5001), dest_port: 53 };
mgr.insert(key1, 1);
mgr.insert(key2, 2);
// Nothing expired yet
assert!(mgr.expire_idle().is_empty());
assert_eq!(mgr.len(), 2);
// Wait for timeout
tokio::time::sleep(Duration::from_millis(60)).await;
let expired = mgr.expire_idle();
assert_eq!(expired.len(), 2);
assert_eq!(mgr.len(), 0);
}
#[tokio::test]
async fn test_activity_prevents_expiry() {
let mut mgr = UdpSessionManager::new(Duration::from_millis(100));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 1);
// Touch session at 50ms (before 100ms timeout)
tokio::time::sleep(Duration::from_millis(50)).await;
mgr.get_mut(&key); // refreshes last_activity
// At 80ms from last touch, should still be alive
tokio::time::sleep(Duration::from_millis(80)).await;
assert!(mgr.expire_idle().is_empty());
assert_eq!(mgr.len(), 1);
// Wait for full timeout from last activity
tokio::time::sleep(Duration::from_millis(30)).await;
let expired = mgr.expire_idle();
assert_eq!(expired.len(), 1);
}
#[test]
fn test_multiple_sessions_same_client_different_ports() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
let key2 = UdpSessionKey { client_addr: addr(5000), dest_port: 443 };
mgr.insert(key1, 1);
mgr.insert(key2, 2);
assert_eq!(mgr.len(), 2);
assert_eq!(mgr.get_mut(&key1).unwrap().stream_id, 1);
assert_eq!(mgr.get_mut(&key2).unwrap().stream_id, 2);
}
}

View File

@@ -19,6 +19,12 @@ pub const FRAME_PONG: u8 = 0x08; // Edge -> Hub: heartbeat response
pub const FRAME_WINDOW_UPDATE: u8 = 0x09; // Edge -> Hub: per-stream flow control
pub const FRAME_WINDOW_UPDATE_BACK: u8 = 0x0A; // Hub -> Edge: per-stream flow control
// UDP tunnel frame types
pub const FRAME_UDP_OPEN: u8 = 0x0B; // Edge -> Hub: open UDP session (payload: PROXY v2 header)
pub const FRAME_UDP_DATA: u8 = 0x0C; // Edge -> Hub: UDP datagram
pub const FRAME_UDP_DATA_BACK: u8 = 0x0D; // Hub -> Edge: UDP datagram
pub const FRAME_UDP_CLOSE: u8 = 0x0E; // Either direction: close UDP session
// Frame header size: 4 (stream_id) + 1 (type) + 4 (length) = 9 bytes
pub const FRAME_HEADER_SIZE: usize = 9;
@@ -107,6 +113,76 @@ pub fn build_proxy_v1_header(
)
}
/// PROXY protocol v2 signature (12 bytes).
pub const PROXY_V2_SIGNATURE: [u8; 12] = [
0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A,
];
/// Transport protocol for PROXY v2 header.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProxyV2Transport {
/// TCP (STREAM) — byte 13 low nibble = 0x1
Tcp,
/// UDP (DGRAM) — byte 13 low nibble = 0x2
Udp,
}
/// Build a PROXY protocol v2 binary header for IPv4.
///
/// Returns a 28-byte header:
/// - 12B signature
/// - 1B version (0x2) + command (0x1 = PROXY)
/// - 1B address family (0x1 = AF_INET) + transport (0x1 = TCP, 0x2 = UDP)
/// - 2B address block length (0x000C = 12)
/// - 4B source IPv4 address
/// - 4B destination IPv4 address
/// - 2B source port
/// - 2B destination port
pub fn build_proxy_v2_header(
src_ip: &std::net::Ipv4Addr,
dst_ip: &std::net::Ipv4Addr,
src_port: u16,
dst_port: u16,
transport: ProxyV2Transport,
) -> Bytes {
let mut buf = BytesMut::with_capacity(28);
// Signature (12 bytes)
buf.put_slice(&PROXY_V2_SIGNATURE);
// Version 2 + PROXY command
buf.put_u8(0x21);
// AF_INET (0x1) + transport
let transport_nibble = match transport {
ProxyV2Transport::Tcp => 0x1,
ProxyV2Transport::Udp => 0x2,
};
buf.put_u8(0x10 | transport_nibble);
// Address block length: 12 bytes for IPv4
buf.put_u16(12);
// Source address (4 bytes, network byte order)
buf.put_slice(&src_ip.octets());
// Destination address (4 bytes, network byte order)
buf.put_slice(&dst_ip.octets());
// Source port (2 bytes, network byte order)
buf.put_u16(src_port);
// Destination port (2 bytes, network byte order)
buf.put_u16(dst_port);
buf.freeze()
}
/// Build a PROXY protocol v2 binary header from string IP addresses.
/// Falls back to 0.0.0.0 if parsing fails.
pub fn build_proxy_v2_header_from_str(
src_ip: &str,
dst_ip: &str,
src_port: u16,
dst_port: u16,
transport: ProxyV2Transport,
) -> Bytes {
let src: std::net::Ipv4Addr = src_ip.parse().unwrap_or(std::net::Ipv4Addr::UNSPECIFIED);
let dst: std::net::Ipv4Addr = dst_ip.parse().unwrap_or(std::net::Ipv4Addr::UNSPECIFIED);
build_proxy_v2_header(&src, &dst, src_port, dst_port, transport)
}
/// Stateful async frame reader that yields `Frame` values from an `AsyncRead`.
pub struct FrameReader<R> {
reader: R,
@@ -604,6 +680,62 @@ mod tests {
assert_eq!(header, "PROXY TCP4 1.2.3.4 5.6.7.8 12345 443\r\n");
}
#[test]
fn test_proxy_v2_header_tcp4() {
let src = "198.51.100.10".parse().unwrap();
let dst = "203.0.113.25".parse().unwrap();
let header = build_proxy_v2_header(&src, &dst, 54321, 8443, ProxyV2Transport::Tcp);
assert_eq!(header.len(), 28);
// Signature
assert_eq!(&header[0..12], &PROXY_V2_SIGNATURE);
// Version 2 + PROXY command
assert_eq!(header[12], 0x21);
// AF_INET + STREAM (TCP)
assert_eq!(header[13], 0x11);
// Address length = 12
assert_eq!(u16::from_be_bytes([header[14], header[15]]), 12);
// Source IP: 198.51.100.10
assert_eq!(&header[16..20], &[198, 51, 100, 10]);
// Dest IP: 203.0.113.25
assert_eq!(&header[20..24], &[203, 0, 113, 25]);
// Source port: 54321
assert_eq!(u16::from_be_bytes([header[24], header[25]]), 54321);
// Dest port: 8443
assert_eq!(u16::from_be_bytes([header[26], header[27]]), 8443);
}
#[test]
fn test_proxy_v2_header_udp4() {
let src = "10.0.0.1".parse().unwrap();
let dst = "10.0.0.2".parse().unwrap();
let header = build_proxy_v2_header(&src, &dst, 12345, 53, ProxyV2Transport::Udp);
assert_eq!(header.len(), 28);
assert_eq!(header[12], 0x21); // v2, PROXY
assert_eq!(header[13], 0x12); // AF_INET + DGRAM (UDP)
assert_eq!(&header[16..20], &[10, 0, 0, 1]); // src
assert_eq!(&header[20..24], &[10, 0, 0, 2]); // dst
assert_eq!(u16::from_be_bytes([header[24], header[25]]), 12345);
assert_eq!(u16::from_be_bytes([header[26], header[27]]), 53);
}
#[test]
fn test_proxy_v2_header_from_str() {
let header = build_proxy_v2_header_from_str("1.2.3.4", "5.6.7.8", 1000, 443, ProxyV2Transport::Tcp);
assert_eq!(header.len(), 28);
assert_eq!(&header[16..20], &[1, 2, 3, 4]);
assert_eq!(&header[20..24], &[5, 6, 7, 8]);
}
#[test]
fn test_proxy_v2_header_from_str_invalid_ip() {
let header = build_proxy_v2_header_from_str("not-an-ip", "also-not", 1000, 443, ProxyV2Transport::Udp);
assert_eq!(header.len(), 28);
// Falls back to 0.0.0.0
assert_eq!(&header[16..20], &[0, 0, 0, 0]);
assert_eq!(&header[20..24], &[0, 0, 0, 0]);
assert_eq!(header[13], 0x12); // UDP
}
#[tokio::test]
async fn test_frame_reader() {
let frame1 = encode_frame(1, FRAME_OPEN, b"PROXY TCP4 1.2.3.4 5.6.7.8 1234 443\r\n");

193
test/test.udp.node.ts Normal file
View File

@@ -0,0 +1,193 @@
import { expect, tap } from '@push.rocks/tapbundle';
import * as dgram from 'dgram';
import * as net from 'net';
import * as crypto from 'crypto';
import { RemoteIngressHub, RemoteIngressEdge } from '../ts/index.js';
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
async function findFreePorts(count: number): Promise<number[]> {
const servers: net.Server[] = [];
const ports: number[] = [];
for (let i = 0; i < count; i++) {
const server = net.createServer();
await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve));
ports.push((server.address() as net.AddressInfo).port);
servers.push(server);
}
await Promise.all(servers.map((s) => new Promise<void>((resolve) => s.close(() => resolve()))));
return ports;
}
/**
* Start a UDP echo server that:
* 1. Receives the first datagram (PROXY v2 header — 28 bytes) and discards it
* 2. Echoes all subsequent datagrams back to the sender
*/
function startUdpEchoServer(port: number, host: string): Promise<dgram.Socket> {
return new Promise((resolve, reject) => {
const server = dgram.createSocket('udp4');
let proxyHeaderReceived = false;
server.on('message', (msg, rinfo) => {
if (!proxyHeaderReceived) {
// First datagram is the PROXY v2 header (28 bytes for IPv4)
// In the current implementation, the hub connects directly via UDP
// so the first real datagram is the actual data (no PROXY header yet)
// For now, just echo everything back
proxyHeaderReceived = true;
}
// Echo back
server.send(msg, rinfo.port, rinfo.address);
});
server.on('error', reject);
server.bind(port, host, () => resolve(server));
});
}
/**
* Send a UDP datagram through the tunnel and wait for the echo response.
*/
function udpSendAndReceive(
port: number,
data: Buffer,
timeoutMs = 10000,
): Promise<Buffer> {
return new Promise((resolve, reject) => {
const client = dgram.createSocket('udp4');
let settled = false;
const timer = setTimeout(() => {
if (!settled) {
settled = true;
client.close();
reject(new Error(`UDP timeout after ${timeoutMs}ms`));
}
}, timeoutMs);
client.on('message', (msg) => {
if (!settled) {
settled = true;
clearTimeout(timer);
client.close();
resolve(msg);
}
});
client.on('error', (err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
client.close();
reject(err);
}
});
client.send(data, port, '127.0.0.1');
});
}
// ---------------------------------------------------------------------------
// Test state
// ---------------------------------------------------------------------------
let hub: RemoteIngressHub;
let edge: RemoteIngressEdge;
let echoServer: dgram.Socket;
let hubPort: number;
let edgeUdpPort: number;
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
tap.test('UDP setup: start echo server and tunnel with UDP ports', async () => {
[hubPort, edgeUdpPort] = await findFreePorts(2);
// Start UDP echo server on upstream (127.0.0.2)
echoServer = await startUdpEchoServer(edgeUdpPort, '127.0.0.2');
hub = new RemoteIngressHub();
edge = new RemoteIngressEdge();
await hub.start({ tunnelPort: hubPort, targetHost: '127.0.0.2' });
await hub.updateAllowedEdges([
{ id: 'test-edge', secret: 'test-secret', listenPorts: [], listenPortsUdp: [edgeUdpPort] },
]);
const connectedPromise = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('Edge did not connect within 10s')), 10000);
edge.once('tunnelConnected', () => {
clearTimeout(timeout);
resolve();
});
});
await edge.start({
hubHost: '127.0.0.1',
hubPort,
edgeId: 'test-edge',
secret: 'test-secret',
bindAddress: '127.0.0.1',
});
await connectedPromise;
// Wait for UDP listener to bind
await new Promise((resolve) => setTimeout(resolve, 500));
const status = await edge.getStatus();
expect(status.connected).toBeTrue();
});
tap.test('UDP: single datagram echo — 64 bytes', async () => {
const data = crypto.randomBytes(64);
const received = await udpSendAndReceive(edgeUdpPort, data, 5000);
expect(received.length).toEqual(64);
expect(Buffer.compare(received, data)).toEqual(0);
});
tap.test('UDP: single datagram echo — 1KB', async () => {
const data = crypto.randomBytes(1024);
const received = await udpSendAndReceive(edgeUdpPort, data, 5000);
expect(received.length).toEqual(1024);
expect(Buffer.compare(received, data)).toEqual(0);
});
tap.test('UDP: 10 sequential datagrams', async () => {
for (let i = 0; i < 10; i++) {
const data = crypto.randomBytes(128);
const received = await udpSendAndReceive(edgeUdpPort, data, 5000);
expect(received.length).toEqual(128);
expect(Buffer.compare(received, data)).toEqual(0);
}
});
tap.test('UDP: 10 concurrent datagrams from different source ports', async () => {
const promises = Array.from({ length: 10 }, () => {
const data = crypto.randomBytes(256);
return udpSendAndReceive(edgeUdpPort, data, 5000).then((received) => ({
sizeOk: received.length === 256,
dataOk: Buffer.compare(received, data) === 0,
}));
});
const results = await Promise.all(promises);
const failures = results.filter((r) => !r.sizeOk || !r.dataOk);
expect(failures.length).toEqual(0);
});
tap.test('UDP: tunnel still connected after tests', async () => {
const status = await edge.getStatus();
expect(status.connected).toBeTrue();
});
tap.test('UDP teardown: stop tunnel and echo server', async () => {
await edge.stop();
await hub.stop();
await new Promise<void>((resolve) => echoServer.close(() => resolve()));
});
export default tap.start();

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@serve.zone/remoteingress',
version: '4.10.0',
version: '4.11.0',
description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.'
}

View File

@@ -22,7 +22,7 @@ type THubCommands = {
};
updateAllowedEdges: {
params: {
edges: Array<{ id: string; secret: string; listenPorts?: number[]; stunIntervalSecs?: number }>;
edges: Array<{ id: string; secret: string; listenPorts?: number[]; listenPortsUdp?: number[]; stunIntervalSecs?: number }>;
};
result: { updated: boolean };
};
@@ -50,7 +50,7 @@ export interface IHubConfig {
};
}
type TAllowedEdge = { id: string; secret: string; listenPorts?: number[]; stunIntervalSecs?: number };
type TAllowedEdge = { id: string; secret: string; listenPorts?: number[]; listenPortsUdp?: number[]; stunIntervalSecs?: number };
const MAX_RESTART_ATTEMPTS = 10;
const MAX_RESTART_BACKOFF_MS = 30_000;