fix(remoteingress-core): improve stream flow control retries and increase channel buffer capacity

This commit is contained in:
2026-03-15 18:16:10 +00:00
parent f456b0ba4f
commit 6ac4b37532
4 changed files with 19 additions and 11 deletions

View File

@@ -625,7 +625,7 @@ async fn handle_client_connection(
}
// Set up channel for data coming back from hub (capacity 16 is sufficient with flow control)
let (back_tx, mut back_rx) = mpsc::channel::<Vec<u8>>(128);
let (back_tx, mut back_rx) = mpsc::channel::<Vec<u8>>(256);
let send_window = Arc::new(AtomicU32::new(INITIAL_STREAM_WINDOW));
let window_notify = Arc::new(Notify::new());
{
@@ -657,10 +657,11 @@ async fn handle_client_connection(
// Track consumption for flow control
consumed_since_update += len;
if consumed_since_update >= WINDOW_UPDATE_THRESHOLD {
let increment = consumed_since_update;
consumed_since_update = 0;
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, increment);
let _ = wu_tx.try_send(frame);
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, consumed_since_update);
if wu_tx.try_send(frame).is_ok() {
consumed_since_update = 0;
}
// If try_send fails, keep accumulating — retry on next threshold
}
}
None => break,

View File

@@ -477,7 +477,7 @@ async fn handle_edge_connection(
});
// Create channel for data from edge to this stream (capacity 16 is sufficient with flow control)
let (data_tx, mut data_rx) = mpsc::channel::<Vec<u8>>(128);
let (data_tx, mut data_rx) = mpsc::channel::<Vec<u8>>(256);
let send_window = Arc::new(AtomicU32::new(INITIAL_STREAM_WINDOW));
let window_notify = Arc::new(Notify::new());
{
@@ -528,10 +528,11 @@ async fn handle_edge_connection(
// Track consumption for flow control
consumed_since_update += len;
if consumed_since_update >= WINDOW_UPDATE_THRESHOLD {
let increment = consumed_since_update;
consumed_since_update = 0;
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, increment);
let _ = wub_tx.try_send(frame);
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, consumed_since_update);
if wub_tx.try_send(frame).is_ok() {
consumed_since_update = 0;
}
// If try_send fails, keep accumulating — retry on next threshold
}
}
None => break,