feat(protocol): add sustained-stream tunnel scheduling to isolate high-throughput traffic

This commit is contained in:
2026-03-18 00:13:14 +00:00
parent a63247af3e
commit 6cbe8bee5e
5 changed files with 181 additions and 32 deletions

View File

@@ -293,6 +293,7 @@ async fn handle_edge_frame(
event_tx: &mpsc::Sender<EdgeEvent>,
tunnel_writer_tx: &mpsc::Sender<Bytes>,
tunnel_data_tx: &mpsc::Sender<Bytes>,
tunnel_sustained_tx: &mpsc::Sender<Bytes>,
port_listeners: &mut HashMap<u16, JoinHandle<()>>,
active_streams: &Arc<AtomicU32>,
next_stream_id: &Arc<AtomicU32>,
@@ -343,6 +344,7 @@ async fn handle_edge_frame(
port_listeners,
tunnel_writer_tx,
tunnel_data_tx,
tunnel_sustained_tx,
client_writers,
active_streams,
next_stream_id,
@@ -499,6 +501,7 @@ async fn connect_to_hub_and_run(
// Stream handlers send through these channels → TunnelIo drains them.
let (tunnel_ctrl_tx, mut tunnel_ctrl_rx) = mpsc::channel::<Bytes>(512);
let (tunnel_data_tx, mut tunnel_data_rx) = mpsc::channel::<Bytes>(4096);
let (tunnel_sustained_tx, mut tunnel_sustained_rx) = mpsc::channel::<Bytes>(4096);
let tunnel_writer_tx = tunnel_ctrl_tx.clone();
// Start TCP listeners for initial ports
@@ -509,6 +512,7 @@ async fn connect_to_hub_and_run(
&mut port_listeners,
&tunnel_writer_tx,
&tunnel_data_tx,
&tunnel_sustained_tx,
&client_writers,
active_streams,
next_stream_id,
@@ -540,7 +544,7 @@ async fn connect_to_hub_and_run(
liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur);
if let EdgeFrameAction::Disconnect(reason) = handle_edge_frame(
frame, &mut tunnel_io, &client_writers, listen_ports, event_tx,
&tunnel_writer_tx, &tunnel_data_tx, &mut port_listeners,
&tunnel_writer_tx, &tunnel_data_tx, &tunnel_sustained_tx, &mut port_listeners,
active_streams, next_stream_id, &config.edge_id, connection_token, bind_address,
).await {
break 'io_loop EdgeLoopResult::Reconnect(reason);
@@ -549,7 +553,7 @@ async fn connect_to_hub_and_run(
// Poll I/O: write(ctrl→data), flush, read, channels, timers
let event = std::future::poll_fn(|cx| {
tunnel_io.poll_step(cx, &mut tunnel_ctrl_rx, &mut tunnel_data_rx, &mut liveness_deadline, connection_token)
tunnel_io.poll_step(cx, &mut tunnel_ctrl_rx, &mut tunnel_data_rx, &mut tunnel_sustained_rx, &mut liveness_deadline, connection_token)
}).await;
match event {
@@ -558,7 +562,7 @@ async fn connect_to_hub_and_run(
liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur);
if let EdgeFrameAction::Disconnect(reason) = handle_edge_frame(
frame, &mut tunnel_io, &client_writers, listen_ports, event_tx,
&tunnel_writer_tx, &tunnel_data_tx, &mut port_listeners,
&tunnel_writer_tx, &tunnel_data_tx, &tunnel_sustained_tx, &mut port_listeners,
active_streams, next_stream_id, &config.edge_id, connection_token, bind_address,
).await {
break EdgeLoopResult::Reconnect(reason);
@@ -615,6 +619,7 @@ fn apply_port_config(
port_listeners: &mut HashMap<u16, JoinHandle<()>>,
tunnel_ctrl_tx: &mpsc::Sender<Bytes>,
tunnel_data_tx: &mpsc::Sender<Bytes>,
tunnel_sustained_tx: &mpsc::Sender<Bytes>,
client_writers: &Arc<Mutex<HashMap<u32, EdgeStreamState>>>,
active_streams: &Arc<AtomicU32>,
next_stream_id: &Arc<AtomicU32>,
@@ -637,6 +642,7 @@ fn apply_port_config(
for &port in new_set.difference(&old_set) {
let tunnel_ctrl_tx = tunnel_ctrl_tx.clone();
let tunnel_data_tx = tunnel_data_tx.clone();
let tunnel_sustained_tx = tunnel_sustained_tx.clone();
let client_writers = client_writers.clone();
let active_streams = active_streams.clone();
let next_stream_id = next_stream_id.clone();
@@ -671,6 +677,7 @@ fn apply_port_config(
let stream_id = next_stream_id.fetch_add(1, Ordering::Relaxed);
let tunnel_ctrl_tx = tunnel_ctrl_tx.clone();
let tunnel_data_tx = tunnel_data_tx.clone();
let tunnel_sustained_tx = tunnel_sustained_tx.clone();
let client_writers = client_writers.clone();
let active_streams = active_streams.clone();
let edge_id = edge_id.clone();
@@ -687,6 +694,7 @@ fn apply_port_config(
&edge_id,
tunnel_ctrl_tx,
tunnel_data_tx,
tunnel_sustained_tx,
client_writers,
client_token,
Arc::clone(&active_streams),
@@ -730,6 +738,7 @@ async fn handle_client_connection(
edge_id: &str,
tunnel_ctrl_tx: mpsc::Sender<Bytes>,
tunnel_data_tx: mpsc::Sender<Bytes>,
tunnel_sustained_tx: mpsc::Sender<Bytes>,
client_writers: Arc<Mutex<HashMap<u32, EdgeStreamState>>>,
client_token: CancellationToken,
active_streams: Arc<AtomicU32>,
@@ -833,6 +842,9 @@ async fn handle_client_connection(
// Task: client -> hub (upload direction) with per-stream flow control.
// Zero-copy: read payload directly after the header, then prepend header.
let mut buf = vec![0u8; FRAME_HEADER_SIZE + 32768];
let mut stream_bytes_sent: u64 = 0;
let stream_start = tokio::time::Instant::now();
let mut is_sustained = false;
loop {
// Wait for send window to have capacity (with stall timeout).
// Safe pattern: register notified BEFORE checking the condition
@@ -873,8 +885,21 @@ async fn handle_client_connection(
send_window.fetch_sub(n as u32, Ordering::Release);
encode_frame_header(&mut buf, stream_id, FRAME_DATA, n);
let data_frame = Bytes::copy_from_slice(&buf[..FRAME_HEADER_SIZE + n]);
// Sustained classification: >2.5 MB/s for >10 seconds
stream_bytes_sent += n as u64;
if !is_sustained {
let elapsed = stream_start.elapsed().as_secs();
if elapsed >= remoteingress_protocol::SUSTAINED_MIN_DURATION_SECS
&& stream_bytes_sent / elapsed >= remoteingress_protocol::SUSTAINED_THRESHOLD_BPS
{
is_sustained = true;
log::debug!("Stream {} classified as sustained (upload, {} bytes in {}s)",
stream_id, stream_bytes_sent, elapsed);
}
}
let tx = if is_sustained { &tunnel_sustained_tx } else { &tunnel_data_tx };
let sent = tokio::select! {
result = tunnel_data_tx.send(data_frame) => result.is_ok(),
result = tx.send(data_frame) => result.is_ok(),
_ = client_token.cancelled() => false,
};
if !sent { break; }
@@ -901,8 +926,9 @@ async fn handle_client_connection(
// select! with cancellation guard prevents indefinite blocking if tunnel dies.
if !client_token.is_cancelled() {
let close_frame = encode_frame(stream_id, FRAME_CLOSE, &[]);
let tx = if is_sustained { &tunnel_sustained_tx } else { &tunnel_data_tx };
tokio::select! {
_ = tunnel_data_tx.send(close_frame) => {}
_ = tx.send(close_frame) => {}
_ = client_token.cancelled() => {}
}
}

View File

@@ -310,6 +310,7 @@ async fn handle_hub_frame(
event_tx: &mpsc::Sender<HubEvent>,
ctrl_tx: &mpsc::Sender<Bytes>,
data_tx: &mpsc::Sender<Bytes>,
sustained_tx: &mpsc::Sender<Bytes>,
target_host: &str,
edge_token: &CancellationToken,
cleanup_tx: &mpsc::Sender<u32>,
@@ -338,6 +339,7 @@ async fn handle_hub_frame(
let cleanup = cleanup_tx.clone();
let writer_tx = ctrl_tx.clone(); // control: CLOSE_BACK, WINDOW_UPDATE_BACK
let data_writer_tx = data_tx.clone(); // data: DATA_BACK
let sustained_writer_tx = sustained_tx.clone(); // sustained: DATA_BACK from elephant flows
let target = target_host.to_string();
let stream_token = edge_token.child_token();
@@ -458,6 +460,9 @@ async fn handle_hub_frame(
// with per-stream flow control (check send_window before reading).
// Zero-copy: read payload directly after the header, then prepend header.
let mut buf = vec![0u8; FRAME_HEADER_SIZE + 32768];
let mut dl_bytes_sent: u64 = 0;
let dl_start = tokio::time::Instant::now();
let mut is_sustained = false;
loop {
// Wait for send window to have capacity (with stall timeout).
// Safe pattern: register notified BEFORE checking the condition
@@ -498,8 +503,21 @@ async fn handle_hub_frame(
send_window.fetch_sub(n as u32, Ordering::Release);
encode_frame_header(&mut buf, stream_id, FRAME_DATA_BACK, n);
let frame = Bytes::copy_from_slice(&buf[..FRAME_HEADER_SIZE + n]);
// Sustained classification: >2.5 MB/s for >10 seconds
dl_bytes_sent += n as u64;
if !is_sustained {
let elapsed = dl_start.elapsed().as_secs();
if elapsed >= remoteingress_protocol::SUSTAINED_MIN_DURATION_SECS
&& dl_bytes_sent / elapsed >= remoteingress_protocol::SUSTAINED_THRESHOLD_BPS
{
is_sustained = true;
log::debug!("Stream {} classified as sustained (download, {} bytes in {}s)",
stream_id, dl_bytes_sent, elapsed);
}
}
let tx = if is_sustained { &sustained_writer_tx } else { &data_writer_tx };
let sent = tokio::select! {
result = data_writer_tx.send(frame) => result.is_ok(),
result = tx.send(frame) => result.is_ok(),
_ = stream_token.cancelled() => false,
};
if !sent { break; }
@@ -511,12 +529,13 @@ async fn handle_hub_frame(
}
}
// Send CLOSE_BACK via DATA channel (must arrive AFTER last DATA_BACK).
// Send CLOSE_BACK via same channel as DATA_BACK (must arrive AFTER last DATA_BACK).
// select! with cancellation guard prevents indefinite blocking if tunnel dies.
if !stream_token.is_cancelled() {
let close_frame = encode_frame(stream_id, FRAME_CLOSE_BACK, &[]);
let tx = if is_sustained { &sustained_writer_tx } else { &data_writer_tx };
tokio::select! {
_ = data_writer_tx.send(close_frame) => {}
_ = tx.send(close_frame) => {}
_ = stream_token.cancelled() => {}
}
}
@@ -528,7 +547,9 @@ async fn handle_hub_frame(
if let Err(e) = result {
log::error!("Stream {} error: {}", stream_id, e);
// Send CLOSE_BACK via DATA channel on error (must arrive after any DATA_BACK).
// Send CLOSE_BACK on error (must arrive after any DATA_BACK).
// Error path: is_sustained not available here, use data channel (safe —
// if error occurs before classification, no sustained frames were sent).
if !stream_token.is_cancelled() {
let close_frame = encode_frame(stream_id, FRAME_CLOSE_BACK, &[]);
tokio::select! {
@@ -710,6 +731,7 @@ async fn handle_edge_connection(
// Stream handlers send through these channels -> TunnelIo drains them.
let (ctrl_tx, mut ctrl_rx) = mpsc::channel::<Bytes>(512);
let (data_tx, mut data_rx) = mpsc::channel::<Bytes>(4096);
let (sustained_tx, mut sustained_rx) = mpsc::channel::<Bytes>(4096);
// Spawn task to forward config updates as FRAME_CONFIG frames
let config_writer_tx = ctrl_tx.clone();
@@ -783,7 +805,7 @@ async fn handle_edge_connection(
liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur);
if let FrameAction::Disconnect(reason) = handle_hub_frame(
frame, &mut tunnel_io, &mut streams, &stream_semaphore, &edge_stream_count,
&edge_id, &event_tx, &ctrl_tx, &data_tx, &target_host, &edge_token,
&edge_id, &event_tx, &ctrl_tx, &data_tx, &sustained_tx, &target_host, &edge_token,
&cleanup_tx,
).await {
disconnect_reason = reason;
@@ -797,7 +819,7 @@ async fn handle_edge_connection(
if ping_ticker.poll_tick(cx).is_ready() {
tunnel_io.queue_ctrl(encode_frame(0, FRAME_PING, &[]));
}
tunnel_io.poll_step(cx, &mut ctrl_rx, &mut data_rx, &mut liveness_deadline, &edge_token)
tunnel_io.poll_step(cx, &mut ctrl_rx, &mut data_rx, &mut sustained_rx, &mut liveness_deadline, &edge_token)
}).await;
match event {
@@ -806,7 +828,7 @@ async fn handle_edge_connection(
liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur);
if let FrameAction::Disconnect(reason) = handle_hub_frame(
frame, &mut tunnel_io, &mut streams, &stream_semaphore, &edge_stream_count,
&edge_id, &event_tx, &ctrl_tx, &data_tx, &target_host, &edge_token,
&edge_id, &event_tx, &ctrl_tx, &data_tx, &sustained_tx, &target_host, &edge_token,
&cleanup_tx,
).await {
disconnect_reason = reason;