Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a63247af3e | |||
| 28a0c769d9 |
@@ -1,5 +1,12 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 2026-03-18 - 4.8.19 - fix(remoteingress-protocol)
|
||||||
|
reduce per-stream flow control windows and increase control channel buffering
|
||||||
|
|
||||||
|
- Lower the initial and maximum per-stream window from 16MB to 4MB and scale adaptive windows against a 200MB total budget with a 1MB minimum.
|
||||||
|
- Increase edge and hub control frame channel capacity from 256 to 512 to better handle prioritized control traffic.
|
||||||
|
- Update flow-control tests and comments to reflect the new window sizing and budget behavior.
|
||||||
|
|
||||||
## 2026-03-17 - 4.8.18 - fix(rust-protocol)
|
## 2026-03-17 - 4.8.18 - fix(rust-protocol)
|
||||||
switch tunnel frame buffers from Vec<u8> to Bytes to reduce copying and memory overhead
|
switch tunnel frame buffers from Vec<u8> to Bytes to reduce copying and memory overhead
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@serve.zone/remoteingress",
|
"name": "@serve.zone/remoteingress",
|
||||||
"version": "4.8.18",
|
"version": "4.8.19",
|
||||||
"private": false,
|
"private": false,
|
||||||
"description": "Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.",
|
"description": "Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.",
|
||||||
"main": "dist_ts/index.js",
|
"main": "dist_ts/index.js",
|
||||||
|
|||||||
@@ -497,7 +497,7 @@ async fn connect_to_hub_and_run(
|
|||||||
|
|
||||||
// QoS dual-channel: ctrl frames have priority over data frames.
|
// QoS dual-channel: ctrl frames have priority over data frames.
|
||||||
// Stream handlers send through these channels → TunnelIo drains them.
|
// Stream handlers send through these channels → TunnelIo drains them.
|
||||||
let (tunnel_ctrl_tx, mut tunnel_ctrl_rx) = mpsc::channel::<Bytes>(256);
|
let (tunnel_ctrl_tx, mut tunnel_ctrl_rx) = mpsc::channel::<Bytes>(512);
|
||||||
let (tunnel_data_tx, mut tunnel_data_rx) = mpsc::channel::<Bytes>(4096);
|
let (tunnel_data_tx, mut tunnel_data_rx) = mpsc::channel::<Bytes>(4096);
|
||||||
let tunnel_writer_tx = tunnel_ctrl_tx.clone();
|
let tunnel_writer_tx = tunnel_ctrl_tx.clone();
|
||||||
|
|
||||||
@@ -756,7 +756,7 @@ async fn handle_client_connection(
|
|||||||
// streams due to channel overflow — backpressure slows streams, never kills them.
|
// streams due to channel overflow — backpressure slows streams, never kills them.
|
||||||
let (back_tx, mut back_rx) = mpsc::unbounded_channel::<Bytes>();
|
let (back_tx, mut back_rx) = mpsc::unbounded_channel::<Bytes>();
|
||||||
// Adaptive initial window: scale with current stream count to keep total in-flight
|
// Adaptive initial window: scale with current stream count to keep total in-flight
|
||||||
// data within the 32MB budget. Prevents burst flooding when many streams open.
|
// data within the 200MB budget. Prevents burst flooding when many streams open.
|
||||||
let initial_window = remoteingress_protocol::compute_window_for_stream_count(
|
let initial_window = remoteingress_protocol::compute_window_for_stream_count(
|
||||||
active_streams.load(Ordering::Relaxed),
|
active_streams.load(Ordering::Relaxed),
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -349,7 +349,7 @@ async fn handle_hub_frame(
|
|||||||
// Create channel for data from edge to this stream
|
// Create channel for data from edge to this stream
|
||||||
let (stream_data_tx, mut stream_data_rx) = mpsc::unbounded_channel::<Bytes>();
|
let (stream_data_tx, mut stream_data_rx) = mpsc::unbounded_channel::<Bytes>();
|
||||||
// Adaptive initial window: scale with current stream count
|
// Adaptive initial window: scale with current stream count
|
||||||
// to keep total in-flight data within the 32MB budget.
|
// to keep total in-flight data within the 200MB budget.
|
||||||
let initial_window = compute_window_for_stream_count(
|
let initial_window = compute_window_for_stream_count(
|
||||||
edge_stream_count.load(Ordering::Relaxed),
|
edge_stream_count.load(Ordering::Relaxed),
|
||||||
);
|
);
|
||||||
@@ -708,7 +708,7 @@ async fn handle_edge_connection(
|
|||||||
|
|
||||||
// QoS dual-channel: ctrl frames have priority over data frames.
|
// QoS dual-channel: ctrl frames have priority over data frames.
|
||||||
// Stream handlers send through these channels -> TunnelIo drains them.
|
// Stream handlers send through these channels -> TunnelIo drains them.
|
||||||
let (ctrl_tx, mut ctrl_rx) = mpsc::channel::<Bytes>(256);
|
let (ctrl_tx, mut ctrl_rx) = mpsc::channel::<Bytes>(512);
|
||||||
let (data_tx, mut data_rx) = mpsc::channel::<Bytes>(4096);
|
let (data_tx, mut data_rx) = mpsc::channel::<Bytes>(4096);
|
||||||
|
|
||||||
// Spawn task to forward config updates as FRAME_CONFIG frames
|
// Spawn task to forward config updates as FRAME_CONFIG frames
|
||||||
|
|||||||
@@ -24,12 +24,12 @@ pub const FRAME_HEADER_SIZE: usize = 9;
|
|||||||
pub const MAX_PAYLOAD_SIZE: u32 = 16 * 1024 * 1024;
|
pub const MAX_PAYLOAD_SIZE: u32 = 16 * 1024 * 1024;
|
||||||
|
|
||||||
// Per-stream flow control constants
|
// Per-stream flow control constants
|
||||||
/// Initial (and maximum) per-stream window size (16 MB).
|
/// Initial (and maximum) per-stream window size (4 MB).
|
||||||
pub const INITIAL_STREAM_WINDOW: u32 = 16 * 1024 * 1024;
|
pub const INITIAL_STREAM_WINDOW: u32 = 4 * 1024 * 1024;
|
||||||
/// Send WINDOW_UPDATE after consuming this many bytes (half the initial window).
|
/// Send WINDOW_UPDATE after consuming this many bytes (half the initial window).
|
||||||
pub const WINDOW_UPDATE_THRESHOLD: u32 = INITIAL_STREAM_WINDOW / 2;
|
pub const WINDOW_UPDATE_THRESHOLD: u32 = INITIAL_STREAM_WINDOW / 2;
|
||||||
/// Maximum window size to prevent overflow.
|
/// Maximum window size to prevent overflow.
|
||||||
pub const MAX_WINDOW_SIZE: u32 = 16 * 1024 * 1024;
|
pub const MAX_WINDOW_SIZE: u32 = 4 * 1024 * 1024;
|
||||||
|
|
||||||
/// Encode a WINDOW_UPDATE frame for a specific stream.
|
/// Encode a WINDOW_UPDATE frame for a specific stream.
|
||||||
pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> Bytes {
|
pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> Bytes {
|
||||||
@@ -37,11 +37,11 @@ pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> B
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Compute the target per-stream window size based on the number of active streams.
|
/// Compute the target per-stream window size based on the number of active streams.
|
||||||
/// Total memory budget is ~800MB shared across all streams. Up to 50 streams get the
|
/// Total memory budget is ~200MB shared across all streams. Up to 50 streams get the
|
||||||
/// full 16MB window; above that the window scales down to a 4MB floor at 200+ streams.
|
/// full 4MB window; above that the window scales down to a 1MB floor at 200+ streams.
|
||||||
pub fn compute_window_for_stream_count(active: u32) -> u32 {
|
pub fn compute_window_for_stream_count(active: u32) -> u32 {
|
||||||
let per_stream = (800 * 1024 * 1024u64) / (active.max(1) as u64);
|
let per_stream = (200 * 1024 * 1024u64) / (active.max(1) as u64);
|
||||||
per_stream.clamp(4 * 1024 * 1024, INITIAL_STREAM_WINDOW as u64) as u32
|
per_stream.clamp(1 * 1024 * 1024, INITIAL_STREAM_WINDOW as u64) as u32
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Decode a WINDOW_UPDATE payload into a byte increment. Returns None if payload is malformed.
|
/// Decode a WINDOW_UPDATE payload into a byte increment. Returns None if payload is malformed.
|
||||||
@@ -684,7 +684,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_adaptive_window_zero_streams() {
|
fn test_adaptive_window_zero_streams() {
|
||||||
// 0 streams treated as 1: 800MB/1 -> clamped to 16MB max
|
// 0 streams treated as 1: 200MB/1 -> clamped to 4MB max
|
||||||
assert_eq!(compute_window_for_stream_count(0), INITIAL_STREAM_WINDOW);
|
assert_eq!(compute_window_for_stream_count(0), INITIAL_STREAM_WINDOW);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -695,40 +695,40 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_adaptive_window_50_streams_full() {
|
fn test_adaptive_window_50_streams_full() {
|
||||||
// 800MB/50 = 16MB = exactly INITIAL_STREAM_WINDOW
|
// 200MB/50 = 4MB = exactly INITIAL_STREAM_WINDOW
|
||||||
assert_eq!(compute_window_for_stream_count(50), INITIAL_STREAM_WINDOW);
|
assert_eq!(compute_window_for_stream_count(50), INITIAL_STREAM_WINDOW);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_adaptive_window_51_streams_starts_scaling() {
|
fn test_adaptive_window_51_streams_starts_scaling() {
|
||||||
// 800MB/51 < 16MB — first value below max
|
// 200MB/51 < 4MB — first value below max
|
||||||
let w = compute_window_for_stream_count(51);
|
let w = compute_window_for_stream_count(51);
|
||||||
assert!(w < INITIAL_STREAM_WINDOW);
|
assert!(w < INITIAL_STREAM_WINDOW);
|
||||||
assert_eq!(w, (800 * 1024 * 1024u64 / 51) as u32);
|
assert_eq!(w, (200 * 1024 * 1024u64 / 51) as u32);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_adaptive_window_100_streams() {
|
fn test_adaptive_window_100_streams() {
|
||||||
// 800MB/100 = 8MB
|
// 200MB/100 = 2MB
|
||||||
assert_eq!(compute_window_for_stream_count(100), 8 * 1024 * 1024);
|
assert_eq!(compute_window_for_stream_count(100), 2 * 1024 * 1024);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_adaptive_window_200_streams_at_floor() {
|
fn test_adaptive_window_200_streams_at_floor() {
|
||||||
// 800MB/200 = 4MB = exactly the floor
|
// 200MB/200 = 1MB = exactly the floor
|
||||||
assert_eq!(compute_window_for_stream_count(200), 4 * 1024 * 1024);
|
assert_eq!(compute_window_for_stream_count(200), 1 * 1024 * 1024);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_adaptive_window_500_streams_clamped() {
|
fn test_adaptive_window_500_streams_clamped() {
|
||||||
// 800MB/500 = 1.6MB -> clamped up to 4MB floor
|
// 200MB/500 = 0.4MB -> clamped up to 1MB floor
|
||||||
assert_eq!(compute_window_for_stream_count(500), 4 * 1024 * 1024);
|
assert_eq!(compute_window_for_stream_count(500), 1 * 1024 * 1024);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_adaptive_window_max_u32() {
|
fn test_adaptive_window_max_u32() {
|
||||||
// Extreme: u32::MAX streams -> tiny value -> clamped to 4MB
|
// Extreme: u32::MAX streams -> tiny value -> clamped to 1MB
|
||||||
assert_eq!(compute_window_for_stream_count(u32::MAX), 4 * 1024 * 1024);
|
assert_eq!(compute_window_for_stream_count(u32::MAX), 1 * 1024 * 1024);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -743,11 +743,11 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_adaptive_window_total_budget_bounded() {
|
fn test_adaptive_window_total_budget_bounded() {
|
||||||
// active x per_stream_window should never exceed 800MB (+ clamp overhead for high N)
|
// active x per_stream_window should never exceed 200MB (+ clamp overhead for high N)
|
||||||
for n in [1, 10, 50, 100, 200] {
|
for n in [1, 10, 50, 100, 200] {
|
||||||
let w = compute_window_for_stream_count(n);
|
let w = compute_window_for_stream_count(n);
|
||||||
let total = w as u64 * n as u64;
|
let total = w as u64 * n as u64;
|
||||||
assert!(total <= 800 * 1024 * 1024, "total {}MB exceeds budget at n={}", total / (1024*1024), n);
|
assert!(total <= 200 * 1024 * 1024, "total {}MB exceeds budget at n={}", total / (1024*1024), n);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -324,7 +324,7 @@ tap.test('setup: start echo server and tunnel', async () => {
|
|||||||
expect(tunnel.hub.running).toBeTrue();
|
expect(tunnel.hub.running).toBeTrue();
|
||||||
});
|
});
|
||||||
|
|
||||||
tap.test('single stream: 32MB transfer exceeding initial 4MB window', async () => {
|
tap.test('single stream: 32MB transfer exceeding initial 4MB window (multiple refills)', async () => {
|
||||||
const size = 32 * 1024 * 1024;
|
const size = 32 * 1024 * 1024;
|
||||||
const data = crypto.randomBytes(size);
|
const data = crypto.randomBytes(size);
|
||||||
const expectedHash = sha256(data);
|
const expectedHash = sha256(data);
|
||||||
@@ -392,7 +392,7 @@ tap.test('asymmetric transfer: 4KB request -> 4MB response', async () => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
tap.test('100 streams x 1MB each (100MB total exceeding 32MB budget)', async () => {
|
tap.test('100 streams x 1MB each (100MB total exceeding 200MB budget)', async () => {
|
||||||
const streamCount = 100;
|
const streamCount = 100;
|
||||||
const payloadSize = 1 * 1024 * 1024;
|
const payloadSize = 1 * 1024 * 1024;
|
||||||
|
|
||||||
@@ -446,7 +446,7 @@ tap.test('active stream counter tracks concurrent connections', async () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
tap.test('50 streams x 2MB each (forces multiple window refills per stream)', async () => {
|
tap.test('50 streams x 2MB each (forces multiple window refills per stream)', async () => {
|
||||||
// At 50 concurrent streams: adaptive window = 32MB/50 = 655KB per stream
|
// At 50 concurrent streams: adaptive window = 200MB/50 = 4MB per stream
|
||||||
// Each stream sends 2MB → needs ~3 WINDOW_UPDATE refill cycles per stream
|
// Each stream sends 2MB → needs ~3 WINDOW_UPDATE refill cycles per stream
|
||||||
const streamCount = 50;
|
const streamCount = 50;
|
||||||
const payloadSize = 2 * 1024 * 1024;
|
const payloadSize = 2 * 1024 * 1024;
|
||||||
|
|||||||
@@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: '@serve.zone/remoteingress',
|
name: '@serve.zone/remoteingress',
|
||||||
version: '4.8.18',
|
version: '4.8.19',
|
||||||
description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.'
|
description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.'
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user