Compare commits

...

4 Commits

5 changed files with 64 additions and 15 deletions

View File

@@ -1,5 +1,18 @@
# Changelog
## 2026-03-16 - 4.6.1 - fix(remoteingress-core)
avoid spurious tunnel disconnect events and increase control channel capacity
- Emit TunnelDisconnected only after an established connection is actually lost, preventing false disconnect events during failed reconnect attempts.
- Increase edge and hub control-channel buffer sizes from 64 to 256 to better prioritize control frames under load.
## 2026-03-16 - 4.6.0 - feat(remoteingress-core)
add adaptive per-stream flow control based on active stream counts
- Track active stream counts on edge and hub connections to size per-stream flow control windows dynamically.
- Cap WINDOW_UPDATE increments and read sizes to the adaptive window so bandwidth is shared more evenly across concurrent streams.
- Apply the adaptive logic to both upload and download paths on edge and hub stream handlers.
## 2026-03-16 - 4.5.12 - fix(remoteingress-core)
improve tunnel liveness handling and enable TCP keepalive for accepted client sockets

View File

@@ -1,6 +1,6 @@
{
"name": "@serve.zone/remoteingress",
"version": "4.5.12",
"version": "4.6.1",
"private": false,
"description": "Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.",
"main": "dist_ts/index.js",

View File

@@ -232,7 +232,11 @@ async fn edge_main_loop(
}
*connected.write().await = false;
let _ = event_tx.try_send(EdgeEvent::TunnelDisconnected);
// Only emit disconnect event on actual disconnection, not on failed reconnects.
// Failed reconnects never reach line 335 (handshake success), so was_connected is false.
if was_connected {
let _ = event_tx.try_send(EdgeEvent::TunnelDisconnected);
}
active_streams.store(0, Ordering::Relaxed);
// Reset stream ID counter for next connection cycle
next_stream_id.store(1, Ordering::Relaxed);
@@ -375,7 +379,7 @@ async fn connect_to_hub_and_run(
// QoS dual-channel tunnel writer: control frames (PONG/WINDOW_UPDATE/CLOSE/OPEN)
// have priority over data frames (DATA). Prevents PING starvation under load.
let (tunnel_ctrl_tx, mut tunnel_ctrl_rx) = mpsc::channel::<Vec<u8>>(64);
let (tunnel_ctrl_tx, mut tunnel_ctrl_rx) = mpsc::channel::<Vec<u8>>(256);
let (tunnel_data_tx, mut tunnel_data_rx) = mpsc::channel::<Vec<u8>>(4096);
// Legacy alias — control channel for PONG, CLOSE, WINDOW_UPDATE, OPEN
let tunnel_writer_tx = tunnel_ctrl_tx.clone();
@@ -620,6 +624,7 @@ fn apply_port_config(
tunnel_data_tx,
client_writers,
client_token,
Arc::clone(&active_streams),
)
.await;
active_streams.fetch_sub(1, Ordering::Relaxed);
@@ -651,6 +656,7 @@ async fn handle_client_connection(
tunnel_data_tx: mpsc::Sender<Vec<u8>>,
client_writers: Arc<Mutex<HashMap<u32, EdgeStreamState>>>,
client_token: CancellationToken,
active_streams: Arc<AtomicU32>,
) {
let client_ip = client_addr.ip().to_string();
let client_port = client_addr.port();
@@ -684,6 +690,7 @@ async fn handle_client_connection(
// After writing to client TCP, send WINDOW_UPDATE to hub so it can send more
let hub_to_client_token = client_token.clone();
let wu_tx = tunnel_ctrl_tx.clone();
let active_streams_h2c = Arc::clone(&active_streams);
let mut hub_to_client = tokio::spawn(async move {
let mut consumed_since_update: u32 = 0;
loop {
@@ -695,12 +702,20 @@ async fn handle_client_connection(
if client_write.write_all(&data).await.is_err() {
break;
}
// Track consumption for flow control
// Track consumption for adaptive flow control.
// The increment is capped to the adaptive window so the sender's
// effective window shrinks to match current demand (fewer streams
// = larger window, more streams = smaller window per stream).
consumed_since_update += len;
if consumed_since_update >= WINDOW_UPDATE_THRESHOLD {
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, consumed_since_update);
let adaptive_window = remoteingress_protocol::compute_window_for_stream_count(
active_streams_h2c.load(Ordering::Relaxed),
);
let threshold = adaptive_window / 2;
if consumed_since_update >= threshold {
let increment = consumed_since_update.min(adaptive_window);
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, increment);
if wu_tx.try_send(frame).is_ok() {
consumed_since_update = 0;
consumed_since_update -= increment;
}
// If try_send fails, keep accumulating — retry on next threshold
}
@@ -746,7 +761,11 @@ async fn handle_client_connection(
log::warn!("Stream {} upload: window still 0 after stall timeout, closing", stream_id);
break;
}
let max_read = w.min(buf.len());
// Adaptive: cap read to current per-stream target window
let adaptive_cap = remoteingress_protocol::compute_window_for_stream_count(
active_streams.load(Ordering::Relaxed),
) as usize;
let max_read = w.min(buf.len()).min(adaptive_cap);
tokio::select! {
read_result = client_read.read(&mut buf[..max_read]) => {

View File

@@ -373,9 +373,12 @@ async fn handle_edge_connection(
);
}
// Per-edge active stream counter for adaptive flow control
let edge_stream_count = Arc::new(AtomicU32::new(0));
// QoS dual-channel tunnel writer: control frames (PING/PONG/WINDOW_UPDATE/CLOSE)
// have priority over data frames (DATA_BACK). This prevents PING starvation under load.
let (ctrl_tx, mut ctrl_rx) = mpsc::channel::<Vec<u8>>(64);
let (ctrl_tx, mut ctrl_rx) = mpsc::channel::<Vec<u8>>(256);
let (data_tx, mut data_rx) = mpsc::channel::<Vec<u8>>(4096);
// Legacy alias for code that sends both control and data (will be migrated)
let frame_writer_tx = ctrl_tx.clone();
@@ -509,8 +512,10 @@ async fn handle_edge_connection(
}
// Spawn task: connect to SmartProxy, send PROXY header, pipe data
let stream_counter = Arc::clone(&edge_stream_count);
tokio::spawn(async move {
let _permit = permit; // hold semaphore permit until stream completes
stream_counter.fetch_add(1, Ordering::Relaxed);
let result = async {
// A2: Connect to SmartProxy with timeout
@@ -533,6 +538,7 @@ async fn handle_edge_connection(
// After writing to upstream, send WINDOW_UPDATE_BACK to edge
let writer_token = stream_token.clone();
let wub_tx = writer_tx.clone();
let stream_counter_w = Arc::clone(&stream_counter);
let writer_for_edge_data = tokio::spawn(async move {
let mut consumed_since_update: u32 = 0;
loop {
@@ -558,12 +564,18 @@ async fn handle_edge_connection(
break;
}
}
// Track consumption for flow control
// Track consumption for adaptive flow control.
// Increment capped to adaptive window to limit per-stream in-flight data.
consumed_since_update += len;
if consumed_since_update >= WINDOW_UPDATE_THRESHOLD {
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, consumed_since_update);
let adaptive_window = remoteingress_protocol::compute_window_for_stream_count(
stream_counter_w.load(Ordering::Relaxed),
);
let threshold = adaptive_window / 2;
if consumed_since_update >= threshold {
let increment = consumed_since_update.min(adaptive_window);
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, increment);
if wub_tx.try_send(frame).is_ok() {
consumed_since_update = 0;
consumed_since_update -= increment;
}
// If try_send fails, keep accumulating — retry on next threshold
}
@@ -610,7 +622,11 @@ async fn handle_edge_connection(
log::warn!("Stream {} download: window still 0 after stall timeout, closing", stream_id);
break;
}
let max_read = w.min(buf.len());
// Adaptive: cap read to current per-stream target window
let adaptive_cap = remoteingress_protocol::compute_window_for_stream_count(
stream_counter.load(Ordering::Relaxed),
) as usize;
let max_read = w.min(buf.len()).min(adaptive_cap);
tokio::select! {
read_result = up_read.read(&mut buf[..max_read]) => {
@@ -665,6 +681,7 @@ async fn handle_edge_connection(
stream_id,
});
}
stream_counter.fetch_sub(1, Ordering::Relaxed);
});
}
FRAME_DATA => {

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@serve.zone/remoteingress',
version: '4.5.12',
version: '4.6.1',
description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.'
}