Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 948032fc9e | |||
| a400945371 | |||
| bc89e49f39 | |||
| 2087567f15 |
14
changelog.md
14
changelog.md
@@ -1,5 +1,19 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-03-19 - 4.12.1 - fix(remoteingress-core)
|
||||
send PROXY v2 headers for UDP upstream sessions and expire idle UDP sessions
|
||||
|
||||
- Adds periodic idle UDP session expiry in edge tunnel and QUIC loops, including UDP close signaling for expired tunnel sessions.
|
||||
- Sends the PROXY v2 header as the first datagram for UDP upstream connections in both standard and QUIC hub paths.
|
||||
- Updates the UDP node test server to ignore the initial PROXY v2 datagram per source before echoing payload traffic.
|
||||
|
||||
## 2026-03-19 - 4.12.0 - feat(remoteingress-core)
|
||||
add UDP tunneling over QUIC datagrams and expand transport-specific test coverage
|
||||
|
||||
- Implement QUIC datagram-based UDP forwarding on both edge and hub, including session setup, payload routing, and listener cleanup
|
||||
- Enable QUIC datagram receive buffers in client and server transport configuration
|
||||
- Add UDP-over-QUIC tests and clarify existing test names to distinguish TCP/TLS, UDP/TLS, and QUIC scenarios
|
||||
|
||||
## 2026-03-19 - 4.11.0 - feat(remoteingress-core)
|
||||
add UDP tunneling support between edge and hub
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@serve.zone/remoteingress",
|
||||
"version": "4.11.0",
|
||||
"version": "4.12.1",
|
||||
"private": false,
|
||||
"description": "Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.",
|
||||
"main": "dist_ts/index.js",
|
||||
|
||||
@@ -642,8 +642,23 @@ async fn connect_to_hub_and_run(
|
||||
let liveness_timeout_dur = Duration::from_secs(45);
|
||||
let mut last_activity = Instant::now();
|
||||
let mut liveness_deadline = Box::pin(sleep_until(last_activity + liveness_timeout_dur));
|
||||
let mut next_udp_expiry = Instant::now() + Duration::from_secs(30);
|
||||
|
||||
let result = 'io_loop: loop {
|
||||
// Expire idle UDP sessions periodically
|
||||
if Instant::now() >= next_udp_expiry {
|
||||
let mut sessions = udp_sessions.lock().await;
|
||||
let expired = sessions.expire_idle();
|
||||
for sid in &expired {
|
||||
let close_frame = encode_frame(*sid, FRAME_UDP_CLOSE, &[]);
|
||||
let _ = tunnel_data_tx.try_send(close_frame);
|
||||
}
|
||||
if !expired.is_empty() {
|
||||
log::debug!("Expired {} idle UDP sessions", expired.len());
|
||||
}
|
||||
next_udp_expiry = Instant::now() + Duration::from_secs(30);
|
||||
}
|
||||
|
||||
// Drain any buffered frames
|
||||
loop {
|
||||
let frame = match tunnel_io.try_parse_frame() {
|
||||
@@ -1328,9 +1343,36 @@ async fn connect_to_hub_and_run_quic_with_connection(
|
||||
bind_address,
|
||||
);
|
||||
|
||||
// Monitor control stream for config updates, and connection health.
|
||||
// Also handle shutdown signals.
|
||||
// UDP listeners for QUIC transport — uses QUIC datagrams for low-latency forwarding.
|
||||
let udp_sessions_quic: Arc<Mutex<UdpSessionManager>> =
|
||||
Arc::new(Mutex::new(UdpSessionManager::new(Duration::from_secs(60))));
|
||||
let udp_sockets_quic: Arc<Mutex<HashMap<u16, Arc<UdpSocket>>>> =
|
||||
Arc::new(Mutex::new(HashMap::new()));
|
||||
let mut udp_listeners_quic: HashMap<u16, JoinHandle<()>> = HashMap::new();
|
||||
apply_udp_port_config_quic(
|
||||
&handshake.listen_ports_udp,
|
||||
&mut udp_listeners_quic,
|
||||
&quic_conn,
|
||||
&udp_sessions_quic,
|
||||
&udp_sockets_quic,
|
||||
next_stream_id,
|
||||
connection_token,
|
||||
bind_address,
|
||||
);
|
||||
|
||||
// Monitor control stream for config updates, connection health, and QUIC datagrams.
|
||||
let mut next_udp_expiry_quic = Instant::now() + Duration::from_secs(30);
|
||||
let result = 'quic_loop: loop {
|
||||
// Expire idle UDP sessions periodically
|
||||
if Instant::now() >= next_udp_expiry_quic {
|
||||
let mut sessions = udp_sessions_quic.lock().await;
|
||||
let expired = sessions.expire_idle();
|
||||
if !expired.is_empty() {
|
||||
log::debug!("Expired {} idle QUIC UDP sessions", expired.len());
|
||||
}
|
||||
next_udp_expiry_quic = Instant::now() + Duration::from_secs(30);
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
// Read control messages from hub
|
||||
ctrl_msg = quic_transport::read_ctrl_message(&mut ctrl_recv) => {
|
||||
@@ -1354,6 +1396,16 @@ async fn connect_to_hub_and_run_quic_with_connection(
|
||||
connection_token,
|
||||
bind_address,
|
||||
);
|
||||
apply_udp_port_config_quic(
|
||||
&update.listen_ports_udp,
|
||||
&mut udp_listeners_quic,
|
||||
&quic_conn,
|
||||
&udp_sessions_quic,
|
||||
&udp_sockets_quic,
|
||||
next_stream_id,
|
||||
connection_token,
|
||||
bind_address,
|
||||
);
|
||||
}
|
||||
}
|
||||
quic_transport::CTRL_PING => {
|
||||
@@ -1384,6 +1436,30 @@ async fn connect_to_hub_and_run_quic_with_connection(
|
||||
}
|
||||
}
|
||||
}
|
||||
// Receive QUIC datagrams (UDP return traffic from hub)
|
||||
datagram = quic_conn.read_datagram() => {
|
||||
match datagram {
|
||||
Ok(data) => {
|
||||
// Format: [session_id:4][payload:N]
|
||||
if data.len() >= 4 {
|
||||
let session_id = u32::from_be_bytes([data[0], data[1], data[2], data[3]]);
|
||||
let payload = &data[4..];
|
||||
let mut sessions = udp_sessions_quic.lock().await;
|
||||
if let Some(session) = sessions.get_by_stream_id(session_id) {
|
||||
let client_addr = session.client_addr;
|
||||
let dest_port = session.dest_port;
|
||||
let sockets = udp_sockets_quic.lock().await;
|
||||
if let Some(socket) = sockets.get(&dest_port) {
|
||||
let _ = socket.send_to(payload, client_addr).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::debug!("QUIC datagram recv error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
// QUIC connection closed
|
||||
reason = quic_conn.closed() => {
|
||||
log::info!("QUIC connection closed: {}", reason);
|
||||
@@ -1405,6 +1481,9 @@ async fn connect_to_hub_and_run_quic_with_connection(
|
||||
for (_, h) in port_listeners.drain() {
|
||||
h.abort();
|
||||
}
|
||||
for (_, h) in udp_listeners_quic.drain() {
|
||||
h.abort();
|
||||
}
|
||||
|
||||
// Graceful QUIC close
|
||||
quic_conn.close(quinn::VarInt::from_u32(0), b"shutdown");
|
||||
@@ -1513,6 +1592,104 @@ fn apply_port_config_quic(
|
||||
/// Handle a single client connection via QUIC transport.
|
||||
/// Opens a new QUIC bidirectional stream, sends the PROXY header,
|
||||
/// then bidirectionally copies data between the client TCP socket and the QUIC stream.
|
||||
/// Apply UDP port config for QUIC transport: bind UdpSockets that send via QUIC datagrams.
|
||||
fn apply_udp_port_config_quic(
|
||||
new_ports: &[u16],
|
||||
udp_listeners: &mut HashMap<u16, JoinHandle<()>>,
|
||||
quic_conn: &quinn::Connection,
|
||||
udp_sessions: &Arc<Mutex<UdpSessionManager>>,
|
||||
udp_sockets: &Arc<Mutex<HashMap<u16, Arc<UdpSocket>>>>,
|
||||
next_stream_id: &Arc<AtomicU32>,
|
||||
connection_token: &CancellationToken,
|
||||
bind_address: &str,
|
||||
) {
|
||||
let new_set: std::collections::HashSet<u16> = new_ports.iter().copied().collect();
|
||||
let old_set: std::collections::HashSet<u16> = udp_listeners.keys().copied().collect();
|
||||
|
||||
for &port in old_set.difference(&new_set) {
|
||||
if let Some(handle) = udp_listeners.remove(&port) {
|
||||
log::info!("Stopping QUIC UDP listener on port {}", port);
|
||||
handle.abort();
|
||||
}
|
||||
let sockets = udp_sockets.clone();
|
||||
tokio::spawn(async move { sockets.lock().await.remove(&port); });
|
||||
}
|
||||
|
||||
for &port in new_set.difference(&old_set) {
|
||||
let quic_conn = quic_conn.clone();
|
||||
let udp_sessions = udp_sessions.clone();
|
||||
let udp_sockets = udp_sockets.clone();
|
||||
let next_stream_id = next_stream_id.clone();
|
||||
let port_token = connection_token.child_token();
|
||||
let bind_addr = bind_address.to_string();
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
let socket = match UdpSocket::bind((bind_addr.as_str(), port)).await {
|
||||
Ok(s) => Arc::new(s),
|
||||
Err(e) => {
|
||||
log::error!("Failed to bind QUIC UDP port {}: {}", port, e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
log::info!("Listening on UDP port {} (QUIC datagram transport)", port);
|
||||
udp_sockets.lock().await.insert(port, socket.clone());
|
||||
|
||||
let mut buf = vec![0u8; 65536];
|
||||
loop {
|
||||
tokio::select! {
|
||||
recv_result = socket.recv_from(&mut buf) => {
|
||||
match recv_result {
|
||||
Ok((len, client_addr)) => {
|
||||
let key = UdpSessionKey { client_addr, dest_port: port };
|
||||
let mut sessions = udp_sessions.lock().await;
|
||||
|
||||
let stream_id = if let Some(session) = sessions.get_mut(&key) {
|
||||
session.stream_id
|
||||
} else {
|
||||
// New session — send PROXY v2 header via control-style datagram
|
||||
let sid = next_stream_id.fetch_add(1, Ordering::Relaxed);
|
||||
sessions.insert(key, sid);
|
||||
|
||||
let client_ip = client_addr.ip().to_string();
|
||||
let client_port = client_addr.port();
|
||||
let proxy_header = build_proxy_v2_header_from_str(
|
||||
&client_ip, "0.0.0.0", client_port, port,
|
||||
ProxyV2Transport::Udp,
|
||||
);
|
||||
// Send OPEN as a QUIC datagram: [session_id:4][0xFF magic:1][proxy_header:28]
|
||||
let mut open_buf = Vec::with_capacity(4 + 1 + proxy_header.len());
|
||||
open_buf.extend_from_slice(&sid.to_be_bytes());
|
||||
open_buf.push(0xFF); // magic byte to distinguish OPEN from DATA
|
||||
open_buf.extend_from_slice(&proxy_header);
|
||||
let _ = quic_conn.send_datagram(open_buf.into());
|
||||
|
||||
log::debug!("New QUIC UDP session {} from {} -> port {}", sid, client_addr, port);
|
||||
sid
|
||||
};
|
||||
drop(sessions);
|
||||
|
||||
// Send datagram: [session_id:4][payload:N]
|
||||
let mut dgram = Vec::with_capacity(4 + len);
|
||||
dgram.extend_from_slice(&stream_id.to_be_bytes());
|
||||
dgram.extend_from_slice(&buf[..len]);
|
||||
let _ = quic_conn.send_datagram(dgram.into());
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("QUIC UDP recv error on port {}: {}", port, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = port_token.cancelled() => {
|
||||
log::info!("QUIC UDP port {} listener cancelled", port);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
udp_listeners.insert(port, handle);
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_client_connection_quic(
|
||||
client_stream: TcpStream,
|
||||
client_addr: std::net::SocketAddr,
|
||||
|
||||
@@ -706,6 +706,7 @@ async fn handle_hub_frame(
|
||||
let data_writer_tx = data_tx.clone();
|
||||
let session_token = edge_token.child_token();
|
||||
let edge_id_str = edge_id.to_string();
|
||||
let proxy_v2_header = frame.payload.clone();
|
||||
|
||||
// Channel for forwarding datagrams from edge to upstream
|
||||
let (udp_tx, mut udp_rx) = mpsc::channel::<Bytes>(256);
|
||||
@@ -728,6 +729,12 @@ async fn handle_hub_frame(
|
||||
return;
|
||||
}
|
||||
|
||||
// Send PROXY v2 header as first datagram so SmartProxy knows the original client
|
||||
if let Err(e) = upstream.send(&proxy_v2_header).await {
|
||||
log::error!("UDP session {} failed to send PROXY v2 header: {}", stream_id, e);
|
||||
return;
|
||||
}
|
||||
|
||||
// Task: upstream -> edge (return datagrams)
|
||||
let upstream_recv = Arc::new(upstream);
|
||||
let upstream_send = upstream_recv.clone();
|
||||
@@ -1331,6 +1338,129 @@ async fn handle_edge_connection_quic(
|
||||
}
|
||||
});
|
||||
|
||||
// UDP sessions for QUIC datagram transport
|
||||
let quic_udp_sessions: Arc<Mutex<HashMap<u32, mpsc::Sender<Bytes>>>> =
|
||||
Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
// Spawn QUIC datagram receiver task
|
||||
let dgram_conn = quic_conn.clone();
|
||||
let dgram_sessions = quic_udp_sessions.clone();
|
||||
let dgram_target = target_host.clone();
|
||||
let dgram_edge_id = edge_id.clone();
|
||||
let dgram_token = edge_token.clone();
|
||||
let dgram_handle = tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
datagram = dgram_conn.read_datagram() => {
|
||||
match datagram {
|
||||
Ok(data) => {
|
||||
if data.len() < 4 { continue; }
|
||||
let session_id = u32::from_be_bytes([data[0], data[1], data[2], data[3]]);
|
||||
let payload = &data[4..];
|
||||
|
||||
// Check for OPEN magic byte (0xFF)
|
||||
if !payload.is_empty() && payload[0] == 0xFF {
|
||||
// This is a session OPEN: [0xFF][proxy_v2_header:28]
|
||||
let proxy_data = &payload[1..];
|
||||
let dest_port = if proxy_data.len() >= 28 {
|
||||
u16::from_be_bytes([proxy_data[26], proxy_data[27]])
|
||||
} else {
|
||||
53 // fallback
|
||||
};
|
||||
|
||||
// Create upstream UDP socket
|
||||
let target = dgram_target.clone();
|
||||
let conn = dgram_conn.clone();
|
||||
let sessions = dgram_sessions.clone();
|
||||
let session_token = dgram_token.child_token();
|
||||
let (tx, mut rx) = mpsc::channel::<Bytes>(256);
|
||||
let proxy_v2_data: Vec<u8> = proxy_data.to_vec();
|
||||
|
||||
{
|
||||
let mut s = sessions.lock().await;
|
||||
s.insert(session_id, tx);
|
||||
}
|
||||
|
||||
tokio::spawn(async move {
|
||||
let upstream = match UdpSocket::bind("0.0.0.0:0").await {
|
||||
Ok(s) => Arc::new(s),
|
||||
Err(e) => {
|
||||
log::error!("QUIC UDP session {} bind failed: {}", session_id, e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Err(e) = upstream.connect((target.as_str(), dest_port)).await {
|
||||
log::error!("QUIC UDP session {} connect failed: {}", session_id, e);
|
||||
return;
|
||||
}
|
||||
|
||||
// Send PROXY v2 header as first datagram so SmartProxy knows the original client
|
||||
if let Err(e) = upstream.send(&proxy_v2_data).await {
|
||||
log::error!("QUIC UDP session {} failed to send PROXY v2 header: {}", session_id, e);
|
||||
return;
|
||||
}
|
||||
|
||||
// Upstream recv → QUIC datagram back to edge
|
||||
let upstream_recv = upstream.clone();
|
||||
let recv_conn = conn.clone();
|
||||
let recv_token = session_token.clone();
|
||||
let recv_handle = tokio::spawn(async move {
|
||||
let mut buf = vec![0u8; 65536];
|
||||
loop {
|
||||
tokio::select! {
|
||||
result = upstream_recv.recv(&mut buf) => {
|
||||
match result {
|
||||
Ok(len) => {
|
||||
let mut dgram = Vec::with_capacity(4 + len);
|
||||
dgram.extend_from_slice(&session_id.to_be_bytes());
|
||||
dgram.extend_from_slice(&buf[..len]);
|
||||
let _ = recv_conn.send_datagram(dgram.into());
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
_ = recv_token.cancelled() => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Edge datagrams → upstream
|
||||
loop {
|
||||
tokio::select! {
|
||||
data = rx.recv() => {
|
||||
match data {
|
||||
Some(datagram) => {
|
||||
let _ = upstream.send(&datagram).await;
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
_ = session_token.cancelled() => break,
|
||||
}
|
||||
}
|
||||
recv_handle.abort();
|
||||
});
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
// Regular data datagram — forward to upstream
|
||||
let sessions = dgram_sessions.lock().await;
|
||||
if let Some(tx) = sessions.get(&session_id) {
|
||||
let _ = tx.try_send(Bytes::copy_from_slice(payload));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::debug!("QUIC datagram recv error from edge {}: {}", dgram_edge_id, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = dgram_token.cancelled() => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Control stream loop: forward config updates and handle PONG
|
||||
let disconnect_reason;
|
||||
loop {
|
||||
@@ -1399,6 +1529,7 @@ async fn handle_edge_connection_quic(
|
||||
// Cleanup
|
||||
edge_token.cancel();
|
||||
data_handle.abort();
|
||||
dgram_handle.abort();
|
||||
quic_conn.close(quinn::VarInt::from_u32(0), b"hub_shutdown");
|
||||
|
||||
{
|
||||
|
||||
@@ -31,6 +31,8 @@ pub fn build_quic_client_config() -> quinn::ClientConfig {
|
||||
// Match MAX_STREAMS_PER_EDGE (1024) from hub.rs.
|
||||
// Default is 100 which is too low for high-concurrency tunneling.
|
||||
transport.max_concurrent_bidi_streams(1024u32.into());
|
||||
// Enable QUIC datagrams (RFC 9221) for low-latency UDP tunneling.
|
||||
transport.datagram_receive_buffer_size(Some(65536));
|
||||
|
||||
let mut client_config = quinn::ClientConfig::new(Arc::new(quic_config));
|
||||
client_config.transport_config(Arc::new(transport));
|
||||
@@ -49,6 +51,7 @@ pub fn build_quic_server_config(
|
||||
quinn::IdleTimeout::try_from(std::time::Duration::from_secs(45)).unwrap(),
|
||||
));
|
||||
transport.max_concurrent_bidi_streams(1024u32.into());
|
||||
transport.datagram_receive_buffer_size(Some(65536));
|
||||
|
||||
let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_config));
|
||||
server_config.transport_config(Arc::new(transport));
|
||||
|
||||
@@ -315,7 +315,7 @@ let echoServer: TrackingServer;
|
||||
let hubPort: number;
|
||||
let edgePort: number;
|
||||
|
||||
tap.test('setup: start echo server and tunnel', async () => {
|
||||
tap.test('TCP/TLS setup: start TCP echo server and TCP+TLS tunnel', async () => {
|
||||
[hubPort, edgePort] = await findFreePorts(2);
|
||||
|
||||
echoServer = await startEchoServer(edgePort, '127.0.0.2');
|
||||
@@ -324,7 +324,7 @@ tap.test('setup: start echo server and tunnel', async () => {
|
||||
expect(tunnel.hub.running).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('single stream: 32MB transfer exceeding initial 4MB window (multiple refills)', async () => {
|
||||
tap.test('TCP/TLS: single TCP stream — 32MB transfer exceeding initial 4MB window', async () => {
|
||||
const size = 32 * 1024 * 1024;
|
||||
const data = crypto.randomBytes(size);
|
||||
const expectedHash = sha256(data);
|
||||
@@ -335,7 +335,7 @@ tap.test('single stream: 32MB transfer exceeding initial 4MB window (multiple re
|
||||
expect(sha256(received)).toEqual(expectedHash);
|
||||
});
|
||||
|
||||
tap.test('200 concurrent streams with 64KB each', async () => {
|
||||
tap.test('TCP/TLS: 200 concurrent TCP streams x 64KB each', async () => {
|
||||
const streamCount = 200;
|
||||
const payloadSize = 64 * 1024;
|
||||
|
||||
@@ -355,7 +355,7 @@ tap.test('200 concurrent streams with 64KB each', async () => {
|
||||
expect(failures.length).toEqual(0);
|
||||
});
|
||||
|
||||
tap.test('512 concurrent streams at minimum window boundary (16KB each)', async () => {
|
||||
tap.test('TCP/TLS: 512 concurrent TCP streams at minimum window boundary (16KB each)', async () => {
|
||||
const streamCount = 512;
|
||||
const payloadSize = 16 * 1024;
|
||||
|
||||
@@ -375,7 +375,7 @@ tap.test('512 concurrent streams at minimum window boundary (16KB each)', async
|
||||
expect(failures.length).toEqual(0);
|
||||
});
|
||||
|
||||
tap.test('asymmetric transfer: 4KB request -> 4MB response', async () => {
|
||||
tap.test('TCP/TLS: asymmetric TCP transfer — 4KB request -> 4MB response', async () => {
|
||||
// Swap to large-response server
|
||||
await forceCloseServer(echoServer);
|
||||
const responseSize = 4 * 1024 * 1024; // 4 MB
|
||||
@@ -392,7 +392,7 @@ tap.test('asymmetric transfer: 4KB request -> 4MB response', async () => {
|
||||
}
|
||||
});
|
||||
|
||||
tap.test('100 streams x 1MB each (100MB total exceeding 200MB budget)', async () => {
|
||||
tap.test('TCP/TLS: 100 TCP streams x 1MB each (100MB total exceeding 200MB budget)', async () => {
|
||||
const streamCount = 100;
|
||||
const payloadSize = 1 * 1024 * 1024;
|
||||
|
||||
@@ -412,7 +412,7 @@ tap.test('100 streams x 1MB each (100MB total exceeding 200MB budget)', async ()
|
||||
expect(failures.length).toEqual(0);
|
||||
});
|
||||
|
||||
tap.test('active stream counter tracks concurrent connections', async () => {
|
||||
tap.test('TCP/TLS: active TCP stream counter tracks concurrent connections', async () => {
|
||||
const N = 50;
|
||||
|
||||
// Open N connections and keep them alive (send data but don't close)
|
||||
@@ -445,7 +445,7 @@ tap.test('active stream counter tracks concurrent connections', async () => {
|
||||
}
|
||||
});
|
||||
|
||||
tap.test('50 streams x 2MB each (forces multiple window refills per stream)', async () => {
|
||||
tap.test('TCP/TLS: 50 TCP streams x 2MB each (forces multiple window refills)', async () => {
|
||||
// At 50 concurrent streams: adaptive window = 200MB/50 = 4MB per stream
|
||||
// Each stream sends 2MB → needs ~3 WINDOW_UPDATE refill cycles per stream
|
||||
const streamCount = 50;
|
||||
@@ -467,7 +467,7 @@ tap.test('50 streams x 2MB each (forces multiple window refills per stream)', as
|
||||
expect(failures.length).toEqual(0);
|
||||
});
|
||||
|
||||
tap.test('teardown: stop tunnel and echo server', async () => {
|
||||
tap.test('TCP/TLS teardown: stop tunnel and TCP echo server', async () => {
|
||||
await tunnel.cleanup();
|
||||
await forceCloseServer(echoServer);
|
||||
});
|
||||
|
||||
@@ -231,7 +231,7 @@ let edgePort: number;
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
tap.test('setup: start throttled tunnel (100 Mbit/s)', async () => {
|
||||
tap.test('TCP/TLS setup: start throttled TCP+TLS tunnel (100 Mbit/s)', async () => {
|
||||
[hubPort, proxyPort, edgePort] = await findFreePorts(3);
|
||||
|
||||
echoServer = await startEchoServer(edgePort, '127.0.0.2');
|
||||
@@ -271,7 +271,7 @@ tap.test('setup: start throttled tunnel (100 Mbit/s)', async () => {
|
||||
expect(status.connected).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('throttled: 5 streams x 20MB each through 100Mbit tunnel', async () => {
|
||||
tap.test('TCP/TLS throttled: 5 TCP streams x 20MB each through 100Mbit tunnel', async () => {
|
||||
const streamCount = 5;
|
||||
const payloadSize = 20 * 1024 * 1024; // 20MB per stream = 100MB total round-trip
|
||||
|
||||
@@ -293,7 +293,7 @@ tap.test('throttled: 5 streams x 20MB each through 100Mbit tunnel', async () =>
|
||||
expect(status.connected).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('throttled: slow consumer with 20MB does not kill other streams', async () => {
|
||||
tap.test('TCP/TLS throttled: slow TCP consumer with 20MB does not kill other streams', async () => {
|
||||
// Open a connection that creates download-direction backpressure:
|
||||
// send 20MB but DON'T read the response — client TCP receive buffer fills
|
||||
const slowSock = net.createConnection({ host: '127.0.0.1', port: edgePort });
|
||||
@@ -326,7 +326,7 @@ tap.test('throttled: slow consumer with 20MB does not kill other streams', async
|
||||
slowSock.destroy();
|
||||
});
|
||||
|
||||
tap.test('throttled: rapid churn — 3 x 20MB long + 50 x 1MB short streams', async () => {
|
||||
tap.test('TCP/TLS throttled: rapid churn — 3 x 20MB long + 50 x 1MB short TCP streams', async () => {
|
||||
// 3 long streams (20MB each) running alongside 50 short streams (1MB each)
|
||||
const longPayload = crypto.randomBytes(20 * 1024 * 1024);
|
||||
const longHash = sha256(longPayload);
|
||||
@@ -360,7 +360,7 @@ tap.test('throttled: rapid churn — 3 x 20MB long + 50 x 1MB short streams', as
|
||||
expect(status.connected).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('throttled: 3 burst waves of 5 streams x 20MB each', async () => {
|
||||
tap.test('TCP/TLS throttled: 3 burst waves of 5 TCP streams x 20MB each', async () => {
|
||||
for (let wave = 0; wave < 3; wave++) {
|
||||
const streamCount = 5;
|
||||
const payloadSize = 20 * 1024 * 1024; // 20MB per stream = 100MB per wave
|
||||
@@ -382,7 +382,7 @@ tap.test('throttled: 3 burst waves of 5 streams x 20MB each', async () => {
|
||||
}
|
||||
});
|
||||
|
||||
tap.test('throttled: tunnel still works after all load tests', async () => {
|
||||
tap.test('TCP/TLS throttled: TCP tunnel still works after all load tests', async () => {
|
||||
const data = crypto.randomBytes(1024);
|
||||
const hash = sha256(data);
|
||||
const received = await sendAndReceive(edgePort, data, 30000);
|
||||
@@ -392,7 +392,7 @@ tap.test('throttled: tunnel still works after all load tests', async () => {
|
||||
expect(status.connected).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('teardown: stop tunnel', async () => {
|
||||
tap.test('TCP/TLS teardown: stop throttled tunnel', async () => {
|
||||
await edge.stop();
|
||||
await hub.stop();
|
||||
if (throttle) await throttle.close();
|
||||
|
||||
@@ -176,7 +176,7 @@ let echoServer: TrackingServer;
|
||||
let hubPort: number;
|
||||
let edgePort: number;
|
||||
|
||||
tap.test('QUIC setup: start echo server and QUIC tunnel', async () => {
|
||||
tap.test('QUIC setup: start TCP echo server and QUIC tunnel', async () => {
|
||||
[hubPort, edgePort] = await findFreePorts(2);
|
||||
|
||||
echoServer = await startEchoServer(edgePort, '127.0.0.2');
|
||||
@@ -187,7 +187,7 @@ tap.test('QUIC setup: start echo server and QUIC tunnel', async () => {
|
||||
expect(status.connected).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('QUIC: single stream echo — 1KB', async () => {
|
||||
tap.test('QUIC: single TCP stream echo — 1KB', async () => {
|
||||
const data = crypto.randomBytes(1024);
|
||||
const hash = sha256(data);
|
||||
const received = await sendAndReceive(edgePort, data, 10000);
|
||||
@@ -195,7 +195,7 @@ tap.test('QUIC: single stream echo — 1KB', async () => {
|
||||
expect(sha256(received)).toEqual(hash);
|
||||
});
|
||||
|
||||
tap.test('QUIC: single stream echo — 1MB', async () => {
|
||||
tap.test('QUIC: single TCP stream echo — 1MB', async () => {
|
||||
const size = 1024 * 1024;
|
||||
const data = crypto.randomBytes(size);
|
||||
const hash = sha256(data);
|
||||
@@ -204,7 +204,7 @@ tap.test('QUIC: single stream echo — 1MB', async () => {
|
||||
expect(sha256(received)).toEqual(hash);
|
||||
});
|
||||
|
||||
tap.test('QUIC: single stream echo — 16MB', async () => {
|
||||
tap.test('QUIC: single TCP stream echo — 16MB', async () => {
|
||||
const size = 16 * 1024 * 1024;
|
||||
const data = crypto.randomBytes(size);
|
||||
const hash = sha256(data);
|
||||
@@ -213,7 +213,7 @@ tap.test('QUIC: single stream echo — 16MB', async () => {
|
||||
expect(sha256(received)).toEqual(hash);
|
||||
});
|
||||
|
||||
tap.test('QUIC: 10 concurrent streams x 1MB each', async () => {
|
||||
tap.test('QUIC: 10 concurrent TCP streams x 1MB each', async () => {
|
||||
const streamCount = 10;
|
||||
const payloadSize = 1024 * 1024;
|
||||
|
||||
@@ -232,7 +232,7 @@ tap.test('QUIC: 10 concurrent streams x 1MB each', async () => {
|
||||
expect(failures.length).toEqual(0);
|
||||
});
|
||||
|
||||
tap.test('QUIC: 50 concurrent streams x 64KB each', async () => {
|
||||
tap.test('QUIC: 50 concurrent TCP streams x 64KB each', async () => {
|
||||
const streamCount = 50;
|
||||
const payloadSize = 64 * 1024;
|
||||
|
||||
@@ -251,7 +251,7 @@ tap.test('QUIC: 50 concurrent streams x 64KB each', async () => {
|
||||
expect(failures.length).toEqual(0);
|
||||
});
|
||||
|
||||
tap.test('QUIC: 200 concurrent streams x 16KB each', async () => {
|
||||
tap.test('QUIC: 200 concurrent TCP streams x 16KB each', async () => {
|
||||
const streamCount = 200;
|
||||
const payloadSize = 16 * 1024;
|
||||
|
||||
@@ -270,12 +270,12 @@ tap.test('QUIC: 200 concurrent streams x 16KB each', async () => {
|
||||
expect(failures.length).toEqual(0);
|
||||
});
|
||||
|
||||
tap.test('QUIC: tunnel still connected after all tests', async () => {
|
||||
tap.test('QUIC: TCP tunnel still connected after all tests', async () => {
|
||||
const status = await tunnel.edge.getStatus();
|
||||
expect(status.connected).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('QUIC teardown: stop tunnel and echo server', async () => {
|
||||
tap.test('QUIC teardown: stop TCP tunnel and echo server', async () => {
|
||||
await tunnel.cleanup();
|
||||
await forceCloseServer(echoServer);
|
||||
});
|
||||
|
||||
@@ -29,15 +29,16 @@ async function findFreePorts(count: number): Promise<number[]> {
|
||||
function startUdpEchoServer(port: number, host: string): Promise<dgram.Socket> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const server = dgram.createSocket('udp4');
|
||||
let proxyHeaderReceived = false;
|
||||
// Track which source endpoints have sent their PROXY v2 header.
|
||||
// The hub sends a 28-byte PROXY v2 header as the first datagram per session.
|
||||
const seenSources = new Set<string>();
|
||||
|
||||
server.on('message', (msg, rinfo) => {
|
||||
if (!proxyHeaderReceived) {
|
||||
// First datagram is the PROXY v2 header (28 bytes for IPv4)
|
||||
// In the current implementation, the hub connects directly via UDP
|
||||
// so the first real datagram is the actual data (no PROXY header yet)
|
||||
// For now, just echo everything back
|
||||
proxyHeaderReceived = true;
|
||||
const sourceKey = `${rinfo.address}:${rinfo.port}`;
|
||||
if (!seenSources.has(sourceKey)) {
|
||||
seenSources.add(sourceKey);
|
||||
// First datagram from this source is the PROXY v2 header — skip it
|
||||
return;
|
||||
}
|
||||
// Echo back
|
||||
server.send(msg, rinfo.port, rinfo.address);
|
||||
@@ -104,7 +105,7 @@ let edgeUdpPort: number;
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
tap.test('UDP setup: start echo server and tunnel with UDP ports', async () => {
|
||||
tap.test('UDP/TLS setup: start UDP echo server and TCP+TLS tunnel with UDP ports', async () => {
|
||||
[hubPort, edgeUdpPort] = await findFreePorts(2);
|
||||
|
||||
// Start UDP echo server on upstream (127.0.0.2)
|
||||
@@ -142,21 +143,21 @@ tap.test('UDP setup: start echo server and tunnel with UDP ports', async () => {
|
||||
expect(status.connected).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('UDP: single datagram echo — 64 bytes', async () => {
|
||||
tap.test('UDP/TLS: single UDP datagram echo — 64 bytes', async () => {
|
||||
const data = crypto.randomBytes(64);
|
||||
const received = await udpSendAndReceive(edgeUdpPort, data, 5000);
|
||||
expect(received.length).toEqual(64);
|
||||
expect(Buffer.compare(received, data)).toEqual(0);
|
||||
});
|
||||
|
||||
tap.test('UDP: single datagram echo — 1KB', async () => {
|
||||
tap.test('UDP/TLS: single UDP datagram echo — 1KB', async () => {
|
||||
const data = crypto.randomBytes(1024);
|
||||
const received = await udpSendAndReceive(edgeUdpPort, data, 5000);
|
||||
expect(received.length).toEqual(1024);
|
||||
expect(Buffer.compare(received, data)).toEqual(0);
|
||||
});
|
||||
|
||||
tap.test('UDP: 10 sequential datagrams', async () => {
|
||||
tap.test('UDP/TLS: 10 sequential UDP datagrams', async () => {
|
||||
for (let i = 0; i < 10; i++) {
|
||||
const data = crypto.randomBytes(128);
|
||||
const received = await udpSendAndReceive(edgeUdpPort, data, 5000);
|
||||
@@ -165,7 +166,7 @@ tap.test('UDP: 10 sequential datagrams', async () => {
|
||||
}
|
||||
});
|
||||
|
||||
tap.test('UDP: 10 concurrent datagrams from different source ports', async () => {
|
||||
tap.test('UDP/TLS: 10 concurrent UDP datagrams from different source ports', async () => {
|
||||
const promises = Array.from({ length: 10 }, () => {
|
||||
const data = crypto.randomBytes(256);
|
||||
return udpSendAndReceive(edgeUdpPort, data, 5000).then((received) => ({
|
||||
@@ -179,15 +180,105 @@ tap.test('UDP: 10 concurrent datagrams from different source ports', async () =>
|
||||
expect(failures.length).toEqual(0);
|
||||
});
|
||||
|
||||
tap.test('UDP: tunnel still connected after tests', async () => {
|
||||
tap.test('UDP/TLS: tunnel still connected after UDP tests', async () => {
|
||||
const status = await edge.getStatus();
|
||||
expect(status.connected).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('UDP teardown: stop tunnel and echo server', async () => {
|
||||
tap.test('UDP/TLS teardown: stop tunnel and UDP echo server', async () => {
|
||||
await edge.stop();
|
||||
await hub.stop();
|
||||
await new Promise<void>((resolve) => echoServer.close(() => resolve()));
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// QUIC transport UDP tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
let quicHub: RemoteIngressHub;
|
||||
let quicEdge: RemoteIngressEdge;
|
||||
let quicEchoServer: dgram.Socket;
|
||||
let quicHubPort: number;
|
||||
let quicEdgeUdpPort: number;
|
||||
|
||||
tap.test('UDP/QUIC setup: start UDP echo server and QUIC tunnel with UDP ports', async () => {
|
||||
[quicHubPort, quicEdgeUdpPort] = await findFreePorts(2);
|
||||
|
||||
quicEchoServer = await startUdpEchoServer(quicEdgeUdpPort, '127.0.0.2');
|
||||
|
||||
quicHub = new RemoteIngressHub();
|
||||
quicEdge = new RemoteIngressEdge();
|
||||
|
||||
await quicHub.start({ tunnelPort: quicHubPort, targetHost: '127.0.0.2' });
|
||||
await quicHub.updateAllowedEdges([
|
||||
{ id: 'test-edge', secret: 'test-secret', listenPorts: [], listenPortsUdp: [quicEdgeUdpPort] },
|
||||
]);
|
||||
|
||||
const connectedPromise = new Promise<void>((resolve, reject) => {
|
||||
const timeout = setTimeout(() => reject(new Error('QUIC edge did not connect within 10s')), 10000);
|
||||
quicEdge.once('tunnelConnected', () => {
|
||||
clearTimeout(timeout);
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
|
||||
await quicEdge.start({
|
||||
hubHost: '127.0.0.1',
|
||||
hubPort: quicHubPort,
|
||||
edgeId: 'test-edge',
|
||||
secret: 'test-secret',
|
||||
bindAddress: '127.0.0.1',
|
||||
transportMode: 'quic',
|
||||
});
|
||||
|
||||
await connectedPromise;
|
||||
await new Promise((resolve) => setTimeout(resolve, 500));
|
||||
|
||||
const status = await quicEdge.getStatus();
|
||||
expect(status.connected).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('UDP/QUIC: single UDP datagram echo — 64 bytes', async () => {
|
||||
const data = crypto.randomBytes(64);
|
||||
const received = await udpSendAndReceive(quicEdgeUdpPort, data, 5000);
|
||||
expect(received.length).toEqual(64);
|
||||
expect(Buffer.compare(received, data)).toEqual(0);
|
||||
});
|
||||
|
||||
tap.test('UDP/QUIC: single UDP datagram echo — 1KB', async () => {
|
||||
const data = crypto.randomBytes(1024);
|
||||
const received = await udpSendAndReceive(quicEdgeUdpPort, data, 5000);
|
||||
expect(received.length).toEqual(1024);
|
||||
expect(Buffer.compare(received, data)).toEqual(0);
|
||||
});
|
||||
|
||||
tap.test('UDP/QUIC: 10 sequential UDP datagrams', async () => {
|
||||
for (let i = 0; i < 10; i++) {
|
||||
const data = crypto.randomBytes(128);
|
||||
const received = await udpSendAndReceive(quicEdgeUdpPort, data, 5000);
|
||||
expect(received.length).toEqual(128);
|
||||
expect(Buffer.compare(received, data)).toEqual(0);
|
||||
}
|
||||
});
|
||||
|
||||
tap.test('UDP/QUIC: 10 concurrent UDP datagrams', async () => {
|
||||
const promises = Array.from({ length: 10 }, () => {
|
||||
const data = crypto.randomBytes(256);
|
||||
return udpSendAndReceive(quicEdgeUdpPort, data, 5000).then((received) => ({
|
||||
sizeOk: received.length === 256,
|
||||
dataOk: Buffer.compare(received, data) === 0,
|
||||
}));
|
||||
});
|
||||
|
||||
const results = await Promise.all(promises);
|
||||
const failures = results.filter((r) => !r.sizeOk || !r.dataOk);
|
||||
expect(failures.length).toEqual(0);
|
||||
});
|
||||
|
||||
tap.test('UDP/QUIC teardown: stop QUIC tunnel and UDP echo server', async () => {
|
||||
await quicEdge.stop();
|
||||
await quicHub.stop();
|
||||
await new Promise<void>((resolve) => quicEchoServer.close(() => resolve()));
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@serve.zone/remoteingress',
|
||||
version: '4.11.0',
|
||||
version: '4.12.1',
|
||||
description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.'
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user