From bda82f32ca87e685df0aaf7039b2f0b9249d953d Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Thu, 26 Feb 2026 23:02:23 +0000 Subject: [PATCH] feat(core): expose edge peer address in hub events and migrate writers to channel-based, non-blocking framing with stream limits and timeouts --- changelog.md | 10 +++ rust/crates/remoteingress-bin/src/main.rs | 4 +- rust/crates/remoteingress-core/src/edge.rs | 61 +++++++++----- rust/crates/remoteingress-core/src/hub.rs | 98 +++++++++++++++++----- ts/00_commitinfo_data.ts | 2 +- ts/classes.remoteingresshub.ts | 3 +- 6 files changed, 133 insertions(+), 45 deletions(-) diff --git a/changelog.md b/changelog.md index 1477281..decafee 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,15 @@ # Changelog +## 2026-02-26 - 4.2.0 - feat(core) +expose edge peer address in hub events and migrate writers to channel-based, non-blocking framing with stream limits and timeouts + +- Add peerAddr to ConnectedEdgeStatus and HubEvent::EdgeConnected and surface it to the TS frontend event (management:edgeConnected). +- Replace Arc> writers with dedicated mpsc channel writer tasks in both hub and edge crates to serialize writes off the main tasks. +- Use non-blocking try_send for data frames to avoid head-of-line blocking and drop frames with warnings when channels are full. +- Introduce MAX_STREAMS_PER_EDGE semaphore to limit concurrent streams per edge and reject excess opens with a CLOSE_BACK frame. +- Add a 10s timeout when connecting to SmartProxy to avoid hanging connections. +- Ensure writer tasks are aborted on shutdown/cleanup and propagate cancellation tokens appropriately. + ## 2026-02-26 - 4.1.0 - feat(remoteingress-bin) use mimalloc as the global allocator to reduce memory overhead and improve allocation performance diff --git a/rust/crates/remoteingress-bin/src/main.rs b/rust/crates/remoteingress-bin/src/main.rs index 3c06683..815c6d2 100644 --- a/rust/crates/remoteingress-bin/src/main.rs +++ b/rust/crates/remoteingress-bin/src/main.rs @@ -167,10 +167,10 @@ async fn handle_request( tokio::spawn(async move { while let Some(event) = event_rx.recv().await { match &event { - HubEvent::EdgeConnected { edge_id } => { + HubEvent::EdgeConnected { edge_id, peer_addr } => { send_event( "edgeConnected", - serde_json::json!({ "edgeId": edge_id }), + serde_json::json!({ "edgeId": edge_id, "peerAddr": peer_addr }), ); } HubEvent::EdgeDisconnected { edge_id } => { diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index 19397d4..21e0fc9 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -346,15 +346,33 @@ async fn connect_to_hub_and_run( let client_writers: Arc>>>> = Arc::new(Mutex::new(HashMap::new())); - // Shared tunnel writer - let tunnel_writer = Arc::new(Mutex::new(write_half)); + // A5: Channel-based tunnel writer replaces Arc> + let (tunnel_writer_tx, mut tunnel_writer_rx) = mpsc::channel::>(4096); + let tw_token = connection_token.clone(); + let tunnel_writer_handle = tokio::spawn(async move { + loop { + tokio::select! { + data = tunnel_writer_rx.recv() => { + match data { + Some(frame_data) => { + if write_half.write_all(&frame_data).await.is_err() { + break; + } + } + None => break, + } + } + _ = tw_token.cancelled() => break, + } + } + }); // Start TCP listeners for initial ports (hot-reloadable) let mut port_listeners: HashMap> = HashMap::new(); apply_port_config( &handshake.listen_ports, &mut port_listeners, - &tunnel_writer, + &tunnel_writer_tx, &client_writers, active_streams, next_stream_id, @@ -371,9 +389,12 @@ async fn connect_to_hub_and_run( Ok(Some(frame)) => { match frame.frame_type { FRAME_DATA_BACK => { + // A1: Non-blocking send to prevent head-of-line blocking let writers = client_writers.lock().await; if let Some(tx) = writers.get(&frame.stream_id) { - let _ = tx.send(frame.payload).await; + if tx.try_send(frame.payload).is_err() { + log::warn!("Stream {} back-channel full, dropping frame", frame.stream_id); + } } } FRAME_CLOSE_BACK => { @@ -390,7 +411,7 @@ async fn connect_to_hub_and_run( apply_port_config( &update.listen_ports, &mut port_listeners, - &tunnel_writer, + &tunnel_writer_tx, &client_writers, active_streams, next_stream_id, @@ -427,6 +448,7 @@ async fn connect_to_hub_and_run( // Cancel connection token to propagate to all child tasks BEFORE aborting connection_token.cancel(); stun_handle.abort(); + tunnel_writer_handle.abort(); for (_, h) in port_listeners.drain() { h.abort(); } @@ -438,7 +460,7 @@ async fn connect_to_hub_and_run( fn apply_port_config( new_ports: &[u16], port_listeners: &mut HashMap>, - tunnel_writer: &Arc>>>, + tunnel_writer_tx: &mpsc::Sender>, client_writers: &Arc>>>>, active_streams: &Arc, next_stream_id: &Arc, @@ -458,7 +480,7 @@ fn apply_port_config( // Add new ports for &port in new_set.difference(&old_set) { - let tunnel_writer = tunnel_writer.clone(); + let tunnel_writer_tx = tunnel_writer_tx.clone(); let client_writers = client_writers.clone(); let active_streams = active_streams.clone(); let next_stream_id = next_stream_id.clone(); @@ -481,7 +503,7 @@ fn apply_port_config( match accept_result { Ok((client_stream, client_addr)) => { let stream_id = next_stream_id.fetch_add(1, Ordering::Relaxed); - let tunnel_writer = tunnel_writer.clone(); + let tunnel_writer_tx = tunnel_writer_tx.clone(); let client_writers = client_writers.clone(); let active_streams = active_streams.clone(); let edge_id = edge_id.clone(); @@ -496,7 +518,7 @@ fn apply_port_config( stream_id, port, &edge_id, - tunnel_writer, + tunnel_writer_tx, client_writers, client_token, ) @@ -526,7 +548,7 @@ async fn handle_client_connection( stream_id: u32, dest_port: u16, edge_id: &str, - tunnel_writer: Arc>>>, + tunnel_writer_tx: mpsc::Sender>, client_writers: Arc>>>>, client_token: CancellationToken, ) { @@ -536,14 +558,11 @@ async fn handle_client_connection( // Determine edge IP (use 0.0.0.0 as placeholder — hub doesn't use it for routing) let edge_ip = "0.0.0.0"; - // Send OPEN frame with PROXY v1 header + // Send OPEN frame with PROXY v1 header via writer channel let proxy_header = build_proxy_v1_header(&client_ip, edge_ip, client_port, dest_port); let open_frame = encode_frame(stream_id, FRAME_OPEN, proxy_header.as_bytes()); - { - let mut w = tunnel_writer.lock().await; - if w.write_all(&open_frame).await.is_err() { - return; - } + if tunnel_writer_tx.send(open_frame).await.is_err() { + return; } // Set up channel for data coming back from hub @@ -576,7 +595,7 @@ async fn handle_client_connection( let _ = client_write.shutdown().await; }); - // Task: client -> hub + // Task: client -> hub (via writer channel) let mut buf = vec![0u8; 32768]; loop { tokio::select! { @@ -585,8 +604,9 @@ async fn handle_client_connection( Ok(0) => break, Ok(n) => { let data_frame = encode_frame(stream_id, FRAME_DATA, &buf[..n]); - let mut w = tunnel_writer.lock().await; - if w.write_all(&data_frame).await.is_err() { + // A5: Use try_send to avoid blocking if writer channel is full + if tunnel_writer_tx.try_send(data_frame).is_err() { + log::warn!("Stream {} tunnel writer full, closing", stream_id); break; } } @@ -600,8 +620,7 @@ async fn handle_client_connection( // Send CLOSE frame (only if not cancelled) if !client_token.is_cancelled() { let close_frame = encode_frame(stream_id, FRAME_CLOSE, &[]); - let mut w = tunnel_writer.lock().await; - let _ = w.write_all(&close_frame).await; + let _ = tunnel_writer_tx.try_send(close_frame); } // Cleanup diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index ce09426..64367fd 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::{mpsc, Mutex, RwLock}; +use tokio::sync::{mpsc, Mutex, RwLock, Semaphore}; use tokio_rustls::TlsAcceptor; use tokio_util::sync::CancellationToken; use serde::{Deserialize, Serialize}; @@ -65,6 +65,7 @@ pub struct ConnectedEdgeStatus { pub edge_id: String, pub connected_at: u64, pub active_streams: usize, + pub peer_addr: String, } /// Events emitted by the hub. @@ -73,7 +74,7 @@ pub struct ConnectedEdgeStatus { #[serde(tag = "type")] pub enum HubEvent { #[serde(rename_all = "camelCase")] - EdgeConnected { edge_id: String }, + EdgeConnected { edge_id: String, peer_addr: String }, #[serde(rename_all = "camelCase")] EdgeDisconnected { edge_id: String }, #[serde(rename_all = "camelCase")] @@ -105,6 +106,7 @@ pub struct TunnelHub { struct ConnectedEdgeInfo { connected_at: u64, + peer_addr: String, active_streams: Arc>, CancellationToken)>>>, config_tx: mpsc::Sender, #[allow(dead_code)] // kept alive for Drop — cancels child tokens when edge is removed @@ -176,6 +178,7 @@ impl TunnelHub { edge_id: id.clone(), connected_at: info.connected_at, active_streams: streams.len(), + peer_addr: info.peer_addr.clone(), }); } @@ -218,9 +221,10 @@ impl TunnelHub { let event_tx = event_tx.clone(); let target = target_host.clone(); let edge_token = hub_token.child_token(); + let peer_addr = addr.ip().to_string(); tokio::spawn(async move { if let Err(e) = handle_edge_connection( - stream, acceptor, allowed, connected, event_tx, target, edge_token, + stream, acceptor, allowed, connected, event_tx, target, edge_token, peer_addr, ).await { log::error!("Edge connection error: {}", e); } @@ -264,6 +268,9 @@ impl Drop for TunnelHub { } } +/// Maximum concurrent streams per edge connection. +const MAX_STREAMS_PER_EDGE: usize = 1024; + /// Handle a single edge connection: authenticate, then enter frame loop. async fn handle_edge_connection( stream: TcpStream, @@ -273,6 +280,7 @@ async fn handle_edge_connection( event_tx: mpsc::Sender, target_host: String, edge_token: CancellationToken, + peer_addr: String, ) -> Result<(), Box> { let tls_stream = acceptor.accept(stream).await?; let (read_half, mut write_half) = tokio::io::split(tls_stream); @@ -307,9 +315,10 @@ async fn handle_edge_connection( } }; - log::info!("Edge {} authenticated", edge_id); + log::info!("Edge {} authenticated from {}", edge_id, peer_addr); let _ = event_tx.try_send(HubEvent::EdgeConnected { edge_id: edge_id.clone(), + peer_addr: peer_addr.clone(), }); // Send handshake response with initial config before frame protocol begins @@ -338,6 +347,7 @@ async fn handle_edge_connection( edge_id.clone(), ConnectedEdgeInfo { connected_at: now, + peer_addr, active_streams: streams.clone(), config_tx, cancel_token: edge_token.clone(), @@ -345,11 +355,30 @@ async fn handle_edge_connection( ); } - // Shared writer for sending frames back to edge - let write_half = Arc::new(Mutex::new(write_half)); + // A5: Channel-based writer replaces Arc> + // All frame writes go through this channel → dedicated writer task serializes them + let (frame_writer_tx, mut frame_writer_rx) = mpsc::channel::>(4096); + let writer_token = edge_token.clone(); + let writer_handle = tokio::spawn(async move { + loop { + tokio::select! { + data = frame_writer_rx.recv() => { + match data { + Some(frame_data) => { + if write_half.write_all(&frame_data).await.is_err() { + break; + } + } + None => break, + } + } + _ = writer_token.cancelled() => break, + } + } + }); // Spawn task to forward config updates as FRAME_CONFIG frames - let config_writer = write_half.clone(); + let config_writer_tx = frame_writer_tx.clone(); let config_edge_id = edge_id.clone(); let config_token = edge_token.clone(); let config_handle = tokio::spawn(async move { @@ -360,8 +389,7 @@ async fn handle_edge_connection( Some(update) => { if let Ok(payload) = serde_json::to_vec(&update) { let frame = encode_frame(0, FRAME_CONFIG, &payload); - let mut w = config_writer.lock().await; - if w.write_all(&frame).await.is_err() { + if config_writer_tx.send(frame).await.is_err() { log::error!("Failed to send config update to edge {}", config_edge_id); break; } @@ -376,6 +404,9 @@ async fn handle_edge_connection( } }); + // A4: Semaphore to limit concurrent streams per edge + let stream_semaphore = Arc::new(Semaphore::new(MAX_STREAMS_PER_EDGE)); + // Frame reading loop let mut frame_reader = FrameReader::new(buf_reader); @@ -386,6 +417,18 @@ async fn handle_edge_connection( Ok(Some(frame)) => { match frame.frame_type { FRAME_OPEN => { + // A4: Check stream limit before processing + let permit = match stream_semaphore.clone().try_acquire_owned() { + Ok(p) => p, + Err(_) => { + log::warn!("Edge {} exceeded max streams ({}), rejecting stream {}", + edge_id, MAX_STREAMS_PER_EDGE, frame.stream_id); + let close_frame = encode_frame(frame.stream_id, FRAME_CLOSE_BACK, &[]); + let _ = frame_writer_tx.try_send(close_frame); + continue; + } + }; + // Payload is PROXY v1 header line let proxy_header = String::from_utf8_lossy(&frame.payload).to_string(); @@ -396,7 +439,7 @@ async fn handle_edge_connection( let edge_id_clone = edge_id.clone(); let event_tx_clone = event_tx.clone(); let streams_clone = streams.clone(); - let writer_clone = write_half.clone(); + let writer_tx = frame_writer_tx.clone(); let target = target_host.clone(); let stream_token = edge_token.child_token(); @@ -414,9 +457,19 @@ async fn handle_edge_connection( // Spawn task: connect to SmartProxy, send PROXY header, pipe data tokio::spawn(async move { + let _permit = permit; // hold semaphore permit until stream completes + let result = async { - let mut upstream = - TcpStream::connect((target.as_str(), dest_port)).await?; + // A2: Connect to SmartProxy with timeout + let mut upstream = tokio::time::timeout( + std::time::Duration::from_secs(10), + TcpStream::connect((target.as_str(), dest_port)), + ) + .await + .map_err(|_| -> Box { + format!("connect to SmartProxy {}:{} timed out (10s)", target, dest_port).into() + })??; + upstream.write_all(proxy_header.as_bytes()).await?; let (mut up_read, mut up_write) = @@ -443,7 +496,7 @@ async fn handle_edge_connection( let _ = up_write.shutdown().await; }); - // Forward data from SmartProxy back to edge + // Forward data from SmartProxy back to edge via writer channel let mut buf = vec![0u8; 32768]; loop { tokio::select! { @@ -453,8 +506,9 @@ async fn handle_edge_connection( Ok(n) => { let frame = encode_frame(stream_id, FRAME_DATA_BACK, &buf[..n]); - let mut w = writer_clone.lock().await; - if w.write_all(&frame).await.is_err() { + // A5: Use try_send to avoid blocking if writer channel is full + if writer_tx.try_send(frame).is_err() { + log::warn!("Stream {} writer channel full, closing", stream_id); break; } } @@ -468,8 +522,7 @@ async fn handle_edge_connection( // Send CLOSE_BACK to edge (only if not cancelled) if !stream_token.is_cancelled() { let close_frame = encode_frame(stream_id, FRAME_CLOSE_BACK, &[]); - let mut w = writer_clone.lock().await; - let _ = w.write_all(&close_frame).await; + let _ = writer_tx.try_send(close_frame); } writer_for_edge_data.abort(); @@ -482,8 +535,7 @@ async fn handle_edge_connection( // Send CLOSE_BACK on error (only if not cancelled) if !stream_token.is_cancelled() { let close_frame = encode_frame(stream_id, FRAME_CLOSE_BACK, &[]); - let mut w = writer_clone.lock().await; - let _ = w.write_all(&close_frame).await; + let _ = writer_tx.try_send(close_frame); } } @@ -501,9 +553,12 @@ async fn handle_edge_connection( }); } FRAME_DATA => { + // A1: Non-blocking send to prevent head-of-line blocking let s = streams.lock().await; if let Some((tx, _)) = s.get(&frame.stream_id) { - let _ = tx.send(frame.payload).await; + if tx.try_send(frame.payload).is_err() { + log::warn!("Stream {} data channel full, dropping frame", frame.stream_id); + } } } FRAME_CLOSE => { @@ -541,6 +596,7 @@ async fn handle_edge_connection( // Cleanup: cancel edge token to propagate to all child tasks edge_token.cancel(); config_handle.abort(); + writer_handle.abort(); { let mut edges = connected.lock().await; edges.remove(&edge_id); @@ -757,10 +813,12 @@ mod tests { fn test_hub_event_edge_connected_serialize() { let event = HubEvent::EdgeConnected { edge_id: "edge-1".to_string(), + peer_addr: "203.0.113.5".to_string(), }; let json = serde_json::to_value(&event).unwrap(); assert_eq!(json["type"], "edgeConnected"); assert_eq!(json["edgeId"], "edge-1"); + assert_eq!(json["peerAddr"], "203.0.113.5"); } #[test] diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 9c9bc67..9b9bd34 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.1.0', + version: '4.2.0', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' } diff --git a/ts/classes.remoteingresshub.ts b/ts/classes.remoteingresshub.ts index b4d6a02..23e5963 100644 --- a/ts/classes.remoteingresshub.ts +++ b/ts/classes.remoteingresshub.ts @@ -33,6 +33,7 @@ type THubCommands = { edgeId: string; connectedAt: number; activeStreams: number; + peerAddr: string; }>; }; }; @@ -73,7 +74,7 @@ export class RemoteIngressHub extends EventEmitter { }); // Forward events from Rust binary - this.bridge.on('management:edgeConnected', (data: { edgeId: string }) => { + this.bridge.on('management:edgeConnected', (data: { edgeId: string; peerAddr: string }) => { this.emit('edgeConnected', data); }); this.bridge.on('management:edgeDisconnected', (data: { edgeId: string }) => {