Compare commits

...

14 Commits

11 changed files with 832 additions and 33 deletions

View File

@@ -1,5 +1,49 @@
# Changelog
## 2026-03-17 - 4.7.1 - fix(remoteingress-core)
improve tunnel failure detection and reconnect handling
- Enable TCP keepalive on edge and hub connections to detect silent network failures sooner
- Trigger immediate reconnect or disconnect when tunnel writer tasks fail instead of waiting for liveness timeouts
- Prevent active stream counter underflow during concurrent connection cleanup
## 2026-03-16 - 4.7.0 - feat(edge,protocol,test)
add configurable edge bind address and expand flow-control test coverage
- adds an optional bindAddress configuration for edge TCP listeners, defaulting to 0.0.0.0 when not provided
- passes bindAddress through the TypeScript edge client and Rust edge runtime so local test setups can bind to localhost
- adds protocol unit tests for adaptive stream window sizing and window update frame encoding/decoding
- introduces end-to-end flow-control tests and updates the test script to build before running tests
## 2026-03-16 - 4.6.1 - fix(remoteingress-core)
avoid spurious tunnel disconnect events and increase control channel capacity
- Emit TunnelDisconnected only after an established connection is actually lost, preventing false disconnect events during failed reconnect attempts.
- Increase edge and hub control-channel buffer sizes from 64 to 256 to better prioritize control frames under load.
## 2026-03-16 - 4.6.0 - feat(remoteingress-core)
add adaptive per-stream flow control based on active stream counts
- Track active stream counts on edge and hub connections to size per-stream flow control windows dynamically.
- Cap WINDOW_UPDATE increments and read sizes to the adaptive window so bandwidth is shared more evenly across concurrent streams.
- Apply the adaptive logic to both upload and download paths on edge and hub stream handlers.
## 2026-03-16 - 4.5.12 - fix(remoteingress-core)
improve tunnel liveness handling and enable TCP keepalive for accepted client sockets
- Avoid disconnecting edges when PING or PONG frames cannot be queued because the control channel is temporarily full.
- Enable TCP_NODELAY and TCP keepalive on accepted client connections to help detect stale or dropped clients.
## 2026-03-16 - 4.5.11 - fix(repo)
no changes to commit
## 2026-03-16 - 4.5.10 - fix(remoteingress-core)
guard zero-window reads to avoid false EOF handling on stalled streams
- Prevent upload and download loops from calling read on an empty buffer when flow-control window remains at 0 after stall timeout
- Log a warning and close the affected stream instead of misinterpreting Ok(0) as end-of-file
## 2026-03-16 - 4.5.9 - fix(remoteingress-core)
delay stream close until downstream response draining finishes to prevent truncated transfers

View File

@@ -1,6 +1,6 @@
{
"name": "@serve.zone/remoteingress",
"version": "4.5.9",
"version": "4.7.1",
"private": false,
"description": "Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.",
"main": "dist_ts/index.js",
@@ -9,7 +9,7 @@
"author": "Task Venture Capital GmbH",
"license": "MIT",
"scripts": {
"test": "(tstest test/ --verbose --logfile --timeout 60)",
"test": "(pnpm run build && tstest test/ --verbose --logfile --timeout 60)",
"build": "(tsbuild tsfolders --allowimplicitany && tsrust)",
"buildDocs": "(tsdoc)"
},

13
rust/Cargo.lock generated
View File

@@ -558,6 +558,7 @@ dependencies = [
"rustls-pemfile",
"serde",
"serde_json",
"socket2 0.5.10",
"tokio",
"tokio-rustls",
"tokio-util",
@@ -701,6 +702,16 @@ version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
[[package]]
name = "socket2"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "socket2"
version = "0.6.2"
@@ -765,7 +776,7 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"socket2 0.6.2",
"tokio-macros",
"windows-sys 0.61.2",
]

View File

@@ -14,3 +14,4 @@ serde_json = "1"
log = "0.4"
rustls-pemfile = "2"
tokio-util = "0.7"
socket2 = "0.5"

View File

@@ -32,6 +32,10 @@ pub struct EdgeConfig {
pub hub_port: u16,
pub edge_id: String,
pub secret: String,
/// Optional bind address for TCP listeners (defaults to "0.0.0.0").
/// Useful for testing on localhost where edge and upstream share the same machine.
#[serde(default)]
pub bind_address: Option<String>,
}
/// Handshake config received from hub after authentication.
@@ -232,7 +236,11 @@ async fn edge_main_loop(
}
*connected.write().await = false;
let _ = event_tx.try_send(EdgeEvent::TunnelDisconnected);
// Only emit disconnect event on actual disconnection, not on failed reconnects.
// Failed reconnects never reach line 335 (handshake success), so was_connected is false.
if was_connected {
let _ = event_tx.try_send(EdgeEvent::TunnelDisconnected);
}
active_streams.store(0, Ordering::Relaxed);
// Reset stream ID counter for next connection cycle
next_stream_id.store(1, Ordering::Relaxed);
@@ -276,6 +284,13 @@ async fn connect_to_hub_and_run(
Ok(s) => {
// Disable Nagle's algorithm for low-latency control frames (PING/PONG, WINDOW_UPDATE)
let _ = s.set_nodelay(true);
// TCP keepalive detects silent network failures (NAT timeout, path change)
// faster than the 45s application-level liveness timeout.
let ka = socket2::TcpKeepalive::new()
.with_time(Duration::from_secs(30));
#[cfg(target_os = "linux")]
let ka = ka.with_interval(Duration::from_secs(10));
let _ = socket2::SockRef::from(&s).set_tcp_keepalive(&ka);
s
}
Err(e) => {
@@ -375,23 +390,27 @@ async fn connect_to_hub_and_run(
// QoS dual-channel tunnel writer: control frames (PONG/WINDOW_UPDATE/CLOSE/OPEN)
// have priority over data frames (DATA). Prevents PING starvation under load.
let (tunnel_ctrl_tx, mut tunnel_ctrl_rx) = mpsc::channel::<Vec<u8>>(64);
let (tunnel_ctrl_tx, mut tunnel_ctrl_rx) = mpsc::channel::<Vec<u8>>(256);
let (tunnel_data_tx, mut tunnel_data_rx) = mpsc::channel::<Vec<u8>>(4096);
// Legacy alias — control channel for PONG, CLOSE, WINDOW_UPDATE, OPEN
let tunnel_writer_tx = tunnel_ctrl_tx.clone();
let tw_token = connection_token.clone();
// Oneshot to signal the reader loop when the writer dies from a write error.
// This avoids the 45s liveness timeout delay when the tunnel is already dead.
let (writer_dead_tx, mut writer_dead_rx) = tokio::sync::oneshot::channel::<()>();
let tunnel_writer_handle = tokio::spawn(async move {
// BufWriter coalesces small writes (frame headers, control frames) into fewer
// TLS records and syscalls. Flushed after each frame to avoid holding data.
let mut writer = tokio::io::BufWriter::with_capacity(65536, write_half);
let mut write_error = false;
loop {
tokio::select! {
biased; // control frames always take priority over data
ctrl = tunnel_ctrl_rx.recv() => {
match ctrl {
Some(frame_data) => {
if writer.write_all(&frame_data).await.is_err() { break; }
if writer.flush().await.is_err() { break; }
if writer.write_all(&frame_data).await.is_err() { write_error = true; break; }
if writer.flush().await.is_err() { write_error = true; break; }
}
None => break,
}
@@ -399,8 +418,8 @@ async fn connect_to_hub_and_run(
data = tunnel_data_rx.recv() => {
match data {
Some(frame_data) => {
if writer.write_all(&frame_data).await.is_err() { break; }
if writer.flush().await.is_err() { break; }
if writer.write_all(&frame_data).await.is_err() { write_error = true; break; }
if writer.flush().await.is_err() { write_error = true; break; }
}
None => break,
}
@@ -408,10 +427,15 @@ async fn connect_to_hub_and_run(
_ = tw_token.cancelled() => break,
}
}
if write_error {
log::error!("Tunnel writer failed, signalling reader for fast reconnect");
let _ = writer_dead_tx.send(());
}
});
// Start TCP listeners for initial ports (hot-reloadable)
let mut port_listeners: HashMap<u16, JoinHandle<()>> = HashMap::new();
let bind_address = config.bind_address.as_deref().unwrap_or("0.0.0.0");
apply_port_config(
&handshake.listen_ports,
&mut port_listeners,
@@ -422,6 +446,7 @@ async fn connect_to_hub_and_run(
next_stream_id,
&config.edge_id,
connection_token,
bind_address,
);
// Heartbeat: liveness timeout detects silent hub failures
@@ -488,14 +513,17 @@ async fn connect_to_hub_and_run(
next_stream_id,
&config.edge_id,
connection_token,
bind_address,
);
}
}
FRAME_PING => {
let pong_frame = encode_frame(0, FRAME_PONG, &[]);
if tunnel_writer_tx.try_send(pong_frame).is_err() {
log::warn!("Failed to send PONG, writer channel full/closed");
break EdgeLoopResult::Reconnect;
// Control channel full (WINDOW_UPDATE burst from many streams).
// DON'T disconnect — the 45s liveness timeout gives margin
// for the channel to drain and the next PONG to succeed.
log::warn!("PONG send failed, control channel full — skipping this cycle");
}
log::trace!("Received PING from hub, sent PONG");
}
@@ -519,6 +547,10 @@ async fn connect_to_hub_and_run(
liveness_timeout_dur.as_secs());
break EdgeLoopResult::Reconnect;
}
_ = &mut writer_dead_rx => {
log::error!("Tunnel writer died, reconnecting immediately");
break EdgeLoopResult::Reconnect;
}
_ = connection_token.cancelled() => {
log::info!("Connection cancelled");
break EdgeLoopResult::Shutdown;
@@ -551,6 +583,7 @@ fn apply_port_config(
next_stream_id: &Arc<AtomicU32>,
edge_id: &str,
connection_token: &CancellationToken,
bind_address: &str,
) {
let new_set: std::collections::HashSet<u16> = new_ports.iter().copied().collect();
let old_set: std::collections::HashSet<u16> = port_listeners.keys().copied().collect();
@@ -573,8 +606,9 @@ fn apply_port_config(
let edge_id = edge_id.to_string();
let port_token = connection_token.child_token();
let bind_addr = bind_address.to_string();
let handle = tokio::spawn(async move {
let listener = match TcpListener::bind(("0.0.0.0", port)).await {
let listener = match TcpListener::bind((bind_addr.as_str(), port)).await {
Ok(l) => l,
Err(e) => {
log::error!("Failed to bind port {}: {}", port, e);
@@ -588,6 +622,15 @@ fn apply_port_config(
accept_result = listener.accept() => {
match accept_result {
Ok((client_stream, client_addr)) => {
// TCP keepalive detects dead clients that disappear without FIN.
// Without this, zombie streams accumulate and never get cleaned up.
let _ = client_stream.set_nodelay(true);
let ka = socket2::TcpKeepalive::new()
.with_time(Duration::from_secs(60));
#[cfg(target_os = "linux")]
let ka = ka.with_interval(Duration::from_secs(60));
let _ = socket2::SockRef::from(&client_stream).set_tcp_keepalive(&ka);
let stream_id = next_stream_id.fetch_add(1, Ordering::Relaxed);
let tunnel_ctrl_tx = tunnel_ctrl_tx.clone();
let tunnel_data_tx = tunnel_data_tx.clone();
@@ -609,9 +652,21 @@ fn apply_port_config(
tunnel_data_tx,
client_writers,
client_token,
Arc::clone(&active_streams),
)
.await;
active_streams.fetch_sub(1, Ordering::Relaxed);
// Saturating decrement: prevent underflow when
// edge_main_loop's store(0) races with task cleanup.
loop {
let current = active_streams.load(Ordering::Relaxed);
if current == 0 { break; }
if active_streams.compare_exchange_weak(
current, current - 1,
Ordering::Relaxed, Ordering::Relaxed,
).is_ok() {
break;
}
}
});
}
Err(e) => {
@@ -640,6 +695,7 @@ async fn handle_client_connection(
tunnel_data_tx: mpsc::Sender<Vec<u8>>,
client_writers: Arc<Mutex<HashMap<u32, EdgeStreamState>>>,
client_token: CancellationToken,
active_streams: Arc<AtomicU32>,
) {
let client_ip = client_addr.ip().to_string();
let client_port = client_addr.port();
@@ -673,6 +729,7 @@ async fn handle_client_connection(
// After writing to client TCP, send WINDOW_UPDATE to hub so it can send more
let hub_to_client_token = client_token.clone();
let wu_tx = tunnel_ctrl_tx.clone();
let active_streams_h2c = Arc::clone(&active_streams);
let mut hub_to_client = tokio::spawn(async move {
let mut consumed_since_update: u32 = 0;
loop {
@@ -684,12 +741,20 @@ async fn handle_client_connection(
if client_write.write_all(&data).await.is_err() {
break;
}
// Track consumption for flow control
// Track consumption for adaptive flow control.
// The increment is capped to the adaptive window so the sender's
// effective window shrinks to match current demand (fewer streams
// = larger window, more streams = smaller window per stream).
consumed_since_update += len;
if consumed_since_update >= WINDOW_UPDATE_THRESHOLD {
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, consumed_since_update);
let adaptive_window = remoteingress_protocol::compute_window_for_stream_count(
active_streams_h2c.load(Ordering::Relaxed),
);
let threshold = adaptive_window / 2;
if consumed_since_update >= threshold {
let increment = consumed_since_update.min(adaptive_window);
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, increment);
if wu_tx.try_send(frame).is_ok() {
consumed_since_update = 0;
consumed_since_update -= increment;
}
// If try_send fails, keep accumulating — retry on next threshold
}
@@ -726,9 +791,20 @@ async fn handle_client_connection(
}
if client_token.is_cancelled() { break; }
// Limit read size to available window
// Limit read size to available window.
// IMPORTANT: if window is 0 (stall timeout fired), we must NOT
// read into an empty buffer — read(&mut buf[..0]) returns Ok(0)
// which would be falsely interpreted as EOF.
let w = send_window.load(Ordering::Acquire) as usize;
let max_read = w.min(buf.len());
if w == 0 {
log::warn!("Stream {} upload: window still 0 after stall timeout, closing", stream_id);
break;
}
// Adaptive: cap read to current per-stream target window
let adaptive_cap = remoteingress_protocol::compute_window_for_stream_count(
active_streams.load(Ordering::Relaxed),
) as usize;
let max_read = w.min(buf.len()).min(adaptive_cap);
tokio::select! {
read_result = client_read.read(&mut buf[..max_read]) => {
@@ -803,6 +879,7 @@ mod tests {
hub_port: 9999,
edge_id: "e1".to_string(),
secret: "sec".to_string(),
bind_address: None,
};
let json = serde_json::to_string(&config).unwrap();
let back: EdgeConfig = serde_json::from_str(&json).unwrap();
@@ -918,6 +995,7 @@ mod tests {
hub_port: 8443,
edge_id: "test-edge".to_string(),
secret: "test-secret".to_string(),
bind_address: None,
});
let status = edge.get_status().await;
assert!(!status.running);
@@ -934,6 +1012,7 @@ mod tests {
hub_port: 8443,
edge_id: "e".to_string(),
secret: "s".to_string(),
bind_address: None,
});
let rx1 = edge.take_event_rx().await;
assert!(rx1.is_some());
@@ -948,6 +1027,7 @@ mod tests {
hub_port: 8443,
edge_id: "e".to_string(),
secret: "s".to_string(),
bind_address: None,
});
edge.stop().await; // should not panic
let status = edge.get_status().await;

View File

@@ -300,6 +300,13 @@ async fn handle_edge_connection(
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Disable Nagle's algorithm for low-latency control frames (PING/PONG, WINDOW_UPDATE)
stream.set_nodelay(true)?;
// TCP keepalive detects silent network failures (NAT timeout, path change)
// faster than the 45s application-level liveness timeout.
let ka = socket2::TcpKeepalive::new()
.with_time(Duration::from_secs(30));
#[cfg(target_os = "linux")]
let ka = ka.with_interval(Duration::from_secs(10));
let _ = socket2::SockRef::from(&stream).set_tcp_keepalive(&ka);
let tls_stream = acceptor.accept(stream).await?;
let (read_half, mut write_half) = tokio::io::split(tls_stream);
let mut buf_reader = BufReader::new(read_half);
@@ -373,25 +380,30 @@ async fn handle_edge_connection(
);
}
// Per-edge active stream counter for adaptive flow control
let edge_stream_count = Arc::new(AtomicU32::new(0));
// QoS dual-channel tunnel writer: control frames (PING/PONG/WINDOW_UPDATE/CLOSE)
// have priority over data frames (DATA_BACK). This prevents PING starvation under load.
let (ctrl_tx, mut ctrl_rx) = mpsc::channel::<Vec<u8>>(64);
let (ctrl_tx, mut ctrl_rx) = mpsc::channel::<Vec<u8>>(256);
let (data_tx, mut data_rx) = mpsc::channel::<Vec<u8>>(4096);
// Legacy alias for code that sends both control and data (will be migrated)
let frame_writer_tx = ctrl_tx.clone();
let writer_token = edge_token.clone();
let (writer_dead_tx, mut writer_dead_rx) = tokio::sync::oneshot::channel::<()>();
let writer_handle = tokio::spawn(async move {
// BufWriter coalesces small writes (frame headers, control frames) into fewer
// TLS records and syscalls. Flushed after each frame to avoid holding data.
let mut writer = tokio::io::BufWriter::with_capacity(65536, write_half);
let mut write_error = false;
loop {
tokio::select! {
biased; // control frames always take priority over data
ctrl = ctrl_rx.recv() => {
match ctrl {
Some(frame_data) => {
if writer.write_all(&frame_data).await.is_err() { break; }
if writer.flush().await.is_err() { break; }
if writer.write_all(&frame_data).await.is_err() { write_error = true; break; }
if writer.flush().await.is_err() { write_error = true; break; }
}
None => break,
}
@@ -399,8 +411,8 @@ async fn handle_edge_connection(
data = data_rx.recv() => {
match data {
Some(frame_data) => {
if writer.write_all(&frame_data).await.is_err() { break; }
if writer.flush().await.is_err() { break; }
if writer.write_all(&frame_data).await.is_err() { write_error = true; break; }
if writer.flush().await.is_err() { write_error = true; break; }
}
None => break,
}
@@ -408,6 +420,10 @@ async fn handle_edge_connection(
_ = writer_token.cancelled() => break,
}
}
if write_error {
log::error!("Tunnel writer to edge failed, signalling reader for fast cleanup");
let _ = writer_dead_tx.send(());
}
});
// Spawn task to forward config updates as FRAME_CONFIG frames
@@ -509,8 +525,10 @@ async fn handle_edge_connection(
}
// Spawn task: connect to SmartProxy, send PROXY header, pipe data
let stream_counter = Arc::clone(&edge_stream_count);
tokio::spawn(async move {
let _permit = permit; // hold semaphore permit until stream completes
stream_counter.fetch_add(1, Ordering::Relaxed);
let result = async {
// A2: Connect to SmartProxy with timeout
@@ -533,6 +551,7 @@ async fn handle_edge_connection(
// After writing to upstream, send WINDOW_UPDATE_BACK to edge
let writer_token = stream_token.clone();
let wub_tx = writer_tx.clone();
let stream_counter_w = Arc::clone(&stream_counter);
let writer_for_edge_data = tokio::spawn(async move {
let mut consumed_since_update: u32 = 0;
loop {
@@ -558,12 +577,18 @@ async fn handle_edge_connection(
break;
}
}
// Track consumption for flow control
// Track consumption for adaptive flow control.
// Increment capped to adaptive window to limit per-stream in-flight data.
consumed_since_update += len;
if consumed_since_update >= WINDOW_UPDATE_THRESHOLD {
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, consumed_since_update);
let adaptive_window = remoteingress_protocol::compute_window_for_stream_count(
stream_counter_w.load(Ordering::Relaxed),
);
let threshold = adaptive_window / 2;
if consumed_since_update >= threshold {
let increment = consumed_since_update.min(adaptive_window);
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, increment);
if wub_tx.try_send(frame).is_ok() {
consumed_since_update = 0;
consumed_since_update -= increment;
}
// If try_send fails, keep accumulating — retry on next threshold
}
@@ -601,9 +626,20 @@ async fn handle_edge_connection(
}
if stream_token.is_cancelled() { break; }
// Limit read size to available window
// Limit read size to available window.
// IMPORTANT: if window is 0 (stall timeout fired), we must NOT
// read into an empty buffer — read(&mut buf[..0]) returns Ok(0)
// which would be falsely interpreted as EOF.
let w = send_window.load(Ordering::Acquire) as usize;
let max_read = w.min(buf.len());
if w == 0 {
log::warn!("Stream {} download: window still 0 after stall timeout, closing", stream_id);
break;
}
// Adaptive: cap read to current per-stream target window
let adaptive_cap = remoteingress_protocol::compute_window_for_stream_count(
stream_counter.load(Ordering::Relaxed),
) as usize;
let max_read = w.min(buf.len()).min(adaptive_cap);
tokio::select! {
read_result = up_read.read(&mut buf[..max_read]) => {
@@ -658,6 +694,7 @@ async fn handle_edge_connection(
stream_id,
});
}
stream_counter.fetch_sub(1, Ordering::Relaxed);
});
}
FRAME_DATA => {
@@ -719,8 +756,9 @@ async fn handle_edge_connection(
_ = ping_ticker.tick() => {
let ping_frame = encode_frame(0, FRAME_PING, &[]);
if frame_writer_tx.try_send(ping_frame).is_err() {
log::warn!("Failed to send PING to edge {}, writer channel full/closed", edge_id);
break;
// Control channel full — skip this PING cycle.
// The 45s liveness timeout gives margin for the channel to drain.
log::warn!("PING send to edge {} failed, control channel full — skipping", edge_id);
}
log::trace!("Sent PING to edge {}", edge_id);
}
@@ -729,6 +767,10 @@ async fn handle_edge_connection(
edge_id, liveness_timeout_dur.as_secs());
break;
}
_ = &mut writer_dead_rx => {
log::error!("Tunnel writer to edge {} died, disconnecting immediately", edge_id);
break;
}
_ = edge_token.cancelled() => {
log::info!("Edge {} cancelled by hub", edge_id);
break;

View File

@@ -5,3 +5,6 @@ edition = "2021"
[dependencies]
tokio = { version = "1", features = ["io-util"] }
[dev-dependencies]
tokio = { version = "1", features = ["io-util", "macros", "rt"] }

View File

@@ -32,6 +32,15 @@ pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> V
encode_frame(stream_id, frame_type, &increment.to_be_bytes())
}
/// Compute the target per-stream window size based on the number of active streams.
/// Total memory budget is ~32MB shared across all streams. As more streams are active,
/// each gets a smaller window. This adapts to current demand — few streams get high
/// throughput, many streams save memory and reduce control frame pressure.
pub fn compute_window_for_stream_count(active: u32) -> u32 {
let per_stream = (32 * 1024 * 1024u64) / (active.max(1) as u64);
per_stream.clamp(64 * 1024, INITIAL_STREAM_WINDOW as u64) as u32
}
/// Decode a WINDOW_UPDATE payload into a byte increment. Returns None if payload is malformed.
pub fn decode_window_update(payload: &[u8]) -> Option<u32> {
if payload.len() != 4 {
@@ -336,4 +345,134 @@ mod tests {
assert_eq!(&pong[0..4], &0u32.to_be_bytes());
assert_eq!(pong.len(), FRAME_HEADER_SIZE);
}
// --- compute_window_for_stream_count tests ---
#[test]
fn test_adaptive_window_zero_streams() {
// 0 streams treated as 1: 32MB/1 = 32MB → clamped to 4MB max
assert_eq!(compute_window_for_stream_count(0), INITIAL_STREAM_WINDOW);
}
#[test]
fn test_adaptive_window_one_stream() {
// 32MB/1 = 32MB → clamped to 4MB max
assert_eq!(compute_window_for_stream_count(1), INITIAL_STREAM_WINDOW);
}
#[test]
fn test_adaptive_window_at_max_boundary() {
// 32MB/8 = 4MB = exactly INITIAL_STREAM_WINDOW
assert_eq!(compute_window_for_stream_count(8), INITIAL_STREAM_WINDOW);
}
#[test]
fn test_adaptive_window_just_below_max() {
// 32MB/9 = 3,728,270 — first value below INITIAL_STREAM_WINDOW
let w = compute_window_for_stream_count(9);
assert!(w < INITIAL_STREAM_WINDOW);
assert_eq!(w, (32 * 1024 * 1024u64 / 9) as u32);
}
#[test]
fn test_adaptive_window_16_streams() {
// 32MB/16 = 2MB
assert_eq!(compute_window_for_stream_count(16), 2 * 1024 * 1024);
}
#[test]
fn test_adaptive_window_100_streams() {
// 32MB/100 = 335,544 bytes (~327KB)
let w = compute_window_for_stream_count(100);
assert_eq!(w, (32 * 1024 * 1024u64 / 100) as u32);
assert!(w > 64 * 1024); // above floor
assert!(w < INITIAL_STREAM_WINDOW as u32); // below ceiling
}
#[test]
fn test_adaptive_window_200_streams() {
// 32MB/200 = 167,772 bytes (~163KB), above 64KB floor
let w = compute_window_for_stream_count(200);
assert_eq!(w, (32 * 1024 * 1024u64 / 200) as u32);
assert!(w > 64 * 1024);
}
#[test]
fn test_adaptive_window_500_streams() {
// 32MB/500 = 67,108 bytes (~65.5KB), just above 64KB floor
let w = compute_window_for_stream_count(500);
assert_eq!(w, (32 * 1024 * 1024u64 / 500) as u32);
assert!(w > 64 * 1024);
}
#[test]
fn test_adaptive_window_at_min_boundary() {
// 32MB/512 = 65,536 = exactly 64KB floor
assert_eq!(compute_window_for_stream_count(512), 64 * 1024);
}
#[test]
fn test_adaptive_window_below_min_clamped() {
// 32MB/513 = 65,408 → clamped up to 64KB
assert_eq!(compute_window_for_stream_count(513), 64 * 1024);
}
#[test]
fn test_adaptive_window_1000_streams() {
// 32MB/1000 = 33,554 → clamped to 64KB
assert_eq!(compute_window_for_stream_count(1000), 64 * 1024);
}
#[test]
fn test_adaptive_window_max_u32() {
// Extreme: u32::MAX streams → tiny value → clamped to 64KB
assert_eq!(compute_window_for_stream_count(u32::MAX), 64 * 1024);
}
#[test]
fn test_adaptive_window_monotonically_decreasing() {
// Window should decrease (or stay same) as stream count increases
let mut prev = compute_window_for_stream_count(1);
for n in [2, 5, 10, 50, 100, 200, 500, 512, 1000] {
let w = compute_window_for_stream_count(n);
assert!(w <= prev, "window increased from {} to {} at n={}", prev, w, n);
prev = w;
}
}
#[test]
fn test_adaptive_window_total_budget_bounded() {
// active × per_stream_window should never exceed 32MB (+ clamp overhead for high N)
for n in [1, 10, 50, 100, 200, 500] {
let w = compute_window_for_stream_count(n);
let total = w as u64 * n as u64;
assert!(total <= 32 * 1024 * 1024, "total {}MB exceeds budget at n={}", total / (1024*1024), n);
}
}
// --- encode/decode window_update roundtrip ---
#[test]
fn test_window_update_roundtrip() {
for &increment in &[0u32, 1, 64 * 1024, INITIAL_STREAM_WINDOW, MAX_WINDOW_SIZE, u32::MAX] {
let frame = encode_window_update(42, FRAME_WINDOW_UPDATE, increment);
assert_eq!(frame[4], FRAME_WINDOW_UPDATE);
let decoded = decode_window_update(&frame[FRAME_HEADER_SIZE..]);
assert_eq!(decoded, Some(increment));
}
}
#[test]
fn test_window_update_back_roundtrip() {
let frame = encode_window_update(7, FRAME_WINDOW_UPDATE_BACK, 1234567);
assert_eq!(frame[4], FRAME_WINDOW_UPDATE_BACK);
assert_eq!(decode_window_update(&frame[FRAME_HEADER_SIZE..]), Some(1234567));
}
#[test]
fn test_decode_window_update_malformed() {
assert_eq!(decode_window_update(&[]), None);
assert_eq!(decode_window_update(&[0, 0, 0]), None);
assert_eq!(decode_window_update(&[0, 0, 0, 0, 0]), None);
}
}

View File

@@ -0,0 +1,475 @@
import { expect, tap } from '@push.rocks/tapbundle';
import * as net from 'net';
import * as crypto from 'crypto';
import { RemoteIngressHub, RemoteIngressEdge } from '../ts/index.js';
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/** Find N free ports by binding to port 0 and collecting OS-assigned ports. */
async function findFreePorts(count: number): Promise<number[]> {
const servers: net.Server[] = [];
const ports: number[] = [];
for (let i = 0; i < count; i++) {
const server = net.createServer();
await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve));
ports.push((server.address() as net.AddressInfo).port);
servers.push(server);
}
await Promise.all(servers.map((s) => new Promise<void>((resolve) => s.close(() => resolve()))));
return ports;
}
type TrackingServer = net.Server & { destroyAll: () => void };
/** Start a TCP echo server that tracks connections for force-close. */
function startEchoServer(port: number, host: string): Promise<TrackingServer> {
return new Promise((resolve, reject) => {
const connections = new Set<net.Socket>();
const server = net.createServer((socket) => {
connections.add(socket);
socket.on('close', () => connections.delete(socket));
// Skip PROXY protocol v1 header line before echoing
let proxyHeaderParsed = false;
let pendingBuf = Buffer.alloc(0);
socket.on('data', (data: Buffer) => {
if (!proxyHeaderParsed) {
pendingBuf = Buffer.concat([pendingBuf, data]);
const idx = pendingBuf.indexOf('\r\n');
if (idx !== -1) {
proxyHeaderParsed = true;
const remainder = pendingBuf.subarray(idx + 2);
if (remainder.length > 0) {
socket.write(remainder);
}
}
return;
}
socket.write(data);
});
socket.on('error', () => {});
}) as TrackingServer;
server.destroyAll = () => {
for (const conn of connections) conn.destroy();
connections.clear();
};
server.on('error', reject);
server.listen(port, host, () => resolve(server));
});
}
/**
* Start a server that sends a large response immediately on first data received.
* Does NOT wait for end (the tunnel protocol has no half-close).
* On receiving first data chunk after PROXY header, sends responseSize bytes then closes.
*/
function startLargeResponseServer(port: number, host: string, responseSize: number): Promise<TrackingServer> {
return new Promise((resolve, reject) => {
const connections = new Set<net.Socket>();
const server = net.createServer((socket) => {
connections.add(socket);
socket.on('close', () => connections.delete(socket));
let proxyHeaderParsed = false;
let pendingBuf = Buffer.alloc(0);
let responseSent = false;
socket.on('data', (data: Buffer) => {
if (!proxyHeaderParsed) {
pendingBuf = Buffer.concat([pendingBuf, data]);
const idx = pendingBuf.indexOf('\r\n');
if (idx !== -1) {
proxyHeaderParsed = true;
const remainder = pendingBuf.subarray(idx + 2);
if (remainder.length > 0 && !responseSent) {
responseSent = true;
sendLargeResponse(socket, responseSize);
}
}
return;
}
if (!responseSent) {
responseSent = true;
sendLargeResponse(socket, responseSize);
}
});
socket.on('error', () => {});
}) as TrackingServer;
server.destroyAll = () => {
for (const conn of connections) conn.destroy();
connections.clear();
};
server.on('error', reject);
server.listen(port, host, () => resolve(server));
});
}
function sendLargeResponse(socket: net.Socket, totalBytes: number) {
const chunkSize = 32 * 1024;
let sent = 0;
const writeChunk = () => {
while (sent < totalBytes) {
const toWrite = Math.min(chunkSize, totalBytes - sent);
// Use a deterministic pattern for verification
const chunk = Buffer.alloc(toWrite, (sent % 256) & 0xff);
const canContinue = socket.write(chunk);
sent += toWrite;
if (!canContinue) {
socket.once('drain', writeChunk);
return;
}
}
socket.end();
};
writeChunk();
}
/** Force-close a server: destroy all connections, then close. */
async function forceCloseServer(server: TrackingServer): Promise<void> {
server.destroyAll();
await new Promise<void>((resolve) => server.close(() => resolve()));
}
interface TestTunnel {
hub: RemoteIngressHub;
edge: RemoteIngressEdge;
edgePort: number;
cleanup: () => Promise<void>;
}
/**
* Start a full hub + edge tunnel.
* Edge binds to 127.0.0.1, upstream server binds to 127.0.0.2.
* Hub targetHost = 127.0.0.2 so hub -> upstream doesn't loop back to edge.
*/
async function startTunnel(edgePort: number, hubPort: number): Promise<TestTunnel> {
const hub = new RemoteIngressHub();
const edge = new RemoteIngressEdge();
await hub.start({
tunnelPort: hubPort,
targetHost: '127.0.0.2',
});
await hub.updateAllowedEdges([
{ id: 'test-edge', secret: 'test-secret', listenPorts: [edgePort] },
]);
const connectedPromise = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('Edge did not connect within 10s')), 10000);
edge.once('tunnelConnected', () => {
clearTimeout(timeout);
resolve();
});
});
await edge.start({
hubHost: '127.0.0.1',
hubPort,
edgeId: 'test-edge',
secret: 'test-secret',
bindAddress: '127.0.0.1',
});
await connectedPromise;
await new Promise((resolve) => setTimeout(resolve, 500));
return {
hub,
edge,
edgePort,
cleanup: async () => {
await edge.stop();
await hub.stop();
},
};
}
/**
* Send data through the tunnel and collect the echoed response.
*/
function sendAndReceive(port: number, data: Buffer, timeoutMs = 30000): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
let totalReceived = 0;
const expectedLength = data.length;
let settled = false;
const client = net.createConnection({ host: '127.0.0.1', port }, () => {
client.write(data);
client.end();
});
const timer = setTimeout(() => {
if (!settled) {
settled = true;
client.destroy();
reject(new Error(`Timeout after ${timeoutMs}ms — received ${totalReceived}/${expectedLength} bytes`));
}
}, timeoutMs);
client.on('data', (chunk: Buffer) => {
chunks.push(chunk);
totalReceived += chunk.length;
if (totalReceived >= expectedLength && !settled) {
settled = true;
clearTimeout(timer);
client.destroy();
resolve(Buffer.concat(chunks));
}
});
client.on('end', () => {
if (!settled) {
settled = true;
clearTimeout(timer);
resolve(Buffer.concat(chunks));
}
});
client.on('error', (err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
reject(err);
}
});
});
}
/**
* Connect to the tunnel, send a small request, and collect a large response.
* Does NOT call end() — the tunnel has no half-close.
* Instead, collects until expectedResponseSize bytes arrive.
*/
function sendAndReceiveLarge(
port: number,
data: Buffer,
expectedResponseSize: number,
timeoutMs = 60000,
): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
let totalReceived = 0;
let settled = false;
const client = net.createConnection({ host: '127.0.0.1', port }, () => {
client.write(data);
// Do NOT call client.end() — the server will respond immediately
// and the tunnel CLOSE will happen when the download finishes
});
const timer = setTimeout(() => {
if (!settled) {
settled = true;
client.destroy();
reject(new Error(`Timeout after ${timeoutMs}ms — received ${totalReceived}/${expectedResponseSize} bytes`));
}
}, timeoutMs);
client.on('data', (chunk: Buffer) => {
chunks.push(chunk);
totalReceived += chunk.length;
if (totalReceived >= expectedResponseSize && !settled) {
settled = true;
clearTimeout(timer);
client.destroy();
resolve(Buffer.concat(chunks));
}
});
client.on('end', () => {
if (!settled) {
settled = true;
clearTimeout(timer);
resolve(Buffer.concat(chunks));
}
});
client.on('error', (err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
reject(err);
}
});
});
}
function sha256(buf: Buffer): string {
return crypto.createHash('sha256').update(buf).digest('hex');
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
let tunnel: TestTunnel;
let echoServer: TrackingServer;
let hubPort: number;
let edgePort: number;
tap.test('setup: start echo server and tunnel', async () => {
[hubPort, edgePort] = await findFreePorts(2);
echoServer = await startEchoServer(edgePort, '127.0.0.2');
tunnel = await startTunnel(edgePort, hubPort);
expect(tunnel.hub.running).toBeTrue();
});
tap.test('single stream: 32MB transfer exceeding initial 4MB window', async () => {
const size = 32 * 1024 * 1024;
const data = crypto.randomBytes(size);
const expectedHash = sha256(data);
const received = await sendAndReceive(edgePort, data, 60000);
expect(received.length).toEqual(size);
expect(sha256(received)).toEqual(expectedHash);
});
tap.test('200 concurrent streams with 64KB each', async () => {
const streamCount = 200;
const payloadSize = 64 * 1024;
const promises = Array.from({ length: streamCount }, () => {
const data = crypto.randomBytes(payloadSize);
const hash = sha256(data);
return sendAndReceive(edgePort, data, 30000).then((received) => ({
sent: hash,
received: sha256(received),
sizeOk: received.length === payloadSize,
}));
});
const results = await Promise.all(promises);
const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received);
expect(failures.length).toEqual(0);
});
tap.test('512 concurrent streams at minimum window boundary (16KB each)', async () => {
const streamCount = 512;
const payloadSize = 16 * 1024;
const promises = Array.from({ length: streamCount }, () => {
const data = crypto.randomBytes(payloadSize);
const hash = sha256(data);
return sendAndReceive(edgePort, data, 60000).then((received) => ({
sent: hash,
received: sha256(received),
sizeOk: received.length === payloadSize,
}));
});
const results = await Promise.all(promises);
const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received);
expect(failures.length).toEqual(0);
});
tap.test('asymmetric transfer: 4KB request -> 4MB response', async () => {
// Swap to large-response server
await forceCloseServer(echoServer);
const responseSize = 4 * 1024 * 1024; // 4 MB
const largeServer = await startLargeResponseServer(edgePort, '127.0.0.2', responseSize);
try {
const requestData = crypto.randomBytes(4 * 1024); // 4 KB
const received = await sendAndReceiveLarge(edgePort, requestData, responseSize, 60000);
expect(received.length).toEqual(responseSize);
} finally {
// Always restore echo server even on failure
await forceCloseServer(largeServer);
echoServer = await startEchoServer(edgePort, '127.0.0.2');
}
});
tap.test('100 streams x 1MB each (100MB total exceeding 32MB budget)', async () => {
const streamCount = 100;
const payloadSize = 1 * 1024 * 1024;
const promises = Array.from({ length: streamCount }, () => {
const data = crypto.randomBytes(payloadSize);
const hash = sha256(data);
return sendAndReceive(edgePort, data, 120000).then((received) => ({
sent: hash,
received: sha256(received),
sizeOk: received.length === payloadSize,
}));
});
const results = await Promise.all(promises);
const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received);
expect(failures.length).toEqual(0);
});
tap.test('active stream counter tracks concurrent connections', async () => {
const N = 50;
// Open N connections and keep them alive (send data but don't close)
const sockets: net.Socket[] = [];
const connectPromises = Array.from({ length: N }, () => {
return new Promise<net.Socket>((resolve, reject) => {
const sock = net.createConnection({ host: '127.0.0.1', port: edgePort }, () => {
resolve(sock);
});
sock.on('error', () => {});
setTimeout(() => reject(new Error('connect timeout')), 5000);
});
});
const connected = await Promise.all(connectPromises);
sockets.push(...connected);
// Brief delay for stream registration to propagate
await new Promise((resolve) => setTimeout(resolve, 500));
// Verify the edge reports >= N active streams.
// This counter is the input to compute_window_for_stream_count(),
// so its accuracy determines whether adaptive window sizing is correct.
const status = await tunnel.edge.getStatus();
expect(status.activeStreams).toBeGreaterThanOrEqual(N);
// Clean up: destroy all sockets (the tunnel's 300s stream timeout will handle cleanup)
for (const sock of sockets) {
sock.destroy();
}
});
tap.test('50 streams x 2MB each (forces multiple window refills per stream)', async () => {
// At 50 concurrent streams: adaptive window = 32MB/50 = 655KB per stream
// Each stream sends 2MB → needs ~3 WINDOW_UPDATE refill cycles per stream
const streamCount = 50;
const payloadSize = 2 * 1024 * 1024;
const promises = Array.from({ length: streamCount }, () => {
const data = crypto.randomBytes(payloadSize);
const hash = sha256(data);
return sendAndReceive(edgePort, data, 120000).then((received) => ({
sent: hash,
received: sha256(received),
sizeOk: received.length === payloadSize,
}));
});
const results = await Promise.all(promises);
const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received);
expect(failures.length).toEqual(0);
});
tap.test('teardown: stop tunnel and echo server', async () => {
await tunnel.cleanup();
await forceCloseServer(echoServer);
});
export default tap.start();

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@serve.zone/remoteingress',
version: '4.5.9',
version: '4.7.1',
description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.'
}

View File

@@ -14,6 +14,7 @@ type TEdgeCommands = {
hubPort: number;
edgeId: string;
secret: string;
bindAddress?: string;
};
result: { started: boolean };
};
@@ -38,6 +39,7 @@ export interface IEdgeConfig {
hubPort?: number;
edgeId: string;
secret: string;
bindAddress?: string;
}
const MAX_RESTART_ATTEMPTS = 10;
@@ -132,6 +134,7 @@ export class RemoteIngressEdge extends EventEmitter {
hubPort: edgeConfig.hubPort ?? 8443,
edgeId: edgeConfig.edgeId,
secret: edgeConfig.secret,
...(edgeConfig.bindAddress ? { bindAddress: edgeConfig.bindAddress } : {}),
});
this.started = true;
@@ -227,6 +230,7 @@ export class RemoteIngressEdge extends EventEmitter {
hubPort: this.savedConfig.hubPort ?? 8443,
edgeId: this.savedConfig.edgeId,
secret: this.savedConfig.secret,
...(this.savedConfig.bindAddress ? { bindAddress: this.savedConfig.bindAddress } : {}),
});
this.started = true;