feat(core): expose edge peer address in hub events and migrate writers to channel-based, non-blocking framing with stream limits and timeouts

This commit is contained in:
2026-02-26 23:02:23 +00:00
parent 4b06cb1b24
commit bda82f32ca
6 changed files with 133 additions and 45 deletions

View File

@@ -346,15 +346,33 @@ async fn connect_to_hub_and_run(
let client_writers: Arc<Mutex<HashMap<u32, mpsc::Sender<Vec<u8>>>>> =
Arc::new(Mutex::new(HashMap::new()));
// Shared tunnel writer
let tunnel_writer = Arc::new(Mutex::new(write_half));
// A5: Channel-based tunnel writer replaces Arc<Mutex<WriteHalf>>
let (tunnel_writer_tx, mut tunnel_writer_rx) = mpsc::channel::<Vec<u8>>(4096);
let tw_token = connection_token.clone();
let tunnel_writer_handle = tokio::spawn(async move {
loop {
tokio::select! {
data = tunnel_writer_rx.recv() => {
match data {
Some(frame_data) => {
if write_half.write_all(&frame_data).await.is_err() {
break;
}
}
None => break,
}
}
_ = tw_token.cancelled() => break,
}
}
});
// Start TCP listeners for initial ports (hot-reloadable)
let mut port_listeners: HashMap<u16, JoinHandle<()>> = HashMap::new();
apply_port_config(
&handshake.listen_ports,
&mut port_listeners,
&tunnel_writer,
&tunnel_writer_tx,
&client_writers,
active_streams,
next_stream_id,
@@ -371,9 +389,12 @@ async fn connect_to_hub_and_run(
Ok(Some(frame)) => {
match frame.frame_type {
FRAME_DATA_BACK => {
// A1: Non-blocking send to prevent head-of-line blocking
let writers = client_writers.lock().await;
if let Some(tx) = writers.get(&frame.stream_id) {
let _ = tx.send(frame.payload).await;
if tx.try_send(frame.payload).is_err() {
log::warn!("Stream {} back-channel full, dropping frame", frame.stream_id);
}
}
}
FRAME_CLOSE_BACK => {
@@ -390,7 +411,7 @@ async fn connect_to_hub_and_run(
apply_port_config(
&update.listen_ports,
&mut port_listeners,
&tunnel_writer,
&tunnel_writer_tx,
&client_writers,
active_streams,
next_stream_id,
@@ -427,6 +448,7 @@ async fn connect_to_hub_and_run(
// Cancel connection token to propagate to all child tasks BEFORE aborting
connection_token.cancel();
stun_handle.abort();
tunnel_writer_handle.abort();
for (_, h) in port_listeners.drain() {
h.abort();
}
@@ -438,7 +460,7 @@ async fn connect_to_hub_and_run(
fn apply_port_config(
new_ports: &[u16],
port_listeners: &mut HashMap<u16, JoinHandle<()>>,
tunnel_writer: &Arc<Mutex<tokio::io::WriteHalf<tokio_rustls::client::TlsStream<TcpStream>>>>,
tunnel_writer_tx: &mpsc::Sender<Vec<u8>>,
client_writers: &Arc<Mutex<HashMap<u32, mpsc::Sender<Vec<u8>>>>>,
active_streams: &Arc<AtomicU32>,
next_stream_id: &Arc<AtomicU32>,
@@ -458,7 +480,7 @@ fn apply_port_config(
// Add new ports
for &port in new_set.difference(&old_set) {
let tunnel_writer = tunnel_writer.clone();
let tunnel_writer_tx = tunnel_writer_tx.clone();
let client_writers = client_writers.clone();
let active_streams = active_streams.clone();
let next_stream_id = next_stream_id.clone();
@@ -481,7 +503,7 @@ fn apply_port_config(
match accept_result {
Ok((client_stream, client_addr)) => {
let stream_id = next_stream_id.fetch_add(1, Ordering::Relaxed);
let tunnel_writer = tunnel_writer.clone();
let tunnel_writer_tx = tunnel_writer_tx.clone();
let client_writers = client_writers.clone();
let active_streams = active_streams.clone();
let edge_id = edge_id.clone();
@@ -496,7 +518,7 @@ fn apply_port_config(
stream_id,
port,
&edge_id,
tunnel_writer,
tunnel_writer_tx,
client_writers,
client_token,
)
@@ -526,7 +548,7 @@ async fn handle_client_connection(
stream_id: u32,
dest_port: u16,
edge_id: &str,
tunnel_writer: Arc<Mutex<tokio::io::WriteHalf<tokio_rustls::client::TlsStream<TcpStream>>>>,
tunnel_writer_tx: mpsc::Sender<Vec<u8>>,
client_writers: Arc<Mutex<HashMap<u32, mpsc::Sender<Vec<u8>>>>>,
client_token: CancellationToken,
) {
@@ -536,14 +558,11 @@ async fn handle_client_connection(
// Determine edge IP (use 0.0.0.0 as placeholder — hub doesn't use it for routing)
let edge_ip = "0.0.0.0";
// Send OPEN frame with PROXY v1 header
// Send OPEN frame with PROXY v1 header via writer channel
let proxy_header = build_proxy_v1_header(&client_ip, edge_ip, client_port, dest_port);
let open_frame = encode_frame(stream_id, FRAME_OPEN, proxy_header.as_bytes());
{
let mut w = tunnel_writer.lock().await;
if w.write_all(&open_frame).await.is_err() {
return;
}
if tunnel_writer_tx.send(open_frame).await.is_err() {
return;
}
// Set up channel for data coming back from hub
@@ -576,7 +595,7 @@ async fn handle_client_connection(
let _ = client_write.shutdown().await;
});
// Task: client -> hub
// Task: client -> hub (via writer channel)
let mut buf = vec![0u8; 32768];
loop {
tokio::select! {
@@ -585,8 +604,9 @@ async fn handle_client_connection(
Ok(0) => break,
Ok(n) => {
let data_frame = encode_frame(stream_id, FRAME_DATA, &buf[..n]);
let mut w = tunnel_writer.lock().await;
if w.write_all(&data_frame).await.is_err() {
// A5: Use try_send to avoid blocking if writer channel is full
if tunnel_writer_tx.try_send(data_frame).is_err() {
log::warn!("Stream {} tunnel writer full, closing", stream_id);
break;
}
}
@@ -600,8 +620,7 @@ async fn handle_client_connection(
// Send CLOSE frame (only if not cancelled)
if !client_token.is_cancelled() {
let close_frame = encode_frame(stream_id, FRAME_CLOSE, &[]);
let mut w = tunnel_writer.lock().await;
let _ = w.write_all(&close_frame).await;
let _ = tunnel_writer_tx.try_send(close_frame);
}
// Cleanup