Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a87e9578eb | |||
| b851bc7994 | |||
| 1284bb5b73 | |||
| 1afd0e5347 | |||
| 96e7ab00cf | |||
| 17d1a795cd | |||
| 982f648928 | |||
| 3a2a060a85 | |||
| e0c469147e | |||
| 0fdcdf566e | |||
| a808d4c9de | |||
| f8a0171ef3 | |||
| 1d59a48648 | |||
| af2ec11a2d |
44
changelog.md
44
changelog.md
@@ -1,5 +1,49 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 2026-03-17 - 4.7.2 - fix(remoteingress-core)
|
||||||
|
add tunnel write timeouts and scale initial stream windows by active stream count
|
||||||
|
|
||||||
|
- Wrap tunnel frame writes and flushes in a 30-second timeout on both edge and hub to detect stalled writers and trigger faster reconnect or cleanup.
|
||||||
|
- Compute each stream's initial send window from the current active stream count instead of using a fixed window to keep total in-flight data within the 32MB budget.
|
||||||
|
|
||||||
|
## 2026-03-17 - 4.7.1 - fix(remoteingress-core)
|
||||||
|
improve tunnel failure detection and reconnect handling
|
||||||
|
|
||||||
|
- Enable TCP keepalive on edge and hub connections to detect silent network failures sooner
|
||||||
|
- Trigger immediate reconnect or disconnect when tunnel writer tasks fail instead of waiting for liveness timeouts
|
||||||
|
- Prevent active stream counter underflow during concurrent connection cleanup
|
||||||
|
|
||||||
|
## 2026-03-16 - 4.7.0 - feat(edge,protocol,test)
|
||||||
|
add configurable edge bind address and expand flow-control test coverage
|
||||||
|
|
||||||
|
- adds an optional bindAddress configuration for edge TCP listeners, defaulting to 0.0.0.0 when not provided
|
||||||
|
- passes bindAddress through the TypeScript edge client and Rust edge runtime so local test setups can bind to localhost
|
||||||
|
- adds protocol unit tests for adaptive stream window sizing and window update frame encoding/decoding
|
||||||
|
- introduces end-to-end flow-control tests and updates the test script to build before running tests
|
||||||
|
|
||||||
|
## 2026-03-16 - 4.6.1 - fix(remoteingress-core)
|
||||||
|
avoid spurious tunnel disconnect events and increase control channel capacity
|
||||||
|
|
||||||
|
- Emit TunnelDisconnected only after an established connection is actually lost, preventing false disconnect events during failed reconnect attempts.
|
||||||
|
- Increase edge and hub control-channel buffer sizes from 64 to 256 to better prioritize control frames under load.
|
||||||
|
|
||||||
|
## 2026-03-16 - 4.6.0 - feat(remoteingress-core)
|
||||||
|
add adaptive per-stream flow control based on active stream counts
|
||||||
|
|
||||||
|
- Track active stream counts on edge and hub connections to size per-stream flow control windows dynamically.
|
||||||
|
- Cap WINDOW_UPDATE increments and read sizes to the adaptive window so bandwidth is shared more evenly across concurrent streams.
|
||||||
|
- Apply the adaptive logic to both upload and download paths on edge and hub stream handlers.
|
||||||
|
|
||||||
|
## 2026-03-16 - 4.5.12 - fix(remoteingress-core)
|
||||||
|
improve tunnel liveness handling and enable TCP keepalive for accepted client sockets
|
||||||
|
|
||||||
|
- Avoid disconnecting edges when PING or PONG frames cannot be queued because the control channel is temporarily full.
|
||||||
|
- Enable TCP_NODELAY and TCP keepalive on accepted client connections to help detect stale or dropped clients.
|
||||||
|
|
||||||
|
## 2026-03-16 - 4.5.11 - fix(repo)
|
||||||
|
no changes to commit
|
||||||
|
|
||||||
|
|
||||||
## 2026-03-16 - 4.5.10 - fix(remoteingress-core)
|
## 2026-03-16 - 4.5.10 - fix(remoteingress-core)
|
||||||
guard zero-window reads to avoid false EOF handling on stalled streams
|
guard zero-window reads to avoid false EOF handling on stalled streams
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@serve.zone/remoteingress",
|
"name": "@serve.zone/remoteingress",
|
||||||
"version": "4.5.10",
|
"version": "4.7.2",
|
||||||
"private": false,
|
"private": false,
|
||||||
"description": "Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.",
|
"description": "Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.",
|
||||||
"main": "dist_ts/index.js",
|
"main": "dist_ts/index.js",
|
||||||
@@ -9,7 +9,7 @@
|
|||||||
"author": "Task Venture Capital GmbH",
|
"author": "Task Venture Capital GmbH",
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"test": "(tstest test/ --verbose --logfile --timeout 60)",
|
"test": "(pnpm run build && tstest test/ --verbose --logfile --timeout 60)",
|
||||||
"build": "(tsbuild tsfolders --allowimplicitany && tsrust)",
|
"build": "(tsbuild tsfolders --allowimplicitany && tsrust)",
|
||||||
"buildDocs": "(tsdoc)"
|
"buildDocs": "(tsdoc)"
|
||||||
},
|
},
|
||||||
|
|||||||
13
rust/Cargo.lock
generated
13
rust/Cargo.lock
generated
@@ -558,6 +558,7 @@ dependencies = [
|
|||||||
"rustls-pemfile",
|
"rustls-pemfile",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
"socket2 0.5.10",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-rustls",
|
"tokio-rustls",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
@@ -701,6 +702,16 @@ version = "1.15.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
|
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "socket2"
|
||||||
|
version = "0.5.10"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
"windows-sys 0.52.0",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "socket2"
|
name = "socket2"
|
||||||
version = "0.6.2"
|
version = "0.6.2"
|
||||||
@@ -765,7 +776,7 @@ dependencies = [
|
|||||||
"parking_lot",
|
"parking_lot",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"signal-hook-registry",
|
"signal-hook-registry",
|
||||||
"socket2",
|
"socket2 0.6.2",
|
||||||
"tokio-macros",
|
"tokio-macros",
|
||||||
"windows-sys 0.61.2",
|
"windows-sys 0.61.2",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -14,3 +14,4 @@ serde_json = "1"
|
|||||||
log = "0.4"
|
log = "0.4"
|
||||||
rustls-pemfile = "2"
|
rustls-pemfile = "2"
|
||||||
tokio-util = "0.7"
|
tokio-util = "0.7"
|
||||||
|
socket2 = "0.5"
|
||||||
|
|||||||
@@ -32,6 +32,10 @@ pub struct EdgeConfig {
|
|||||||
pub hub_port: u16,
|
pub hub_port: u16,
|
||||||
pub edge_id: String,
|
pub edge_id: String,
|
||||||
pub secret: String,
|
pub secret: String,
|
||||||
|
/// Optional bind address for TCP listeners (defaults to "0.0.0.0").
|
||||||
|
/// Useful for testing on localhost where edge and upstream share the same machine.
|
||||||
|
#[serde(default)]
|
||||||
|
pub bind_address: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handshake config received from hub after authentication.
|
/// Handshake config received from hub after authentication.
|
||||||
@@ -232,7 +236,11 @@ async fn edge_main_loop(
|
|||||||
}
|
}
|
||||||
|
|
||||||
*connected.write().await = false;
|
*connected.write().await = false;
|
||||||
let _ = event_tx.try_send(EdgeEvent::TunnelDisconnected);
|
// Only emit disconnect event on actual disconnection, not on failed reconnects.
|
||||||
|
// Failed reconnects never reach line 335 (handshake success), so was_connected is false.
|
||||||
|
if was_connected {
|
||||||
|
let _ = event_tx.try_send(EdgeEvent::TunnelDisconnected);
|
||||||
|
}
|
||||||
active_streams.store(0, Ordering::Relaxed);
|
active_streams.store(0, Ordering::Relaxed);
|
||||||
// Reset stream ID counter for next connection cycle
|
// Reset stream ID counter for next connection cycle
|
||||||
next_stream_id.store(1, Ordering::Relaxed);
|
next_stream_id.store(1, Ordering::Relaxed);
|
||||||
@@ -276,6 +284,13 @@ async fn connect_to_hub_and_run(
|
|||||||
Ok(s) => {
|
Ok(s) => {
|
||||||
// Disable Nagle's algorithm for low-latency control frames (PING/PONG, WINDOW_UPDATE)
|
// Disable Nagle's algorithm for low-latency control frames (PING/PONG, WINDOW_UPDATE)
|
||||||
let _ = s.set_nodelay(true);
|
let _ = s.set_nodelay(true);
|
||||||
|
// TCP keepalive detects silent network failures (NAT timeout, path change)
|
||||||
|
// faster than the 45s application-level liveness timeout.
|
||||||
|
let ka = socket2::TcpKeepalive::new()
|
||||||
|
.with_time(Duration::from_secs(30));
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
let ka = ka.with_interval(Duration::from_secs(10));
|
||||||
|
let _ = socket2::SockRef::from(&s).set_tcp_keepalive(&ka);
|
||||||
s
|
s
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -375,23 +390,31 @@ async fn connect_to_hub_and_run(
|
|||||||
|
|
||||||
// QoS dual-channel tunnel writer: control frames (PONG/WINDOW_UPDATE/CLOSE/OPEN)
|
// QoS dual-channel tunnel writer: control frames (PONG/WINDOW_UPDATE/CLOSE/OPEN)
|
||||||
// have priority over data frames (DATA). Prevents PING starvation under load.
|
// have priority over data frames (DATA). Prevents PING starvation under load.
|
||||||
let (tunnel_ctrl_tx, mut tunnel_ctrl_rx) = mpsc::channel::<Vec<u8>>(64);
|
let (tunnel_ctrl_tx, mut tunnel_ctrl_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||||
let (tunnel_data_tx, mut tunnel_data_rx) = mpsc::channel::<Vec<u8>>(4096);
|
let (tunnel_data_tx, mut tunnel_data_rx) = mpsc::channel::<Vec<u8>>(4096);
|
||||||
// Legacy alias — control channel for PONG, CLOSE, WINDOW_UPDATE, OPEN
|
// Legacy alias — control channel for PONG, CLOSE, WINDOW_UPDATE, OPEN
|
||||||
let tunnel_writer_tx = tunnel_ctrl_tx.clone();
|
let tunnel_writer_tx = tunnel_ctrl_tx.clone();
|
||||||
let tw_token = connection_token.clone();
|
let tw_token = connection_token.clone();
|
||||||
|
// Oneshot to signal the reader loop when the writer dies from a write error.
|
||||||
|
// This avoids the 45s liveness timeout delay when the tunnel is already dead.
|
||||||
|
let (writer_dead_tx, mut writer_dead_rx) = tokio::sync::oneshot::channel::<()>();
|
||||||
let tunnel_writer_handle = tokio::spawn(async move {
|
let tunnel_writer_handle = tokio::spawn(async move {
|
||||||
// BufWriter coalesces small writes (frame headers, control frames) into fewer
|
// BufWriter coalesces small writes (frame headers, control frames) into fewer
|
||||||
// TLS records and syscalls. Flushed after each frame to avoid holding data.
|
// TLS records and syscalls. Flushed after each frame to avoid holding data.
|
||||||
let mut writer = tokio::io::BufWriter::with_capacity(65536, write_half);
|
let mut writer = tokio::io::BufWriter::with_capacity(65536, write_half);
|
||||||
|
let mut write_error = false;
|
||||||
|
let write_timeout = Duration::from_secs(30);
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
biased; // control frames always take priority over data
|
biased; // control frames always take priority over data
|
||||||
ctrl = tunnel_ctrl_rx.recv() => {
|
ctrl = tunnel_ctrl_rx.recv() => {
|
||||||
match ctrl {
|
match ctrl {
|
||||||
Some(frame_data) => {
|
Some(frame_data) => {
|
||||||
if writer.write_all(&frame_data).await.is_err() { break; }
|
let ok = tokio::time::timeout(write_timeout, async {
|
||||||
if writer.flush().await.is_err() { break; }
|
writer.write_all(&frame_data).await?;
|
||||||
|
writer.flush().await
|
||||||
|
}).await;
|
||||||
|
if !matches!(ok, Ok(Ok(()))) { write_error = true; break; }
|
||||||
}
|
}
|
||||||
None => break,
|
None => break,
|
||||||
}
|
}
|
||||||
@@ -399,8 +422,11 @@ async fn connect_to_hub_and_run(
|
|||||||
data = tunnel_data_rx.recv() => {
|
data = tunnel_data_rx.recv() => {
|
||||||
match data {
|
match data {
|
||||||
Some(frame_data) => {
|
Some(frame_data) => {
|
||||||
if writer.write_all(&frame_data).await.is_err() { break; }
|
let ok = tokio::time::timeout(write_timeout, async {
|
||||||
if writer.flush().await.is_err() { break; }
|
writer.write_all(&frame_data).await?;
|
||||||
|
writer.flush().await
|
||||||
|
}).await;
|
||||||
|
if !matches!(ok, Ok(Ok(()))) { write_error = true; break; }
|
||||||
}
|
}
|
||||||
None => break,
|
None => break,
|
||||||
}
|
}
|
||||||
@@ -408,10 +434,15 @@ async fn connect_to_hub_and_run(
|
|||||||
_ = tw_token.cancelled() => break,
|
_ = tw_token.cancelled() => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if write_error {
|
||||||
|
log::error!("Tunnel writer failed or stalled, signalling reader for fast reconnect");
|
||||||
|
let _ = writer_dead_tx.send(());
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Start TCP listeners for initial ports (hot-reloadable)
|
// Start TCP listeners for initial ports (hot-reloadable)
|
||||||
let mut port_listeners: HashMap<u16, JoinHandle<()>> = HashMap::new();
|
let mut port_listeners: HashMap<u16, JoinHandle<()>> = HashMap::new();
|
||||||
|
let bind_address = config.bind_address.as_deref().unwrap_or("0.0.0.0");
|
||||||
apply_port_config(
|
apply_port_config(
|
||||||
&handshake.listen_ports,
|
&handshake.listen_ports,
|
||||||
&mut port_listeners,
|
&mut port_listeners,
|
||||||
@@ -422,6 +453,7 @@ async fn connect_to_hub_and_run(
|
|||||||
next_stream_id,
|
next_stream_id,
|
||||||
&config.edge_id,
|
&config.edge_id,
|
||||||
connection_token,
|
connection_token,
|
||||||
|
bind_address,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Heartbeat: liveness timeout detects silent hub failures
|
// Heartbeat: liveness timeout detects silent hub failures
|
||||||
@@ -488,14 +520,17 @@ async fn connect_to_hub_and_run(
|
|||||||
next_stream_id,
|
next_stream_id,
|
||||||
&config.edge_id,
|
&config.edge_id,
|
||||||
connection_token,
|
connection_token,
|
||||||
|
bind_address,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
FRAME_PING => {
|
FRAME_PING => {
|
||||||
let pong_frame = encode_frame(0, FRAME_PONG, &[]);
|
let pong_frame = encode_frame(0, FRAME_PONG, &[]);
|
||||||
if tunnel_writer_tx.try_send(pong_frame).is_err() {
|
if tunnel_writer_tx.try_send(pong_frame).is_err() {
|
||||||
log::warn!("Failed to send PONG, writer channel full/closed");
|
// Control channel full (WINDOW_UPDATE burst from many streams).
|
||||||
break EdgeLoopResult::Reconnect;
|
// DON'T disconnect — the 45s liveness timeout gives margin
|
||||||
|
// for the channel to drain and the next PONG to succeed.
|
||||||
|
log::warn!("PONG send failed, control channel full — skipping this cycle");
|
||||||
}
|
}
|
||||||
log::trace!("Received PING from hub, sent PONG");
|
log::trace!("Received PING from hub, sent PONG");
|
||||||
}
|
}
|
||||||
@@ -519,6 +554,10 @@ async fn connect_to_hub_and_run(
|
|||||||
liveness_timeout_dur.as_secs());
|
liveness_timeout_dur.as_secs());
|
||||||
break EdgeLoopResult::Reconnect;
|
break EdgeLoopResult::Reconnect;
|
||||||
}
|
}
|
||||||
|
_ = &mut writer_dead_rx => {
|
||||||
|
log::error!("Tunnel writer died, reconnecting immediately");
|
||||||
|
break EdgeLoopResult::Reconnect;
|
||||||
|
}
|
||||||
_ = connection_token.cancelled() => {
|
_ = connection_token.cancelled() => {
|
||||||
log::info!("Connection cancelled");
|
log::info!("Connection cancelled");
|
||||||
break EdgeLoopResult::Shutdown;
|
break EdgeLoopResult::Shutdown;
|
||||||
@@ -551,6 +590,7 @@ fn apply_port_config(
|
|||||||
next_stream_id: &Arc<AtomicU32>,
|
next_stream_id: &Arc<AtomicU32>,
|
||||||
edge_id: &str,
|
edge_id: &str,
|
||||||
connection_token: &CancellationToken,
|
connection_token: &CancellationToken,
|
||||||
|
bind_address: &str,
|
||||||
) {
|
) {
|
||||||
let new_set: std::collections::HashSet<u16> = new_ports.iter().copied().collect();
|
let new_set: std::collections::HashSet<u16> = new_ports.iter().copied().collect();
|
||||||
let old_set: std::collections::HashSet<u16> = port_listeners.keys().copied().collect();
|
let old_set: std::collections::HashSet<u16> = port_listeners.keys().copied().collect();
|
||||||
@@ -573,8 +613,9 @@ fn apply_port_config(
|
|||||||
let edge_id = edge_id.to_string();
|
let edge_id = edge_id.to_string();
|
||||||
let port_token = connection_token.child_token();
|
let port_token = connection_token.child_token();
|
||||||
|
|
||||||
|
let bind_addr = bind_address.to_string();
|
||||||
let handle = tokio::spawn(async move {
|
let handle = tokio::spawn(async move {
|
||||||
let listener = match TcpListener::bind(("0.0.0.0", port)).await {
|
let listener = match TcpListener::bind((bind_addr.as_str(), port)).await {
|
||||||
Ok(l) => l,
|
Ok(l) => l,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::error!("Failed to bind port {}: {}", port, e);
|
log::error!("Failed to bind port {}: {}", port, e);
|
||||||
@@ -588,6 +629,15 @@ fn apply_port_config(
|
|||||||
accept_result = listener.accept() => {
|
accept_result = listener.accept() => {
|
||||||
match accept_result {
|
match accept_result {
|
||||||
Ok((client_stream, client_addr)) => {
|
Ok((client_stream, client_addr)) => {
|
||||||
|
// TCP keepalive detects dead clients that disappear without FIN.
|
||||||
|
// Without this, zombie streams accumulate and never get cleaned up.
|
||||||
|
let _ = client_stream.set_nodelay(true);
|
||||||
|
let ka = socket2::TcpKeepalive::new()
|
||||||
|
.with_time(Duration::from_secs(60));
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
let ka = ka.with_interval(Duration::from_secs(60));
|
||||||
|
let _ = socket2::SockRef::from(&client_stream).set_tcp_keepalive(&ka);
|
||||||
|
|
||||||
let stream_id = next_stream_id.fetch_add(1, Ordering::Relaxed);
|
let stream_id = next_stream_id.fetch_add(1, Ordering::Relaxed);
|
||||||
let tunnel_ctrl_tx = tunnel_ctrl_tx.clone();
|
let tunnel_ctrl_tx = tunnel_ctrl_tx.clone();
|
||||||
let tunnel_data_tx = tunnel_data_tx.clone();
|
let tunnel_data_tx = tunnel_data_tx.clone();
|
||||||
@@ -609,9 +659,21 @@ fn apply_port_config(
|
|||||||
tunnel_data_tx,
|
tunnel_data_tx,
|
||||||
client_writers,
|
client_writers,
|
||||||
client_token,
|
client_token,
|
||||||
|
Arc::clone(&active_streams),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
active_streams.fetch_sub(1, Ordering::Relaxed);
|
// Saturating decrement: prevent underflow when
|
||||||
|
// edge_main_loop's store(0) races with task cleanup.
|
||||||
|
loop {
|
||||||
|
let current = active_streams.load(Ordering::Relaxed);
|
||||||
|
if current == 0 { break; }
|
||||||
|
if active_streams.compare_exchange_weak(
|
||||||
|
current, current - 1,
|
||||||
|
Ordering::Relaxed, Ordering::Relaxed,
|
||||||
|
).is_ok() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -640,6 +702,7 @@ async fn handle_client_connection(
|
|||||||
tunnel_data_tx: mpsc::Sender<Vec<u8>>,
|
tunnel_data_tx: mpsc::Sender<Vec<u8>>,
|
||||||
client_writers: Arc<Mutex<HashMap<u32, EdgeStreamState>>>,
|
client_writers: Arc<Mutex<HashMap<u32, EdgeStreamState>>>,
|
||||||
client_token: CancellationToken,
|
client_token: CancellationToken,
|
||||||
|
active_streams: Arc<AtomicU32>,
|
||||||
) {
|
) {
|
||||||
let client_ip = client_addr.ip().to_string();
|
let client_ip = client_addr.ip().to_string();
|
||||||
let client_port = client_addr.port();
|
let client_port = client_addr.port();
|
||||||
@@ -656,7 +719,12 @@ async fn handle_client_connection(
|
|||||||
|
|
||||||
// Set up channel for data coming back from hub (capacity 16 is sufficient with flow control)
|
// Set up channel for data coming back from hub (capacity 16 is sufficient with flow control)
|
||||||
let (back_tx, mut back_rx) = mpsc::channel::<Vec<u8>>(256);
|
let (back_tx, mut back_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||||
let send_window = Arc::new(AtomicU32::new(INITIAL_STREAM_WINDOW));
|
// Adaptive initial window: scale with current stream count to keep total in-flight
|
||||||
|
// data within the 32MB budget. Prevents burst flooding when many streams open.
|
||||||
|
let initial_window = remoteingress_protocol::compute_window_for_stream_count(
|
||||||
|
active_streams.load(Ordering::Relaxed),
|
||||||
|
);
|
||||||
|
let send_window = Arc::new(AtomicU32::new(initial_window));
|
||||||
let window_notify = Arc::new(Notify::new());
|
let window_notify = Arc::new(Notify::new());
|
||||||
{
|
{
|
||||||
let mut writers = client_writers.lock().await;
|
let mut writers = client_writers.lock().await;
|
||||||
@@ -673,6 +741,7 @@ async fn handle_client_connection(
|
|||||||
// After writing to client TCP, send WINDOW_UPDATE to hub so it can send more
|
// After writing to client TCP, send WINDOW_UPDATE to hub so it can send more
|
||||||
let hub_to_client_token = client_token.clone();
|
let hub_to_client_token = client_token.clone();
|
||||||
let wu_tx = tunnel_ctrl_tx.clone();
|
let wu_tx = tunnel_ctrl_tx.clone();
|
||||||
|
let active_streams_h2c = Arc::clone(&active_streams);
|
||||||
let mut hub_to_client = tokio::spawn(async move {
|
let mut hub_to_client = tokio::spawn(async move {
|
||||||
let mut consumed_since_update: u32 = 0;
|
let mut consumed_since_update: u32 = 0;
|
||||||
loop {
|
loop {
|
||||||
@@ -684,12 +753,20 @@ async fn handle_client_connection(
|
|||||||
if client_write.write_all(&data).await.is_err() {
|
if client_write.write_all(&data).await.is_err() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
// Track consumption for flow control
|
// Track consumption for adaptive flow control.
|
||||||
|
// The increment is capped to the adaptive window so the sender's
|
||||||
|
// effective window shrinks to match current demand (fewer streams
|
||||||
|
// = larger window, more streams = smaller window per stream).
|
||||||
consumed_since_update += len;
|
consumed_since_update += len;
|
||||||
if consumed_since_update >= WINDOW_UPDATE_THRESHOLD {
|
let adaptive_window = remoteingress_protocol::compute_window_for_stream_count(
|
||||||
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, consumed_since_update);
|
active_streams_h2c.load(Ordering::Relaxed),
|
||||||
|
);
|
||||||
|
let threshold = adaptive_window / 2;
|
||||||
|
if consumed_since_update >= threshold {
|
||||||
|
let increment = consumed_since_update.min(adaptive_window);
|
||||||
|
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, increment);
|
||||||
if wu_tx.try_send(frame).is_ok() {
|
if wu_tx.try_send(frame).is_ok() {
|
||||||
consumed_since_update = 0;
|
consumed_since_update -= increment;
|
||||||
}
|
}
|
||||||
// If try_send fails, keep accumulating — retry on next threshold
|
// If try_send fails, keep accumulating — retry on next threshold
|
||||||
}
|
}
|
||||||
@@ -735,7 +812,11 @@ async fn handle_client_connection(
|
|||||||
log::warn!("Stream {} upload: window still 0 after stall timeout, closing", stream_id);
|
log::warn!("Stream {} upload: window still 0 after stall timeout, closing", stream_id);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
let max_read = w.min(buf.len());
|
// Adaptive: cap read to current per-stream target window
|
||||||
|
let adaptive_cap = remoteingress_protocol::compute_window_for_stream_count(
|
||||||
|
active_streams.load(Ordering::Relaxed),
|
||||||
|
) as usize;
|
||||||
|
let max_read = w.min(buf.len()).min(adaptive_cap);
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
read_result = client_read.read(&mut buf[..max_read]) => {
|
read_result = client_read.read(&mut buf[..max_read]) => {
|
||||||
@@ -810,6 +891,7 @@ mod tests {
|
|||||||
hub_port: 9999,
|
hub_port: 9999,
|
||||||
edge_id: "e1".to_string(),
|
edge_id: "e1".to_string(),
|
||||||
secret: "sec".to_string(),
|
secret: "sec".to_string(),
|
||||||
|
bind_address: None,
|
||||||
};
|
};
|
||||||
let json = serde_json::to_string(&config).unwrap();
|
let json = serde_json::to_string(&config).unwrap();
|
||||||
let back: EdgeConfig = serde_json::from_str(&json).unwrap();
|
let back: EdgeConfig = serde_json::from_str(&json).unwrap();
|
||||||
@@ -925,6 +1007,7 @@ mod tests {
|
|||||||
hub_port: 8443,
|
hub_port: 8443,
|
||||||
edge_id: "test-edge".to_string(),
|
edge_id: "test-edge".to_string(),
|
||||||
secret: "test-secret".to_string(),
|
secret: "test-secret".to_string(),
|
||||||
|
bind_address: None,
|
||||||
});
|
});
|
||||||
let status = edge.get_status().await;
|
let status = edge.get_status().await;
|
||||||
assert!(!status.running);
|
assert!(!status.running);
|
||||||
@@ -941,6 +1024,7 @@ mod tests {
|
|||||||
hub_port: 8443,
|
hub_port: 8443,
|
||||||
edge_id: "e".to_string(),
|
edge_id: "e".to_string(),
|
||||||
secret: "s".to_string(),
|
secret: "s".to_string(),
|
||||||
|
bind_address: None,
|
||||||
});
|
});
|
||||||
let rx1 = edge.take_event_rx().await;
|
let rx1 = edge.take_event_rx().await;
|
||||||
assert!(rx1.is_some());
|
assert!(rx1.is_some());
|
||||||
@@ -955,6 +1039,7 @@ mod tests {
|
|||||||
hub_port: 8443,
|
hub_port: 8443,
|
||||||
edge_id: "e".to_string(),
|
edge_id: "e".to_string(),
|
||||||
secret: "s".to_string(),
|
secret: "s".to_string(),
|
||||||
|
bind_address: None,
|
||||||
});
|
});
|
||||||
edge.stop().await; // should not panic
|
edge.stop().await; // should not panic
|
||||||
let status = edge.get_status().await;
|
let status = edge.get_status().await;
|
||||||
|
|||||||
@@ -300,6 +300,13 @@ async fn handle_edge_connection(
|
|||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
// Disable Nagle's algorithm for low-latency control frames (PING/PONG, WINDOW_UPDATE)
|
// Disable Nagle's algorithm for low-latency control frames (PING/PONG, WINDOW_UPDATE)
|
||||||
stream.set_nodelay(true)?;
|
stream.set_nodelay(true)?;
|
||||||
|
// TCP keepalive detects silent network failures (NAT timeout, path change)
|
||||||
|
// faster than the 45s application-level liveness timeout.
|
||||||
|
let ka = socket2::TcpKeepalive::new()
|
||||||
|
.with_time(Duration::from_secs(30));
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
let ka = ka.with_interval(Duration::from_secs(10));
|
||||||
|
let _ = socket2::SockRef::from(&stream).set_tcp_keepalive(&ka);
|
||||||
let tls_stream = acceptor.accept(stream).await?;
|
let tls_stream = acceptor.accept(stream).await?;
|
||||||
let (read_half, mut write_half) = tokio::io::split(tls_stream);
|
let (read_half, mut write_half) = tokio::io::split(tls_stream);
|
||||||
let mut buf_reader = BufReader::new(read_half);
|
let mut buf_reader = BufReader::new(read_half);
|
||||||
@@ -373,25 +380,34 @@ async fn handle_edge_connection(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Per-edge active stream counter for adaptive flow control
|
||||||
|
let edge_stream_count = Arc::new(AtomicU32::new(0));
|
||||||
|
|
||||||
// QoS dual-channel tunnel writer: control frames (PING/PONG/WINDOW_UPDATE/CLOSE)
|
// QoS dual-channel tunnel writer: control frames (PING/PONG/WINDOW_UPDATE/CLOSE)
|
||||||
// have priority over data frames (DATA_BACK). This prevents PING starvation under load.
|
// have priority over data frames (DATA_BACK). This prevents PING starvation under load.
|
||||||
let (ctrl_tx, mut ctrl_rx) = mpsc::channel::<Vec<u8>>(64);
|
let (ctrl_tx, mut ctrl_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||||
let (data_tx, mut data_rx) = mpsc::channel::<Vec<u8>>(4096);
|
let (data_tx, mut data_rx) = mpsc::channel::<Vec<u8>>(4096);
|
||||||
// Legacy alias for code that sends both control and data (will be migrated)
|
// Legacy alias for code that sends both control and data (will be migrated)
|
||||||
let frame_writer_tx = ctrl_tx.clone();
|
let frame_writer_tx = ctrl_tx.clone();
|
||||||
let writer_token = edge_token.clone();
|
let writer_token = edge_token.clone();
|
||||||
|
let (writer_dead_tx, mut writer_dead_rx) = tokio::sync::oneshot::channel::<()>();
|
||||||
let writer_handle = tokio::spawn(async move {
|
let writer_handle = tokio::spawn(async move {
|
||||||
// BufWriter coalesces small writes (frame headers, control frames) into fewer
|
// BufWriter coalesces small writes (frame headers, control frames) into fewer
|
||||||
// TLS records and syscalls. Flushed after each frame to avoid holding data.
|
// TLS records and syscalls. Flushed after each frame to avoid holding data.
|
||||||
let mut writer = tokio::io::BufWriter::with_capacity(65536, write_half);
|
let mut writer = tokio::io::BufWriter::with_capacity(65536, write_half);
|
||||||
|
let mut write_error = false;
|
||||||
|
let write_timeout = Duration::from_secs(30);
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
biased; // control frames always take priority over data
|
biased; // control frames always take priority over data
|
||||||
ctrl = ctrl_rx.recv() => {
|
ctrl = ctrl_rx.recv() => {
|
||||||
match ctrl {
|
match ctrl {
|
||||||
Some(frame_data) => {
|
Some(frame_data) => {
|
||||||
if writer.write_all(&frame_data).await.is_err() { break; }
|
let ok = tokio::time::timeout(write_timeout, async {
|
||||||
if writer.flush().await.is_err() { break; }
|
writer.write_all(&frame_data).await?;
|
||||||
|
writer.flush().await
|
||||||
|
}).await;
|
||||||
|
if !matches!(ok, Ok(Ok(()))) { write_error = true; break; }
|
||||||
}
|
}
|
||||||
None => break,
|
None => break,
|
||||||
}
|
}
|
||||||
@@ -399,8 +415,11 @@ async fn handle_edge_connection(
|
|||||||
data = data_rx.recv() => {
|
data = data_rx.recv() => {
|
||||||
match data {
|
match data {
|
||||||
Some(frame_data) => {
|
Some(frame_data) => {
|
||||||
if writer.write_all(&frame_data).await.is_err() { break; }
|
let ok = tokio::time::timeout(write_timeout, async {
|
||||||
if writer.flush().await.is_err() { break; }
|
writer.write_all(&frame_data).await?;
|
||||||
|
writer.flush().await
|
||||||
|
}).await;
|
||||||
|
if !matches!(ok, Ok(Ok(()))) { write_error = true; break; }
|
||||||
}
|
}
|
||||||
None => break,
|
None => break,
|
||||||
}
|
}
|
||||||
@@ -408,6 +427,10 @@ async fn handle_edge_connection(
|
|||||||
_ = writer_token.cancelled() => break,
|
_ = writer_token.cancelled() => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if write_error {
|
||||||
|
log::error!("Tunnel writer to edge failed or stalled, signalling reader for fast cleanup");
|
||||||
|
let _ = writer_dead_tx.send(());
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Spawn task to forward config updates as FRAME_CONFIG frames
|
// Spawn task to forward config updates as FRAME_CONFIG frames
|
||||||
@@ -496,7 +519,12 @@ async fn handle_edge_connection(
|
|||||||
|
|
||||||
// Create channel for data from edge to this stream (capacity 16 is sufficient with flow control)
|
// Create channel for data from edge to this stream (capacity 16 is sufficient with flow control)
|
||||||
let (data_tx, mut data_rx) = mpsc::channel::<Vec<u8>>(256);
|
let (data_tx, mut data_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||||
let send_window = Arc::new(AtomicU32::new(INITIAL_STREAM_WINDOW));
|
// Adaptive initial window: scale with current stream count
|
||||||
|
// to keep total in-flight data within the 32MB budget.
|
||||||
|
let initial_window = compute_window_for_stream_count(
|
||||||
|
edge_stream_count.load(Ordering::Relaxed),
|
||||||
|
);
|
||||||
|
let send_window = Arc::new(AtomicU32::new(initial_window));
|
||||||
let window_notify = Arc::new(Notify::new());
|
let window_notify = Arc::new(Notify::new());
|
||||||
{
|
{
|
||||||
let mut s = streams.lock().await;
|
let mut s = streams.lock().await;
|
||||||
@@ -509,8 +537,10 @@ async fn handle_edge_connection(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Spawn task: connect to SmartProxy, send PROXY header, pipe data
|
// Spawn task: connect to SmartProxy, send PROXY header, pipe data
|
||||||
|
let stream_counter = Arc::clone(&edge_stream_count);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let _permit = permit; // hold semaphore permit until stream completes
|
let _permit = permit; // hold semaphore permit until stream completes
|
||||||
|
stream_counter.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
let result = async {
|
let result = async {
|
||||||
// A2: Connect to SmartProxy with timeout
|
// A2: Connect to SmartProxy with timeout
|
||||||
@@ -533,6 +563,7 @@ async fn handle_edge_connection(
|
|||||||
// After writing to upstream, send WINDOW_UPDATE_BACK to edge
|
// After writing to upstream, send WINDOW_UPDATE_BACK to edge
|
||||||
let writer_token = stream_token.clone();
|
let writer_token = stream_token.clone();
|
||||||
let wub_tx = writer_tx.clone();
|
let wub_tx = writer_tx.clone();
|
||||||
|
let stream_counter_w = Arc::clone(&stream_counter);
|
||||||
let writer_for_edge_data = tokio::spawn(async move {
|
let writer_for_edge_data = tokio::spawn(async move {
|
||||||
let mut consumed_since_update: u32 = 0;
|
let mut consumed_since_update: u32 = 0;
|
||||||
loop {
|
loop {
|
||||||
@@ -558,12 +589,18 @@ async fn handle_edge_connection(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Track consumption for flow control
|
// Track consumption for adaptive flow control.
|
||||||
|
// Increment capped to adaptive window to limit per-stream in-flight data.
|
||||||
consumed_since_update += len;
|
consumed_since_update += len;
|
||||||
if consumed_since_update >= WINDOW_UPDATE_THRESHOLD {
|
let adaptive_window = remoteingress_protocol::compute_window_for_stream_count(
|
||||||
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, consumed_since_update);
|
stream_counter_w.load(Ordering::Relaxed),
|
||||||
|
);
|
||||||
|
let threshold = adaptive_window / 2;
|
||||||
|
if consumed_since_update >= threshold {
|
||||||
|
let increment = consumed_since_update.min(adaptive_window);
|
||||||
|
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, increment);
|
||||||
if wub_tx.try_send(frame).is_ok() {
|
if wub_tx.try_send(frame).is_ok() {
|
||||||
consumed_since_update = 0;
|
consumed_since_update -= increment;
|
||||||
}
|
}
|
||||||
// If try_send fails, keep accumulating — retry on next threshold
|
// If try_send fails, keep accumulating — retry on next threshold
|
||||||
}
|
}
|
||||||
@@ -610,7 +647,11 @@ async fn handle_edge_connection(
|
|||||||
log::warn!("Stream {} download: window still 0 after stall timeout, closing", stream_id);
|
log::warn!("Stream {} download: window still 0 after stall timeout, closing", stream_id);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
let max_read = w.min(buf.len());
|
// Adaptive: cap read to current per-stream target window
|
||||||
|
let adaptive_cap = remoteingress_protocol::compute_window_for_stream_count(
|
||||||
|
stream_counter.load(Ordering::Relaxed),
|
||||||
|
) as usize;
|
||||||
|
let max_read = w.min(buf.len()).min(adaptive_cap);
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
read_result = up_read.read(&mut buf[..max_read]) => {
|
read_result = up_read.read(&mut buf[..max_read]) => {
|
||||||
@@ -665,6 +706,7 @@ async fn handle_edge_connection(
|
|||||||
stream_id,
|
stream_id,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
stream_counter.fetch_sub(1, Ordering::Relaxed);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
FRAME_DATA => {
|
FRAME_DATA => {
|
||||||
@@ -726,8 +768,9 @@ async fn handle_edge_connection(
|
|||||||
_ = ping_ticker.tick() => {
|
_ = ping_ticker.tick() => {
|
||||||
let ping_frame = encode_frame(0, FRAME_PING, &[]);
|
let ping_frame = encode_frame(0, FRAME_PING, &[]);
|
||||||
if frame_writer_tx.try_send(ping_frame).is_err() {
|
if frame_writer_tx.try_send(ping_frame).is_err() {
|
||||||
log::warn!("Failed to send PING to edge {}, writer channel full/closed", edge_id);
|
// Control channel full — skip this PING cycle.
|
||||||
break;
|
// The 45s liveness timeout gives margin for the channel to drain.
|
||||||
|
log::warn!("PING send to edge {} failed, control channel full — skipping", edge_id);
|
||||||
}
|
}
|
||||||
log::trace!("Sent PING to edge {}", edge_id);
|
log::trace!("Sent PING to edge {}", edge_id);
|
||||||
}
|
}
|
||||||
@@ -736,6 +779,10 @@ async fn handle_edge_connection(
|
|||||||
edge_id, liveness_timeout_dur.as_secs());
|
edge_id, liveness_timeout_dur.as_secs());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
_ = &mut writer_dead_rx => {
|
||||||
|
log::error!("Tunnel writer to edge {} died, disconnecting immediately", edge_id);
|
||||||
|
break;
|
||||||
|
}
|
||||||
_ = edge_token.cancelled() => {
|
_ = edge_token.cancelled() => {
|
||||||
log::info!("Edge {} cancelled by hub", edge_id);
|
log::info!("Edge {} cancelled by hub", edge_id);
|
||||||
break;
|
break;
|
||||||
|
|||||||
@@ -5,3 +5,6 @@ edition = "2021"
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
tokio = { version = "1", features = ["io-util"] }
|
tokio = { version = "1", features = ["io-util"] }
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
tokio = { version = "1", features = ["io-util", "macros", "rt"] }
|
||||||
|
|||||||
@@ -32,6 +32,15 @@ pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> V
|
|||||||
encode_frame(stream_id, frame_type, &increment.to_be_bytes())
|
encode_frame(stream_id, frame_type, &increment.to_be_bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compute the target per-stream window size based on the number of active streams.
|
||||||
|
/// Total memory budget is ~32MB shared across all streams. As more streams are active,
|
||||||
|
/// each gets a smaller window. This adapts to current demand — few streams get high
|
||||||
|
/// throughput, many streams save memory and reduce control frame pressure.
|
||||||
|
pub fn compute_window_for_stream_count(active: u32) -> u32 {
|
||||||
|
let per_stream = (32 * 1024 * 1024u64) / (active.max(1) as u64);
|
||||||
|
per_stream.clamp(64 * 1024, INITIAL_STREAM_WINDOW as u64) as u32
|
||||||
|
}
|
||||||
|
|
||||||
/// Decode a WINDOW_UPDATE payload into a byte increment. Returns None if payload is malformed.
|
/// Decode a WINDOW_UPDATE payload into a byte increment. Returns None if payload is malformed.
|
||||||
pub fn decode_window_update(payload: &[u8]) -> Option<u32> {
|
pub fn decode_window_update(payload: &[u8]) -> Option<u32> {
|
||||||
if payload.len() != 4 {
|
if payload.len() != 4 {
|
||||||
@@ -336,4 +345,134 @@ mod tests {
|
|||||||
assert_eq!(&pong[0..4], &0u32.to_be_bytes());
|
assert_eq!(&pong[0..4], &0u32.to_be_bytes());
|
||||||
assert_eq!(pong.len(), FRAME_HEADER_SIZE);
|
assert_eq!(pong.len(), FRAME_HEADER_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- compute_window_for_stream_count tests ---
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_adaptive_window_zero_streams() {
|
||||||
|
// 0 streams treated as 1: 32MB/1 = 32MB → clamped to 4MB max
|
||||||
|
assert_eq!(compute_window_for_stream_count(0), INITIAL_STREAM_WINDOW);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_adaptive_window_one_stream() {
|
||||||
|
// 32MB/1 = 32MB → clamped to 4MB max
|
||||||
|
assert_eq!(compute_window_for_stream_count(1), INITIAL_STREAM_WINDOW);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_adaptive_window_at_max_boundary() {
|
||||||
|
// 32MB/8 = 4MB = exactly INITIAL_STREAM_WINDOW
|
||||||
|
assert_eq!(compute_window_for_stream_count(8), INITIAL_STREAM_WINDOW);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_adaptive_window_just_below_max() {
|
||||||
|
// 32MB/9 = 3,728,270 — first value below INITIAL_STREAM_WINDOW
|
||||||
|
let w = compute_window_for_stream_count(9);
|
||||||
|
assert!(w < INITIAL_STREAM_WINDOW);
|
||||||
|
assert_eq!(w, (32 * 1024 * 1024u64 / 9) as u32);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_adaptive_window_16_streams() {
|
||||||
|
// 32MB/16 = 2MB
|
||||||
|
assert_eq!(compute_window_for_stream_count(16), 2 * 1024 * 1024);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_adaptive_window_100_streams() {
|
||||||
|
// 32MB/100 = 335,544 bytes (~327KB)
|
||||||
|
let w = compute_window_for_stream_count(100);
|
||||||
|
assert_eq!(w, (32 * 1024 * 1024u64 / 100) as u32);
|
||||||
|
assert!(w > 64 * 1024); // above floor
|
||||||
|
assert!(w < INITIAL_STREAM_WINDOW as u32); // below ceiling
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_adaptive_window_200_streams() {
|
||||||
|
// 32MB/200 = 167,772 bytes (~163KB), above 64KB floor
|
||||||
|
let w = compute_window_for_stream_count(200);
|
||||||
|
assert_eq!(w, (32 * 1024 * 1024u64 / 200) as u32);
|
||||||
|
assert!(w > 64 * 1024);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_adaptive_window_500_streams() {
|
||||||
|
// 32MB/500 = 67,108 bytes (~65.5KB), just above 64KB floor
|
||||||
|
let w = compute_window_for_stream_count(500);
|
||||||
|
assert_eq!(w, (32 * 1024 * 1024u64 / 500) as u32);
|
||||||
|
assert!(w > 64 * 1024);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_adaptive_window_at_min_boundary() {
|
||||||
|
// 32MB/512 = 65,536 = exactly 64KB floor
|
||||||
|
assert_eq!(compute_window_for_stream_count(512), 64 * 1024);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_adaptive_window_below_min_clamped() {
|
||||||
|
// 32MB/513 = 65,408 → clamped up to 64KB
|
||||||
|
assert_eq!(compute_window_for_stream_count(513), 64 * 1024);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_adaptive_window_1000_streams() {
|
||||||
|
// 32MB/1000 = 33,554 → clamped to 64KB
|
||||||
|
assert_eq!(compute_window_for_stream_count(1000), 64 * 1024);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_adaptive_window_max_u32() {
|
||||||
|
// Extreme: u32::MAX streams → tiny value → clamped to 64KB
|
||||||
|
assert_eq!(compute_window_for_stream_count(u32::MAX), 64 * 1024);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_adaptive_window_monotonically_decreasing() {
|
||||||
|
// Window should decrease (or stay same) as stream count increases
|
||||||
|
let mut prev = compute_window_for_stream_count(1);
|
||||||
|
for n in [2, 5, 10, 50, 100, 200, 500, 512, 1000] {
|
||||||
|
let w = compute_window_for_stream_count(n);
|
||||||
|
assert!(w <= prev, "window increased from {} to {} at n={}", prev, w, n);
|
||||||
|
prev = w;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_adaptive_window_total_budget_bounded() {
|
||||||
|
// active × per_stream_window should never exceed 32MB (+ clamp overhead for high N)
|
||||||
|
for n in [1, 10, 50, 100, 200, 500] {
|
||||||
|
let w = compute_window_for_stream_count(n);
|
||||||
|
let total = w as u64 * n as u64;
|
||||||
|
assert!(total <= 32 * 1024 * 1024, "total {}MB exceeds budget at n={}", total / (1024*1024), n);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- encode/decode window_update roundtrip ---
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_window_update_roundtrip() {
|
||||||
|
for &increment in &[0u32, 1, 64 * 1024, INITIAL_STREAM_WINDOW, MAX_WINDOW_SIZE, u32::MAX] {
|
||||||
|
let frame = encode_window_update(42, FRAME_WINDOW_UPDATE, increment);
|
||||||
|
assert_eq!(frame[4], FRAME_WINDOW_UPDATE);
|
||||||
|
let decoded = decode_window_update(&frame[FRAME_HEADER_SIZE..]);
|
||||||
|
assert_eq!(decoded, Some(increment));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_window_update_back_roundtrip() {
|
||||||
|
let frame = encode_window_update(7, FRAME_WINDOW_UPDATE_BACK, 1234567);
|
||||||
|
assert_eq!(frame[4], FRAME_WINDOW_UPDATE_BACK);
|
||||||
|
assert_eq!(decode_window_update(&frame[FRAME_HEADER_SIZE..]), Some(1234567));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_decode_window_update_malformed() {
|
||||||
|
assert_eq!(decode_window_update(&[]), None);
|
||||||
|
assert_eq!(decode_window_update(&[0, 0, 0]), None);
|
||||||
|
assert_eq!(decode_window_update(&[0, 0, 0, 0, 0]), None);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
475
test/test.flowcontrol.node.ts
Normal file
475
test/test.flowcontrol.node.ts
Normal file
@@ -0,0 +1,475 @@
|
|||||||
|
import { expect, tap } from '@push.rocks/tapbundle';
|
||||||
|
import * as net from 'net';
|
||||||
|
import * as crypto from 'crypto';
|
||||||
|
import { RemoteIngressHub, RemoteIngressEdge } from '../ts/index.js';
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Helpers
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/** Find N free ports by binding to port 0 and collecting OS-assigned ports. */
|
||||||
|
async function findFreePorts(count: number): Promise<number[]> {
|
||||||
|
const servers: net.Server[] = [];
|
||||||
|
const ports: number[] = [];
|
||||||
|
for (let i = 0; i < count; i++) {
|
||||||
|
const server = net.createServer();
|
||||||
|
await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve));
|
||||||
|
ports.push((server.address() as net.AddressInfo).port);
|
||||||
|
servers.push(server);
|
||||||
|
}
|
||||||
|
await Promise.all(servers.map((s) => new Promise<void>((resolve) => s.close(() => resolve()))));
|
||||||
|
return ports;
|
||||||
|
}
|
||||||
|
|
||||||
|
type TrackingServer = net.Server & { destroyAll: () => void };
|
||||||
|
|
||||||
|
/** Start a TCP echo server that tracks connections for force-close. */
|
||||||
|
function startEchoServer(port: number, host: string): Promise<TrackingServer> {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const connections = new Set<net.Socket>();
|
||||||
|
const server = net.createServer((socket) => {
|
||||||
|
connections.add(socket);
|
||||||
|
socket.on('close', () => connections.delete(socket));
|
||||||
|
|
||||||
|
// Skip PROXY protocol v1 header line before echoing
|
||||||
|
let proxyHeaderParsed = false;
|
||||||
|
let pendingBuf = Buffer.alloc(0);
|
||||||
|
socket.on('data', (data: Buffer) => {
|
||||||
|
if (!proxyHeaderParsed) {
|
||||||
|
pendingBuf = Buffer.concat([pendingBuf, data]);
|
||||||
|
const idx = pendingBuf.indexOf('\r\n');
|
||||||
|
if (idx !== -1) {
|
||||||
|
proxyHeaderParsed = true;
|
||||||
|
const remainder = pendingBuf.subarray(idx + 2);
|
||||||
|
if (remainder.length > 0) {
|
||||||
|
socket.write(remainder);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
socket.write(data);
|
||||||
|
});
|
||||||
|
socket.on('error', () => {});
|
||||||
|
}) as TrackingServer;
|
||||||
|
|
||||||
|
server.destroyAll = () => {
|
||||||
|
for (const conn of connections) conn.destroy();
|
||||||
|
connections.clear();
|
||||||
|
};
|
||||||
|
|
||||||
|
server.on('error', reject);
|
||||||
|
server.listen(port, host, () => resolve(server));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start a server that sends a large response immediately on first data received.
|
||||||
|
* Does NOT wait for end (the tunnel protocol has no half-close).
|
||||||
|
* On receiving first data chunk after PROXY header, sends responseSize bytes then closes.
|
||||||
|
*/
|
||||||
|
function startLargeResponseServer(port: number, host: string, responseSize: number): Promise<TrackingServer> {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const connections = new Set<net.Socket>();
|
||||||
|
const server = net.createServer((socket) => {
|
||||||
|
connections.add(socket);
|
||||||
|
socket.on('close', () => connections.delete(socket));
|
||||||
|
|
||||||
|
let proxyHeaderParsed = false;
|
||||||
|
let pendingBuf = Buffer.alloc(0);
|
||||||
|
let responseSent = false;
|
||||||
|
|
||||||
|
socket.on('data', (data: Buffer) => {
|
||||||
|
if (!proxyHeaderParsed) {
|
||||||
|
pendingBuf = Buffer.concat([pendingBuf, data]);
|
||||||
|
const idx = pendingBuf.indexOf('\r\n');
|
||||||
|
if (idx !== -1) {
|
||||||
|
proxyHeaderParsed = true;
|
||||||
|
const remainder = pendingBuf.subarray(idx + 2);
|
||||||
|
if (remainder.length > 0 && !responseSent) {
|
||||||
|
responseSent = true;
|
||||||
|
sendLargeResponse(socket, responseSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!responseSent) {
|
||||||
|
responseSent = true;
|
||||||
|
sendLargeResponse(socket, responseSize);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
socket.on('error', () => {});
|
||||||
|
}) as TrackingServer;
|
||||||
|
|
||||||
|
server.destroyAll = () => {
|
||||||
|
for (const conn of connections) conn.destroy();
|
||||||
|
connections.clear();
|
||||||
|
};
|
||||||
|
|
||||||
|
server.on('error', reject);
|
||||||
|
server.listen(port, host, () => resolve(server));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function sendLargeResponse(socket: net.Socket, totalBytes: number) {
|
||||||
|
const chunkSize = 32 * 1024;
|
||||||
|
let sent = 0;
|
||||||
|
const writeChunk = () => {
|
||||||
|
while (sent < totalBytes) {
|
||||||
|
const toWrite = Math.min(chunkSize, totalBytes - sent);
|
||||||
|
// Use a deterministic pattern for verification
|
||||||
|
const chunk = Buffer.alloc(toWrite, (sent % 256) & 0xff);
|
||||||
|
const canContinue = socket.write(chunk);
|
||||||
|
sent += toWrite;
|
||||||
|
if (!canContinue) {
|
||||||
|
socket.once('drain', writeChunk);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
socket.end();
|
||||||
|
};
|
||||||
|
writeChunk();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Force-close a server: destroy all connections, then close. */
|
||||||
|
async function forceCloseServer(server: TrackingServer): Promise<void> {
|
||||||
|
server.destroyAll();
|
||||||
|
await new Promise<void>((resolve) => server.close(() => resolve()));
|
||||||
|
}
|
||||||
|
|
||||||
|
interface TestTunnel {
|
||||||
|
hub: RemoteIngressHub;
|
||||||
|
edge: RemoteIngressEdge;
|
||||||
|
edgePort: number;
|
||||||
|
cleanup: () => Promise<void>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start a full hub + edge tunnel.
|
||||||
|
* Edge binds to 127.0.0.1, upstream server binds to 127.0.0.2.
|
||||||
|
* Hub targetHost = 127.0.0.2 so hub -> upstream doesn't loop back to edge.
|
||||||
|
*/
|
||||||
|
async function startTunnel(edgePort: number, hubPort: number): Promise<TestTunnel> {
|
||||||
|
const hub = new RemoteIngressHub();
|
||||||
|
const edge = new RemoteIngressEdge();
|
||||||
|
|
||||||
|
await hub.start({
|
||||||
|
tunnelPort: hubPort,
|
||||||
|
targetHost: '127.0.0.2',
|
||||||
|
});
|
||||||
|
|
||||||
|
await hub.updateAllowedEdges([
|
||||||
|
{ id: 'test-edge', secret: 'test-secret', listenPorts: [edgePort] },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const connectedPromise = new Promise<void>((resolve, reject) => {
|
||||||
|
const timeout = setTimeout(() => reject(new Error('Edge did not connect within 10s')), 10000);
|
||||||
|
edge.once('tunnelConnected', () => {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
await edge.start({
|
||||||
|
hubHost: '127.0.0.1',
|
||||||
|
hubPort,
|
||||||
|
edgeId: 'test-edge',
|
||||||
|
secret: 'test-secret',
|
||||||
|
bindAddress: '127.0.0.1',
|
||||||
|
});
|
||||||
|
|
||||||
|
await connectedPromise;
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 500));
|
||||||
|
|
||||||
|
return {
|
||||||
|
hub,
|
||||||
|
edge,
|
||||||
|
edgePort,
|
||||||
|
cleanup: async () => {
|
||||||
|
await edge.stop();
|
||||||
|
await hub.stop();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send data through the tunnel and collect the echoed response.
|
||||||
|
*/
|
||||||
|
function sendAndReceive(port: number, data: Buffer, timeoutMs = 30000): Promise<Buffer> {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const chunks: Buffer[] = [];
|
||||||
|
let totalReceived = 0;
|
||||||
|
const expectedLength = data.length;
|
||||||
|
let settled = false;
|
||||||
|
|
||||||
|
const client = net.createConnection({ host: '127.0.0.1', port }, () => {
|
||||||
|
client.write(data);
|
||||||
|
client.end();
|
||||||
|
});
|
||||||
|
|
||||||
|
const timer = setTimeout(() => {
|
||||||
|
if (!settled) {
|
||||||
|
settled = true;
|
||||||
|
client.destroy();
|
||||||
|
reject(new Error(`Timeout after ${timeoutMs}ms — received ${totalReceived}/${expectedLength} bytes`));
|
||||||
|
}
|
||||||
|
}, timeoutMs);
|
||||||
|
|
||||||
|
client.on('data', (chunk: Buffer) => {
|
||||||
|
chunks.push(chunk);
|
||||||
|
totalReceived += chunk.length;
|
||||||
|
if (totalReceived >= expectedLength && !settled) {
|
||||||
|
settled = true;
|
||||||
|
clearTimeout(timer);
|
||||||
|
client.destroy();
|
||||||
|
resolve(Buffer.concat(chunks));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
client.on('end', () => {
|
||||||
|
if (!settled) {
|
||||||
|
settled = true;
|
||||||
|
clearTimeout(timer);
|
||||||
|
resolve(Buffer.concat(chunks));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
client.on('error', (err) => {
|
||||||
|
if (!settled) {
|
||||||
|
settled = true;
|
||||||
|
clearTimeout(timer);
|
||||||
|
reject(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connect to the tunnel, send a small request, and collect a large response.
|
||||||
|
* Does NOT call end() — the tunnel has no half-close.
|
||||||
|
* Instead, collects until expectedResponseSize bytes arrive.
|
||||||
|
*/
|
||||||
|
function sendAndReceiveLarge(
|
||||||
|
port: number,
|
||||||
|
data: Buffer,
|
||||||
|
expectedResponseSize: number,
|
||||||
|
timeoutMs = 60000,
|
||||||
|
): Promise<Buffer> {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const chunks: Buffer[] = [];
|
||||||
|
let totalReceived = 0;
|
||||||
|
let settled = false;
|
||||||
|
|
||||||
|
const client = net.createConnection({ host: '127.0.0.1', port }, () => {
|
||||||
|
client.write(data);
|
||||||
|
// Do NOT call client.end() — the server will respond immediately
|
||||||
|
// and the tunnel CLOSE will happen when the download finishes
|
||||||
|
});
|
||||||
|
|
||||||
|
const timer = setTimeout(() => {
|
||||||
|
if (!settled) {
|
||||||
|
settled = true;
|
||||||
|
client.destroy();
|
||||||
|
reject(new Error(`Timeout after ${timeoutMs}ms — received ${totalReceived}/${expectedResponseSize} bytes`));
|
||||||
|
}
|
||||||
|
}, timeoutMs);
|
||||||
|
|
||||||
|
client.on('data', (chunk: Buffer) => {
|
||||||
|
chunks.push(chunk);
|
||||||
|
totalReceived += chunk.length;
|
||||||
|
if (totalReceived >= expectedResponseSize && !settled) {
|
||||||
|
settled = true;
|
||||||
|
clearTimeout(timer);
|
||||||
|
client.destroy();
|
||||||
|
resolve(Buffer.concat(chunks));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
client.on('end', () => {
|
||||||
|
if (!settled) {
|
||||||
|
settled = true;
|
||||||
|
clearTimeout(timer);
|
||||||
|
resolve(Buffer.concat(chunks));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
client.on('error', (err) => {
|
||||||
|
if (!settled) {
|
||||||
|
settled = true;
|
||||||
|
clearTimeout(timer);
|
||||||
|
reject(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function sha256(buf: Buffer): string {
|
||||||
|
return crypto.createHash('sha256').update(buf).digest('hex');
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Tests
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
let tunnel: TestTunnel;
|
||||||
|
let echoServer: TrackingServer;
|
||||||
|
let hubPort: number;
|
||||||
|
let edgePort: number;
|
||||||
|
|
||||||
|
tap.test('setup: start echo server and tunnel', async () => {
|
||||||
|
[hubPort, edgePort] = await findFreePorts(2);
|
||||||
|
|
||||||
|
echoServer = await startEchoServer(edgePort, '127.0.0.2');
|
||||||
|
tunnel = await startTunnel(edgePort, hubPort);
|
||||||
|
|
||||||
|
expect(tunnel.hub.running).toBeTrue();
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('single stream: 32MB transfer exceeding initial 4MB window', async () => {
|
||||||
|
const size = 32 * 1024 * 1024;
|
||||||
|
const data = crypto.randomBytes(size);
|
||||||
|
const expectedHash = sha256(data);
|
||||||
|
|
||||||
|
const received = await sendAndReceive(edgePort, data, 60000);
|
||||||
|
|
||||||
|
expect(received.length).toEqual(size);
|
||||||
|
expect(sha256(received)).toEqual(expectedHash);
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('200 concurrent streams with 64KB each', async () => {
|
||||||
|
const streamCount = 200;
|
||||||
|
const payloadSize = 64 * 1024;
|
||||||
|
|
||||||
|
const promises = Array.from({ length: streamCount }, () => {
|
||||||
|
const data = crypto.randomBytes(payloadSize);
|
||||||
|
const hash = sha256(data);
|
||||||
|
return sendAndReceive(edgePort, data, 30000).then((received) => ({
|
||||||
|
sent: hash,
|
||||||
|
received: sha256(received),
|
||||||
|
sizeOk: received.length === payloadSize,
|
||||||
|
}));
|
||||||
|
});
|
||||||
|
|
||||||
|
const results = await Promise.all(promises);
|
||||||
|
const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received);
|
||||||
|
|
||||||
|
expect(failures.length).toEqual(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('512 concurrent streams at minimum window boundary (16KB each)', async () => {
|
||||||
|
const streamCount = 512;
|
||||||
|
const payloadSize = 16 * 1024;
|
||||||
|
|
||||||
|
const promises = Array.from({ length: streamCount }, () => {
|
||||||
|
const data = crypto.randomBytes(payloadSize);
|
||||||
|
const hash = sha256(data);
|
||||||
|
return sendAndReceive(edgePort, data, 60000).then((received) => ({
|
||||||
|
sent: hash,
|
||||||
|
received: sha256(received),
|
||||||
|
sizeOk: received.length === payloadSize,
|
||||||
|
}));
|
||||||
|
});
|
||||||
|
|
||||||
|
const results = await Promise.all(promises);
|
||||||
|
const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received);
|
||||||
|
|
||||||
|
expect(failures.length).toEqual(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('asymmetric transfer: 4KB request -> 4MB response', async () => {
|
||||||
|
// Swap to large-response server
|
||||||
|
await forceCloseServer(echoServer);
|
||||||
|
const responseSize = 4 * 1024 * 1024; // 4 MB
|
||||||
|
const largeServer = await startLargeResponseServer(edgePort, '127.0.0.2', responseSize);
|
||||||
|
|
||||||
|
try {
|
||||||
|
const requestData = crypto.randomBytes(4 * 1024); // 4 KB
|
||||||
|
const received = await sendAndReceiveLarge(edgePort, requestData, responseSize, 60000);
|
||||||
|
expect(received.length).toEqual(responseSize);
|
||||||
|
} finally {
|
||||||
|
// Always restore echo server even on failure
|
||||||
|
await forceCloseServer(largeServer);
|
||||||
|
echoServer = await startEchoServer(edgePort, '127.0.0.2');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('100 streams x 1MB each (100MB total exceeding 32MB budget)', async () => {
|
||||||
|
const streamCount = 100;
|
||||||
|
const payloadSize = 1 * 1024 * 1024;
|
||||||
|
|
||||||
|
const promises = Array.from({ length: streamCount }, () => {
|
||||||
|
const data = crypto.randomBytes(payloadSize);
|
||||||
|
const hash = sha256(data);
|
||||||
|
return sendAndReceive(edgePort, data, 120000).then((received) => ({
|
||||||
|
sent: hash,
|
||||||
|
received: sha256(received),
|
||||||
|
sizeOk: received.length === payloadSize,
|
||||||
|
}));
|
||||||
|
});
|
||||||
|
|
||||||
|
const results = await Promise.all(promises);
|
||||||
|
const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received);
|
||||||
|
|
||||||
|
expect(failures.length).toEqual(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('active stream counter tracks concurrent connections', async () => {
|
||||||
|
const N = 50;
|
||||||
|
|
||||||
|
// Open N connections and keep them alive (send data but don't close)
|
||||||
|
const sockets: net.Socket[] = [];
|
||||||
|
const connectPromises = Array.from({ length: N }, () => {
|
||||||
|
return new Promise<net.Socket>((resolve, reject) => {
|
||||||
|
const sock = net.createConnection({ host: '127.0.0.1', port: edgePort }, () => {
|
||||||
|
resolve(sock);
|
||||||
|
});
|
||||||
|
sock.on('error', () => {});
|
||||||
|
setTimeout(() => reject(new Error('connect timeout')), 5000);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
const connected = await Promise.all(connectPromises);
|
||||||
|
sockets.push(...connected);
|
||||||
|
|
||||||
|
// Brief delay for stream registration to propagate
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 500));
|
||||||
|
|
||||||
|
// Verify the edge reports >= N active streams.
|
||||||
|
// This counter is the input to compute_window_for_stream_count(),
|
||||||
|
// so its accuracy determines whether adaptive window sizing is correct.
|
||||||
|
const status = await tunnel.edge.getStatus();
|
||||||
|
expect(status.activeStreams).toBeGreaterThanOrEqual(N);
|
||||||
|
|
||||||
|
// Clean up: destroy all sockets (the tunnel's 300s stream timeout will handle cleanup)
|
||||||
|
for (const sock of sockets) {
|
||||||
|
sock.destroy();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('50 streams x 2MB each (forces multiple window refills per stream)', async () => {
|
||||||
|
// At 50 concurrent streams: adaptive window = 32MB/50 = 655KB per stream
|
||||||
|
// Each stream sends 2MB → needs ~3 WINDOW_UPDATE refill cycles per stream
|
||||||
|
const streamCount = 50;
|
||||||
|
const payloadSize = 2 * 1024 * 1024;
|
||||||
|
|
||||||
|
const promises = Array.from({ length: streamCount }, () => {
|
||||||
|
const data = crypto.randomBytes(payloadSize);
|
||||||
|
const hash = sha256(data);
|
||||||
|
return sendAndReceive(edgePort, data, 120000).then((received) => ({
|
||||||
|
sent: hash,
|
||||||
|
received: sha256(received),
|
||||||
|
sizeOk: received.length === payloadSize,
|
||||||
|
}));
|
||||||
|
});
|
||||||
|
|
||||||
|
const results = await Promise.all(promises);
|
||||||
|
const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received);
|
||||||
|
|
||||||
|
expect(failures.length).toEqual(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('teardown: stop tunnel and echo server', async () => {
|
||||||
|
await tunnel.cleanup();
|
||||||
|
await forceCloseServer(echoServer);
|
||||||
|
});
|
||||||
|
|
||||||
|
export default tap.start();
|
||||||
@@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: '@serve.zone/remoteingress',
|
name: '@serve.zone/remoteingress',
|
||||||
version: '4.5.10',
|
version: '4.7.2',
|
||||||
description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.'
|
description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.'
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ type TEdgeCommands = {
|
|||||||
hubPort: number;
|
hubPort: number;
|
||||||
edgeId: string;
|
edgeId: string;
|
||||||
secret: string;
|
secret: string;
|
||||||
|
bindAddress?: string;
|
||||||
};
|
};
|
||||||
result: { started: boolean };
|
result: { started: boolean };
|
||||||
};
|
};
|
||||||
@@ -38,6 +39,7 @@ export interface IEdgeConfig {
|
|||||||
hubPort?: number;
|
hubPort?: number;
|
||||||
edgeId: string;
|
edgeId: string;
|
||||||
secret: string;
|
secret: string;
|
||||||
|
bindAddress?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
const MAX_RESTART_ATTEMPTS = 10;
|
const MAX_RESTART_ATTEMPTS = 10;
|
||||||
@@ -132,6 +134,7 @@ export class RemoteIngressEdge extends EventEmitter {
|
|||||||
hubPort: edgeConfig.hubPort ?? 8443,
|
hubPort: edgeConfig.hubPort ?? 8443,
|
||||||
edgeId: edgeConfig.edgeId,
|
edgeId: edgeConfig.edgeId,
|
||||||
secret: edgeConfig.secret,
|
secret: edgeConfig.secret,
|
||||||
|
...(edgeConfig.bindAddress ? { bindAddress: edgeConfig.bindAddress } : {}),
|
||||||
});
|
});
|
||||||
|
|
||||||
this.started = true;
|
this.started = true;
|
||||||
@@ -227,6 +230,7 @@ export class RemoteIngressEdge extends EventEmitter {
|
|||||||
hubPort: this.savedConfig.hubPort ?? 8443,
|
hubPort: this.savedConfig.hubPort ?? 8443,
|
||||||
edgeId: this.savedConfig.edgeId,
|
edgeId: this.savedConfig.edgeId,
|
||||||
secret: this.savedConfig.secret,
|
secret: this.savedConfig.secret,
|
||||||
|
...(this.savedConfig.bindAddress ? { bindAddress: this.savedConfig.bindAddress } : {}),
|
||||||
});
|
});
|
||||||
|
|
||||||
this.started = true;
|
this.started = true;
|
||||||
|
|||||||
Reference in New Issue
Block a user