fix(remoteingress-core): prevent stream data loss by applying backpressure and closing saturated channels
This commit is contained in:
@@ -1,5 +1,11 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-03-15 - 4.4.1 - fix(remoteingress-core)
|
||||
prevent stream data loss by applying backpressure and closing saturated channels
|
||||
|
||||
- replace non-blocking frame writes with awaited sends in per-stream tasks so large transfers respect backpressure instead of dropping data
|
||||
- close and remove streams when back-channel or data channels fill up to avoid TCP stream corruption from silently dropped frames
|
||||
|
||||
## 2026-03-03 - 4.4.0 - feat(remoteingress)
|
||||
add heartbeat PING/PONG and liveness timeouts; implement fast-reconnect/backoff reset and JS crash-recovery auto-restart
|
||||
|
||||
|
||||
@@ -407,11 +407,14 @@ async fn connect_to_hub_and_run(
|
||||
|
||||
match frame.frame_type {
|
||||
FRAME_DATA_BACK => {
|
||||
// A1: Non-blocking send to prevent head-of-line blocking
|
||||
let writers = client_writers.lock().await;
|
||||
// Non-blocking send to prevent head-of-line blocking in the main dispatch loop.
|
||||
// If the per-stream channel is full, close the stream rather than silently
|
||||
// dropping data (which would corrupt the TCP stream).
|
||||
let mut writers = client_writers.lock().await;
|
||||
if let Some(tx) = writers.get(&frame.stream_id) {
|
||||
if tx.try_send(frame.payload).is_err() {
|
||||
log::warn!("Stream {} back-channel full, dropping frame", frame.stream_id);
|
||||
log::warn!("Stream {} back-channel full, closing stream to prevent data corruption", frame.stream_id);
|
||||
writers.remove(&frame.stream_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -635,9 +638,11 @@ async fn handle_client_connection(
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
let data_frame = encode_frame(stream_id, FRAME_DATA, &buf[..n]);
|
||||
// A5: Use try_send to avoid blocking if writer channel is full
|
||||
if tunnel_writer_tx.try_send(data_frame).is_err() {
|
||||
log::warn!("Stream {} tunnel writer full, closing", stream_id);
|
||||
// Use send().await for backpressure — this is a per-stream task so
|
||||
// blocking only stalls this stream, not others. Prevents data loss
|
||||
// for large transfers (e.g. 352MB Docker layers).
|
||||
if tunnel_writer_tx.send(data_frame).await.is_err() {
|
||||
log::warn!("Stream {} tunnel writer closed, closing", stream_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -520,9 +520,11 @@ async fn handle_edge_connection(
|
||||
Ok(n) => {
|
||||
let frame =
|
||||
encode_frame(stream_id, FRAME_DATA_BACK, &buf[..n]);
|
||||
// A5: Use try_send to avoid blocking if writer channel is full
|
||||
if writer_tx.try_send(frame).is_err() {
|
||||
log::warn!("Stream {} writer channel full, closing", stream_id);
|
||||
// Use send().await for backpressure — this is a per-stream task so
|
||||
// blocking only stalls this stream, not others. Prevents data loss
|
||||
// for large transfers (e.g. 352MB Docker layers).
|
||||
if writer_tx.send(frame).await.is_err() {
|
||||
log::warn!("Stream {} writer channel closed, closing", stream_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -567,11 +569,16 @@ async fn handle_edge_connection(
|
||||
});
|
||||
}
|
||||
FRAME_DATA => {
|
||||
// A1: Non-blocking send to prevent head-of-line blocking
|
||||
let s = streams.lock().await;
|
||||
// Non-blocking send to prevent head-of-line blocking in the main dispatch loop.
|
||||
// If the per-stream channel is full, close the stream rather than silently
|
||||
// dropping data (which would corrupt the TCP stream).
|
||||
let mut s = streams.lock().await;
|
||||
if let Some((tx, _)) = s.get(&frame.stream_id) {
|
||||
if tx.try_send(frame.payload).is_err() {
|
||||
log::warn!("Stream {} data channel full, dropping frame", frame.stream_id);
|
||||
log::warn!("Stream {} data channel full, closing stream to prevent data corruption", frame.stream_id);
|
||||
if let Some((_, token)) = s.remove(&frame.stream_id) {
|
||||
token.cancel();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@serve.zone/remoteingress',
|
||||
version: '4.4.0',
|
||||
version: '4.4.1',
|
||||
description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.'
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user