Compare commits

...

8 Commits

Author SHA1 Message Date
c2c9dd195d v4.14.3
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-26 07:49:44 +00:00
fb6e9c54ad fix(docs): refresh project metadata and README to reflect current ingress tunnel capabilities 2026-03-26 07:49:44 +00:00
ac22617849 v4.14.2
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-26 07:10:15 +00:00
e5a91f298c fix(hub-core): improve stream shutdown handling and connection cleanup in hub and edge 2026-03-26 07:10:15 +00:00
5e93710c42 v4.14.1
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-21 19:49:05 +00:00
331b5c8d3f fix(remoteingress edge/hub crash recovery): prevent duplicate crash recovery listeners and reset saved runtime state on shutdown 2026-03-21 19:49:05 +00:00
bf3418d0ed v4.14.0
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 00:51:37 +00:00
6d5e6f60f8 feat(quic): add QUIC stability test coverage and bridge logging for hub and edge 2026-03-20 00:51:37 +00:00
14 changed files with 2015 additions and 1274 deletions

View File

@@ -11,26 +11,26 @@
"githost": "code.foss.global",
"gitscope": "serve.zone",
"gitrepo": "remoteingress",
"description": "Provides a service for creating private tunnels and reaching private clusters from the outside, facilitating secure remote access as part of the @serve.zone stack.",
"description": "Edge ingress tunnel for DcRouter - tunnels TCP and UDP traffic from the network edge to SmartProxy over TLS or QUIC, preserving client IP via PROXY protocol.",
"npmPackagename": "@serve.zone/remoteingress",
"license": "MIT",
"projectDomain": "serve.zone",
"keywords": [
"remote access",
"private tunnels",
"network security",
"TLS encryption",
"connector",
"ingress tunnel",
"network edge",
"PROXY protocol",
"multiplexed tunnel",
"TCP proxy",
"TLS tunnel",
"QUIC transport",
"UDP tunneling",
"serve.zone stack",
"private clusters access",
"public access management",
"TypeScript application",
"node.js package",
"secure communications",
"TLS/SSL certificates",
"development tools",
"software development",
"private network integration"
"TypeScript",
"Rust",
"SmartProxy",
"DcRouter",
"flow control"
]
},
"release": {

View File

@@ -1,7 +1,7 @@
{
"json.schemas": [
{
"fileMatch": ["/npmextra.json"],
"fileMatch": ["/.smartconfig.json"],
"schema": {
"type": "object",
"properties": {

View File

@@ -1,5 +1,35 @@
# Changelog
## 2026-03-26 - 4.14.3 - fix(docs)
refresh project metadata and README to reflect current ingress tunnel capabilities
- update package metadata description and keywords to better describe edge ingress, TLS/QUIC transport, and SmartProxy integration
- revise README terminology, API docs, and feature list to document crash recovery, bindAddress support, and current event names
- improve README formatting and examples for architecture, wire protocol, QoS, and token usage
## 2026-03-26 - 4.14.2 - fix(hub-core)
improve stream shutdown handling and connection cleanup in hub and edge
- Cancel edge upload loops immediately when the hub closes a stream instead of waiting for the window stall timeout.
- Reduce stalled stream timeouts from 120s to 55s to detect broken connections faster.
- Allow hub writer tasks to shut down gracefully before aborting to avoid unnecessary TCP resets.
- Enable TCP keepalive on hub upstream connections to detect silent SmartProxy failures.
- Remove leaked QUIC UDP session entries when setup fails or sessions end.
- Rename npmextra.json to .smartconfig.json and update package packaging references.
## 2026-03-21 - 4.14.1 - fix(remoteingress edge/hub crash recovery)
prevent duplicate crash recovery listeners and reset saved runtime state on shutdown
- Removes existing exit listeners before re-registering crash recovery handlers for edge and hub processes.
- Clears saved edge and hub configuration on stop to avoid stale restart state.
- Resets orphaned edge status intervals and restarts periodic status logging after successful crash recovery.
## 2026-03-20 - 4.14.0 - feat(quic)
add QUIC stability test coverage and bridge logging for hub and edge
- adds a long-running QUIC stability test with periodic echo probes and disconnect detection
- enables prefixed bridge logging for RemoteIngressHub and RemoteIngressEdge to improve runtime diagnostics
## 2026-03-20 - 4.13.2 - fix(remoteingress-core)
preserve reconnected edge entries during disconnect cleanup

21
license.md Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2024 Task Venture Capital GmbH
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -1,6 +1,6 @@
{
"name": "@serve.zone/remoteingress",
"version": "4.13.2",
"version": "4.14.3",
"private": false,
"description": "Edge ingress tunnel for DcRouter - tunnels TCP and UDP traffic from the network edge to SmartProxy over TLS or QUIC, preserving client IP via PROXY protocol.",
"main": "dist_ts/index.js",
@@ -14,17 +14,17 @@
"buildDocs": "(tsdoc)"
},
"devDependencies": {
"@git.zone/tsbuild": "^4.1.2",
"@git.zone/tsbundle": "^2.8.3",
"@git.zone/tsrun": "^2.0.1",
"@git.zone/tsrust": "^1.3.0",
"@git.zone/tstest": "^3.1.8",
"@git.zone/tsbuild": "^4.4.0",
"@git.zone/tsbundle": "^2.10.0",
"@git.zone/tsrun": "^2.0.2",
"@git.zone/tsrust": "^1.3.2",
"@git.zone/tstest": "^3.6.0",
"@push.rocks/tapbundle": "^6.0.3",
"@types/node": "^25.3.0"
"@types/node": "^25.5.0"
},
"dependencies": {
"@push.rocks/qenv": "^6.1.3",
"@push.rocks/smartrust": "^1.2.1"
"@push.rocks/smartrust": "^1.3.2"
},
"repository": {
"type": "git",
@@ -47,7 +47,7 @@
"dist_rust/**/*",
"assets/**/*",
"cli.js",
"npmextra.json",
".smartconfig.json",
"readme.md"
],
"keywords": [

2725
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

130
readme.md
View File

@@ -12,23 +12,24 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
pnpm install @serve.zone/remoteingress
```
## 🏗️ Architecture
## Architecture
`@serve.zone/remoteingress` uses a **Hub/Edge** topology with a high-performance Rust core and a TypeScript API surface:
```
┌─────────────────────┐ TLS or QUIC Tunnel ┌─────────────────────┐
│ Network Edge │ ◄══════════════════════════► │ Private Cluster │
│ TCP+TLS: frame mux │
RemoteIngressEdge │ QUIC: native streams │ RemoteIngressHub
│ UDP: QUIC datagrams │
Accepts TCP & UDP │ │ Forwards to
on hub-assigned │ │ SmartProxy on
ports │ │ local ports
└─────────────────────┘ └─────────────────────┘
│ TCP + UDP from end users
Internet DcRouter / SmartProxy
TLS or QUIC Tunnel
┌─────────────────────┐ ◄══════════════════════════► ┌─────────────────────┐
Network Edge │ TCP+TLS: frame mux │ Private Cluster
│ QUIC: native streams
RemoteIngressEdge │ UDP: QUIC datagrams │ RemoteIngressHub
Accepts TCP & UDP Forwards to
on hub-assigned SmartProxy on
│ ports │ local ports │
└─────────────────────┘ └─────────────────────┘
TCP + UDP from end users
Internet DcRouter / SmartProxy
```
| Component | Role |
@@ -37,24 +38,25 @@ pnpm install @serve.zone/remoteingress
| **RemoteIngressHub** | Deployed alongside DcRouter/SmartProxy in a private cluster. Accepts edge connections, demuxes streams/datagrams, and forwards each to SmartProxy with PROXY protocol headers so the real client IP is preserved. |
| **Rust Binary** (`remoteingress-bin`) | The performance-critical networking core. Managed via `@push.rocks/smartrust` RustBridge IPC — you never interact with it directly. Cross-compiled for `linux/amd64` and `linux/arm64`. |
### Key Features
### Key Features
- 🔒 **Dual transport** — choose between TCP+TLS (frame-multiplexed) or QUIC (native stream multiplexing, zero head-of-line blocking)
- 🌐 **TCP + UDP tunneling** — tunnel any TCP connection or UDP datagram through the same edge/hub pair
- 📋 **PROXY protocol v1 & v2** — SmartProxy sees the real client IP for both TCP (v1 text) and UDP (v2 binary)
- 🔀 **Multiplexed streams** — thousands of concurrent TCP connections over a single tunnel
- **QUIC datagrams** — UDP traffic forwarded via QUIC unreliable datagrams for lowest possible latency
- 🔑 **Shared-secret authentication** — edges must present valid credentials to connect
- 🎫 **Connection tokens** — encode all connection details into a single opaque base64url string
- 📡 **STUN-based public IP discovery** — edges automatically discover their public IP via Cloudflare STUN
- 🔄 **Auto-reconnect** with exponential backoff if the tunnel drops
- 🎛️ **Dynamic port configuration** — the hub assigns TCP and UDP listen ports per edge, hot-reloadable at runtime
- 📣 **Event-driven** — both Hub and Edge extend `EventEmitter` for real-time monitoring
- 🎚️ **3-tier QoS** — control frames, normal data, and sustained (elephant flow) traffic each get their own priority queue
- 📊 **Adaptive flow control** — per-stream windows scale with active stream count to prevent memory overuse
- 🕒 **UDP session management** — automatic session tracking with 60s idle timeout and cleanup
- **Dual transport** — choose between TCP+TLS (frame-multiplexed) or QUIC (native stream multiplexing, zero head-of-line blocking)
- **TCP + UDP tunneling** — tunnel any TCP connection or UDP datagram through the same edge/hub pair
- **PROXY protocol v1 & v2** — SmartProxy sees the real client IP for both TCP (v1 text) and UDP (v2 binary)
- **Multiplexed streams** — thousands of concurrent TCP connections over a single tunnel
- **QUIC datagrams** — UDP traffic forwarded via QUIC unreliable datagrams for lowest possible latency
- **Shared-secret authentication** — edges must present valid credentials to connect
- **Connection tokens** — encode all connection details into a single opaque base64url string
- **STUN-based public IP discovery** — edges automatically discover their public IP via Cloudflare STUN
- **Auto-reconnect** with exponential backoff if the tunnel drops
- **Dynamic port configuration** — the hub assigns TCP and UDP listen ports per edge, hot-reloadable at runtime
- **Event-driven** — both Hub and Edge extend `EventEmitter` for real-time monitoring
- **3-tier QoS** — control frames, normal data, and sustained (elephant flow) traffic each get their own priority queue
- **Adaptive flow control** — per-stream windows scale with active stream count to prevent memory overuse
- **UDP session management** — automatic session tracking with 60s idle timeout and cleanup
- **Crash recovery** — automatic restart with exponential backoff if the Rust binary crashes unexpectedly
## 🚀 Usage
## Usage
Both classes are imported from the package and communicate with the Rust binary under the hood.
@@ -160,7 +162,7 @@ await edge.stop();
| `'quic'` | QUIC with native stream multiplexing. Eliminates head-of-line blocking. Uses QUIC datagrams for UDP traffic. |
| `'quicWithFallback'` | Tries QUIC first (5s timeout), falls back to TCP+TLS if UDP is blocked by the network. |
### 🎫 Connection Tokens
### Connection Tokens
Encode all connection details into a single opaque string for easy distribution:
@@ -183,30 +185,30 @@ const data = decodeConnectionToken(token);
Tokens are base64url-encoded — safe for environment variables, CLI arguments, and config files.
## 📖 API Reference
## API Reference
### `RemoteIngressHub`
| Method / Property | Description |
|-------------------|-------------|
| `start(config?)` | Start the hub. Config: `{ tunnelPort?: number, targetHost?: string }`. Listens on both TCP and UDP (QUIC) on the tunnel port. |
| `start(config?)` | Start the hub. Config: `{ tunnelPort?, targetHost?, tls?: { certPem?, keyPem? } }`. Listens on both TCP and UDP (QUIC) on the tunnel port. |
| `stop()` | Graceful shutdown. |
| `updateAllowedEdges(edges)` | Set authorized edges. Each: `{ id, secret, listenPorts?, listenPortsUdp?, stunIntervalSecs? }`. Port changes are pushed to connected edges in real time. |
| `getStatus()` | Returns `{ running, tunnelPort, connectedEdges: [...] }`. |
| `running` | `boolean` — whether the Rust binary is alive. |
**Events:** `edgeConnected`, `edgeDisconnected`, `streamOpened`, `streamClosed`
**Events:** `edgeConnected`, `edgeDisconnected`, `streamOpened`, `streamClosed`, `crashRecovered`, `crashRecoveryFailed`
### `RemoteIngressEdge`
| Method / Property | Description |
|-------------------|-------------|
| `start(config)` | Connect to hub. Accepts `{ token }` or `{ hubHost, hubPort, edgeId, secret, transportMode? }`. |
| `start(config)` | Connect to hub. Accepts `{ token }` or `{ hubHost, hubPort, edgeId, secret, bindAddress?, transportMode? }`. |
| `stop()` | Graceful shutdown. |
| `getStatus()` | Returns `{ running, connected, publicIp, activeStreams, listenPorts }`. |
| `running` | `boolean` — whether the Rust binary is alive. |
**Events:** `tunnelConnected`, `tunnelDisconnected`, `publicIpDiscovered`, `portsAssigned`, `portsUpdated`
**Events:** `tunnelConnected`, `tunnelDisconnected`, `publicIpDiscovered`, `portsAssigned`, `portsUpdated`, `crashRecovered`, `crashRecoveryFailed`
### Token Utilities
@@ -221,6 +223,10 @@ Tokens are base64url-encoded — safe for environment variables, CLI arguments,
interface IHubConfig {
tunnelPort?: number; // default: 8443
targetHost?: string; // default: '127.0.0.1'
tls?: {
certPem?: string; // PEM-encoded TLS certificate
keyPem?: string; // PEM-encoded TLS private key
};
}
interface IEdgeConfig {
@@ -240,7 +246,7 @@ interface IConnectionTokenData {
}
```
## 🔌 Wire Protocol
## Wire Protocol
### TCP+TLS Transport (Frame Protocol)
@@ -252,19 +258,19 @@ The tunnel uses a custom binary frame protocol over a single TLS connection:
| Frame Type | Value | Direction | Purpose |
|------------|-------|-----------|---------|
| `OPEN` | `0x01` | Edge Hub | Open TCP stream; payload is PROXY v1 header |
| `DATA` | `0x02` | Edge Hub | Client data (upload) |
| `CLOSE` | `0x03` | Edge Hub | Client closed connection |
| `DATA_BACK` | `0x04` | Hub Edge | Response data (download) |
| `CLOSE_BACK` | `0x05` | Hub Edge | Upstream closed connection |
| `CONFIG` | `0x06` | Hub Edge | Runtime config update (JSON payload) |
| `PING` | `0x07` | Hub Edge | Heartbeat probe (every 15s) |
| `PONG` | `0x08` | Edge Hub | Heartbeat response |
| `WINDOW_UPDATE` | `0x09` | Edge Hub | Flow control: edge consumed N bytes |
| `WINDOW_UPDATE_BACK` | `0x0A` | Hub Edge | Flow control: hub consumed N bytes |
| `UDP_OPEN` | `0x0B` | Edge Hub | Open UDP session; payload is PROXY v2 header |
| `UDP_DATA` | `0x0C` | Edge Hub | UDP datagram (upload) |
| `UDP_DATA_BACK` | `0x0D` | Hub Edge | UDP datagram (download) |
| `OPEN` | `0x01` | Edge -> Hub | Open TCP stream; payload is PROXY v1 header |
| `DATA` | `0x02` | Edge -> Hub | Client data (upload) |
| `CLOSE` | `0x03` | Edge -> Hub | Client closed connection |
| `DATA_BACK` | `0x04` | Hub -> Edge | Response data (download) |
| `CLOSE_BACK` | `0x05` | Hub -> Edge | Upstream closed connection |
| `CONFIG` | `0x06` | Hub -> Edge | Runtime config update (JSON payload) |
| `PING` | `0x07` | Hub -> Edge | Heartbeat probe (every 15s) |
| `PONG` | `0x08` | Edge -> Hub | Heartbeat response |
| `WINDOW_UPDATE` | `0x09` | Edge -> Hub | Flow control: edge consumed N bytes |
| `WINDOW_UPDATE_BACK` | `0x0A` | Hub -> Edge | Flow control: hub consumed N bytes |
| `UDP_OPEN` | `0x0B` | Edge -> Hub | Open UDP session; payload is PROXY v2 header |
| `UDP_DATA` | `0x0C` | Edge -> Hub | UDP datagram (upload) |
| `UDP_DATA_BACK` | `0x0D` | Hub -> Edge | UDP datagram (download) |
| `UDP_CLOSE` | `0x0E` | Either | Close UDP session |
### QUIC Transport
@@ -284,15 +290,15 @@ When using QUIC, the frame protocol is replaced by native QUIC primitives:
4. Edge starts TCP and UDP listeners on the assigned ports
5. Data flows — TCP frames/QUIC streams for TCP traffic, UDP frames/QUIC datagrams for UDP traffic
## 🎚️ QoS & Flow Control
## QoS & Flow Control
### Priority Tiers (TCP+TLS Transport)
| Tier | Frames | Behavior |
|------|--------|----------|
| 🔴 **Control** | PING, PONG, WINDOW_UPDATE, OPEN, CLOSE, CONFIG | Always drained first. Never delayed. |
| 🟡 **Data** | DATA/DATA_BACK from normal streams, UDP frames | Drained when control queue is empty. |
| 🟢 **Sustained** | DATA/DATA_BACK from elephant flows | Lowest priority with guaranteed **1 MB/s** drain rate. |
| **Control** | PING, PONG, WINDOW_UPDATE, OPEN, CLOSE, CONFIG | Always drained first. Never delayed. |
| **Data** | DATA/DATA_BACK from normal streams, UDP frames | Drained when control queue is empty. |
| **Sustained** | DATA/DATA_BACK from elephant flows | Lowest priority with guaranteed **1 MB/s** drain rate. |
### Sustained Stream Classification
@@ -308,13 +314,13 @@ Each TCP stream has a send window from a shared **200 MB budget**:
| Active Streams | Window per Stream |
|---|---|
| 150 | 4 MB (maximum) |
| 51200 | Scales down (4 MB 1 MB) |
| 1-50 | 4 MB (maximum) |
| 51-200 | Scales down (4 MB -> 1 MB) |
| 200+ | 1 MB (floor) |
UDP traffic uses no flow control — datagrams are fire-and-forget, matching UDP semantics.
## 💡 Example Scenarios
## Example Scenarios
### 1. Expose a Private Cluster to the Internet
@@ -358,6 +364,8 @@ await edge.start({
Generate connection tokens on the hub side and distribute them to edge operators:
```typescript
import { encodeConnectionToken, RemoteIngressEdge } from '@serve.zone/remoteingress';
const token = encodeConnectionToken({
hubHost: 'hub.prod.example.com',
hubPort: 8443,
@@ -372,21 +380,19 @@ await edge.start({ token });
## License and Legal Information
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
This repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the [license](license) file within this repository.
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
### Trademarks
This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH or third parties, and are not included within the scope of the MIT license granted herein.
Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines or the guidelines of the respective third-party owners, and any usage must be approved in writing. Third-party trademarks used herein are the property of their respective owners and used only in a descriptive manner, e.g. for an implementation of an API or similar.
This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.
### Company Information
Task Venture Capital GmbH
Registered at District Court Bremen HRB 35230 HB, Germany
Registered at District court Bremen HRB 35230 HB, Germany
For any legal inquiries or further information, please contact us via email at hello@task.vc.
For any legal inquiries or if you require further information, please contact us via email at hello@task.vc.
By using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.

View File

@@ -36,6 +36,9 @@ struct EdgeStreamState {
send_window: Arc<AtomicU32>,
/// Notifier to wake the client reader when the window opens.
window_notify: Arc<Notify>,
/// Per-stream cancellation token — cancelled on FRAME_CLOSE_BACK to promptly
/// terminate the upload loop instead of waiting for the window stall timeout.
cancel_token: CancellationToken,
}
/// Edge configuration (hub-host + credentials only; ports come from hub).
@@ -399,7 +402,11 @@ async fn handle_edge_frame(
}
FRAME_CLOSE_BACK => {
let mut writers = client_writers.lock().await;
writers.remove(&frame.stream_id);
if let Some(state) = writers.remove(&frame.stream_id) {
// Cancel the stream's token so the upload loop exits promptly
// instead of waiting for the window stall timeout.
state.cancel_token.cancel();
}
}
FRAME_CONFIG => {
if let Ok(update) = serde_json::from_slice::<ConfigUpdate>(&frame.payload) {
@@ -1012,6 +1019,7 @@ async fn handle_client_connection(
back_tx,
send_window: Arc::clone(&send_window),
window_notify: Arc::clone(&window_notify),
cancel_token: client_token.clone(),
});
}
@@ -1093,8 +1101,8 @@ async fn handle_client_connection(
tokio::select! {
_ = notified => continue,
_ = client_token.cancelled() => break,
_ = tokio::time::sleep(Duration::from_secs(120)) => {
log::warn!("Stream {} upload stalled (window empty for 120s)", stream_id);
_ = tokio::time::sleep(Duration::from_secs(55)) => {
log::warn!("Stream {} upload stalled (window empty for 55s)", stream_id);
break;
}
}

View File

@@ -475,6 +475,12 @@ async fn handle_hub_frame(
})??;
upstream.set_nodelay(true)?;
// TCP keepalive detects silent failures on the hub→SmartProxy connection
let ka = socket2::TcpKeepalive::new()
.with_time(Duration::from_secs(30));
#[cfg(target_os = "linux")]
let ka = ka.with_interval(Duration::from_secs(10));
let _ = socket2::SockRef::from(&upstream).set_tcp_keepalive(&ka);
upstream.write_all(proxy_header.as_bytes()).await?;
let (mut up_read, mut up_write) =
@@ -485,7 +491,7 @@ async fn handle_hub_frame(
let writer_token = stream_token.clone();
let wub_tx = writer_tx.clone();
let stream_counter_w = Arc::clone(&stream_counter);
let writer_for_edge_data = tokio::spawn(async move {
let mut writer_for_edge_data = tokio::spawn(async move {
let mut consumed_since_update: u32 = 0;
loop {
tokio::select! {
@@ -569,8 +575,8 @@ async fn handle_hub_frame(
tokio::select! {
_ = notified => continue,
_ = stream_token.cancelled() => break,
_ = tokio::time::sleep(Duration::from_secs(120)) => {
log::warn!("Stream {} download stalled (window empty for 120s)", stream_id);
_ = tokio::time::sleep(Duration::from_secs(55)) => {
log::warn!("Stream {} download stalled (window empty for 55s)", stream_id);
break;
}
}
@@ -633,7 +639,11 @@ async fn handle_hub_frame(
}
}
writer_for_edge_data.abort();
// Give the writer task 2s to shut down gracefully (sends TCP FIN
// via up_write.shutdown()) before force-aborting (which causes RST).
if tokio::time::timeout(Duration::from_secs(2), &mut writer_for_edge_data).await.is_err() {
writer_for_edge_data.abort();
}
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
}
.await;
@@ -1379,6 +1389,7 @@ async fn handle_edge_connection_quic(
let session_token = dgram_token.child_token();
let (tx, mut rx) = mpsc::channel::<Bytes>(256);
let proxy_v2_data: Vec<u8> = proxy_data.to_vec();
let cleanup_sessions = sessions.clone();
{
let mut s = sessions.lock().await;
@@ -1390,17 +1401,20 @@ async fn handle_edge_connection_quic(
Ok(s) => Arc::new(s),
Err(e) => {
log::error!("QUIC UDP session {} bind failed: {}", session_id, e);
cleanup_sessions.lock().await.remove(&session_id);
return;
}
};
if let Err(e) = upstream.connect((target.as_str(), dest_port)).await {
log::error!("QUIC UDP session {} connect failed: {}", session_id, e);
cleanup_sessions.lock().await.remove(&session_id);
return;
}
// Send PROXY v2 header as first datagram so SmartProxy knows the original client
if let Err(e) = upstream.send(&proxy_v2_data).await {
log::error!("QUIC UDP session {} failed to send PROXY v2 header: {}", session_id, e);
cleanup_sessions.lock().await.remove(&session_id);
return;
}
@@ -1443,6 +1457,8 @@ async fn handle_edge_connection_quic(
}
}
recv_handle.abort();
// Clean up session entry to prevent memory leak
cleanup_sessions.lock().await.remove(&session_id);
});
continue;
@@ -1590,6 +1606,12 @@ async fn handle_quic_stream(
};
let _ = upstream.set_nodelay(true);
// TCP keepalive detects silent failures on the hub→SmartProxy connection
let ka = socket2::TcpKeepalive::new()
.with_time(Duration::from_secs(30));
#[cfg(target_os = "linux")]
let ka = ka.with_interval(Duration::from_secs(10));
let _ = socket2::SockRef::from(&upstream).set_tcp_keepalive(&ka);
// Send PROXY header to SmartProxy
if let Err(e) = upstream.write_all(proxy_header.as_bytes()).await {
log::error!("QUIC stream {} failed to write PROXY header to upstream: {}", stream_id, e);
@@ -1600,7 +1622,7 @@ async fn handle_quic_stream(
// Task: QUIC -> upstream (edge data to SmartProxy)
let writer_token = stream_token.clone();
let writer_task = tokio::spawn(async move {
let mut writer_task = tokio::spawn(async move {
let mut buf = vec![0u8; 32768];
loop {
tokio::select! {
@@ -1651,7 +1673,11 @@ async fn handle_quic_stream(
// Gracefully close the QUIC send stream
let _ = quic_send.finish();
writer_task.abort();
// Give the writer task 2s to shut down gracefully (sends TCP FIN
// via up_write.shutdown()) before force-aborting (which causes RST).
if tokio::time::timeout(Duration::from_secs(2), &mut writer_task).await.is_err() {
writer_task.abort();
}
}
#[cfg(test)]

View File

@@ -0,0 +1,228 @@
import { expect, tap } from '@push.rocks/tapbundle';
import * as net from 'net';
import * as crypto from 'crypto';
import { RemoteIngressHub, RemoteIngressEdge } from '../ts/index.js';
// ---------------------------------------------------------------------------
// Helpers (same patterns as test.quic.node.ts)
// ---------------------------------------------------------------------------
async function findFreePorts(count: number): Promise<number[]> {
const servers: net.Server[] = [];
const ports: number[] = [];
for (let i = 0; i < count; i++) {
const server = net.createServer();
await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve));
ports.push((server.address() as net.AddressInfo).port);
servers.push(server);
}
await Promise.all(servers.map((s) => new Promise<void>((resolve) => s.close(() => resolve()))));
return ports;
}
type TrackingServer = net.Server & { destroyAll: () => void };
function startEchoServer(port: number, host: string): Promise<TrackingServer> {
return new Promise((resolve, reject) => {
const connections = new Set<net.Socket>();
const server = net.createServer((socket) => {
connections.add(socket);
socket.on('close', () => connections.delete(socket));
let proxyHeaderParsed = false;
let pendingBuf = Buffer.alloc(0);
socket.on('data', (data: Buffer) => {
if (!proxyHeaderParsed) {
pendingBuf = Buffer.concat([pendingBuf, data]);
const idx = pendingBuf.indexOf('\r\n');
if (idx !== -1) {
proxyHeaderParsed = true;
const remainder = pendingBuf.subarray(idx + 2);
if (remainder.length > 0) socket.write(remainder);
}
return;
}
socket.write(data);
});
socket.on('error', () => {});
}) as TrackingServer;
server.destroyAll = () => {
for (const conn of connections) conn.destroy();
connections.clear();
};
server.on('error', reject);
server.listen(port, host, () => resolve(server));
});
}
async function forceCloseServer(server: TrackingServer): Promise<void> {
server.destroyAll();
await new Promise<void>((resolve) => server.close(() => resolve()));
}
function sendAndReceive(port: number, data: Buffer, timeoutMs = 30000): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
let totalReceived = 0;
const expectedLength = data.length;
let settled = false;
const client = net.createConnection({ host: '127.0.0.1', port }, () => {
client.write(data);
client.end();
});
const timer = setTimeout(() => {
if (!settled) {
settled = true;
client.destroy();
reject(new Error(`Timeout after ${timeoutMs}ms — received ${totalReceived}/${expectedLength} bytes`));
}
}, timeoutMs);
client.on('data', (chunk: Buffer) => {
chunks.push(chunk);
totalReceived += chunk.length;
if (totalReceived >= expectedLength && !settled) {
settled = true;
clearTimeout(timer);
client.destroy();
resolve(Buffer.concat(chunks));
}
});
client.on('end', () => {
if (!settled) {
settled = true;
clearTimeout(timer);
resolve(Buffer.concat(chunks));
}
});
client.on('error', (err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
reject(err);
}
});
});
}
function sha256(buf: Buffer): string {
return crypto.createHash('sha256').update(buf).digest('hex');
}
// ---------------------------------------------------------------------------
// QUIC Long-Running Stability Test — 2 minutes
// ---------------------------------------------------------------------------
let hub: RemoteIngressHub;
let edge: RemoteIngressEdge;
let echoServer: TrackingServer;
let hubPort: number;
let edgePort: number;
let disconnectCount = 0;
tap.test('QUIC stability setup: start echo server and QUIC tunnel', async () => {
[hubPort, edgePort] = await findFreePorts(2);
echoServer = await startEchoServer(edgePort, '127.0.0.2');
hub = new RemoteIngressHub();
edge = new RemoteIngressEdge();
await hub.start({
tunnelPort: hubPort,
targetHost: '127.0.0.2',
});
await hub.updateAllowedEdges([
{ id: 'test-edge', secret: 'test-secret', listenPorts: [edgePort] },
]);
const connectedPromise = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('QUIC edge did not connect within 10s')), 10000);
edge.once('tunnelConnected', () => {
clearTimeout(timeout);
resolve();
});
});
// Track disconnects — any disconnect during the test is a failure signal
edge.on('tunnelDisconnected', () => {
disconnectCount++;
console.log(`[STABILITY] Unexpected tunnel disconnect #${disconnectCount}`);
});
await edge.start({
hubHost: '127.0.0.1',
hubPort,
edgeId: 'test-edge',
secret: 'test-secret',
bindAddress: '127.0.0.1',
transportMode: 'quic',
});
await connectedPromise;
await new Promise((resolve) => setTimeout(resolve, 500));
const status = await edge.getStatus();
expect(status.connected).toBeTrue();
});
tap.test('QUIC stability: tunnel stays alive for 30s with periodic echo probes', async () => {
const testDurationMs = 30_000; // 30 seconds
const probeIntervalMs = 5_000; // probe every 5 seconds
const startTime = Date.now();
let probeCount = 0;
let failedProbes = 0;
while (Date.now() - startTime < testDurationMs) {
probeCount++;
const elapsed = Math.round((Date.now() - startTime) / 1000);
// Verify edge still reports connected
const status = await edge.getStatus();
if (!status.connected) {
throw new Error(`Tunnel disconnected at ${elapsed}s (probe #${probeCount})`);
}
// Send a 4KB echo probe through the tunnel
const data = crypto.randomBytes(4096);
const hash = sha256(data);
try {
const received = await sendAndReceive(edgePort, data, 10000);
if (received.length !== 4096 || sha256(received) !== hash) {
failedProbes++;
console.log(`[STABILITY] Probe #${probeCount} at ${elapsed}s: data mismatch`);
} else {
console.log(`[STABILITY] Probe #${probeCount} at ${elapsed}s: OK`);
}
} catch (err) {
failedProbes++;
console.log(`[STABILITY] Probe #${probeCount} at ${elapsed}s: FAILED — ${err}`);
}
// Wait for next probe interval
const remaining = testDurationMs - (Date.now() - startTime);
if (remaining > 0) {
await new Promise((resolve) => setTimeout(resolve, Math.min(probeIntervalMs, remaining)));
}
}
console.log(`[STABILITY] Completed: ${probeCount} probes, ${failedProbes} failures, ${disconnectCount} disconnects`);
expect(failedProbes).toEqual(0);
expect(disconnectCount).toEqual(0);
// Final status check
const finalStatus = await edge.getStatus();
expect(finalStatus.connected).toBeTrue();
});
tap.test('QUIC stability teardown', async () => {
await edge.stop();
await hub.stop();
await forceCloseServer(echoServer);
});
export default tap.start();

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@serve.zone/remoteingress',
version: '4.13.2',
version: '4.14.3',
description: 'Edge ingress tunnel for DcRouter - tunnels TCP and UDP traffic from the network edge to SmartProxy over TLS or QUIC, preserving client IP via PROXY protocol.'
}

View File

@@ -79,6 +79,15 @@ export class RemoteIngressEdge extends EventEmitter {
plugins.path.join(packageDir, 'rust', 'target', 'debug', 'remoteingress-bin'),
],
searchSystemPath: false,
logger: {
log: (level: string, message: string) => {
if (level === 'error') {
console.error(`[RemoteIngressEdge] ${message}`);
} else {
console.log(`[RemoteIngressEdge] ${message}`);
}
},
},
});
// Forward events from Rust binary
@@ -130,7 +139,8 @@ export class RemoteIngressEdge extends EventEmitter {
throw new Error('Failed to spawn remoteingress-bin');
}
// Register crash recovery handler
// Register crash recovery handler (remove first to avoid duplicates)
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.on('exit', this.handleCrashRecovery);
await this.bridge.sendCommand('startEdge', {
@@ -180,6 +190,7 @@ export class RemoteIngressEdge extends EventEmitter {
this.bridge.kill();
this.started = false;
}
this.savedConfig = null;
}
/**
@@ -211,6 +222,12 @@ export class RemoteIngressEdge extends EventEmitter {
this.started = false;
// Clear orphaned status interval from previous run
if (this.statusInterval) {
clearInterval(this.statusInterval);
this.statusInterval = undefined;
}
if (this.restartAttempts >= MAX_RESTART_ATTEMPTS) {
console.error('[RemoteIngressEdge] Max restart attempts reached, giving up');
this.emit('crashRecoveryFailed');
@@ -228,6 +245,7 @@ export class RemoteIngressEdge extends EventEmitter {
return;
}
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.on('exit', this.handleCrashRecovery);
await this.bridge.sendCommand('startEdge', {
@@ -242,6 +260,21 @@ export class RemoteIngressEdge extends EventEmitter {
this.started = true;
this.restartAttempts = 0;
this.restartBackoffMs = 1000;
// Restart periodic status logging
this.statusInterval = setInterval(async () => {
try {
const status = await this.getStatus();
console.log(
`[RemoteIngressEdge] Status: connected=${status.connected}, ` +
`streams=${status.activeStreams}, ports=[${status.listenPorts.join(',')}], ` +
`publicIp=${status.publicIp ?? 'unknown'}`
);
} catch {
// Bridge may be shutting down
}
}, 60_000);
console.log('[RemoteIngressEdge] Successfully recovered from crash');
this.emit('crashRecovered');
} catch (err) {

View File

@@ -87,6 +87,15 @@ export class RemoteIngressHub extends EventEmitter {
plugins.path.join(packageDir, 'rust', 'target', 'debug', 'remoteingress-bin'),
],
searchSystemPath: false,
logger: {
log: (level: string, message: string) => {
if (level === 'error') {
console.error(`[RemoteIngressHub] ${message}`);
} else {
console.log(`[RemoteIngressHub] ${message}`);
}
},
},
});
// Forward events from Rust binary
@@ -118,7 +127,8 @@ export class RemoteIngressHub extends EventEmitter {
throw new Error('Failed to spawn remoteingress-bin');
}
// Register crash recovery handler
// Register crash recovery handler (remove first to avoid duplicates)
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.on('exit', this.handleCrashRecovery);
await this.bridge.sendCommand('startHub', {
@@ -149,6 +159,8 @@ export class RemoteIngressHub extends EventEmitter {
this.bridge.kill();
this.started = false;
}
this.savedConfig = null;
this.savedEdges = [];
}
/**
@@ -205,6 +217,7 @@ export class RemoteIngressHub extends EventEmitter {
return;
}
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.on('exit', this.handleCrashRecovery);
const config = this.savedConfig;

View File

@@ -6,7 +6,8 @@
"module": "NodeNext",
"moduleResolution": "NodeNext",
"esModuleInterop": true,
"verbatimModuleSyntax": true
"verbatimModuleSyntax": true,
"types": ["node"]
},
"exclude": [
"dist_*/**/*.d.ts"