Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 982f648928 | |||
| 3a2a060a85 | |||
| e0c469147e | |||
| 0fdcdf566e |
13
changelog.md
13
changelog.md
@@ -1,5 +1,18 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-03-16 - 4.6.1 - fix(remoteingress-core)
|
||||
avoid spurious tunnel disconnect events and increase control channel capacity
|
||||
|
||||
- Emit TunnelDisconnected only after an established connection is actually lost, preventing false disconnect events during failed reconnect attempts.
|
||||
- Increase edge and hub control-channel buffer sizes from 64 to 256 to better prioritize control frames under load.
|
||||
|
||||
## 2026-03-16 - 4.6.0 - feat(remoteingress-core)
|
||||
add adaptive per-stream flow control based on active stream counts
|
||||
|
||||
- Track active stream counts on edge and hub connections to size per-stream flow control windows dynamically.
|
||||
- Cap WINDOW_UPDATE increments and read sizes to the adaptive window so bandwidth is shared more evenly across concurrent streams.
|
||||
- Apply the adaptive logic to both upload and download paths on edge and hub stream handlers.
|
||||
|
||||
## 2026-03-16 - 4.5.12 - fix(remoteingress-core)
|
||||
improve tunnel liveness handling and enable TCP keepalive for accepted client sockets
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@serve.zone/remoteingress",
|
||||
"version": "4.5.12",
|
||||
"version": "4.6.1",
|
||||
"private": false,
|
||||
"description": "Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.",
|
||||
"main": "dist_ts/index.js",
|
||||
|
||||
@@ -232,7 +232,11 @@ async fn edge_main_loop(
|
||||
}
|
||||
|
||||
*connected.write().await = false;
|
||||
let _ = event_tx.try_send(EdgeEvent::TunnelDisconnected);
|
||||
// Only emit disconnect event on actual disconnection, not on failed reconnects.
|
||||
// Failed reconnects never reach line 335 (handshake success), so was_connected is false.
|
||||
if was_connected {
|
||||
let _ = event_tx.try_send(EdgeEvent::TunnelDisconnected);
|
||||
}
|
||||
active_streams.store(0, Ordering::Relaxed);
|
||||
// Reset stream ID counter for next connection cycle
|
||||
next_stream_id.store(1, Ordering::Relaxed);
|
||||
@@ -375,7 +379,7 @@ async fn connect_to_hub_and_run(
|
||||
|
||||
// QoS dual-channel tunnel writer: control frames (PONG/WINDOW_UPDATE/CLOSE/OPEN)
|
||||
// have priority over data frames (DATA). Prevents PING starvation under load.
|
||||
let (tunnel_ctrl_tx, mut tunnel_ctrl_rx) = mpsc::channel::<Vec<u8>>(64);
|
||||
let (tunnel_ctrl_tx, mut tunnel_ctrl_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||
let (tunnel_data_tx, mut tunnel_data_rx) = mpsc::channel::<Vec<u8>>(4096);
|
||||
// Legacy alias — control channel for PONG, CLOSE, WINDOW_UPDATE, OPEN
|
||||
let tunnel_writer_tx = tunnel_ctrl_tx.clone();
|
||||
@@ -620,6 +624,7 @@ fn apply_port_config(
|
||||
tunnel_data_tx,
|
||||
client_writers,
|
||||
client_token,
|
||||
Arc::clone(&active_streams),
|
||||
)
|
||||
.await;
|
||||
active_streams.fetch_sub(1, Ordering::Relaxed);
|
||||
@@ -651,6 +656,7 @@ async fn handle_client_connection(
|
||||
tunnel_data_tx: mpsc::Sender<Vec<u8>>,
|
||||
client_writers: Arc<Mutex<HashMap<u32, EdgeStreamState>>>,
|
||||
client_token: CancellationToken,
|
||||
active_streams: Arc<AtomicU32>,
|
||||
) {
|
||||
let client_ip = client_addr.ip().to_string();
|
||||
let client_port = client_addr.port();
|
||||
@@ -684,6 +690,7 @@ async fn handle_client_connection(
|
||||
// After writing to client TCP, send WINDOW_UPDATE to hub so it can send more
|
||||
let hub_to_client_token = client_token.clone();
|
||||
let wu_tx = tunnel_ctrl_tx.clone();
|
||||
let active_streams_h2c = Arc::clone(&active_streams);
|
||||
let mut hub_to_client = tokio::spawn(async move {
|
||||
let mut consumed_since_update: u32 = 0;
|
||||
loop {
|
||||
@@ -695,12 +702,20 @@ async fn handle_client_connection(
|
||||
if client_write.write_all(&data).await.is_err() {
|
||||
break;
|
||||
}
|
||||
// Track consumption for flow control
|
||||
// Track consumption for adaptive flow control.
|
||||
// The increment is capped to the adaptive window so the sender's
|
||||
// effective window shrinks to match current demand (fewer streams
|
||||
// = larger window, more streams = smaller window per stream).
|
||||
consumed_since_update += len;
|
||||
if consumed_since_update >= WINDOW_UPDATE_THRESHOLD {
|
||||
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, consumed_since_update);
|
||||
let adaptive_window = remoteingress_protocol::compute_window_for_stream_count(
|
||||
active_streams_h2c.load(Ordering::Relaxed),
|
||||
);
|
||||
let threshold = adaptive_window / 2;
|
||||
if consumed_since_update >= threshold {
|
||||
let increment = consumed_since_update.min(adaptive_window);
|
||||
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, increment);
|
||||
if wu_tx.try_send(frame).is_ok() {
|
||||
consumed_since_update = 0;
|
||||
consumed_since_update -= increment;
|
||||
}
|
||||
// If try_send fails, keep accumulating — retry on next threshold
|
||||
}
|
||||
@@ -746,7 +761,11 @@ async fn handle_client_connection(
|
||||
log::warn!("Stream {} upload: window still 0 after stall timeout, closing", stream_id);
|
||||
break;
|
||||
}
|
||||
let max_read = w.min(buf.len());
|
||||
// Adaptive: cap read to current per-stream target window
|
||||
let adaptive_cap = remoteingress_protocol::compute_window_for_stream_count(
|
||||
active_streams.load(Ordering::Relaxed),
|
||||
) as usize;
|
||||
let max_read = w.min(buf.len()).min(adaptive_cap);
|
||||
|
||||
tokio::select! {
|
||||
read_result = client_read.read(&mut buf[..max_read]) => {
|
||||
|
||||
@@ -373,9 +373,12 @@ async fn handle_edge_connection(
|
||||
);
|
||||
}
|
||||
|
||||
// Per-edge active stream counter for adaptive flow control
|
||||
let edge_stream_count = Arc::new(AtomicU32::new(0));
|
||||
|
||||
// QoS dual-channel tunnel writer: control frames (PING/PONG/WINDOW_UPDATE/CLOSE)
|
||||
// have priority over data frames (DATA_BACK). This prevents PING starvation under load.
|
||||
let (ctrl_tx, mut ctrl_rx) = mpsc::channel::<Vec<u8>>(64);
|
||||
let (ctrl_tx, mut ctrl_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||
let (data_tx, mut data_rx) = mpsc::channel::<Vec<u8>>(4096);
|
||||
// Legacy alias for code that sends both control and data (will be migrated)
|
||||
let frame_writer_tx = ctrl_tx.clone();
|
||||
@@ -509,8 +512,10 @@ async fn handle_edge_connection(
|
||||
}
|
||||
|
||||
// Spawn task: connect to SmartProxy, send PROXY header, pipe data
|
||||
let stream_counter = Arc::clone(&edge_stream_count);
|
||||
tokio::spawn(async move {
|
||||
let _permit = permit; // hold semaphore permit until stream completes
|
||||
stream_counter.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let result = async {
|
||||
// A2: Connect to SmartProxy with timeout
|
||||
@@ -533,6 +538,7 @@ async fn handle_edge_connection(
|
||||
// After writing to upstream, send WINDOW_UPDATE_BACK to edge
|
||||
let writer_token = stream_token.clone();
|
||||
let wub_tx = writer_tx.clone();
|
||||
let stream_counter_w = Arc::clone(&stream_counter);
|
||||
let writer_for_edge_data = tokio::spawn(async move {
|
||||
let mut consumed_since_update: u32 = 0;
|
||||
loop {
|
||||
@@ -558,12 +564,18 @@ async fn handle_edge_connection(
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Track consumption for flow control
|
||||
// Track consumption for adaptive flow control.
|
||||
// Increment capped to adaptive window to limit per-stream in-flight data.
|
||||
consumed_since_update += len;
|
||||
if consumed_since_update >= WINDOW_UPDATE_THRESHOLD {
|
||||
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, consumed_since_update);
|
||||
let adaptive_window = remoteingress_protocol::compute_window_for_stream_count(
|
||||
stream_counter_w.load(Ordering::Relaxed),
|
||||
);
|
||||
let threshold = adaptive_window / 2;
|
||||
if consumed_since_update >= threshold {
|
||||
let increment = consumed_since_update.min(adaptive_window);
|
||||
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, increment);
|
||||
if wub_tx.try_send(frame).is_ok() {
|
||||
consumed_since_update = 0;
|
||||
consumed_since_update -= increment;
|
||||
}
|
||||
// If try_send fails, keep accumulating — retry on next threshold
|
||||
}
|
||||
@@ -610,7 +622,11 @@ async fn handle_edge_connection(
|
||||
log::warn!("Stream {} download: window still 0 after stall timeout, closing", stream_id);
|
||||
break;
|
||||
}
|
||||
let max_read = w.min(buf.len());
|
||||
// Adaptive: cap read to current per-stream target window
|
||||
let adaptive_cap = remoteingress_protocol::compute_window_for_stream_count(
|
||||
stream_counter.load(Ordering::Relaxed),
|
||||
) as usize;
|
||||
let max_read = w.min(buf.len()).min(adaptive_cap);
|
||||
|
||||
tokio::select! {
|
||||
read_result = up_read.read(&mut buf[..max_read]) => {
|
||||
@@ -665,6 +681,7 @@ async fn handle_edge_connection(
|
||||
stream_id,
|
||||
});
|
||||
}
|
||||
stream_counter.fetch_sub(1, Ordering::Relaxed);
|
||||
});
|
||||
}
|
||||
FRAME_DATA => {
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@serve.zone/remoteingress',
|
||||
version: '4.5.12',
|
||||
version: '4.6.1',
|
||||
description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.'
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user