Compare commits

...

2 Commits

5 changed files with 32 additions and 14 deletions

View File

@@ -1,5 +1,11 @@
# Changelog
## 2026-03-15 - 4.4.1 - fix(remoteingress-core)
prevent stream data loss by applying backpressure and closing saturated channels
- replace non-blocking frame writes with awaited sends in per-stream tasks so large transfers respect backpressure instead of dropping data
- close and remove streams when back-channel or data channels fill up to avoid TCP stream corruption from silently dropped frames
## 2026-03-03 - 4.4.0 - feat(remoteingress)
add heartbeat PING/PONG and liveness timeouts; implement fast-reconnect/backoff reset and JS crash-recovery auto-restart

View File

@@ -1,6 +1,6 @@
{
"name": "@serve.zone/remoteingress",
"version": "4.4.0",
"version": "4.4.1",
"private": false,
"description": "Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.",
"main": "dist_ts/index.js",

View File

@@ -407,11 +407,14 @@ async fn connect_to_hub_and_run(
match frame.frame_type {
FRAME_DATA_BACK => {
// A1: Non-blocking send to prevent head-of-line blocking
let writers = client_writers.lock().await;
// Non-blocking send to prevent head-of-line blocking in the main dispatch loop.
// If the per-stream channel is full, close the stream rather than silently
// dropping data (which would corrupt the TCP stream).
let mut writers = client_writers.lock().await;
if let Some(tx) = writers.get(&frame.stream_id) {
if tx.try_send(frame.payload).is_err() {
log::warn!("Stream {} back-channel full, dropping frame", frame.stream_id);
log::warn!("Stream {} back-channel full, closing stream to prevent data corruption", frame.stream_id);
writers.remove(&frame.stream_id);
}
}
}
@@ -635,9 +638,11 @@ async fn handle_client_connection(
Ok(0) => break,
Ok(n) => {
let data_frame = encode_frame(stream_id, FRAME_DATA, &buf[..n]);
// A5: Use try_send to avoid blocking if writer channel is full
if tunnel_writer_tx.try_send(data_frame).is_err() {
log::warn!("Stream {} tunnel writer full, closing", stream_id);
// Use send().await for backpressure — this is a per-stream task so
// blocking only stalls this stream, not others. Prevents data loss
// for large transfers (e.g. 352MB Docker layers).
if tunnel_writer_tx.send(data_frame).await.is_err() {
log::warn!("Stream {} tunnel writer closed, closing", stream_id);
break;
}
}

View File

@@ -520,9 +520,11 @@ async fn handle_edge_connection(
Ok(n) => {
let frame =
encode_frame(stream_id, FRAME_DATA_BACK, &buf[..n]);
// A5: Use try_send to avoid blocking if writer channel is full
if writer_tx.try_send(frame).is_err() {
log::warn!("Stream {} writer channel full, closing", stream_id);
// Use send().await for backpressure — this is a per-stream task so
// blocking only stalls this stream, not others. Prevents data loss
// for large transfers (e.g. 352MB Docker layers).
if writer_tx.send(frame).await.is_err() {
log::warn!("Stream {} writer channel closed, closing", stream_id);
break;
}
}
@@ -567,11 +569,16 @@ async fn handle_edge_connection(
});
}
FRAME_DATA => {
// A1: Non-blocking send to prevent head-of-line blocking
let s = streams.lock().await;
// Non-blocking send to prevent head-of-line blocking in the main dispatch loop.
// If the per-stream channel is full, close the stream rather than silently
// dropping data (which would corrupt the TCP stream).
let mut s = streams.lock().await;
if let Some((tx, _)) = s.get(&frame.stream_id) {
if tx.try_send(frame.payload).is_err() {
log::warn!("Stream {} data channel full, dropping frame", frame.stream_id);
log::warn!("Stream {} data channel full, closing stream to prevent data corruption", frame.stream_id);
if let Some((_, token)) = s.remove(&frame.stream_id) {
token.cancel();
}
}
}
}

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@serve.zone/remoteingress',
version: '4.4.0',
version: '4.4.1',
description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.'
}