feat(remoteingress-core): add adaptive per-stream flow control based on active stream counts
This commit is contained in:
@@ -620,6 +620,7 @@ fn apply_port_config(
|
||||
tunnel_data_tx,
|
||||
client_writers,
|
||||
client_token,
|
||||
Arc::clone(&active_streams),
|
||||
)
|
||||
.await;
|
||||
active_streams.fetch_sub(1, Ordering::Relaxed);
|
||||
@@ -651,6 +652,7 @@ async fn handle_client_connection(
|
||||
tunnel_data_tx: mpsc::Sender<Vec<u8>>,
|
||||
client_writers: Arc<Mutex<HashMap<u32, EdgeStreamState>>>,
|
||||
client_token: CancellationToken,
|
||||
active_streams: Arc<AtomicU32>,
|
||||
) {
|
||||
let client_ip = client_addr.ip().to_string();
|
||||
let client_port = client_addr.port();
|
||||
@@ -684,6 +686,7 @@ async fn handle_client_connection(
|
||||
// After writing to client TCP, send WINDOW_UPDATE to hub so it can send more
|
||||
let hub_to_client_token = client_token.clone();
|
||||
let wu_tx = tunnel_ctrl_tx.clone();
|
||||
let active_streams_h2c = Arc::clone(&active_streams);
|
||||
let mut hub_to_client = tokio::spawn(async move {
|
||||
let mut consumed_since_update: u32 = 0;
|
||||
loop {
|
||||
@@ -695,12 +698,20 @@ async fn handle_client_connection(
|
||||
if client_write.write_all(&data).await.is_err() {
|
||||
break;
|
||||
}
|
||||
// Track consumption for flow control
|
||||
// Track consumption for adaptive flow control.
|
||||
// The increment is capped to the adaptive window so the sender's
|
||||
// effective window shrinks to match current demand (fewer streams
|
||||
// = larger window, more streams = smaller window per stream).
|
||||
consumed_since_update += len;
|
||||
if consumed_since_update >= WINDOW_UPDATE_THRESHOLD {
|
||||
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, consumed_since_update);
|
||||
let adaptive_window = remoteingress_protocol::compute_window_for_stream_count(
|
||||
active_streams_h2c.load(Ordering::Relaxed),
|
||||
);
|
||||
let threshold = adaptive_window / 2;
|
||||
if consumed_since_update >= threshold {
|
||||
let increment = consumed_since_update.min(adaptive_window);
|
||||
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE, increment);
|
||||
if wu_tx.try_send(frame).is_ok() {
|
||||
consumed_since_update = 0;
|
||||
consumed_since_update -= increment;
|
||||
}
|
||||
// If try_send fails, keep accumulating — retry on next threshold
|
||||
}
|
||||
@@ -746,7 +757,11 @@ async fn handle_client_connection(
|
||||
log::warn!("Stream {} upload: window still 0 after stall timeout, closing", stream_id);
|
||||
break;
|
||||
}
|
||||
let max_read = w.min(buf.len());
|
||||
// Adaptive: cap read to current per-stream target window
|
||||
let adaptive_cap = remoteingress_protocol::compute_window_for_stream_count(
|
||||
active_streams.load(Ordering::Relaxed),
|
||||
) as usize;
|
||||
let max_read = w.min(buf.len()).min(adaptive_cap);
|
||||
|
||||
tokio::select! {
|
||||
read_result = client_read.read(&mut buf[..max_read]) => {
|
||||
|
||||
Reference in New Issue
Block a user