fix(remoteingress-core): prioritize control frames over data in edge and hub tunnel writers

This commit is contained in:
2026-03-15 19:26:39 +00:00
parent 43e320a36d
commit a8ee0b33d7
4 changed files with 67 additions and 23 deletions

View File

@@ -371,14 +371,28 @@ async fn handle_edge_connection(
);
}
// A5: Channel-based writer replaces Arc<Mutex<WriteHalf>>
// All frame writes go through this channel → dedicated writer task serializes them
let (frame_writer_tx, mut frame_writer_rx) = mpsc::channel::<Vec<u8>>(4096);
// QoS dual-channel tunnel writer: control frames (PING/PONG/WINDOW_UPDATE/CLOSE)
// have priority over data frames (DATA_BACK). This prevents PING starvation under load.
let (ctrl_tx, mut ctrl_rx) = mpsc::channel::<Vec<u8>>(64);
let (data_tx, mut data_rx) = mpsc::channel::<Vec<u8>>(4096);
// Legacy alias for code that sends both control and data (will be migrated)
let frame_writer_tx = ctrl_tx.clone();
let writer_token = edge_token.clone();
let writer_handle = tokio::spawn(async move {
loop {
tokio::select! {
data = frame_writer_rx.recv() => {
biased; // control frames always take priority over data
ctrl = ctrl_rx.recv() => {
match ctrl {
Some(frame_data) => {
if write_half.write_all(&frame_data).await.is_err() {
break;
}
}
None => break,
}
}
data = data_rx.recv() => {
match data {
Some(frame_data) => {
if write_half.write_all(&frame_data).await.is_err() {
@@ -467,7 +481,8 @@ async fn handle_edge_connection(
let edge_id_clone = edge_id.clone();
let event_tx_clone = event_tx.clone();
let streams_clone = streams.clone();
let writer_tx = frame_writer_tx.clone();
let writer_tx = ctrl_tx.clone(); // control: CLOSE_BACK, WINDOW_UPDATE_BACK
let data_writer_tx = data_tx.clone(); // data: DATA_BACK
let target = target_host.clone();
let stream_token = edge_token.child_token();
@@ -576,8 +591,8 @@ async fn handle_edge_connection(
send_window.fetch_sub(n as u32, Ordering::Release);
let frame =
encode_frame(stream_id, FRAME_DATA_BACK, &buf[..n]);
if writer_tx.send(frame).await.is_err() {
log::warn!("Stream {} writer channel closed, closing", stream_id);
if data_writer_tx.send(frame).await.is_err() {
log::warn!("Stream {} data channel closed, closing", stream_id);
break;
}
}