fix(rust-edge): refactor tunnel I/O to preserve TLS state and prioritize control frames
This commit is contained in:
@@ -2,7 +2,7 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::sync::{mpsc, Mutex, Notify, RwLock, Semaphore};
|
||||
use tokio::time::{interval, sleep_until, Instant};
|
||||
@@ -307,13 +307,24 @@ async fn handle_edge_connection(
|
||||
#[cfg(target_os = "linux")]
|
||||
let ka = ka.with_interval(Duration::from_secs(10));
|
||||
let _ = socket2::SockRef::from(&stream).set_tcp_keepalive(&ka);
|
||||
let tls_stream = acceptor.accept(stream).await?;
|
||||
let (read_half, mut write_half) = tokio::io::split(tls_stream);
|
||||
let mut buf_reader = BufReader::new(read_half);
|
||||
let mut tls_stream = acceptor.accept(stream).await?;
|
||||
|
||||
// Read auth line: "EDGE <edgeId> <secret>\n"
|
||||
let mut auth_line = String::new();
|
||||
buf_reader.read_line(&mut auth_line).await?;
|
||||
// Byte-by-byte auth line reading (no BufReader).
|
||||
// Auth line: "EDGE <edgeId> <secret>\n"
|
||||
let mut auth_buf = Vec::with_capacity(512);
|
||||
loop {
|
||||
let mut byte = [0u8; 1];
|
||||
tls_stream.read_exact(&mut byte).await?;
|
||||
if byte[0] == b'\n' {
|
||||
break;
|
||||
}
|
||||
auth_buf.push(byte[0]);
|
||||
if auth_buf.len() > 4096 {
|
||||
return Err("auth line too long".into());
|
||||
}
|
||||
}
|
||||
let auth_line = String::from_utf8(auth_buf)
|
||||
.map_err(|_| "auth line not valid UTF-8")?;
|
||||
let auth_line = auth_line.trim();
|
||||
|
||||
let parts: Vec<&str> = auth_line.splitn(3, ' ').collect();
|
||||
@@ -353,7 +364,8 @@ async fn handle_edge_connection(
|
||||
};
|
||||
let mut handshake_json = serde_json::to_string(&handshake)?;
|
||||
handshake_json.push('\n');
|
||||
write_half.write_all(handshake_json.as_bytes()).await?;
|
||||
tls_stream.write_all(handshake_json.as_bytes()).await?;
|
||||
tls_stream.flush().await?;
|
||||
|
||||
// Track this edge
|
||||
let streams: Arc<Mutex<HashMap<u32, HubStreamState>>> =
|
||||
@@ -383,51 +395,13 @@ async fn handle_edge_connection(
|
||||
// Per-edge active stream counter for adaptive flow control
|
||||
let edge_stream_count = Arc::new(AtomicU32::new(0));
|
||||
|
||||
// QoS dual-channel tunnel writer: control frames (PING/PONG/WINDOW_UPDATE/CLOSE)
|
||||
// have priority over data frames (DATA_BACK). This prevents PING starvation under load.
|
||||
// QoS dual-channel: ctrl frames have priority over data frames.
|
||||
// Stream handlers send through these channels -> TunnelIo drains them.
|
||||
let (ctrl_tx, mut ctrl_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||
let (data_tx, mut data_rx) = mpsc::channel::<Vec<u8>>(4096);
|
||||
// Legacy alias for code that sends both control and data (will be migrated)
|
||||
let frame_writer_tx = ctrl_tx.clone();
|
||||
let writer_token = edge_token.clone();
|
||||
let (writer_dead_tx, mut writer_dead_rx) = tokio::sync::oneshot::channel::<()>();
|
||||
let writer_handle = tokio::spawn(async move {
|
||||
// BufWriter coalesces small writes (frame headers, control frames) into fewer
|
||||
// TLS records and syscalls. Flushed after each frame to avoid holding data.
|
||||
let mut writer = tokio::io::BufWriter::with_capacity(65536, write_half);
|
||||
let mut write_error = false;
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased; // control frames always take priority over data
|
||||
ctrl = ctrl_rx.recv() => {
|
||||
match ctrl {
|
||||
Some(frame_data) => {
|
||||
if writer.write_all(&frame_data).await.is_err() { write_error = true; break; }
|
||||
if writer.flush().await.is_err() { write_error = true; break; }
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
data = data_rx.recv() => {
|
||||
match data {
|
||||
Some(frame_data) => {
|
||||
if writer.write_all(&frame_data).await.is_err() { write_error = true; break; }
|
||||
if writer.flush().await.is_err() { write_error = true; break; }
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
_ = writer_token.cancelled() => break,
|
||||
}
|
||||
}
|
||||
if write_error {
|
||||
log::error!("Tunnel writer to edge failed, signalling reader for fast cleanup");
|
||||
let _ = writer_dead_tx.send(());
|
||||
}
|
||||
});
|
||||
|
||||
// Spawn task to forward config updates as FRAME_CONFIG frames
|
||||
let config_writer_tx = frame_writer_tx.clone();
|
||||
let config_writer_tx = ctrl_tx.clone();
|
||||
let config_edge_id = edge_id.clone();
|
||||
let config_token = edge_token.clone();
|
||||
let config_handle = tokio::spawn(async move {
|
||||
@@ -464,324 +438,610 @@ async fn handle_edge_connection(
|
||||
let mut last_activity = Instant::now();
|
||||
let mut liveness_deadline = Box::pin(sleep_until(last_activity + liveness_timeout_dur));
|
||||
|
||||
// Frame reading loop
|
||||
let mut frame_reader = FrameReader::new(buf_reader);
|
||||
// Single-owner I/O engine — no tokio::io::split, no mutex
|
||||
let mut tunnel_io = remoteingress_protocol::TunnelIo::new(tls_stream, Vec::new());
|
||||
|
||||
let mut disconnect_reason = "unknown".to_string();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
frame_result = frame_reader.next_frame() => {
|
||||
match frame_result {
|
||||
Ok(Some(frame)) => {
|
||||
// Reset liveness on any received frame
|
||||
last_activity = Instant::now();
|
||||
liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur);
|
||||
'hub_loop: loop {
|
||||
// Drain any buffered frames
|
||||
loop {
|
||||
match tunnel_io.try_parse_frame() {
|
||||
Some(Ok(frame)) => {
|
||||
// Reset liveness on any received frame
|
||||
last_activity = Instant::now();
|
||||
liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur);
|
||||
|
||||
match frame.frame_type {
|
||||
FRAME_OPEN => {
|
||||
// A4: Check stream limit before processing
|
||||
let permit = match stream_semaphore.clone().try_acquire_owned() {
|
||||
Ok(p) => p,
|
||||
Err(_) => {
|
||||
log::warn!("Edge {} exceeded max streams ({}), rejecting stream {}",
|
||||
edge_id, MAX_STREAMS_PER_EDGE, frame.stream_id);
|
||||
let close_frame = encode_frame(frame.stream_id, FRAME_CLOSE_BACK, &[]);
|
||||
let _ = frame_writer_tx.try_send(close_frame);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Payload is PROXY v1 header line
|
||||
let proxy_header = String::from_utf8_lossy(&frame.payload).to_string();
|
||||
|
||||
// Parse destination port from PROXY header
|
||||
let dest_port = parse_dest_port_from_proxy(&proxy_header).unwrap_or(443);
|
||||
|
||||
let stream_id = frame.stream_id;
|
||||
let edge_id_clone = edge_id.clone();
|
||||
let event_tx_clone = event_tx.clone();
|
||||
let streams_clone = streams.clone();
|
||||
let writer_tx = ctrl_tx.clone(); // control: CLOSE_BACK, WINDOW_UPDATE_BACK
|
||||
let data_writer_tx = data_tx.clone(); // data: DATA_BACK
|
||||
let target = target_host.clone();
|
||||
let stream_token = edge_token.child_token();
|
||||
|
||||
let _ = event_tx.try_send(HubEvent::StreamOpened {
|
||||
edge_id: edge_id.clone(),
|
||||
stream_id,
|
||||
});
|
||||
|
||||
// Create channel for data from edge to this stream (capacity 16 is sufficient with flow control)
|
||||
let (data_tx, mut data_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||
// Adaptive initial window: scale with current stream count
|
||||
// to keep total in-flight data within the 32MB budget.
|
||||
let initial_window = compute_window_for_stream_count(
|
||||
edge_stream_count.load(Ordering::Relaxed),
|
||||
);
|
||||
let send_window = Arc::new(AtomicU32::new(initial_window));
|
||||
let window_notify = Arc::new(Notify::new());
|
||||
{
|
||||
let mut s = streams.lock().await;
|
||||
s.insert(stream_id, HubStreamState {
|
||||
data_tx,
|
||||
cancel_token: stream_token.clone(),
|
||||
send_window: Arc::clone(&send_window),
|
||||
window_notify: Arc::clone(&window_notify),
|
||||
});
|
||||
match frame.frame_type {
|
||||
FRAME_OPEN => {
|
||||
// A4: Check stream limit before processing
|
||||
let permit = match stream_semaphore.clone().try_acquire_owned() {
|
||||
Ok(p) => p,
|
||||
Err(_) => {
|
||||
log::warn!("Edge {} exceeded max streams ({}), rejecting stream {}",
|
||||
edge_id, MAX_STREAMS_PER_EDGE, frame.stream_id);
|
||||
let close_frame = encode_frame(frame.stream_id, FRAME_CLOSE_BACK, &[]);
|
||||
tunnel_io.queue_ctrl(close_frame);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Spawn task: connect to SmartProxy, send PROXY header, pipe data
|
||||
let stream_counter = Arc::clone(&edge_stream_count);
|
||||
tokio::spawn(async move {
|
||||
let _permit = permit; // hold semaphore permit until stream completes
|
||||
stream_counter.fetch_add(1, Ordering::Relaxed);
|
||||
// Payload is PROXY v1 header line
|
||||
let proxy_header = String::from_utf8_lossy(&frame.payload).to_string();
|
||||
|
||||
let result = async {
|
||||
// A2: Connect to SmartProxy with timeout
|
||||
let mut upstream = tokio::time::timeout(
|
||||
Duration::from_secs(10),
|
||||
TcpStream::connect((target.as_str(), dest_port)),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| -> Box<dyn std::error::Error + Send + Sync> {
|
||||
format!("connect to SmartProxy {}:{} timed out (10s)", target, dest_port).into()
|
||||
})??;
|
||||
// Parse destination port from PROXY header
|
||||
let dest_port = parse_dest_port_from_proxy(&proxy_header).unwrap_or(443);
|
||||
|
||||
upstream.set_nodelay(true)?;
|
||||
upstream.write_all(proxy_header.as_bytes()).await?;
|
||||
let stream_id = frame.stream_id;
|
||||
let edge_id_clone = edge_id.clone();
|
||||
let event_tx_clone = event_tx.clone();
|
||||
let streams_clone = streams.clone();
|
||||
let writer_tx = ctrl_tx.clone(); // control: CLOSE_BACK, WINDOW_UPDATE_BACK
|
||||
let data_writer_tx = data_tx.clone(); // data: DATA_BACK
|
||||
let target = target_host.clone();
|
||||
let stream_token = edge_token.child_token();
|
||||
|
||||
let (mut up_read, mut up_write) =
|
||||
upstream.into_split();
|
||||
let _ = event_tx.try_send(HubEvent::StreamOpened {
|
||||
edge_id: edge_id.clone(),
|
||||
stream_id,
|
||||
});
|
||||
|
||||
// Forward data from edge (via channel) to SmartProxy
|
||||
// After writing to upstream, send WINDOW_UPDATE_BACK to edge
|
||||
let writer_token = stream_token.clone();
|
||||
let wub_tx = writer_tx.clone();
|
||||
let stream_counter_w = Arc::clone(&stream_counter);
|
||||
let writer_for_edge_data = tokio::spawn(async move {
|
||||
let mut consumed_since_update: u32 = 0;
|
||||
loop {
|
||||
tokio::select! {
|
||||
data = data_rx.recv() => {
|
||||
match data {
|
||||
Some(data) => {
|
||||
let len = data.len() as u32;
|
||||
// Check cancellation alongside the write so we respond
|
||||
// promptly to FRAME_CLOSE instead of blocking up to 60s.
|
||||
let write_result = tokio::select! {
|
||||
r = tokio::time::timeout(
|
||||
Duration::from_secs(60),
|
||||
up_write.write_all(&data),
|
||||
) => r,
|
||||
_ = writer_token.cancelled() => break,
|
||||
};
|
||||
match write_result {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(_)) => break,
|
||||
Err(_) => {
|
||||
log::warn!("Stream {} write to upstream timed out (60s)", stream_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Track consumption for adaptive flow control.
|
||||
// Increment capped to adaptive window to limit per-stream in-flight data.
|
||||
consumed_since_update += len;
|
||||
let adaptive_window = remoteingress_protocol::compute_window_for_stream_count(
|
||||
stream_counter_w.load(Ordering::Relaxed),
|
||||
);
|
||||
let threshold = adaptive_window / 2;
|
||||
if consumed_since_update >= threshold {
|
||||
let increment = consumed_since_update.min(adaptive_window);
|
||||
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, increment);
|
||||
if wub_tx.try_send(frame).is_ok() {
|
||||
consumed_since_update -= increment;
|
||||
}
|
||||
// If try_send fails, keep accumulating — retry on next threshold
|
||||
// Create channel for data from edge to this stream (capacity 16 is sufficient with flow control)
|
||||
let (data_tx, mut data_rx) = mpsc::channel::<Vec<u8>>(1024);
|
||||
// Adaptive initial window: scale with current stream count
|
||||
// to keep total in-flight data within the 32MB budget.
|
||||
let initial_window = compute_window_for_stream_count(
|
||||
edge_stream_count.load(Ordering::Relaxed),
|
||||
);
|
||||
let send_window = Arc::new(AtomicU32::new(initial_window));
|
||||
let window_notify = Arc::new(Notify::new());
|
||||
{
|
||||
let mut s = streams.lock().await;
|
||||
s.insert(stream_id, HubStreamState {
|
||||
data_tx,
|
||||
cancel_token: stream_token.clone(),
|
||||
send_window: Arc::clone(&send_window),
|
||||
window_notify: Arc::clone(&window_notify),
|
||||
});
|
||||
}
|
||||
|
||||
// Spawn task: connect to SmartProxy, send PROXY header, pipe data
|
||||
let stream_counter = Arc::clone(&edge_stream_count);
|
||||
tokio::spawn(async move {
|
||||
let _permit = permit; // hold semaphore permit until stream completes
|
||||
stream_counter.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let result = async {
|
||||
// A2: Connect to SmartProxy with timeout
|
||||
let mut upstream = tokio::time::timeout(
|
||||
Duration::from_secs(10),
|
||||
TcpStream::connect((target.as_str(), dest_port)),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| -> Box<dyn std::error::Error + Send + Sync> {
|
||||
format!("connect to SmartProxy {}:{} timed out (10s)", target, dest_port).into()
|
||||
})??;
|
||||
|
||||
upstream.set_nodelay(true)?;
|
||||
upstream.write_all(proxy_header.as_bytes()).await?;
|
||||
|
||||
let (mut up_read, mut up_write) =
|
||||
upstream.into_split();
|
||||
|
||||
// Forward data from edge (via channel) to SmartProxy
|
||||
// After writing to upstream, send WINDOW_UPDATE_BACK to edge
|
||||
let writer_token = stream_token.clone();
|
||||
let wub_tx = writer_tx.clone();
|
||||
let stream_counter_w = Arc::clone(&stream_counter);
|
||||
let writer_for_edge_data = tokio::spawn(async move {
|
||||
let mut consumed_since_update: u32 = 0;
|
||||
loop {
|
||||
tokio::select! {
|
||||
data = data_rx.recv() => {
|
||||
match data {
|
||||
Some(data) => {
|
||||
let len = data.len() as u32;
|
||||
// Check cancellation alongside the write so we respond
|
||||
// promptly to FRAME_CLOSE instead of blocking up to 60s.
|
||||
let write_result = tokio::select! {
|
||||
r = tokio::time::timeout(
|
||||
Duration::from_secs(60),
|
||||
up_write.write_all(&data),
|
||||
) => r,
|
||||
_ = writer_token.cancelled() => break,
|
||||
};
|
||||
match write_result {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(_)) => break,
|
||||
Err(_) => {
|
||||
log::warn!("Stream {} write to upstream timed out (60s)", stream_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
None => break,
|
||||
// Track consumption for adaptive flow control.
|
||||
// Increment capped to adaptive window to limit per-stream in-flight data.
|
||||
consumed_since_update += len;
|
||||
let adaptive_window = remoteingress_protocol::compute_window_for_stream_count(
|
||||
stream_counter_w.load(Ordering::Relaxed),
|
||||
);
|
||||
let threshold = adaptive_window / 2;
|
||||
if consumed_since_update >= threshold {
|
||||
let increment = consumed_since_update.min(adaptive_window);
|
||||
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, increment);
|
||||
if wub_tx.try_send(frame).is_ok() {
|
||||
consumed_since_update -= increment;
|
||||
}
|
||||
// If try_send fails, keep accumulating — retry on next threshold
|
||||
}
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
_ = writer_token.cancelled() => break,
|
||||
}
|
||||
}
|
||||
// Send final window update for remaining consumed bytes
|
||||
if consumed_since_update > 0 {
|
||||
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, consumed_since_update);
|
||||
let _ = wub_tx.try_send(frame);
|
||||
}
|
||||
let _ = up_write.shutdown().await;
|
||||
});
|
||||
|
||||
// Forward data from SmartProxy back to edge via writer channel
|
||||
// with per-stream flow control (check send_window before reading)
|
||||
let mut buf = vec![0u8; 32768];
|
||||
loop {
|
||||
// Wait for send window to have capacity (with stall timeout)
|
||||
loop {
|
||||
let w = send_window.load(Ordering::Acquire);
|
||||
if w > 0 { break; }
|
||||
tokio::select! {
|
||||
_ = window_notify.notified() => continue,
|
||||
_ = stream_token.cancelled() => break,
|
||||
_ = tokio::time::sleep(Duration::from_secs(120)) => {
|
||||
log::warn!("Stream {} download stalled (window empty for 120s)", stream_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if stream_token.is_cancelled() { break; }
|
||||
|
||||
// Limit read size to available window.
|
||||
// IMPORTANT: if window is 0 (stall timeout fired), we must NOT
|
||||
// read into an empty buffer — read(&mut buf[..0]) returns Ok(0)
|
||||
// which would be falsely interpreted as EOF.
|
||||
let w = send_window.load(Ordering::Acquire) as usize;
|
||||
if w == 0 {
|
||||
log::warn!("Stream {} download: window still 0 after stall timeout, closing", stream_id);
|
||||
break;
|
||||
}
|
||||
// Adaptive: cap read to current per-stream target window
|
||||
let adaptive_cap = remoteingress_protocol::compute_window_for_stream_count(
|
||||
stream_counter.load(Ordering::Relaxed),
|
||||
) as usize;
|
||||
let max_read = w.min(buf.len()).min(adaptive_cap);
|
||||
|
||||
tokio::select! {
|
||||
read_result = up_read.read(&mut buf[..max_read]) => {
|
||||
match read_result {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
send_window.fetch_sub(n as u32, Ordering::Release);
|
||||
let frame =
|
||||
encode_frame(stream_id, FRAME_DATA_BACK, &buf[..n]);
|
||||
if data_writer_tx.send(frame).await.is_err() {
|
||||
log::warn!("Stream {} data channel closed, closing", stream_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
_ = writer_token.cancelled() => break,
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
// Send final window update for remaining consumed bytes
|
||||
if consumed_since_update > 0 {
|
||||
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, consumed_since_update);
|
||||
let _ = wub_tx.try_send(frame);
|
||||
}
|
||||
let _ = up_write.shutdown().await;
|
||||
});
|
||||
_ = stream_token.cancelled() => break,
|
||||
}
|
||||
}
|
||||
|
||||
// Forward data from SmartProxy back to edge via writer channel
|
||||
// with per-stream flow control (check send_window before reading)
|
||||
let mut buf = vec![0u8; 32768];
|
||||
loop {
|
||||
// Wait for send window to have capacity (with stall timeout)
|
||||
loop {
|
||||
let w = send_window.load(Ordering::Acquire);
|
||||
if w > 0 { break; }
|
||||
tokio::select! {
|
||||
_ = window_notify.notified() => continue,
|
||||
_ = stream_token.cancelled() => break,
|
||||
_ = tokio::time::sleep(Duration::from_secs(120)) => {
|
||||
log::warn!("Stream {} download stalled (window empty for 120s)", stream_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if stream_token.is_cancelled() { break; }
|
||||
// Send CLOSE_BACK via DATA channel (must arrive AFTER last DATA_BACK).
|
||||
// Use send().await to guarantee delivery (try_send silently drops if full).
|
||||
if !stream_token.is_cancelled() {
|
||||
let close_frame = encode_frame(stream_id, FRAME_CLOSE_BACK, &[]);
|
||||
let _ = data_writer_tx.send(close_frame).await;
|
||||
}
|
||||
|
||||
// Limit read size to available window.
|
||||
// IMPORTANT: if window is 0 (stall timeout fired), we must NOT
|
||||
// read into an empty buffer — read(&mut buf[..0]) returns Ok(0)
|
||||
// which would be falsely interpreted as EOF.
|
||||
let w = send_window.load(Ordering::Acquire) as usize;
|
||||
if w == 0 {
|
||||
log::warn!("Stream {} download: window still 0 after stall timeout, closing", stream_id);
|
||||
break;
|
||||
}
|
||||
// Adaptive: cap read to current per-stream target window
|
||||
let adaptive_cap = remoteingress_protocol::compute_window_for_stream_count(
|
||||
stream_counter.load(Ordering::Relaxed),
|
||||
) as usize;
|
||||
let max_read = w.min(buf.len()).min(adaptive_cap);
|
||||
writer_for_edge_data.abort();
|
||||
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
|
||||
}
|
||||
.await;
|
||||
|
||||
tokio::select! {
|
||||
read_result = up_read.read(&mut buf[..max_read]) => {
|
||||
match read_result {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
send_window.fetch_sub(n as u32, Ordering::Release);
|
||||
let frame =
|
||||
encode_frame(stream_id, FRAME_DATA_BACK, &buf[..n]);
|
||||
if data_writer_tx.send(frame).await.is_err() {
|
||||
log::warn!("Stream {} data channel closed, closing", stream_id);
|
||||
if let Err(e) = result {
|
||||
log::error!("Stream {} error: {}", stream_id, e);
|
||||
// Send CLOSE_BACK via DATA channel on error (must arrive after any DATA_BACK).
|
||||
// Use send().await to guarantee delivery.
|
||||
if !stream_token.is_cancelled() {
|
||||
let close_frame = encode_frame(stream_id, FRAME_CLOSE_BACK, &[]);
|
||||
let _ = data_writer_tx.send(close_frame).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up stream (guard against duplicate if FRAME_CLOSE already removed it)
|
||||
let was_present = {
|
||||
let mut s = streams_clone.lock().await;
|
||||
s.remove(&stream_id).is_some()
|
||||
};
|
||||
if was_present {
|
||||
let _ = event_tx_clone.try_send(HubEvent::StreamClosed {
|
||||
edge_id: edge_id_clone,
|
||||
stream_id,
|
||||
});
|
||||
}
|
||||
stream_counter.fetch_sub(1, Ordering::Relaxed);
|
||||
});
|
||||
}
|
||||
FRAME_DATA => {
|
||||
// Non-blocking dispatch to per-stream channel.
|
||||
// With flow control, the sender should rarely exceed the channel capacity.
|
||||
let mut s = streams.lock().await;
|
||||
if let Some(state) = s.get(&frame.stream_id) {
|
||||
if state.data_tx.try_send(frame.payload).is_err() {
|
||||
log::warn!("Stream {} data channel full, closing stream", frame.stream_id);
|
||||
if let Some(state) = s.remove(&frame.stream_id) {
|
||||
state.cancel_token.cancel();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
FRAME_WINDOW_UPDATE => {
|
||||
// Edge consumed data — increase our send window for this stream
|
||||
if let Some(increment) = decode_window_update(&frame.payload) {
|
||||
if increment > 0 {
|
||||
let s = streams.lock().await;
|
||||
if let Some(state) = s.get(&frame.stream_id) {
|
||||
let prev = state.send_window.fetch_add(increment, Ordering::Release);
|
||||
if prev + increment > MAX_WINDOW_SIZE {
|
||||
state.send_window.store(MAX_WINDOW_SIZE, Ordering::Release);
|
||||
}
|
||||
state.window_notify.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
FRAME_CLOSE => {
|
||||
let mut s = streams.lock().await;
|
||||
if let Some(state) = s.remove(&frame.stream_id) {
|
||||
state.cancel_token.cancel();
|
||||
let _ = event_tx.try_send(HubEvent::StreamClosed {
|
||||
edge_id: edge_id.clone(),
|
||||
stream_id: frame.stream_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
FRAME_PONG => {
|
||||
log::debug!("Received PONG from edge {}", edge_id);
|
||||
}
|
||||
_ => {
|
||||
log::warn!("Unexpected frame type {} from edge", frame.frame_type);
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
log::error!("Edge {} frame error: {}", edge_id, e);
|
||||
disconnect_reason = format!("edge_frame_error: {}", e);
|
||||
break 'hub_loop;
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
// Poll I/O: write(ctrl->data), flush, read, channels, timers
|
||||
let event = std::future::poll_fn(|cx| {
|
||||
// Queue PING if ticker fires
|
||||
if ping_ticker.poll_tick(cx).is_ready() {
|
||||
tunnel_io.queue_ctrl(encode_frame(0, FRAME_PING, &[]));
|
||||
}
|
||||
tunnel_io.poll_step(cx, &mut ctrl_rx, &mut data_rx, &mut liveness_deadline, &edge_token)
|
||||
}).await;
|
||||
|
||||
match event {
|
||||
remoteingress_protocol::TunnelEvent::Frame(frame) => {
|
||||
// Reset liveness on any received frame
|
||||
last_activity = Instant::now();
|
||||
liveness_deadline.as_mut().reset(last_activity + liveness_timeout_dur);
|
||||
|
||||
match frame.frame_type {
|
||||
FRAME_OPEN => {
|
||||
// A4: Check stream limit before processing
|
||||
let permit = match stream_semaphore.clone().try_acquire_owned() {
|
||||
Ok(p) => p,
|
||||
Err(_) => {
|
||||
log::warn!("Edge {} exceeded max streams ({}), rejecting stream {}",
|
||||
edge_id, MAX_STREAMS_PER_EDGE, frame.stream_id);
|
||||
let close_frame = encode_frame(frame.stream_id, FRAME_CLOSE_BACK, &[]);
|
||||
tunnel_io.queue_ctrl(close_frame);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Payload is PROXY v1 header line
|
||||
let proxy_header = String::from_utf8_lossy(&frame.payload).to_string();
|
||||
|
||||
// Parse destination port from PROXY header
|
||||
let dest_port = parse_dest_port_from_proxy(&proxy_header).unwrap_or(443);
|
||||
|
||||
let stream_id = frame.stream_id;
|
||||
let edge_id_clone = edge_id.clone();
|
||||
let event_tx_clone = event_tx.clone();
|
||||
let streams_clone = streams.clone();
|
||||
let writer_tx = ctrl_tx.clone(); // control: CLOSE_BACK, WINDOW_UPDATE_BACK
|
||||
let data_writer_tx = data_tx.clone(); // data: DATA_BACK
|
||||
let target = target_host.clone();
|
||||
let stream_token = edge_token.child_token();
|
||||
|
||||
let _ = event_tx.try_send(HubEvent::StreamOpened {
|
||||
edge_id: edge_id.clone(),
|
||||
stream_id,
|
||||
});
|
||||
|
||||
// Create channel for data from edge to this stream (capacity 16 is sufficient with flow control)
|
||||
let (data_tx, mut data_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||
// Adaptive initial window: scale with current stream count
|
||||
// to keep total in-flight data within the 32MB budget.
|
||||
let initial_window = compute_window_for_stream_count(
|
||||
edge_stream_count.load(Ordering::Relaxed),
|
||||
);
|
||||
let send_window = Arc::new(AtomicU32::new(initial_window));
|
||||
let window_notify = Arc::new(Notify::new());
|
||||
{
|
||||
let mut s = streams.lock().await;
|
||||
s.insert(stream_id, HubStreamState {
|
||||
data_tx,
|
||||
cancel_token: stream_token.clone(),
|
||||
send_window: Arc::clone(&send_window),
|
||||
window_notify: Arc::clone(&window_notify),
|
||||
});
|
||||
}
|
||||
|
||||
// Spawn task: connect to SmartProxy, send PROXY header, pipe data
|
||||
let stream_counter = Arc::clone(&edge_stream_count);
|
||||
tokio::spawn(async move {
|
||||
let _permit = permit; // hold semaphore permit until stream completes
|
||||
stream_counter.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let result = async {
|
||||
// A2: Connect to SmartProxy with timeout
|
||||
let mut upstream = tokio::time::timeout(
|
||||
Duration::from_secs(10),
|
||||
TcpStream::connect((target.as_str(), dest_port)),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| -> Box<dyn std::error::Error + Send + Sync> {
|
||||
format!("connect to SmartProxy {}:{} timed out (10s)", target, dest_port).into()
|
||||
})??;
|
||||
|
||||
upstream.set_nodelay(true)?;
|
||||
upstream.write_all(proxy_header.as_bytes()).await?;
|
||||
|
||||
let (mut up_read, mut up_write) =
|
||||
upstream.into_split();
|
||||
|
||||
// Forward data from edge (via channel) to SmartProxy
|
||||
// After writing to upstream, send WINDOW_UPDATE_BACK to edge
|
||||
let writer_token = stream_token.clone();
|
||||
let wub_tx = writer_tx.clone();
|
||||
let stream_counter_w = Arc::clone(&stream_counter);
|
||||
let writer_for_edge_data = tokio::spawn(async move {
|
||||
let mut consumed_since_update: u32 = 0;
|
||||
loop {
|
||||
tokio::select! {
|
||||
data = data_rx.recv() => {
|
||||
match data {
|
||||
Some(data) => {
|
||||
let len = data.len() as u32;
|
||||
// Check cancellation alongside the write so we respond
|
||||
// promptly to FRAME_CLOSE instead of blocking up to 60s.
|
||||
let write_result = tokio::select! {
|
||||
r = tokio::time::timeout(
|
||||
Duration::from_secs(60),
|
||||
up_write.write_all(&data),
|
||||
) => r,
|
||||
_ = writer_token.cancelled() => break,
|
||||
};
|
||||
match write_result {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(_)) => break,
|
||||
Err(_) => {
|
||||
log::warn!("Stream {} write to upstream timed out (60s)", stream_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
// Track consumption for adaptive flow control.
|
||||
// Increment capped to adaptive window to limit per-stream in-flight data.
|
||||
consumed_since_update += len;
|
||||
let adaptive_window = remoteingress_protocol::compute_window_for_stream_count(
|
||||
stream_counter_w.load(Ordering::Relaxed),
|
||||
);
|
||||
let threshold = adaptive_window / 2;
|
||||
if consumed_since_update >= threshold {
|
||||
let increment = consumed_since_update.min(adaptive_window);
|
||||
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, increment);
|
||||
if wub_tx.try_send(frame).is_ok() {
|
||||
consumed_since_update -= increment;
|
||||
}
|
||||
// If try_send fails, keep accumulating — retry on next threshold
|
||||
}
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
_ = writer_token.cancelled() => break,
|
||||
}
|
||||
}
|
||||
// Send final window update for remaining consumed bytes
|
||||
if consumed_since_update > 0 {
|
||||
let frame = encode_window_update(stream_id, FRAME_WINDOW_UPDATE_BACK, consumed_since_update);
|
||||
let _ = wub_tx.try_send(frame);
|
||||
}
|
||||
let _ = up_write.shutdown().await;
|
||||
});
|
||||
|
||||
// Forward data from SmartProxy back to edge via writer channel
|
||||
// with per-stream flow control (check send_window before reading)
|
||||
let mut buf = vec![0u8; 32768];
|
||||
loop {
|
||||
// Wait for send window to have capacity (with stall timeout)
|
||||
loop {
|
||||
let w = send_window.load(Ordering::Acquire);
|
||||
if w > 0 { break; }
|
||||
tokio::select! {
|
||||
_ = window_notify.notified() => continue,
|
||||
_ = stream_token.cancelled() => break,
|
||||
_ = tokio::time::sleep(Duration::from_secs(120)) => {
|
||||
log::warn!("Stream {} download stalled (window empty for 120s)", stream_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if stream_token.is_cancelled() { break; }
|
||||
|
||||
// Limit read size to available window.
|
||||
// IMPORTANT: if window is 0 (stall timeout fired), we must NOT
|
||||
// read into an empty buffer — read(&mut buf[..0]) returns Ok(0)
|
||||
// which would be falsely interpreted as EOF.
|
||||
let w = send_window.load(Ordering::Acquire) as usize;
|
||||
if w == 0 {
|
||||
log::warn!("Stream {} download: window still 0 after stall timeout, closing", stream_id);
|
||||
break;
|
||||
}
|
||||
// Adaptive: cap read to current per-stream target window
|
||||
let adaptive_cap = remoteingress_protocol::compute_window_for_stream_count(
|
||||
stream_counter.load(Ordering::Relaxed),
|
||||
) as usize;
|
||||
let max_read = w.min(buf.len()).min(adaptive_cap);
|
||||
|
||||
tokio::select! {
|
||||
read_result = up_read.read(&mut buf[..max_read]) => {
|
||||
match read_result {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
send_window.fetch_sub(n as u32, Ordering::Release);
|
||||
let frame =
|
||||
encode_frame(stream_id, FRAME_DATA_BACK, &buf[..n]);
|
||||
if data_writer_tx.send(frame).await.is_err() {
|
||||
log::warn!("Stream {} data channel closed, closing", stream_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
_ = stream_token.cancelled() => break,
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
// Send CLOSE_BACK via DATA channel (must arrive AFTER last DATA_BACK).
|
||||
// Use send().await to guarantee delivery (try_send silently drops if full).
|
||||
if !stream_token.is_cancelled() {
|
||||
let close_frame = encode_frame(stream_id, FRAME_CLOSE_BACK, &[]);
|
||||
let _ = data_writer_tx.send(close_frame).await;
|
||||
}
|
||||
|
||||
writer_for_edge_data.abort();
|
||||
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
|
||||
_ = stream_token.cancelled() => break,
|
||||
}
|
||||
.await;
|
||||
}
|
||||
|
||||
if let Err(e) = result {
|
||||
log::error!("Stream {} error: {}", stream_id, e);
|
||||
// Send CLOSE_BACK via DATA channel on error (must arrive after any DATA_BACK).
|
||||
// Use send().await to guarantee delivery.
|
||||
if !stream_token.is_cancelled() {
|
||||
let close_frame = encode_frame(stream_id, FRAME_CLOSE_BACK, &[]);
|
||||
let _ = data_writer_tx.send(close_frame).await;
|
||||
}
|
||||
}
|
||||
// Send CLOSE_BACK via DATA channel (must arrive AFTER last DATA_BACK).
|
||||
// Use send().await to guarantee delivery (try_send silently drops if full).
|
||||
if !stream_token.is_cancelled() {
|
||||
let close_frame = encode_frame(stream_id, FRAME_CLOSE_BACK, &[]);
|
||||
let _ = data_writer_tx.send(close_frame).await;
|
||||
}
|
||||
|
||||
// Clean up stream (guard against duplicate if FRAME_CLOSE already removed it)
|
||||
let was_present = {
|
||||
let mut s = streams_clone.lock().await;
|
||||
s.remove(&stream_id).is_some()
|
||||
};
|
||||
if was_present {
|
||||
let _ = event_tx_clone.try_send(HubEvent::StreamClosed {
|
||||
edge_id: edge_id_clone,
|
||||
stream_id,
|
||||
});
|
||||
}
|
||||
stream_counter.fetch_sub(1, Ordering::Relaxed);
|
||||
writer_for_edge_data.abort();
|
||||
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
|
||||
}
|
||||
.await;
|
||||
|
||||
if let Err(e) = result {
|
||||
log::error!("Stream {} error: {}", stream_id, e);
|
||||
// Send CLOSE_BACK via DATA channel on error (must arrive after any DATA_BACK).
|
||||
// Use send().await to guarantee delivery.
|
||||
if !stream_token.is_cancelled() {
|
||||
let close_frame = encode_frame(stream_id, FRAME_CLOSE_BACK, &[]);
|
||||
let _ = data_writer_tx.send(close_frame).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up stream (guard against duplicate if FRAME_CLOSE already removed it)
|
||||
let was_present = {
|
||||
let mut s = streams_clone.lock().await;
|
||||
s.remove(&stream_id).is_some()
|
||||
};
|
||||
if was_present {
|
||||
let _ = event_tx_clone.try_send(HubEvent::StreamClosed {
|
||||
edge_id: edge_id_clone,
|
||||
stream_id,
|
||||
});
|
||||
}
|
||||
FRAME_DATA => {
|
||||
// Non-blocking dispatch to per-stream channel.
|
||||
// With flow control, the sender should rarely exceed the channel capacity.
|
||||
let mut s = streams.lock().await;
|
||||
if let Some(state) = s.get(&frame.stream_id) {
|
||||
if state.data_tx.try_send(frame.payload).is_err() {
|
||||
log::warn!("Stream {} data channel full, closing stream", frame.stream_id);
|
||||
if let Some(state) = s.remove(&frame.stream_id) {
|
||||
state.cancel_token.cancel();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
FRAME_WINDOW_UPDATE => {
|
||||
// Edge consumed data — increase our send window for this stream
|
||||
if let Some(increment) = decode_window_update(&frame.payload) {
|
||||
if increment > 0 {
|
||||
let s = streams.lock().await;
|
||||
if let Some(state) = s.get(&frame.stream_id) {
|
||||
let prev = state.send_window.fetch_add(increment, Ordering::Release);
|
||||
if prev + increment > MAX_WINDOW_SIZE {
|
||||
state.send_window.store(MAX_WINDOW_SIZE, Ordering::Release);
|
||||
}
|
||||
state.window_notify.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
FRAME_CLOSE => {
|
||||
let mut s = streams.lock().await;
|
||||
stream_counter.fetch_sub(1, Ordering::Relaxed);
|
||||
});
|
||||
}
|
||||
FRAME_DATA => {
|
||||
// Non-blocking dispatch to per-stream channel.
|
||||
// With flow control, the sender should rarely exceed the channel capacity.
|
||||
let mut s = streams.lock().await;
|
||||
if let Some(state) = s.get(&frame.stream_id) {
|
||||
if state.data_tx.try_send(frame.payload).is_err() {
|
||||
log::warn!("Stream {} data channel full, closing stream", frame.stream_id);
|
||||
if let Some(state) = s.remove(&frame.stream_id) {
|
||||
state.cancel_token.cancel();
|
||||
let _ = event_tx.try_send(HubEvent::StreamClosed {
|
||||
edge_id: edge_id.clone(),
|
||||
stream_id: frame.stream_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
FRAME_PONG => {
|
||||
log::debug!("Received PONG from edge {}", edge_id);
|
||||
}
|
||||
_ => {
|
||||
log::warn!("Unexpected frame type {} from edge", frame.frame_type);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
log::info!("Edge {} disconnected (EOF)", edge_id);
|
||||
disconnect_reason = "edge_eof".to_string();
|
||||
break;
|
||||
FRAME_WINDOW_UPDATE => {
|
||||
// Edge consumed data — increase our send window for this stream
|
||||
if let Some(increment) = decode_window_update(&frame.payload) {
|
||||
if increment > 0 {
|
||||
let s = streams.lock().await;
|
||||
if let Some(state) = s.get(&frame.stream_id) {
|
||||
let prev = state.send_window.fetch_add(increment, Ordering::Release);
|
||||
if prev + increment > MAX_WINDOW_SIZE {
|
||||
state.send_window.store(MAX_WINDOW_SIZE, Ordering::Release);
|
||||
}
|
||||
state.window_notify.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Edge {} frame error: {}", edge_id, e);
|
||||
disconnect_reason = format!("edge_frame_error: {}", e);
|
||||
break;
|
||||
FRAME_CLOSE => {
|
||||
let mut s = streams.lock().await;
|
||||
if let Some(state) = s.remove(&frame.stream_id) {
|
||||
state.cancel_token.cancel();
|
||||
let _ = event_tx.try_send(HubEvent::StreamClosed {
|
||||
edge_id: edge_id.clone(),
|
||||
stream_id: frame.stream_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
FRAME_PONG => {
|
||||
log::debug!("Received PONG from edge {}", edge_id);
|
||||
}
|
||||
_ => {
|
||||
log::warn!("Unexpected frame type {} from edge", frame.frame_type);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = ping_ticker.tick() => {
|
||||
let ping_frame = encode_frame(0, FRAME_PING, &[]);
|
||||
if frame_writer_tx.try_send(ping_frame).is_err() {
|
||||
// Control channel full — skip this PING cycle.
|
||||
// The 45s liveness timeout gives margin for the channel to drain.
|
||||
log::warn!("PING send to edge {} failed, control channel full — skipping", edge_id);
|
||||
}
|
||||
log::trace!("Sent PING to edge {}", edge_id);
|
||||
remoteingress_protocol::TunnelEvent::Eof => {
|
||||
log::info!("Edge {} disconnected (EOF)", edge_id);
|
||||
disconnect_reason = "edge_eof".to_string();
|
||||
break;
|
||||
}
|
||||
_ = &mut liveness_deadline => {
|
||||
remoteingress_protocol::TunnelEvent::ReadError(e) => {
|
||||
log::error!("Edge {} frame error: {}", edge_id, e);
|
||||
disconnect_reason = format!("edge_frame_error: {}", e);
|
||||
break;
|
||||
}
|
||||
remoteingress_protocol::TunnelEvent::WriteError(e) => {
|
||||
log::error!("Tunnel write error to edge {}: {}", edge_id, e);
|
||||
disconnect_reason = format!("tunnel_write_error: {}", e);
|
||||
break;
|
||||
}
|
||||
remoteingress_protocol::TunnelEvent::LivenessTimeout => {
|
||||
log::warn!("Edge {} liveness timeout (no frames for {}s), disconnecting",
|
||||
edge_id, liveness_timeout_dur.as_secs());
|
||||
disconnect_reason = "liveness_timeout".to_string();
|
||||
break;
|
||||
}
|
||||
_ = &mut writer_dead_rx => {
|
||||
log::error!("Tunnel writer to edge {} died, disconnecting immediately", edge_id);
|
||||
disconnect_reason = "writer_dead".to_string();
|
||||
break;
|
||||
}
|
||||
_ = edge_token.cancelled() => {
|
||||
remoteingress_protocol::TunnelEvent::Cancelled => {
|
||||
log::info!("Edge {} cancelled by hub", edge_id);
|
||||
disconnect_reason = "cancelled_by_hub".to_string();
|
||||
break;
|
||||
@@ -792,7 +1052,6 @@ async fn handle_edge_connection(
|
||||
// Cleanup: cancel edge token to propagate to all child tasks
|
||||
edge_token.cancel();
|
||||
config_handle.abort();
|
||||
writer_handle.abort();
|
||||
{
|
||||
let mut edges = connected.lock().await;
|
||||
edges.remove(&edge_id);
|
||||
|
||||
Reference in New Issue
Block a user