From d869589663d91548072d6b3daf3294f3dac0625b Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Wed, 18 Feb 2026 18:20:53 +0000 Subject: [PATCH] feat(remoteingress (edge/hub/protocol)): add dynamic port configuration: handshake, FRAME_CONFIG frames, and hot-reloadable listeners --- changelog.md | 10 + rust/crates/remoteingress-bin/src/main.rs | 12 + rust/crates/remoteingress-core/src/edge.rs | 234 +++++++++++++----- rust/crates/remoteingress-core/src/hub.rs | 96 ++++++- rust/crates/remoteingress-protocol/src/lib.rs | 1 + ts/00_commitinfo_data.ts | 2 +- ts/classes.remoteingressedge.ts | 27 ++ ts/classes.remoteingresshub.ts | 4 +- 8 files changed, 314 insertions(+), 72 deletions(-) diff --git a/changelog.md b/changelog.md index dec0000..b349f73 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,15 @@ # Changelog +## 2026-02-18 - 3.2.0 - feat(remoteingress (edge/hub/protocol)) +add dynamic port configuration: handshake, FRAME_CONFIG frames, and hot-reloadable listeners + +- Introduce a JSON handshake from hub -> edge with initial listen ports and stun interval so edges can configure listeners at connect time. +- Add FRAME_CONFIG (0x06) to the protocol and implement runtime config updates pushed from hub to connected edges. +- Edge now applies initial ports and supports hot-reloading: spawn/abort listeners when ports change, and emit PortsAssigned / PortsUpdated events. +- Hub now stores allowed edge metadata (listen_ports, stun_interval_secs), sends handshake responses on auth, and forwards config updates to connected edges. +- TypeScript bridge/client updated to emit new port events and periodically log status; updateAllowedEdges API accepts listenPorts and stunIntervalSecs. +- Stun interval handling moved to use handshake-provided/stored value instead of config.listen_ports being static. + ## 2026-02-18 - 3.1.1 - fix(readme) update README: add issue reporting/security section, document connection tokens and token utilities, clarify architecture/API and improve examples/formatting diff --git a/rust/crates/remoteingress-bin/src/main.rs b/rust/crates/remoteingress-bin/src/main.rs index e2327b4..ebf73c3 100644 --- a/rust/crates/remoteingress-bin/src/main.rs +++ b/rust/crates/remoteingress-bin/src/main.rs @@ -301,6 +301,18 @@ async fn handle_request( serde_json::json!({ "ip": ip }), ); } + EdgeEvent::PortsAssigned { listen_ports } => { + send_event( + "portsAssigned", + serde_json::json!({ "listenPorts": listen_ports }), + ); + } + EdgeEvent::PortsUpdated { listen_ports } => { + send_event( + "portsUpdated", + serde_json::json!({ "listenPorts": listen_ports }), + ); + } } } }); diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index 1cf21b0..fae83b0 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -1,15 +1,16 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{mpsc, Mutex, RwLock}; +use tokio::task::JoinHandle; use tokio_rustls::TlsConnector; use serde::{Deserialize, Serialize}; use remoteingress_protocol::*; -/// Edge configuration. +/// Edge configuration (hub-host + credentials only; ports come from hub). #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct EdgeConfig { @@ -17,8 +18,26 @@ pub struct EdgeConfig { pub hub_port: u16, pub edge_id: String, pub secret: String, - pub listen_ports: Vec, - pub stun_interval_secs: Option, +} + +/// Handshake config received from hub after authentication. +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct HandshakeConfig { + listen_ports: Vec, + #[serde(default = "default_stun_interval")] + stun_interval_secs: u64, +} + +fn default_stun_interval() -> u64 { + 300 +} + +/// Runtime config update received from hub via FRAME_CONFIG. +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct ConfigUpdate { + listen_ports: Vec, } /// Events emitted by the edge. @@ -30,6 +49,10 @@ pub enum EdgeEvent { TunnelDisconnected, #[serde(rename_all = "camelCase")] PublicIpDiscovered { ip: String }, + #[serde(rename_all = "camelCase")] + PortsAssigned { listen_ports: Vec }, + #[serde(rename_all = "camelCase")] + PortsUpdated { listen_ports: Vec }, } /// Edge status response. @@ -54,6 +77,7 @@ pub struct TunnelEdge { public_ip: Arc>>, active_streams: Arc, next_stream_id: Arc, + listen_ports: Arc>>, } impl TunnelEdge { @@ -69,6 +93,7 @@ impl TunnelEdge { public_ip: Arc::new(RwLock::new(None)), active_streams: Arc::new(AtomicU32::new(0)), next_stream_id: Arc::new(AtomicU32::new(1)), + listen_ports: Arc::new(RwLock::new(Vec::new())), } } @@ -84,7 +109,7 @@ impl TunnelEdge { connected: *self.connected.read().await, public_ip: self.public_ip.read().await.clone(), active_streams: self.active_streams.load(Ordering::Relaxed) as usize, - listen_ports: self.config.read().await.listen_ports.clone(), + listen_ports: self.listen_ports.read().await.clone(), } } @@ -100,6 +125,7 @@ impl TunnelEdge { let active_streams = self.active_streams.clone(); let next_stream_id = self.next_stream_id.clone(); let event_tx = self.event_tx.clone(); + let listen_ports = self.listen_ports.clone(); tokio::spawn(async move { edge_main_loop( @@ -109,6 +135,7 @@ impl TunnelEdge { active_streams, next_stream_id, event_tx, + listen_ports, shutdown_rx, ) .await; @@ -124,6 +151,7 @@ impl TunnelEdge { } *self.running.write().await = false; *self.connected.write().await = false; + self.listen_ports.write().await.clear(); } } @@ -134,6 +162,7 @@ async fn edge_main_loop( active_streams: Arc, next_stream_id: Arc, event_tx: mpsc::UnboundedSender, + listen_ports: Arc>>, mut shutdown_rx: mpsc::Receiver<()>, ) { let mut backoff_ms: u64 = 1000; @@ -148,6 +177,7 @@ async fn edge_main_loop( &active_streams, &next_stream_id, &event_tx, + &listen_ports, &mut shutdown_rx, ) .await; @@ -155,6 +185,7 @@ async fn edge_main_loop( *connected.write().await = false; let _ = event_tx.send(EdgeEvent::TunnelDisconnected); active_streams.store(0, Ordering::Relaxed); + listen_ports.write().await.clear(); match result { EdgeLoopResult::Shutdown => break, @@ -182,6 +213,7 @@ async fn connect_to_hub_and_run( active_streams: &Arc, next_stream_id: &Arc, event_tx: &mpsc::UnboundedSender, + listen_ports: &Arc>>, shutdown_rx: &mut mpsc::Receiver<()>, ) -> EdgeLoopResult { // Build TLS connector that skips cert verification (auth is via secret) @@ -220,12 +252,47 @@ async fn connect_to_hub_and_run( return EdgeLoopResult::Reconnect; } + // Read handshake response line from hub (JSON with initial config) + let mut buf_reader = BufReader::new(read_half); + let mut handshake_line = String::new(); + match buf_reader.read_line(&mut handshake_line).await { + Ok(0) => { + log::error!("Hub rejected connection (EOF before handshake)"); + return EdgeLoopResult::Reconnect; + } + Ok(_) => {} + Err(e) => { + log::error!("Failed to read handshake response: {}", e); + return EdgeLoopResult::Reconnect; + } + } + + let handshake: HandshakeConfig = match serde_json::from_str(handshake_line.trim()) { + Ok(h) => h, + Err(e) => { + log::error!("Invalid handshake response: {}", e); + return EdgeLoopResult::Reconnect; + } + }; + + log::info!( + "Handshake from hub: ports {:?}, stun_interval {}s", + handshake.listen_ports, + handshake.stun_interval_secs + ); + *connected.write().await = true; let _ = event_tx.send(EdgeEvent::TunnelConnected); log::info!("Connected to hub at {}", addr); + // Store initial ports and emit event + *listen_ports.write().await = handshake.listen_ports.clone(); + let _ = event_tx.send(EdgeEvent::PortsAssigned { + listen_ports: handshake.listen_ports.clone(), + }); + // Start STUN discovery - let stun_interval = config.stun_interval_secs.unwrap_or(300); + let stun_interval = handshake.stun_interval_secs; let public_ip_clone = public_ip.clone(); let event_tx_clone = event_tx.clone(); let stun_handle = tokio::spawn(async move { @@ -249,14 +316,112 @@ async fn connect_to_hub_and_run( // Shared tunnel writer let tunnel_writer = Arc::new(Mutex::new(write_half)); - // Start TCP listeners for each port - let mut listener_handles = Vec::new(); - for &port in &config.listen_ports { + // Start TCP listeners for initial ports (hot-reloadable) + let mut port_listeners: HashMap> = HashMap::new(); + apply_port_config( + &handshake.listen_ports, + &mut port_listeners, + &tunnel_writer, + &client_writers, + active_streams, + next_stream_id, + &config.edge_id, + ); + + // Read frames from hub + let mut frame_reader = FrameReader::new(buf_reader); + let result = loop { + tokio::select! { + frame_result = frame_reader.next_frame() => { + match frame_result { + Ok(Some(frame)) => { + match frame.frame_type { + FRAME_DATA_BACK => { + let writers = client_writers.lock().await; + if let Some(tx) = writers.get(&frame.stream_id) { + let _ = tx.send(frame.payload).await; + } + } + FRAME_CLOSE_BACK => { + let mut writers = client_writers.lock().await; + writers.remove(&frame.stream_id); + } + FRAME_CONFIG => { + if let Ok(update) = serde_json::from_slice::(&frame.payload) { + log::info!("Config update from hub: ports {:?}", update.listen_ports); + *listen_ports.write().await = update.listen_ports.clone(); + let _ = event_tx.send(EdgeEvent::PortsUpdated { + listen_ports: update.listen_ports.clone(), + }); + apply_port_config( + &update.listen_ports, + &mut port_listeners, + &tunnel_writer, + &client_writers, + active_streams, + next_stream_id, + &config.edge_id, + ); + } + } + _ => { + log::warn!("Unexpected frame type {} from hub", frame.frame_type); + } + } + } + Ok(None) => { + log::info!("Hub disconnected (EOF)"); + break EdgeLoopResult::Reconnect; + } + Err(e) => { + log::error!("Hub frame error: {}", e); + break EdgeLoopResult::Reconnect; + } + } + } + _ = shutdown_rx.recv() => { + break EdgeLoopResult::Shutdown; + } + } + }; + + // Cleanup + stun_handle.abort(); + for (_, h) in port_listeners.drain() { + h.abort(); + } + + result +} + +/// Apply a new port configuration: spawn listeners for added ports, abort removed ports. +fn apply_port_config( + new_ports: &[u16], + port_listeners: &mut HashMap>, + tunnel_writer: &Arc>>>, + client_writers: &Arc>>>>, + active_streams: &Arc, + next_stream_id: &Arc, + edge_id: &str, +) { + let new_set: std::collections::HashSet = new_ports.iter().copied().collect(); + let old_set: std::collections::HashSet = port_listeners.keys().copied().collect(); + + // Remove ports no longer needed + for &port in old_set.difference(&new_set) { + if let Some(handle) = port_listeners.remove(&port) { + log::info!("Stopping listener on port {}", port); + handle.abort(); + } + } + + // Add new ports + for &port in new_set.difference(&old_set) { let tunnel_writer = tunnel_writer.clone(); let client_writers = client_writers.clone(); let active_streams = active_streams.clone(); let next_stream_id = next_stream_id.clone(); - let edge_id = config.edge_id.clone(); + let edge_id = edge_id.to_string(); let handle = tokio::spawn(async move { let listener = match TcpListener::bind(("0.0.0.0", port)).await { @@ -299,55 +464,8 @@ async fn connect_to_hub_and_run( } } }); - listener_handles.push(handle); + port_listeners.insert(port, handle); } - - // Read frames from hub - let mut frame_reader = FrameReader::new(read_half); - let result = loop { - tokio::select! { - frame_result = frame_reader.next_frame() => { - match frame_result { - Ok(Some(frame)) => { - match frame.frame_type { - FRAME_DATA_BACK => { - let writers = client_writers.lock().await; - if let Some(tx) = writers.get(&frame.stream_id) { - let _ = tx.send(frame.payload).await; - } - } - FRAME_CLOSE_BACK => { - let mut writers = client_writers.lock().await; - writers.remove(&frame.stream_id); - } - _ => { - log::warn!("Unexpected frame type {} from hub", frame.frame_type); - } - } - } - Ok(None) => { - log::info!("Hub disconnected (EOF)"); - break EdgeLoopResult::Reconnect; - } - Err(e) => { - log::error!("Hub frame error: {}", e); - break EdgeLoopResult::Reconnect; - } - } - } - _ = shutdown_rx.recv() => { - break EdgeLoopResult::Shutdown; - } - } - }; - - // Cleanup - stun_handle.abort(); - for h in listener_handles { - h.abort(); - } - - result } async fn handle_client_connection( diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index c154b5d..bb375a2 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -37,6 +37,24 @@ impl Default for HubConfig { pub struct AllowedEdge { pub id: String, pub secret: String, + #[serde(default)] + pub listen_ports: Vec, + pub stun_interval_secs: Option, +} + +/// Handshake response sent to edge after authentication. +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct HandshakeResponse { + listen_ports: Vec, + stun_interval_secs: u64, +} + +/// Configuration update pushed to a connected edge at runtime. +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct EdgeConfigUpdate { + pub listen_ports: Vec, } /// Runtime status of a connected edge. @@ -75,7 +93,7 @@ pub struct HubStatus { /// The tunnel hub that accepts edge connections and demuxes streams to SmartProxy. pub struct TunnelHub { config: RwLock, - allowed_edges: Arc>>, // id -> secret + allowed_edges: Arc>>, connected_edges: Arc>>, event_tx: mpsc::UnboundedSender, event_rx: Mutex>>, @@ -86,6 +104,7 @@ pub struct TunnelHub { struct ConnectedEdgeInfo { connected_at: u64, active_streams: Arc>>>>, + config_tx: mpsc::Sender, } impl TunnelHub { @@ -108,12 +127,35 @@ impl TunnelHub { } /// Update the list of allowed edges. + /// For any currently-connected edge whose ports changed, push a config update. pub async fn update_allowed_edges(&self, edges: Vec) { let mut map = self.allowed_edges.write().await; - map.clear(); - for edge in edges { - map.insert(edge.id, edge.secret); + + // Build new map + let mut new_map = HashMap::new(); + for edge in &edges { + new_map.insert(edge.id.clone(), edge.clone()); } + + // Push config updates to connected edges whose ports changed + let connected = self.connected_edges.lock().await; + for edge in &edges { + if let Some(info) = connected.get(&edge.id) { + // Check if ports changed compared to old config + let ports_changed = match map.get(&edge.id) { + Some(old) => old.listen_ports != edge.listen_ports, + None => true, // newly allowed edge that's already connected + }; + if ports_changed { + let update = EdgeConfigUpdate { + listen_ports: edge.listen_ports.clone(), + }; + let _ = info.config_tx.try_send(update); + } + } + } + + *map = new_map; } /// Get the current hub status. @@ -208,13 +250,13 @@ impl TunnelHub { async fn handle_edge_connection( stream: TcpStream, acceptor: TlsAcceptor, - allowed: Arc>>, + allowed: Arc>>, connected: Arc>>, event_tx: mpsc::UnboundedSender, target_host: String, ) -> Result<(), Box> { let tls_stream = acceptor.accept(stream).await?; - let (read_half, write_half) = tokio::io::split(tls_stream); + let (read_half, mut write_half) = tokio::io::split(tls_stream); let mut buf_reader = BufReader::new(read_half); // Read auth line: "EDGE \n" @@ -230,26 +272,36 @@ async fn handle_edge_connection( let edge_id = parts[1].to_string(); let secret = parts[2]; - // Verify credentials - { + // Verify credentials and extract edge config + let (listen_ports, stun_interval_secs) = { let edges = allowed.read().await; match edges.get(&edge_id) { - Some(expected) => { - if !constant_time_eq(secret.as_bytes(), expected.as_bytes()) { + Some(edge) => { + if !constant_time_eq(secret.as_bytes(), edge.secret.as_bytes()) { return Err(format!("invalid secret for edge {}", edge_id).into()); } + (edge.listen_ports.clone(), edge.stun_interval_secs.unwrap_or(300)) } None => { return Err(format!("unknown edge {}", edge_id).into()); } } - } + }; log::info!("Edge {} authenticated", edge_id); let _ = event_tx.send(HubEvent::EdgeConnected { edge_id: edge_id.clone(), }); + // Send handshake response with initial config before frame protocol begins + let handshake = HandshakeResponse { + listen_ports: listen_ports.clone(), + stun_interval_secs, + }; + let mut handshake_json = serde_json::to_string(&handshake)?; + handshake_json.push('\n'); + write_half.write_all(handshake_json.as_bytes()).await?; + // Track this edge let streams: Arc>>>> = Arc::new(Mutex::new(HashMap::new())); @@ -258,6 +310,9 @@ async fn handle_edge_connection( .unwrap_or_default() .as_secs(); + // Create config update channel + let (config_tx, mut config_rx) = mpsc::channel::(16); + { let mut edges = connected.lock().await; edges.insert( @@ -265,6 +320,7 @@ async fn handle_edge_connection( ConnectedEdgeInfo { connected_at: now, active_streams: streams.clone(), + config_tx, }, ); } @@ -272,6 +328,23 @@ async fn handle_edge_connection( // Shared writer for sending frames back to edge let write_half = Arc::new(Mutex::new(write_half)); + // Spawn task to forward config updates as FRAME_CONFIG frames + let config_writer = write_half.clone(); + let config_edge_id = edge_id.clone(); + let config_handle = tokio::spawn(async move { + while let Some(update) = config_rx.recv().await { + if let Ok(payload) = serde_json::to_vec(&update) { + let frame = encode_frame(0, FRAME_CONFIG, &payload); + let mut w = config_writer.lock().await; + if w.write_all(&frame).await.is_err() { + log::error!("Failed to send config update to edge {}", config_edge_id); + break; + } + log::info!("Sent config update to edge {}: ports {:?}", config_edge_id, update.listen_ports); + } + } + }); + // Frame reading loop let mut frame_reader = FrameReader::new(buf_reader); @@ -398,6 +471,7 @@ async fn handle_edge_connection( } // Cleanup + config_handle.abort(); { let mut edges = connected.lock().await; edges.remove(&edge_id); diff --git a/rust/crates/remoteingress-protocol/src/lib.rs b/rust/crates/remoteingress-protocol/src/lib.rs index 3d99a67..c0bad16 100644 --- a/rust/crates/remoteingress-protocol/src/lib.rs +++ b/rust/crates/remoteingress-protocol/src/lib.rs @@ -6,6 +6,7 @@ pub const FRAME_DATA: u8 = 0x02; pub const FRAME_CLOSE: u8 = 0x03; pub const FRAME_DATA_BACK: u8 = 0x04; pub const FRAME_CLOSE_BACK: u8 = 0x05; +pub const FRAME_CONFIG: u8 = 0x06; // Hub -> Edge: configuration update // Frame header size: 4 (stream_id) + 1 (type) + 4 (length) = 9 bytes pub const FRAME_HEADER_SIZE: usize = 9; diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 58b7dde..f6fa0ea 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '3.1.1', + version: '3.2.0', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' } diff --git a/ts/classes.remoteingressedge.ts b/ts/classes.remoteingressedge.ts index 94498c3..9c4c0f6 100644 --- a/ts/classes.remoteingressedge.ts +++ b/ts/classes.remoteingressedge.ts @@ -43,6 +43,7 @@ export interface IEdgeConfig { export class RemoteIngressEdge extends EventEmitter { private bridge: InstanceType>; private started = false; + private statusInterval: ReturnType | undefined; constructor() { super(); @@ -79,6 +80,14 @@ export class RemoteIngressEdge extends EventEmitter { this.bridge.on('management:publicIpDiscovered', (data: { ip: string }) => { this.emit('publicIpDiscovered', data); }); + this.bridge.on('management:portsAssigned', (data: { listenPorts: number[] }) => { + console.log(`[RemoteIngressEdge] Ports assigned by hub: ${data.listenPorts.join(', ')}`); + this.emit('portsAssigned', data); + }); + this.bridge.on('management:portsUpdated', (data: { listenPorts: number[] }) => { + console.log(`[RemoteIngressEdge] Ports updated by hub: ${data.listenPorts.join(', ')}`); + this.emit('portsUpdated', data); + }); } /** @@ -113,12 +122,30 @@ export class RemoteIngressEdge extends EventEmitter { }); this.started = true; + + // Start periodic status logging + this.statusInterval = setInterval(async () => { + try { + const status = await this.getStatus(); + console.log( + `[RemoteIngressEdge] Status: connected=${status.connected}, ` + + `streams=${status.activeStreams}, ports=[${status.listenPorts.join(',')}], ` + + `publicIp=${status.publicIp ?? 'unknown'}` + ); + } catch { + // Bridge may be shutting down + } + }, 60_000); } /** * Stop the edge and kill the Rust process. */ public async stop(): Promise { + if (this.statusInterval) { + clearInterval(this.statusInterval); + this.statusInterval = undefined; + } if (this.started) { try { await this.bridge.sendCommand('stopEdge', {} as Record); diff --git a/ts/classes.remoteingresshub.ts b/ts/classes.remoteingresshub.ts index 686565d..b4d6a02 100644 --- a/ts/classes.remoteingresshub.ts +++ b/ts/classes.remoteingresshub.ts @@ -20,7 +20,7 @@ type THubCommands = { }; updateAllowedEdges: { params: { - edges: Array<{ id: string; secret: string }>; + edges: Array<{ id: string; secret: string; listenPorts?: number[]; stunIntervalSecs?: number }>; }; result: { updated: boolean }; }; @@ -122,7 +122,7 @@ export class RemoteIngressHub extends EventEmitter { /** * Update the list of allowed edges that can connect to this hub. */ - public async updateAllowedEdges(edges: Array<{ id: string; secret: string }>): Promise { + public async updateAllowedEdges(edges: Array<{ id: string; secret: string; listenPorts?: number[]; stunIntervalSecs?: number }>): Promise { await this.bridge.sendCommand('updateAllowedEdges', { edges }); }