From 3df20df2a1237f0dd853a5ef954d1a7f911c4e7b Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Thu, 26 Feb 2026 12:37:39 +0000 Subject: [PATCH] fix(hub): cancel per-stream tokens on stream close and avoid duplicate StreamClosed events; bump @types/node devDependency to ^25.3.0 --- changelog.md | 8 +++ package.json | 2 +- pnpm-lock.yaml | 66 +++++++++++------------ rust/crates/remoteingress-core/src/hub.rs | 32 ++++++----- ts/00_commitinfo_data.ts | 2 +- 5 files changed, 63 insertions(+), 47 deletions(-) diff --git a/changelog.md b/changelog.md index 79b1d82..911abf5 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # Changelog +## 2026-02-26 - 4.0.1 - fix(hub) +cancel per-stream tokens on stream close and avoid duplicate StreamClosed events; bump @types/node devDependency to ^25.3.0 + +- Add CancellationToken to per-stream entries so each stream can be cancelled independently. +- Ensure StreamClosed event is only emitted when a stream was actually present (guards against duplicate events). +- Cancel the stream-specific token on FRAME_CLOSE to stop associated tasks and free resources. +- DevDependency bump: @types/node updated from ^25.2.3 to ^25.3.0. + ## 2026-02-19 - 4.0.0 - BREAKING CHANGE(remoteingress-core) add cancellation tokens and cooperative shutdown; switch event channels to bounded mpsc and improve cleanup diff --git a/package.json b/package.json index 6accd83..9dd305d 100644 --- a/package.json +++ b/package.json @@ -20,7 +20,7 @@ "@git.zone/tsrust": "^1.3.0", "@git.zone/tstest": "^3.1.8", "@push.rocks/tapbundle": "^6.0.3", - "@types/node": "^25.2.3" + "@types/node": "^25.3.0" }, "dependencies": { "@push.rocks/qenv": "^6.1.3", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5a3cddb..eb463bb 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -34,8 +34,8 @@ importers: specifier: ^6.0.3 version: 6.0.3(socks@2.8.7) '@types/node': - specifier: ^25.2.3 - version: 25.2.3 + specifier: ^25.3.0 + version: 25.3.0 packages: @@ -1501,8 +1501,8 @@ packages: '@types/node@22.19.11': resolution: {integrity: sha512-BH7YwL6rA93ReqeQS1c4bsPpcfOmJasG+Fkr6Y59q83f9M1WcBRHR2vM+P9eOisYRcN3ujQoiZY8uk5W+1WL8w==} - '@types/node@25.2.3': - resolution: {integrity: sha512-m0jEgYlYz+mDJZ2+F4v8D1AyQb+QzsNqRuI7xg1VQX/KlKS0qT9r1Mo16yo5F/MtifXFgaofIFsdFMox2SxIbQ==} + '@types/node@25.3.0': + resolution: {integrity: sha512-4K3bqJpXpqfg2XKGK9bpDTc6xO/xoUP/RBWS7AtRMug6zZFaRekiLzjVtAoZMquxoAbzBvy5nxQ7veS5eYzf8A==} '@types/parse5@6.0.3': resolution: {integrity: sha512-SuT16Q1K51EAVPz1K29DJ/sXjhSQ0zjvsypYJ6tlwVsRV9jwW5Adq2ch8Dq8kDBCkYnELS7N7VNCSB5nC56t/g==} @@ -3986,8 +3986,8 @@ packages: undici-types@6.21.0: resolution: {integrity: sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==} - undici-types@7.16.0: - resolution: {integrity: sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw==} + undici-types@7.18.2: + resolution: {integrity: sha512-AsuCzffGHJybSaRrmr5eHr81mwJU3kjw6M+uprWvCXiNeN9SOGwQ3Jn8jb8m3Z6izVgknn1R0FTCEAP2QrLY/w==} unified@11.0.5: resolution: {integrity: sha512-xKvGhPWw3k84Qjh8bI3ZeJjqnyadK+GEFtazSfZv/rKeTkTjOJho6mFqh2SM96iIcZokxiOpg78GazTSg8+KHA==} @@ -5178,7 +5178,7 @@ snapshots: '@jest/schemas': 29.6.3 '@types/istanbul-lib-coverage': 2.0.6 '@types/istanbul-reports': 3.0.4 - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/yargs': 17.0.35 chalk: 4.1.2 @@ -6736,14 +6736,14 @@ snapshots: '@types/accepts@1.3.7': dependencies: - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/babel__code-frame@7.27.0': {} '@types/body-parser@1.19.6': dependencies: '@types/connect': 3.4.38 - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/buffer-json@2.0.3': {} @@ -6760,17 +6760,17 @@ snapshots: '@types/clean-css@4.2.11': dependencies: - '@types/node': 25.2.3 + '@types/node': 25.3.0 source-map: 0.6.1 '@types/co-body@6.1.3': dependencies: - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/qs': 6.14.0 '@types/connect@3.4.38': dependencies: - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/content-disposition@0.5.9': {} @@ -6781,11 +6781,11 @@ snapshots: '@types/connect': 3.4.38 '@types/express': 5.0.6 '@types/keygrip': 1.0.6 - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/cors@2.8.19': dependencies: - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/debounce@1.2.4': {} @@ -6797,7 +6797,7 @@ snapshots: '@types/express-serve-static-core@5.1.1': dependencies: - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/qs': 6.14.0 '@types/range-parser': 1.2.7 '@types/send': 1.2.1 @@ -6811,7 +6811,7 @@ snapshots: '@types/fs-extra@11.0.4': dependencies: '@types/jsonfile': 6.1.4 - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/hast@3.0.4': dependencies: @@ -6845,7 +6845,7 @@ snapshots: '@types/jsonfile@6.1.4': dependencies: - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/keygrip@1.0.6': {} @@ -6862,7 +6862,7 @@ snapshots: '@types/http-errors': 2.0.5 '@types/keygrip': 1.0.6 '@types/koa-compose': 3.2.9 - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/mdast@4.0.4': dependencies: @@ -6876,19 +6876,19 @@ snapshots: '@types/mute-stream@0.0.4': dependencies: - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/node-forge@1.3.14': dependencies: - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/node@22.19.11': dependencies: undici-types: 6.21.0 - '@types/node@25.2.3': + '@types/node@25.3.0': dependencies: - undici-types: 7.16.0 + undici-types: 7.18.2 '@types/parse5@6.0.3': {} @@ -6904,18 +6904,18 @@ snapshots: '@types/s3rver@3.7.4': dependencies: - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/semver@7.7.1': {} '@types/send@1.2.1': dependencies: - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/serve-static@2.2.0': dependencies: '@types/http-errors': 2.0.5 - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/sinon-chai@3.2.12': dependencies: @@ -6934,11 +6934,11 @@ snapshots: '@types/tar-stream@3.1.4': dependencies: - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/through2@2.0.41': dependencies: - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/triple-beam@1.3.5': {} @@ -6966,11 +6966,11 @@ snapshots: '@types/ws@7.4.7': dependencies: - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/ws@8.18.1': dependencies: - '@types/node': 25.2.3 + '@types/node': 25.3.0 '@types/yargs-parser@21.0.3': {} @@ -6980,7 +6980,7 @@ snapshots: '@types/yauzl@2.10.3': dependencies: - '@types/node': 25.2.3 + '@types/node': 25.3.0 optional: true '@ungap/structured-clone@1.3.0': {} @@ -7585,7 +7585,7 @@ snapshots: engine.io@6.6.4: dependencies: '@types/cors': 2.8.19 - '@types/node': 25.2.3 + '@types/node': 25.3.0 accepts: 1.3.8 base64id: 2.0.0 cookie: 0.7.2 @@ -8299,7 +8299,7 @@ snapshots: jest-util@29.7.0: dependencies: '@jest/types': 29.6.3 - '@types/node': 25.2.3 + '@types/node': 25.3.0 chalk: 4.1.2 ci-info: 3.9.0 graceful-fs: 4.2.11 @@ -9807,7 +9807,7 @@ snapshots: undici-types@6.21.0: {} - undici-types@7.16.0: {} + undici-types@7.18.2: {} unified@11.0.5: dependencies: diff --git a/rust/crates/remoteingress-core/src/hub.rs b/rust/crates/remoteingress-core/src/hub.rs index cb18d09..ce09426 100644 --- a/rust/crates/remoteingress-core/src/hub.rs +++ b/rust/crates/remoteingress-core/src/hub.rs @@ -105,7 +105,7 @@ pub struct TunnelHub { struct ConnectedEdgeInfo { connected_at: u64, - active_streams: Arc>>>>, + active_streams: Arc>, CancellationToken)>>>, config_tx: mpsc::Sender, #[allow(dead_code)] // kept alive for Drop — cancels child tokens when edge is removed cancel_token: CancellationToken, @@ -322,7 +322,7 @@ async fn handle_edge_connection( write_half.write_all(handshake_json.as_bytes()).await?; // Track this edge - let streams: Arc>>>> = + let streams: Arc>, CancellationToken)>>> = Arc::new(Mutex::new(HashMap::new())); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) @@ -409,7 +409,7 @@ async fn handle_edge_connection( let (data_tx, mut data_rx) = mpsc::channel::>(256); { let mut s = streams.lock().await; - s.insert(stream_id, data_tx); + s.insert(stream_id, (data_tx, stream_token.clone())); } // Spawn task: connect to SmartProxy, send PROXY header, pipe data @@ -487,26 +487,34 @@ async fn handle_edge_connection( } } - // Clean up stream - { + // Clean up stream (guard against duplicate if FRAME_CLOSE already removed it) + let was_present = { let mut s = streams_clone.lock().await; - s.remove(&stream_id); + s.remove(&stream_id).is_some() + }; + if was_present { + let _ = event_tx_clone.try_send(HubEvent::StreamClosed { + edge_id: edge_id_clone, + stream_id, + }); } - let _ = event_tx_clone.try_send(HubEvent::StreamClosed { - edge_id: edge_id_clone, - stream_id, - }); }); } FRAME_DATA => { let s = streams.lock().await; - if let Some(tx) = s.get(&frame.stream_id) { + if let Some((tx, _)) = s.get(&frame.stream_id) { let _ = tx.send(frame.payload).await; } } FRAME_CLOSE => { let mut s = streams.lock().await; - s.remove(&frame.stream_id); + if let Some((_, token)) = s.remove(&frame.stream_id) { + token.cancel(); + let _ = event_tx.try_send(HubEvent::StreamClosed { + edge_id: edge_id.clone(), + stream_id: frame.stream_id, + }); + } } _ => { log::warn!("Unexpected frame type {} from edge", frame.frame_type); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index c041556..88ccd95 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/remoteingress', - version: '4.0.0', + version: '4.0.1', description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.' }