From de8422966a9a53a708946b2ba116b5c0334c75ef Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Tue, 17 Mar 2026 00:58:08 +0000 Subject: [PATCH] feat(events): include disconnect reasons in edge and hub management events --- changelog.md | 8 +++++ rust/crates/remoteingress-bin/src/main.rs | 8 ++--- rust/crates/remoteingress-core/src/edge.rs | 37 +++++++++++++--------- rust/crates/remoteingress-core/src/hub.rs | 11 ++++++- ts/00_commitinfo_data.ts | 2 +- ts/classes.remoteingressedge.ts | 6 ++-- ts/classes.remoteingresshub.ts | 4 ++- 7 files changed, 52 insertions(+), 24 deletions(-) diff --git a/changelog.md b/changelog.md index 47cc104..44687fe 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # Changelog +## 2026-03-17 - 4.8.0 - feat(events) +include disconnect reasons in edge and hub management events + +- Add reason fields to tunnelDisconnected and edgeDisconnected events emitted from the Rust core and binary bridge +- Propagate specific disconnect causes such as EOF, liveness timeout, writer failure, handshake failure, and hub cancellation +- Update TypeScript edge and hub classes to log and forward disconnect reason data +- Extend serialization tests to cover the new reason fields + ## 2026-03-17 - 4.7.2 - fix(remoteingress-core) add tunnel write timeouts and scale initial stream windows by active stream count diff --git a/rust/crates/remoteingress-bin/src/main.rs b/rust/crates/remoteingress-bin/src/main.rs index 815c6d2..8b485d7 100644 --- a/rust/crates/remoteingress-bin/src/main.rs +++ b/rust/crates/remoteingress-bin/src/main.rs @@ -173,10 +173,10 @@ async fn handle_request( serde_json::json!({ "edgeId": edge_id, "peerAddr": peer_addr }), ); } - HubEvent::EdgeDisconnected { edge_id } => { + HubEvent::EdgeDisconnected { edge_id, reason } => { send_event( "edgeDisconnected", - serde_json::json!({ "edgeId": edge_id }), + serde_json::json!({ "edgeId": edge_id, "reason": reason }), ); } HubEvent::StreamOpened { @@ -295,8 +295,8 @@ async fn handle_request( EdgeEvent::TunnelConnected => { send_event("tunnelConnected", serde_json::json!({})); } - EdgeEvent::TunnelDisconnected => { - send_event("tunnelDisconnected", serde_json::json!({})); + EdgeEvent::TunnelDisconnected { reason } => { + send_event("tunnelDisconnected", serde_json::json!({ "reason": reason })); } EdgeEvent::PublicIpDiscovered { ip } => { send_event( diff --git a/rust/crates/remoteingress-core/src/edge.rs b/rust/crates/remoteingress-core/src/edge.rs index 5a78e8e..b634c79 100644 --- a/rust/crates/remoteingress-core/src/edge.rs +++ b/rust/crates/remoteingress-core/src/edge.rs @@ -64,7 +64,8 @@ struct ConfigUpdate { #[serde(tag = "type")] pub enum EdgeEvent { TunnelConnected, - TunnelDisconnected, + #[serde(rename_all = "camelCase")] + TunnelDisconnected { reason: String }, #[serde(rename_all = "camelCase")] PublicIpDiscovered { ip: String }, #[serde(rename_all = "camelCase")] @@ -236,10 +237,15 @@ async fn edge_main_loop( } *connected.write().await = false; + // Extract reason for disconnect event + let reason = match &result { + EdgeLoopResult::Reconnect(r) => r.clone(), + EdgeLoopResult::Shutdown => "shutdown".to_string(), + }; // Only emit disconnect event on actual disconnection, not on failed reconnects. // Failed reconnects never reach line 335 (handshake success), so was_connected is false. if was_connected { - let _ = event_tx.try_send(EdgeEvent::TunnelDisconnected); + let _ = event_tx.try_send(EdgeEvent::TunnelDisconnected { reason: reason.clone() }); } active_streams.store(0, Ordering::Relaxed); // Reset stream ID counter for next connection cycle @@ -248,7 +254,7 @@ async fn edge_main_loop( match result { EdgeLoopResult::Shutdown => break, - EdgeLoopResult::Reconnect => { + EdgeLoopResult::Reconnect(_) => { log::info!("Reconnecting in {}ms...", backoff_ms); tokio::select! { _ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {} @@ -263,7 +269,7 @@ async fn edge_main_loop( enum EdgeLoopResult { Shutdown, - Reconnect, + Reconnect(String), // reason for disconnection } async fn connect_to_hub_and_run( @@ -295,7 +301,7 @@ async fn connect_to_hub_and_run( } Err(e) => { log::error!("Failed to connect to hub at {}: {}", addr, e); - return EdgeLoopResult::Reconnect; + return EdgeLoopResult::Reconnect(format!("tcp_connect_failed: {}", e)); } }; @@ -306,7 +312,7 @@ async fn connect_to_hub_and_run( Ok(s) => s, Err(e) => { log::error!("TLS handshake failed: {}", e); - return EdgeLoopResult::Reconnect; + return EdgeLoopResult::Reconnect(format!("tls_handshake_failed: {}", e)); } }; @@ -315,7 +321,7 @@ async fn connect_to_hub_and_run( // Send auth line let auth_line = format!("EDGE {} {}\n", config.edge_id, config.secret); if write_half.write_all(auth_line.as_bytes()).await.is_err() { - return EdgeLoopResult::Reconnect; + return EdgeLoopResult::Reconnect("auth_write_failed".to_string()); } // Read handshake response line from hub (JSON with initial config) @@ -324,12 +330,12 @@ async fn connect_to_hub_and_run( match buf_reader.read_line(&mut handshake_line).await { Ok(0) => { log::error!("Hub rejected connection (EOF before handshake)"); - return EdgeLoopResult::Reconnect; + return EdgeLoopResult::Reconnect("hub_rejected_eof".to_string()); } Ok(_) => {} Err(e) => { log::error!("Failed to read handshake response: {}", e); - return EdgeLoopResult::Reconnect; + return EdgeLoopResult::Reconnect(format!("handshake_read_failed: {}", e)); } } @@ -337,7 +343,7 @@ async fn connect_to_hub_and_run( Ok(h) => h, Err(e) => { log::error!("Invalid handshake response: {}", e); - return EdgeLoopResult::Reconnect; + return EdgeLoopResult::Reconnect(format!("handshake_invalid: {}", e)); } }; @@ -541,22 +547,22 @@ async fn connect_to_hub_and_run( } Ok(None) => { log::info!("Hub disconnected (EOF)"); - break EdgeLoopResult::Reconnect; + break EdgeLoopResult::Reconnect("hub_eof".to_string()); } Err(e) => { log::error!("Hub frame error: {}", e); - break EdgeLoopResult::Reconnect; + break EdgeLoopResult::Reconnect(format!("hub_frame_error: {}", e)); } } } _ = &mut liveness_deadline => { log::warn!("Hub liveness timeout (no frames for {}s), reconnecting", liveness_timeout_dur.as_secs()); - break EdgeLoopResult::Reconnect; + break EdgeLoopResult::Reconnect("liveness_timeout".to_string()); } _ = &mut writer_dead_rx => { log::error!("Tunnel writer died, reconnecting immediately"); - break EdgeLoopResult::Reconnect; + break EdgeLoopResult::Reconnect("writer_dead".to_string()); } _ = connection_token.cancelled() => { log::info!("Connection cancelled"); @@ -963,9 +969,10 @@ mod tests { #[test] fn test_edge_event_tunnel_disconnected() { - let event = EdgeEvent::TunnelDisconnected; + let event = EdgeEvent::TunnelDisconnected { reason: "hub_eof".to_string() }; let json = serde_json::to_value(&event).unwrap(); assert_eq!(json["type"], "tunnelDisconnected"); + assert_eq!(json["reason"], "hub_eof"); } #[test] diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index 75da97a..1900746 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -92,7 +92,7 @@ pub enum HubEvent { #[serde(rename_all = "camelCase")] EdgeConnected { edge_id: String, peer_addr: String }, #[serde(rename_all = "camelCase")] - EdgeDisconnected { edge_id: String }, + EdgeDisconnected { edge_id: String, reason: String }, #[serde(rename_all = "camelCase")] StreamOpened { edge_id: String, stream_id: u32 }, #[serde(rename_all = "camelCase")] @@ -473,6 +473,7 @@ async fn handle_edge_connection( // Frame reading loop let mut frame_reader = FrameReader::new(buf_reader); + let mut disconnect_reason = "unknown".to_string(); loop { tokio::select! { @@ -757,10 +758,12 @@ async fn handle_edge_connection( } Ok(None) => { log::info!("Edge {} disconnected (EOF)", edge_id); + disconnect_reason = "edge_eof".to_string(); break; } Err(e) => { log::error!("Edge {} frame error: {}", edge_id, e); + disconnect_reason = format!("edge_frame_error: {}", e); break; } } @@ -777,14 +780,17 @@ async fn handle_edge_connection( _ = &mut liveness_deadline => { log::warn!("Edge {} liveness timeout (no frames for {}s), disconnecting", edge_id, liveness_timeout_dur.as_secs()); + disconnect_reason = "liveness_timeout".to_string(); break; } _ = &mut writer_dead_rx => { log::error!("Tunnel writer to edge {} died, disconnecting immediately", edge_id); + disconnect_reason = "writer_dead".to_string(); break; } _ = edge_token.cancelled() => { log::info!("Edge {} cancelled by hub", edge_id); + disconnect_reason = "cancelled_by_hub".to_string(); break; } } @@ -800,6 +806,7 @@ async fn handle_edge_connection( } let _ = event_tx.try_send(HubEvent::EdgeDisconnected { edge_id: edge_id.clone(), + reason: disconnect_reason, }); Ok(()) @@ -1022,10 +1029,12 @@ mod tests { fn test_hub_event_edge_disconnected_serialize() { let event = HubEvent::EdgeDisconnected { edge_id: "edge-2".to_string(), + reason: "liveness_timeout".to_string(), }; let json = serde_json::to_value(&event).unwrap(); assert_eq!(json["type"], "edgeDisconnected"); assert_eq!(json["edgeId"], "edge-2"); + assert_eq!(json["reason"], "liveness_timeout"); } #[test] diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 61048b9..5e43f13 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.7.2', + version: '4.8.0', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' } diff --git a/ts/classes.remoteingressedge.ts b/ts/classes.remoteingressedge.ts index 863e6a8..53023ca 100644 --- a/ts/classes.remoteingressedge.ts +++ b/ts/classes.remoteingressedge.ts @@ -83,8 +83,10 @@ export class RemoteIngressEdge extends EventEmitter { this.bridge.on('management:tunnelConnected', () => { this.emit('tunnelConnected'); }); - this.bridge.on('management:tunnelDisconnected', () => { - this.emit('tunnelDisconnected'); + this.bridge.on('management:tunnelDisconnected', (data: { reason?: string }) => { + const reason = data?.reason ?? 'unknown'; + console.log(`[RemoteIngressEdge] Tunnel disconnected: ${reason}`); + this.emit('tunnelDisconnected', data); }); this.bridge.on('management:publicIpDiscovered', (data: { ip: string }) => { this.emit('publicIpDiscovered', data); diff --git a/ts/classes.remoteingresshub.ts b/ts/classes.remoteingresshub.ts index d1a79af..1ee9378 100644 --- a/ts/classes.remoteingresshub.ts +++ b/ts/classes.remoteingresshub.ts @@ -93,7 +93,9 @@ export class RemoteIngressHub extends EventEmitter { this.bridge.on('management:edgeConnected', (data: { edgeId: string; peerAddr: string }) => { this.emit('edgeConnected', data); }); - this.bridge.on('management:edgeDisconnected', (data: { edgeId: string }) => { + this.bridge.on('management:edgeDisconnected', (data: { edgeId: string; reason?: string }) => { + const reason = data?.reason ?? 'unknown'; + console.log(`[RemoteIngressHub] Edge ${data.edgeId} disconnected: ${reason}`); this.emit('edgeDisconnected', data); }); this.bridge.on('management:streamOpened', (data: { edgeId: string; streamId: number }) => {