Compare commits

...

24 Commits

Author SHA1 Message Date
jkunz 27d4a5d3c1 v4.17.1
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-26 15:08:37 +00:00
jkunz 627603532d fix(remoteingressedge): reset nftables state on startup and restart before reapplying hub firewall config 2026-04-26 15:08:37 +00:00
jkunz dd0cd479d5 v4.17.0
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-26 12:09:58 +00:00
jkunz e709e40404 feat(core): add performance profiles, transport observability, and edge stream budget controls 2026-04-26 12:09:58 +00:00
jkunz 5304bbb486 v4.15.3
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-27 11:34:31 +00:00
jkunz ac993dd5a3 fix(core): harden UDP session handling, QUIC control message validation, and bridge process cleanup 2026-03-27 11:34:31 +00:00
jkunz 0b2a83ddb6 v4.15.2
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-26 17:06:28 +00:00
jkunz 3c5ea6bdc5 fix(readme): adjust tunnel diagram alignment in the README 2026-03-26 17:06:28 +00:00
jkunz 3dea43400b v4.15.1
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-26 17:05:38 +00:00
jkunz 8fa3d414dd fix(readme): clarify unified runtime configuration and firewall update behavior 2026-03-26 17:05:38 +00:00
jkunz 1a62c52d24 v4.15.0
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-26 16:39:53 +00:00
jkunz e9a08bdd0f feat(edge,hub): add hub-controlled nftables firewall configuration for remote ingress edges 2026-03-26 16:39:53 +00:00
jkunz c2c9dd195d v4.14.3
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-26 07:49:44 +00:00
jkunz fb6e9c54ad fix(docs): refresh project metadata and README to reflect current ingress tunnel capabilities 2026-03-26 07:49:44 +00:00
jkunz ac22617849 v4.14.2
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-26 07:10:15 +00:00
jkunz e5a91f298c fix(hub-core): improve stream shutdown handling and connection cleanup in hub and edge 2026-03-26 07:10:15 +00:00
jkunz 5e93710c42 v4.14.1
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-21 19:49:05 +00:00
jkunz 331b5c8d3f fix(remoteingress edge/hub crash recovery): prevent duplicate crash recovery listeners and reset saved runtime state on shutdown 2026-03-21 19:49:05 +00:00
jkunz bf3418d0ed v4.14.0
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 00:51:37 +00:00
jkunz 6d5e6f60f8 feat(quic): add QUIC stability test coverage and bridge logging for hub and edge 2026-03-20 00:51:37 +00:00
jkunz de8922148e v4.13.2
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 00:11:34 +00:00
jkunz e84eecf82c fix(remoteingress-core): preserve reconnected edge entries during disconnect cleanup 2026-03-20 00:11:34 +00:00
jkunz c7641853cf v4.13.1
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 23:28:38 +00:00
jkunz 6e2025db3e fix(remoteingress-core): default edge transport mode to QUIC with fallback 2026-03-19 23:28:38 +00:00
24 changed files with 3179 additions and 1428 deletions
+14 -14
View File
@@ -11,26 +11,26 @@
"githost": "code.foss.global",
"gitscope": "serve.zone",
"gitrepo": "remoteingress",
"description": "Provides a service for creating private tunnels and reaching private clusters from the outside, facilitating secure remote access as part of the @serve.zone stack.",
"description": "Edge ingress tunnel for DcRouter - tunnels TCP and UDP traffic from the network edge to SmartProxy over TLS or QUIC, preserving client IP via PROXY protocol.",
"npmPackagename": "@serve.zone/remoteingress",
"license": "MIT",
"projectDomain": "serve.zone",
"keywords": [
"remote access",
"private tunnels",
"network security",
"TLS encryption",
"connector",
"ingress tunnel",
"network edge",
"PROXY protocol",
"multiplexed tunnel",
"TCP proxy",
"TLS tunnel",
"QUIC transport",
"UDP tunneling",
"serve.zone stack",
"private clusters access",
"public access management",
"TypeScript application",
"node.js package",
"secure communications",
"TLS/SSL certificates",
"development tools",
"software development",
"private network integration"
"TypeScript",
"Rust",
"SmartProxy",
"DcRouter",
"flow control"
]
},
"release": {
+1 -1
View File
@@ -1,7 +1,7 @@
{
"json.schemas": [
{
"fileMatch": ["/npmextra.json"],
"fileMatch": ["/.smartconfig.json"],
"schema": {
"type": "object",
"properties": {
+95 -1
View File
@@ -1,5 +1,99 @@
# Changelog
## 2026-04-26 - 4.17.1 - fix(remoteingressedge)
reset nftables state on startup and restart before reapplying hub firewall config
- upgrade @push.rocks/smartnftables to ^1.2.0 to use forced cleanup and IP set blocking
- queue firewall updates until nftables is initialized and apply pending config afterward
- replace per-IP blocking with blockIPSet for the hub blocklist
- force nftables cleanup during startup, restart, firewall replacement, and shutdown to remove stale kernel rules
## 2026-04-26 - 4.17.0 - feat(core)
add performance profiles, transport observability, and edge stream budget controls
- introduce configurable performance profiles and effective per-edge limits for stream concurrency, flow-control windows, and QUIC datagram buffers
- expose hub-side edge status for transport mode, fallback usage, flow-control, queue depths, traffic counters, and UDP session metrics
- enforce edge-side stream admission limits before spawning client tunnel tasks and make TCP/TLS window sizing honor edge memory budgets under high concurrency
- increase QUIC datagram receive buffer configurability and improve hub-side QUIC UDP session tracking and idle pruning
- update hub APIs and documentation to support performance configuration and clarify quicWithFallback as the default edge transport
## 2026-04-26 - 4.16.0 - feat(performance)
add remote ingress performance controls and runtime observability
- add performance profiles and configurable stream/window budgets for hub and edge connections
- expose per-edge transport, flow-control, queue, traffic, and UDP status from hub status
- enforce edge-side stream admission before spawning client tunnel tasks
- make TCP/TLS flow control honor an edge-level memory budget under high concurrency
- increase QUIC datagram receive buffers and prune idle hub-side QUIC UDP sessions
## 2026-03-27 - 4.15.3 - fix(core)
harden UDP session handling, QUIC control message validation, and bridge process cleanup
- cap UDP session creation and drop excess datagrams with warnings to prevent unbounded session growth
- periodically prune closed datagram sessions on the hub and reject oversized QUIC control messages to avoid resource exhaustion
- clean up spawned edge and hub bridge processes on startup failure, remove listeners on stop, and avoid restarting after shutdown during backoff
## 2026-03-26 - 4.15.2 - fix(readme)
adjust tunnel diagram alignment in the README
- Improves formatting consistency in the Hub/Edge topology diagram.
## 2026-03-26 - 4.15.1 - fix(readme)
clarify unified runtime configuration and firewall update behavior
- Updates the architecture and feature descriptions to reflect that ports, firewall rules, and rate limits are pushed together in a single config update
- Clarifies that firewall configuration is delivered via FRAME_CONFIG on handshake and subsequent updates, with atomic full-rule replacement at the edge
- Simplifies and reorganizes README wording around edge and hub responsibilities without changing implementation behavior
## 2026-03-26 - 4.15.0 - feat(edge,hub)
add hub-controlled nftables firewall configuration for remote ingress edges
- add firewallConfig support to allowed edge definitions, handshake responses, and runtime config updates
- emit firewallConfigUpdated events from the Rust bridge and edge runtime when firewall settings change
- initialize SmartNftables on edges, apply blocked IPs, rate limits, and custom rules, and clean up nftables rules on stop
- document centralized firewall management, root requirements, and new edge events in the README
## 2026-03-26 - 4.14.3 - fix(docs)
refresh project metadata and README to reflect current ingress tunnel capabilities
- update package metadata description and keywords to better describe edge ingress, TLS/QUIC transport, and SmartProxy integration
- revise README terminology, API docs, and feature list to document crash recovery, bindAddress support, and current event names
- improve README formatting and examples for architecture, wire protocol, QoS, and token usage
## 2026-03-26 - 4.14.2 - fix(hub-core)
improve stream shutdown handling and connection cleanup in hub and edge
- Cancel edge upload loops immediately when the hub closes a stream instead of waiting for the window stall timeout.
- Reduce stalled stream timeouts from 120s to 55s to detect broken connections faster.
- Allow hub writer tasks to shut down gracefully before aborting to avoid unnecessary TCP resets.
- Enable TCP keepalive on hub upstream connections to detect silent SmartProxy failures.
- Remove leaked QUIC UDP session entries when setup fails or sessions end.
- Rename npmextra.json to .smartconfig.json and update package packaging references.
## 2026-03-21 - 4.14.1 - fix(remoteingress edge/hub crash recovery)
prevent duplicate crash recovery listeners and reset saved runtime state on shutdown
- Removes existing exit listeners before re-registering crash recovery handlers for edge and hub processes.
- Clears saved edge and hub configuration on stop to avoid stale restart state.
- Resets orphaned edge status intervals and restarts periodic status logging after successful crash recovery.
## 2026-03-20 - 4.14.0 - feat(quic)
add QUIC stability test coverage and bridge logging for hub and edge
- adds a long-running QUIC stability test with periodic echo probes and disconnect detection
- enables prefixed bridge logging for RemoteIngressHub and RemoteIngressEdge to improve runtime diagnostics
## 2026-03-20 - 4.13.2 - fix(remoteingress-core)
preserve reconnected edge entries during disconnect cleanup
- Guard edge removal so disconnect handlers only delete entries whose cancel token is already cancelled
- Prevents stale TCP and QUIC disconnect paths from removing a newer connection after an edge reconnects
## 2026-03-19 - 4.13.1 - fix(remoteingress-core)
default edge transport mode to QUIC with fallback
- Changes the default transport mode in edge connections from TCP/TLS to QUIC with fallback when no transport mode is explicitly configured.
## 2026-03-19 - 4.13.0 - feat(docs)
document TCP and UDP tunneling over TLS and QUIC
@@ -448,4 +542,4 @@ Core updates and fixes.
## 2024-03-24 - 1.0.1 - core
Core updates and fixes.
- fix(core): update
- fix(core): update
+21
View File
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2024 Task Venture Capital GmbH
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
+10 -9
View File
@@ -1,6 +1,6 @@
{
"name": "@serve.zone/remoteingress",
"version": "4.13.0",
"version": "4.17.1",
"private": false,
"description": "Edge ingress tunnel for DcRouter - tunnels TCP and UDP traffic from the network edge to SmartProxy over TLS or QUIC, preserving client IP via PROXY protocol.",
"main": "dist_ts/index.js",
@@ -14,17 +14,18 @@
"buildDocs": "(tsdoc)"
},
"devDependencies": {
"@git.zone/tsbuild": "^4.1.2",
"@git.zone/tsbundle": "^2.8.3",
"@git.zone/tsrun": "^2.0.1",
"@git.zone/tsrust": "^1.3.0",
"@git.zone/tstest": "^3.1.8",
"@git.zone/tsbuild": "^4.4.0",
"@git.zone/tsbundle": "^2.10.0",
"@git.zone/tsrun": "^2.0.2",
"@git.zone/tsrust": "^1.3.2",
"@git.zone/tstest": "^3.6.0",
"@push.rocks/tapbundle": "^6.0.3",
"@types/node": "^25.3.0"
"@types/node": "^25.5.0"
},
"dependencies": {
"@push.rocks/qenv": "^6.1.3",
"@push.rocks/smartrust": "^1.2.1"
"@push.rocks/smartnftables": "^1.2.0",
"@push.rocks/smartrust": "^1.3.2"
},
"repository": {
"type": "git",
@@ -47,7 +48,7 @@
"dist_rust/**/*",
"assets/**/*",
"cli.js",
"npmextra.json",
".smartconfig.json",
"readme.md"
],
"keywords": [
+1561 -1175
View File
File diff suppressed because it is too large Load Diff
+168 -56
View File
@@ -1,6 +1,6 @@
# @serve.zone/remoteingress
Edge ingress tunnel for DcRouter — tunnels **TCP and UDP** traffic from the network edge to a private DcRouter/SmartProxy cluster over encrypted TLS or QUIC connections, preserving the original client IP via PROXY protocol.
Edge ingress tunnel for DcRouter — tunnels **TCP and UDP** traffic from the network edge to a private DcRouter/SmartProxy cluster over encrypted TLS or QUIC connections, preserving the original client IP via PROXY protocol. Includes **hub-controlled nftables firewall** for IP blocking, rate limiting, and custom firewall rules applied directly at the edge.
## Issue Reporting and Security
@@ -12,19 +12,20 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
pnpm install @serve.zone/remoteingress
```
## 🏗️ Architecture
## Architecture
`@serve.zone/remoteingress` uses a **Hub/Edge** topology with a high-performance Rust core and a TypeScript API surface:
```
┌─────────────────────┐ TLS or QUIC Tunnel ┌─────────────────────┐
│ Network Edge │ ◄══════════════════════════► │ Private Cluster │
│ TCP+TLS: frame mux │
RemoteIngressEdge │ QUIC: native streams │ RemoteIngressHub
│ UDP: QUIC datagrams
Accepts TCP & UDP │ │ Forwards to
on hub-assigned │ │ SmartProxy on
ports │ │ local ports
TLS or QUIC Tunnel
┌─────────────────────┐ ◄══════════════════════════► ┌─────────────────────┐
Network Edge │ TCP+TLS: frame mux │ Private Cluster
│ QUIC: native streams │
RemoteIngressEdge │ UDP: QUIC datagrams │ RemoteIngressHub
│ │
• TCP/UDP listeners│ ◄─── FRAME_CONFIG pushes ─── │ • Port assignments
• nftables firewall│ ports + firewall rules • Firewall config
│ • Rate limiting │ at any time │ • Rate limit rules │
└─────────────────────┘ └─────────────────────┘
▲ │
│ TCP + UDP from end users ▼
@@ -33,28 +34,30 @@ pnpm install @serve.zone/remoteingress
| Component | Role |
|-----------|------|
| **RemoteIngressEdge** | Deployed at the network edge (VPS, cloud instance). Listens on TCP and UDP ports assigned by the hub, accepts connections/datagrams, and tunnels them to the hub. Ports are hot-reloadable at runtime. |
| **RemoteIngressHub** | Deployed alongside DcRouter/SmartProxy in a private cluster. Accepts edge connections, demuxes streams/datagrams, and forwards each to SmartProxy with PROXY protocol headers so the real client IP is preserved. |
| **RemoteIngressEdge** | Deployed at the network edge (VPS, cloud instance). Runs as root. Listens on hub-assigned TCP/UDP ports, tunnels traffic to the hub, and applies hub-pushed nftables rules (IP blocking, rate limiting). All config is hot-reloadable at runtime. |
| **RemoteIngressHub** | Deployed alongside DcRouter/SmartProxy in a private cluster. Accepts edge connections, demuxes streams/datagrams, and forwards each to SmartProxy with PROXY protocol headers so the real client IP is preserved. Pushes all edge config (ports, firewall) via a single API. |
| **Rust Binary** (`remoteingress-bin`) | The performance-critical networking core. Managed via `@push.rocks/smartrust` RustBridge IPC — you never interact with it directly. Cross-compiled for `linux/amd64` and `linux/arm64`. |
### Key Features
### Key Features
- 🔒 **Dual transport** — choose between TCP+TLS (frame-multiplexed) or QUIC (native stream multiplexing, zero head-of-line blocking)
- 🌐 **TCP + UDP tunneling** — tunnel any TCP connection or UDP datagram through the same edge/hub pair
- 📋 **PROXY protocol v1 & v2** — SmartProxy sees the real client IP for both TCP (v1 text) and UDP (v2 binary)
- 🔀 **Multiplexed streams**thousands of concurrent TCP connections over a single tunnel
- **QUIC datagrams**UDP traffic forwarded via QUIC unreliable datagrams for lowest possible latency
- 🔑 **Shared-secret authentication**edges must present valid credentials to connect
- 🎫 **Connection tokens** — encode all connection details into a single opaque base64url string
- 📡 **STUN-based public IP discovery** — edges automatically discover their public IP via Cloudflare STUN
- 🔄 **Auto-reconnect** with exponential backoff if the tunnel drops
- 🎛️ **Dynamic port configuration** — the hub assigns TCP and UDP listen ports per edge, hot-reloadable at runtime
- 📣 **Event-driven**both Hub and Edge extend `EventEmitter` for real-time monitoring
- 🎚️ **3-tier QoS**control frames, normal data, and sustained (elephant flow) traffic each get their own priority queue
- 📊 **Adaptive flow control** — per-stream windows scale with active stream count to prevent memory overuse
- 🕒 **UDP session management**automatic session tracking with 60s idle timeout and cleanup
- **Dual transport** — choose between TCP+TLS (frame-multiplexed) or QUIC (native stream multiplexing, zero head-of-line blocking)
- **TCP + UDP tunneling** — tunnel any TCP connection or UDP datagram through the same edge/hub pair
- **PROXY protocol v1 & v2** — SmartProxy sees the real client IP for both TCP (v1 text) and UDP (v2 binary)
- **Hub-controlled firewall** — push nftables rules (IP blocking, rate limiting, custom firewall rules) to edges as part of the same config update that assigns ports — powered by `@push.rocks/smartnftables`
- **Multiplexed streams** — thousands of concurrent TCP connections over a single tunnel
- **QUIC datagrams** — UDP traffic forwarded via QUIC unreliable datagrams for lowest possible latency
- **Shared-secret authentication** — edges must present valid credentials to connect
- **Connection tokens** — encode all connection details into a single opaque base64url string
- **STUN-based public IP discovery** — edges automatically discover their public IP via Cloudflare STUN
- **Auto-reconnect** with exponential backoff if the tunnel drops
- **Dynamic runtime configuration** — the hub pushes ports, firewall rules, and rate limits to edges at any time via a single `updateAllowedEdges()` call
- **Event-driven** — both Hub and Edge extend `EventEmitter` for real-time monitoring
- **3-tier QoS** — control frames, normal data, and sustained (elephant flow) traffic each get their own priority queue
- **Adaptive flow control** — per-stream windows scale with active stream count to prevent memory overuse
- **UDP session management** — automatic session tracking with 60s idle timeout and cleanup
- **Crash recovery** — automatic restart with exponential backoff if the Rust binary crashes unexpectedly
## 🚀 Usage
## Usage
Both classes are imported from the package and communicate with the Rust binary under the hood.
@@ -77,7 +80,7 @@ await hub.start({
targetHost: '127.0.0.1', // SmartProxy host to forward traffic to
});
// Register allowed edges with TCP and UDP listen ports
// Register allowed edges with TCP and UDP listen ports + firewall config
await hub.updateAllowedEdges([
{
id: 'edge-nyc-01',
@@ -85,6 +88,15 @@ await hub.updateAllowedEdges([
listenPorts: [80, 443], // TCP ports the edge should listen on
listenPortsUdp: [53, 51820], // UDP ports (e.g., DNS, WireGuard)
stunIntervalSecs: 300,
firewallConfig: {
blockedIps: ['192.168.1.100', '10.0.0.0/8'],
rateLimits: [
{ id: 'http-rate', port: 80, protocol: 'tcp', rate: '100/second', perSourceIP: true },
],
rules: [
{ id: 'allow-ssh', direction: 'input', action: 'accept', sourceIP: '10.0.0.0/24', destPort: 22, protocol: 'tcp' },
],
},
},
{
id: 'edge-fra-02',
@@ -93,13 +105,19 @@ await hub.updateAllowedEdges([
},
]);
// Dynamically update ports — changes are pushed instantly to connected edges
// Dynamically update ports and firewall — changes are pushed instantly to connected edges
await hub.updateAllowedEdges([
{
id: 'edge-nyc-01',
secret: 'supersecrettoken1',
listenPorts: [80, 443, 8443], // added TCP port 8443
listenPortsUdp: [53], // removed WireGuard UDP port
firewallConfig: {
blockedIps: ['192.168.1.100', '10.0.0.0/8', '203.0.113.50'], // added new blocked IP
rateLimits: [
{ id: 'http-rate', port: 80, protocol: 'tcp', rate: '200/second', perSourceIP: true },
],
},
},
]);
@@ -112,7 +130,7 @@ await hub.stop();
### Setting Up the Edge (Network Edge Side)
The edge can connect via **TCP+TLS** (default) or **QUIC** transport.
The edge connects via **QUIC with TCP+TLS fallback** by default. Edges run as **root** so they can bind to privileged ports and apply nftables firewall rules.
#### Option A: Connection Token (Recommended)
@@ -125,6 +143,7 @@ edge.on('tunnelConnected', () => console.log('Tunnel established'));
edge.on('tunnelDisconnected', () => console.log('Tunnel lost — will auto-reconnect'));
edge.on('publicIpDiscovered', ({ ip }) => console.log(`Public IP: ${ip}`));
edge.on('portsAssigned', ({ listenPorts }) => console.log(`TCP ports: ${listenPorts}`));
edge.on('firewallConfigUpdated', () => console.log('Firewall rules applied'));
await edge.start({
token: 'eyJoIjoiaHViLmV4YW1wbGUuY29tIiwi...',
@@ -143,7 +162,7 @@ await edge.start({
hubPort: 8443,
edgeId: 'edge-nyc-01',
secret: 'supersecrettoken1',
transportMode: 'quic', // 'tcpTls' (default) | 'quic' | 'quicWithFallback'
transportMode: 'quic', // 'tcpTls' | 'quic' | 'quicWithFallback' (default)
});
const edgeStatus = await edge.getStatus();
@@ -156,11 +175,11 @@ await edge.stop();
| Mode | Description |
|------|-------------|
| `'tcpTls'` | **Default.** Single TLS connection with frame-based multiplexing. Universal compatibility. |
| `'tcpTls'` | Single TLS connection with frame-based multiplexing. Universal compatibility. |
| `'quic'` | QUIC with native stream multiplexing. Eliminates head-of-line blocking. Uses QUIC datagrams for UDP traffic. |
| `'quicWithFallback'` | Tries QUIC first (5s timeout), falls back to TCP+TLS if UDP is blocked by the network. |
| `'quicWithFallback'` | **Default.** Tries QUIC first (5s timeout), falls back to TCP+TLS if UDP is blocked by the network. |
### 🎫 Connection Tokens
### Connection Tokens
Encode all connection details into a single opaque string for easy distribution:
@@ -183,30 +202,92 @@ const data = decodeConnectionToken(token);
Tokens are base64url-encoded — safe for environment variables, CLI arguments, and config files.
## 📖 API Reference
## 🔥 Firewall Config
The `firewallConfig` field in `updateAllowedEdges()` works exactly like `listenPorts` — it travels in the same `FRAME_CONFIG` frame, is delivered on initial handshake and on every subsequent update, and is applied atomically at the edge using `@push.rocks/smartnftables`. Each update fully replaces the previous ruleset.
Since edges run as root, the rules are applied directly to the Linux kernel via nftables. If the edge isn't root or nftables is unavailable, it logs a warning and continues — the tunnel works fine, just without kernel-level firewall rules.
### Config Structure
```typescript
interface IFirewallConfig {
blockedIps?: string[]; // IPs or CIDRs to block (e.g., '1.2.3.4', '10.0.0.0/8')
rateLimits?: IFirewallRateLimit[];
rules?: IFirewallRule[];
}
interface IFirewallRateLimit {
id: string; // unique identifier for this rate limit
port: number; // port to rate-limit
protocol?: 'tcp' | 'udp'; // default: both
rate: string; // e.g., '100/second', '1000/minute'
burst?: number; // burst allowance
perSourceIP?: boolean; // per-client rate limiting (recommended)
}
interface IFirewallRule {
id: string; // unique identifier for this rule
direction: 'input' | 'output' | 'forward';
action: 'accept' | 'drop' | 'reject';
sourceIP?: string; // source IP or CIDR
destPort?: number; // destination port
protocol?: 'tcp' | 'udp';
comment?: string;
}
```
### Example: Rate Limiting + IP Blocking
```typescript
await hub.updateAllowedEdges([
{
id: 'edge-nyc-01',
secret: 'secret',
listenPorts: [80, 443],
firewallConfig: {
// Block known bad actors
blockedIps: ['198.51.100.0/24', '203.0.113.50'],
// Rate limit HTTP traffic per source IP
rateLimits: [
{ id: 'http', port: 80, protocol: 'tcp', rate: '100/second', burst: 50, perSourceIP: true },
{ id: 'https', port: 443, protocol: 'tcp', rate: '200/second', burst: 100, perSourceIP: true },
],
// Allow monitoring from trusted subnet
rules: [
{ id: 'monitoring', direction: 'input', action: 'accept', sourceIP: '10.0.0.0/24', destPort: 9090, protocol: 'tcp', comment: 'Prometheus scraping' },
],
},
},
]);
```
## API Reference
### `RemoteIngressHub`
| Method / Property | Description |
|-------------------|-------------|
| `start(config?)` | Start the hub. Config: `{ tunnelPort?: number, targetHost?: string }`. Listens on both TCP and UDP (QUIC) on the tunnel port. |
| `start(config?)` | Start the hub. Config: `{ tunnelPort?, targetHost?, tls?: { certPem?, keyPem? } }`. Listens on both TCP and UDP (QUIC) on the tunnel port. |
| `stop()` | Graceful shutdown. |
| `updateAllowedEdges(edges)` | Set authorized edges. Each: `{ id, secret, listenPorts?, listenPortsUdp?, stunIntervalSecs? }`. Port changes are pushed to connected edges in real time. |
| `updateAllowedEdges(edges)` | Set authorized edges. Each: `{ id, secret, listenPorts?, listenPortsUdp?, stunIntervalSecs?, firewallConfig? }`. Port and firewall changes are pushed to connected edges in real time. |
| `getStatus()` | Returns `{ running, tunnelPort, connectedEdges: [...] }`. |
| `running` | `boolean` — whether the Rust binary is alive. |
**Events:** `edgeConnected`, `edgeDisconnected`, `streamOpened`, `streamClosed`
**Events:** `edgeConnected`, `edgeDisconnected`, `streamOpened`, `streamClosed`, `crashRecovered`, `crashRecoveryFailed`
### `RemoteIngressEdge`
| Method / Property | Description |
|-------------------|-------------|
| `start(config)` | Connect to hub. Accepts `{ token }` or `{ hubHost, hubPort, edgeId, secret, transportMode? }`. |
| `stop()` | Graceful shutdown. |
| `start(config)` | Connect to hub. Accepts `{ token }` or `{ hubHost, hubPort, edgeId, secret, bindAddress?, transportMode? }`. |
| `stop()` | Graceful shutdown. Cleans up all nftables rules. |
| `getStatus()` | Returns `{ running, connected, publicIp, activeStreams, listenPorts }`. |
| `running` | `boolean` — whether the Rust binary is alive. |
**Events:** `tunnelConnected`, `tunnelDisconnected`, `publicIpDiscovered`, `portsAssigned`, `portsUpdated`
**Events:** `tunnelConnected`, `tunnelDisconnected`, `publicIpDiscovered`, `portsAssigned`, `portsUpdated`, `firewallConfigUpdated`, `crashRecovered`, `crashRecoveryFailed`
### Token Utilities
@@ -221,6 +302,10 @@ Tokens are base64url-encoded — safe for environment variables, CLI arguments,
interface IHubConfig {
tunnelPort?: number; // default: 8443
targetHost?: string; // default: '127.0.0.1'
tls?: {
certPem?: string; // PEM-encoded TLS certificate
keyPem?: string; // PEM-encoded TLS private key
};
}
interface IEdgeConfig {
@@ -240,7 +325,7 @@ interface IConnectionTokenData {
}
```
## 🔌 Wire Protocol
## Wire Protocol
### TCP+TLS Transport (Frame Protocol)
@@ -257,7 +342,7 @@ The tunnel uses a custom binary frame protocol over a single TLS connection:
| `CLOSE` | `0x03` | Edge → Hub | Client closed connection |
| `DATA_BACK` | `0x04` | Hub → Edge | Response data (download) |
| `CLOSE_BACK` | `0x05` | Hub → Edge | Upstream closed connection |
| `CONFIG` | `0x06` | Hub → Edge | Runtime config update (JSON payload) |
| `CONFIG` | `0x06` | Hub → Edge | Runtime config update (JSON: ports + firewall config) |
| `PING` | `0x07` | Hub → Edge | Heartbeat probe (every 15s) |
| `PONG` | `0x08` | Edge → Hub | Heartbeat response |
| `WINDOW_UPDATE` | `0x09` | Edge → Hub | Flow control: edge consumed N bytes |
@@ -280,19 +365,20 @@ When using QUIC, the frame protocol is replaced by native QUIC primitives:
1. Edge opens a TLS or QUIC connection to the hub
2. Edge sends: `EDGE <edgeId> <secret>\n`
3. Hub verifies credentials (constant-time comparison) and responds with JSON:
`{"listenPorts":[...],"listenPortsUdp":[...],"stunIntervalSecs":300}\n`
`{"listenPorts":[...],"listenPortsUdp":[...],"stunIntervalSecs":300,"firewallConfig":{...}}\n`
4. Edge starts TCP and UDP listeners on the assigned ports
5. Data flows — TCP frames/QUIC streams for TCP traffic, UDP frames/QUIC datagrams for UDP traffic
5. Edge applies firewall config via nftables (if present and running as root)
6. Data flows — TCP frames/QUIC streams for TCP traffic, UDP frames/QUIC datagrams for UDP traffic
## 🎚️ QoS & Flow Control
## QoS & Flow Control
### Priority Tiers (TCP+TLS Transport)
| Tier | Frames | Behavior |
|------|--------|----------|
| 🔴 **Control** | PING, PONG, WINDOW_UPDATE, OPEN, CLOSE, CONFIG | Always drained first. Never delayed. |
| 🟡 **Data** | DATA/DATA_BACK from normal streams, UDP frames | Drained when control queue is empty. |
| 🟢 **Sustained** | DATA/DATA_BACK from elephant flows | Lowest priority with guaranteed **1 MB/s** drain rate. |
| **Control** | PING, PONG, WINDOW_UPDATE, OPEN, CLOSE, CONFIG | Always drained first. Never delayed. |
| **Data** | DATA/DATA_BACK from normal streams, UDP frames | Drained when control queue is empty. |
| **Sustained** | DATA/DATA_BACK from elephant flows | Lowest priority with guaranteed **1 MB/s** drain rate. |
### Sustained Stream Classification
@@ -314,17 +400,17 @@ Each TCP stream has a send window from a shared **200 MB budget**:
UDP traffic uses no flow control — datagrams are fire-and-forget, matching UDP semantics.
## 💡 Example Scenarios
## Example Scenarios
### 1. Expose a Private Cluster to the Internet
### 1. 🌐 Expose a Private Cluster to the Internet
Deploy an Edge on a public VPS, point DNS to its IP. The Edge tunnels all TCP and UDP traffic to the Hub running inside your private cluster. No public ports needed on the cluster.
### 2. Multi-Region Edge Ingress
### 2. 🗺️ Multi-Region Edge Ingress
Run Edges in NYC, Frankfurt, and Tokyo — all connecting to a single Hub. Use GeoDNS to route users to their nearest Edge. PROXY protocol ensures the Hub sees real client IPs regardless of which Edge they entered through.
### 3. UDP Forwarding (DNS, Gaming, VoIP)
### 3. 📡 UDP Forwarding (DNS, Gaming, VoIP)
Configure UDP listen ports alongside TCP ports. DNS queries, game server traffic, or VoIP packets are tunneled through the same edge/hub connection and forwarded to SmartProxy with a PROXY v2 binary header preserving the client's real IP.
@@ -339,7 +425,7 @@ await hub.updateAllowedEdges([
]);
```
### 4. QUIC Transport for Low-Latency
### 4. 🚀 QUIC Transport for Low-Latency
Use QUIC transport to eliminate head-of-line blocking — a lost packet on one stream doesn't stall others. QUIC also enables 0-RTT reconnection and connection migration.
@@ -353,11 +439,13 @@ await edge.start({
});
```
### 5. Token-Based Edge Provisioning
### 5. 🔑 Token-Based Edge Provisioning
Generate connection tokens on the hub side and distribute them to edge operators:
```typescript
import { encodeConnectionToken, RemoteIngressEdge } from '@serve.zone/remoteingress';
const token = encodeConnectionToken({
hubHost: 'hub.prod.example.com',
hubPort: 8443,
@@ -370,6 +458,30 @@ const edge = new RemoteIngressEdge();
await edge.start({ token });
```
### 6. 🛡️ Centralized Firewall Management
Push firewall rules from the hub to all your edge nodes. Block bad actors, rate-limit abusive traffic, and whitelist trusted subnets — all from a single control plane:
```typescript
await hub.updateAllowedEdges([
{
id: 'edge-nyc-01',
secret: 'secret',
listenPorts: [80, 443],
firewallConfig: {
blockedIps: ['198.51.100.0/24'],
rateLimits: [
{ id: 'https', port: 443, protocol: 'tcp', rate: '500/second', perSourceIP: true, burst: 100 },
],
rules: [
{ id: 'allow-monitoring', direction: 'input', action: 'accept', sourceIP: '10.0.0.0/8', destPort: 9090, protocol: 'tcp' },
],
},
},
]);
// Firewall rules are applied at the edge via nftables within seconds
```
## License and Legal Information
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
@@ -316,6 +316,12 @@ async fn handle_request(
serde_json::json!({ "listenPorts": listen_ports }),
);
}
EdgeEvent::FirewallConfigUpdated { firewall_config } => {
send_event(
"firewallConfigUpdated",
serde_json::json!({ "firewallConfig": firewall_config }),
);
}
}
}
});
+143 -41
View File
@@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize};
use bytes::Bytes;
use remoteingress_protocol::*;
use crate::performance::EffectivePerformanceConfig;
use crate::transport::TransportMode;
use crate::transport::quic as quic_transport;
use crate::udp_session::{UdpSessionKey, UdpSessionManager};
@@ -36,6 +37,9 @@ struct EdgeStreamState {
send_window: Arc<AtomicU32>,
/// Notifier to wake the client reader when the window opens.
window_notify: Arc<Notify>,
/// Per-stream cancellation token — cancelled on FRAME_CLOSE_BACK to promptly
/// terminate the upload loop instead of waiting for the window stall timeout.
cancel_token: CancellationToken,
}
/// Edge configuration (hub-host + credentials only; ports come from hub).
@@ -50,7 +54,7 @@ pub struct EdgeConfig {
/// Useful for testing on localhost where edge and upstream share the same machine.
#[serde(default)]
pub bind_address: Option<String>,
/// Transport mode for the tunnel connection (defaults to TcpTls).
/// Transport mode for the tunnel connection (defaults to QuicWithFallback).
#[serde(default)]
pub transport_mode: Option<TransportMode>,
}
@@ -64,12 +68,55 @@ struct HandshakeConfig {
listen_ports_udp: Vec<u16>,
#[serde(default = "default_stun_interval")]
stun_interval_secs: u64,
#[serde(default)]
firewall_config: Option<serde_json::Value>,
#[serde(default)]
performance: EffectivePerformanceConfig,
}
fn default_stun_interval() -> u64 {
300
}
fn try_reserve_stream(active_streams: &AtomicU32, max_streams: usize) -> bool {
let max_streams = max_streams.min(u32::MAX as usize) as u32;
loop {
let current = active_streams.load(Ordering::Relaxed);
if current >= max_streams {
return false;
}
if active_streams
.compare_exchange_weak(current, current + 1, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
return true;
}
}
}
fn release_stream(active_streams: &AtomicU32) {
loop {
let current = active_streams.load(Ordering::Relaxed);
if current == 0 {
break;
}
if active_streams
.compare_exchange_weak(current, current - 1, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
break;
}
}
}
fn transport_mode_wire_name(mode: TransportMode) -> &'static str {
match mode {
TransportMode::TcpTls => "tcpTls",
TransportMode::Quic => "quic",
TransportMode::QuicWithFallback => "quicWithFallback",
}
}
/// Runtime config update received from hub via FRAME_CONFIG.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -77,6 +124,10 @@ struct ConfigUpdate {
listen_ports: Vec<u16>,
#[serde(default)]
listen_ports_udp: Vec<u16>,
#[serde(default)]
firewall_config: Option<serde_json::Value>,
#[serde(default)]
performance: EffectivePerformanceConfig,
}
/// Events emitted by the edge.
@@ -93,6 +144,8 @@ pub enum EdgeEvent {
PortsAssigned { listen_ports: Vec<u16> },
#[serde(rename_all = "camelCase")]
PortsUpdated { listen_ports: Vec<u16> },
#[serde(rename_all = "camelCase")]
FirewallConfigUpdated { firewall_config: serde_json::Value },
}
/// Edge status response.
@@ -220,7 +273,7 @@ async fn edge_main_loop(
let mut backoff_ms: u64 = 1000;
let max_backoff_ms: u64 = 30000;
let transport_mode = config.transport_mode.unwrap_or(TransportMode::TcpTls);
let transport_mode = config.transport_mode.unwrap_or(TransportMode::QuicWithFallback);
// Build TLS config ONCE outside the reconnect loop — preserves session
// cache across reconnections for TLS session resumption (saves 1 RTT).
@@ -369,6 +422,7 @@ async fn handle_edge_frame(
bind_address: &str,
udp_sessions: &Arc<Mutex<UdpSessionManager>>,
udp_sockets: &Arc<Mutex<HashMap<u16, Arc<UdpSocket>>>>,
performance: &mut EffectivePerformanceConfig,
) -> EdgeFrameAction {
match frame.frame_type {
FRAME_DATA_BACK => {
@@ -389,8 +443,8 @@ async fn handle_edge_frame(
let writers = client_writers.lock().await;
if let Some(state) = writers.get(&frame.stream_id) {
let prev = state.send_window.fetch_add(increment, Ordering::Release);
if prev + increment > MAX_WINDOW_SIZE {
state.send_window.store(MAX_WINDOW_SIZE, Ordering::Release);
if prev + increment > performance.max_stream_window_bytes {
state.send_window.store(performance.max_stream_window_bytes, Ordering::Release);
}
state.window_notify.notify_one();
}
@@ -399,11 +453,16 @@ async fn handle_edge_frame(
}
FRAME_CLOSE_BACK => {
let mut writers = client_writers.lock().await;
writers.remove(&frame.stream_id);
if let Some(state) = writers.remove(&frame.stream_id) {
// Cancel the stream's token so the upload loop exits promptly
// instead of waiting for the window stall timeout.
state.cancel_token.cancel();
}
}
FRAME_CONFIG => {
if let Ok(update) = serde_json::from_slice::<ConfigUpdate>(&frame.payload) {
log::info!("Config update from hub: ports {:?}, udp {:?}", update.listen_ports, update.listen_ports_udp);
*performance = update.performance.clone();
*listen_ports.write().await = update.listen_ports.clone();
let _ = event_tx.try_send(EdgeEvent::PortsUpdated {
listen_ports: update.listen_ports.clone(),
@@ -420,6 +479,7 @@ async fn handle_edge_frame(
edge_id,
connection_token,
bind_address,
performance,
);
apply_udp_port_config(
&update.listen_ports_udp,
@@ -432,6 +492,11 @@ async fn handle_edge_frame(
connection_token,
bind_address,
);
if let Some(fw_config) = update.firewall_config {
let _ = event_tx.try_send(EdgeEvent::FirewallConfigUpdated {
firewall_config: fw_config,
});
}
}
}
FRAME_PING => {
@@ -506,7 +571,13 @@ async fn connect_to_hub_and_run(
};
// Send auth line (we own the whole stream — no split)
let auth_line = format!("EDGE {} {}\n", config.edge_id, config.secret);
let requested_transport = config.transport_mode.unwrap_or(TransportMode::QuicWithFallback);
let auth_line = format!(
"EDGE {} {} {}\n",
config.edge_id,
config.secret,
transport_mode_wire_name(requested_transport),
);
if tls_stream.write_all(auth_line.as_bytes()).await.is_err() {
return EdgeLoopResult::Reconnect("auth_write_failed".to_string());
}
@@ -545,6 +616,7 @@ async fn connect_to_hub_and_run(
return EdgeLoopResult::Reconnect(format!("handshake_invalid: {}", e));
}
};
let mut performance = handshake.performance.clone();
log::info!(
"Handshake from hub: ports {:?}, stun_interval {}s",
@@ -562,6 +634,13 @@ async fn connect_to_hub_and_run(
listen_ports: handshake.listen_ports.clone(),
});
// Emit firewall config if present in handshake
if let Some(fw_config) = handshake.firewall_config {
let _ = event_tx.try_send(EdgeEvent::FirewallConfigUpdated {
firewall_config: fw_config,
});
}
// Start STUN discovery
let stun_interval = handshake.stun_interval_secs;
let public_ip_clone = public_ip.clone();
@@ -615,6 +694,7 @@ async fn connect_to_hub_and_run(
&config.edge_id,
connection_token,
bind_address,
&performance,
);
// UDP session manager + listeners
@@ -675,7 +755,7 @@ async fn connect_to_hub_and_run(
frame, &mut tunnel_io, &client_writers, listen_ports, event_tx,
&tunnel_writer_tx, &tunnel_data_tx, &tunnel_sustained_tx, &mut port_listeners,
&mut udp_listeners, active_streams, next_stream_id, &config.edge_id,
connection_token, bind_address, &udp_sessions, &udp_sockets,
connection_token, bind_address, &udp_sessions, &udp_sockets, &mut performance,
).await {
break 'io_loop EdgeLoopResult::Reconnect(reason);
}
@@ -694,7 +774,7 @@ async fn connect_to_hub_and_run(
frame, &mut tunnel_io, &client_writers, listen_ports, event_tx,
&tunnel_writer_tx, &tunnel_data_tx, &tunnel_sustained_tx, &mut port_listeners,
&mut udp_listeners, active_streams, next_stream_id, &config.edge_id,
connection_token, bind_address, &udp_sessions, &udp_sockets,
connection_token, bind_address, &udp_sessions, &udp_sockets, &mut performance,
).await {
break EdgeLoopResult::Reconnect(reason);
}
@@ -760,6 +840,7 @@ fn apply_port_config(
edge_id: &str,
connection_token: &CancellationToken,
bind_address: &str,
performance: &EffectivePerformanceConfig,
) {
let new_set: std::collections::HashSet<u16> = new_ports.iter().copied().collect();
let old_set: std::collections::HashSet<u16> = port_listeners.keys().copied().collect();
@@ -782,6 +863,7 @@ fn apply_port_config(
let next_stream_id = next_stream_id.clone();
let edge_id = edge_id.to_string();
let port_token = connection_token.child_token();
let performance = performance.clone();
let bind_addr = bind_address.to_string();
let handle = tokio::spawn(async move {
@@ -817,8 +899,12 @@ fn apply_port_config(
let edge_id = edge_id.clone();
let client_token = port_token.child_token();
active_streams.fetch_add(1, Ordering::Relaxed);
if !try_reserve_stream(&active_streams, performance.max_streams_per_edge) {
log::warn!("Rejecting client on port {}: max streams ({}) reached", port, performance.max_streams_per_edge);
continue;
}
let performance = performance.clone();
tokio::spawn(async move {
handle_client_connection(
client_stream,
@@ -832,20 +918,10 @@ fn apply_port_config(
client_writers,
client_token,
Arc::clone(&active_streams),
performance,
)
.await;
// Saturating decrement: prevent underflow when
// edge_main_loop's store(0) races with task cleanup.
loop {
let current = active_streams.load(Ordering::Relaxed);
if current == 0 { break; }
if active_streams.compare_exchange_weak(
current, current - 1,
Ordering::Relaxed, Ordering::Relaxed,
).is_ok() {
break;
}
}
release_stream(&active_streams);
});
}
Err(e) => {
@@ -929,7 +1005,10 @@ fn apply_udp_port_config(
} else {
// New session — allocate stream_id and send UDP_OPEN
let sid = next_stream_id.fetch_add(1, Ordering::Relaxed);
sessions.insert(key, sid);
if sessions.insert(key, sid).is_none() {
log::warn!("UDP session limit reached, dropping datagram from {}", client_addr);
continue;
}
let client_ip = client_addr.ip().to_string();
let client_port = client_addr.port();
@@ -977,6 +1056,7 @@ async fn handle_client_connection(
client_writers: Arc<Mutex<HashMap<u32, EdgeStreamState>>>,
client_token: CancellationToken,
active_streams: Arc<AtomicU32>,
performance: EffectivePerformanceConfig,
) {
let client_ip = client_addr.ip().to_string();
let client_port = client_addr.port();
@@ -1000,9 +1080,12 @@ async fn handle_client_connection(
// streams due to channel overflow — backpressure slows streams, never kills them.
let (back_tx, mut back_rx) = mpsc::unbounded_channel::<Bytes>();
// Adaptive initial window: scale with current stream count to keep total in-flight
// data within the 200MB budget. Prevents burst flooding when many streams open.
let initial_window = remoteingress_protocol::compute_window_for_stream_count(
// data within the configured edge budget. Prevents burst flooding when many streams open.
let initial_window = remoteingress_protocol::compute_window_for_limits(
active_streams.load(Ordering::Relaxed),
performance.total_window_budget_bytes,
performance.min_stream_window_bytes,
performance.max_stream_window_bytes,
);
let send_window = Arc::new(AtomicU32::new(initial_window));
let window_notify = Arc::new(Notify::new());
@@ -1012,6 +1095,7 @@ async fn handle_client_connection(
back_tx,
send_window: Arc::clone(&send_window),
window_notify: Arc::clone(&window_notify),
cancel_token: client_token.clone(),
});
}
@@ -1038,8 +1122,11 @@ async fn handle_client_connection(
// effective window shrinks to match current demand (fewer streams
// = larger window, more streams = smaller window per stream).
consumed_since_update += len;
let adaptive_window = remoteingress_protocol::compute_window_for_stream_count(
let adaptive_window = remoteingress_protocol::compute_window_for_limits(
active_streams_h2c.load(Ordering::Relaxed),
performance.total_window_budget_bytes,
performance.min_stream_window_bytes,
performance.max_stream_window_bytes,
);
let threshold = adaptive_window / 2;
if consumed_since_update >= threshold {
@@ -1093,8 +1180,8 @@ async fn handle_client_connection(
tokio::select! {
_ = notified => continue,
_ = client_token.cancelled() => break,
_ = tokio::time::sleep(Duration::from_secs(120)) => {
log::warn!("Stream {} upload stalled (window empty for 120s)", stream_id);
_ = tokio::time::sleep(Duration::from_secs(55)) => {
log::warn!("Stream {} upload stalled (window empty for 55s)", stream_id);
break;
}
}
@@ -1253,7 +1340,13 @@ async fn connect_to_hub_and_run_quic_with_connection(
};
// Auth handshake on control stream (same protocol as TCP+TLS)
let auth_line = format!("EDGE {} {}\n", config.edge_id, config.secret);
let requested_transport = config.transport_mode.unwrap_or(TransportMode::QuicWithFallback);
let auth_line = format!(
"EDGE {} {} {}\n",
config.edge_id,
config.secret,
transport_mode_wire_name(requested_transport),
);
if let Err(e) = ctrl_send.write_all(auth_line.as_bytes()).await {
return EdgeLoopResult::Reconnect(format!("quic_auth_write_failed: {}", e));
}
@@ -1285,6 +1378,7 @@ async fn connect_to_hub_and_run_quic_with_connection(
return EdgeLoopResult::Reconnect(format!("quic_handshake_invalid: {}", e));
}
};
let mut performance = handshake.performance.clone();
log::info!(
"QUIC handshake from hub: ports {:?}, stun_interval {}s",
@@ -1301,6 +1395,13 @@ async fn connect_to_hub_and_run_quic_with_connection(
listen_ports: handshake.listen_ports.clone(),
});
// Emit firewall config if present in handshake
if let Some(fw_config) = handshake.firewall_config {
let _ = event_tx.try_send(EdgeEvent::FirewallConfigUpdated {
firewall_config: fw_config,
});
}
// Start STUN discovery
let stun_interval = handshake.stun_interval_secs;
let public_ip_clone = public_ip.clone();
@@ -1341,6 +1442,7 @@ async fn connect_to_hub_and_run_quic_with_connection(
&config.edge_id,
connection_token,
bind_address,
&performance,
);
// UDP listeners for QUIC transport — uses QUIC datagrams for low-latency forwarding.
@@ -1382,6 +1484,7 @@ async fn connect_to_hub_and_run_quic_with_connection(
quic_transport::CTRL_CONFIG => {
if let Ok(update) = serde_json::from_slice::<ConfigUpdate>(&payload) {
log::info!("QUIC config update from hub: ports {:?}", update.listen_ports);
performance = update.performance.clone();
*listen_ports.write().await = update.listen_ports.clone();
let _ = event_tx.try_send(EdgeEvent::PortsUpdated {
listen_ports: update.listen_ports.clone(),
@@ -1395,6 +1498,7 @@ async fn connect_to_hub_and_run_quic_with_connection(
&config.edge_id,
connection_token,
bind_address,
&performance,
);
apply_udp_port_config_quic(
&update.listen_ports_udp,
@@ -1501,6 +1605,7 @@ fn apply_port_config_quic(
edge_id: &str,
connection_token: &CancellationToken,
bind_address: &str,
performance: &EffectivePerformanceConfig,
) {
let new_set: std::collections::HashSet<u16> = new_ports.iter().copied().collect();
let old_set: std::collections::HashSet<u16> = port_listeners.keys().copied().collect();
@@ -1521,6 +1626,7 @@ fn apply_port_config_quic(
let _edge_id = edge_id.to_string();
let port_token = connection_token.child_token();
let bind_addr = bind_address.to_string();
let performance = performance.clone();
let handle = tokio::spawn(async move {
let listener = match TcpListener::bind((bind_addr.as_str(), port)).await {
@@ -1549,7 +1655,10 @@ fn apply_port_config_quic(
let active_streams = active_streams.clone();
let client_token = port_token.child_token();
active_streams.fetch_add(1, Ordering::Relaxed);
if !try_reserve_stream(&active_streams, performance.max_streams_per_edge) {
log::warn!("Rejecting QUIC client on port {}: max streams ({}) reached", port, performance.max_streams_per_edge);
continue;
}
tokio::spawn(async move {
handle_client_connection_quic(
@@ -1560,17 +1669,7 @@ fn apply_port_config_quic(
quic_conn,
client_token,
).await;
// Saturating decrement
loop {
let current = active_streams.load(Ordering::Relaxed);
if current == 0 { break; }
if active_streams.compare_exchange_weak(
current, current - 1,
Ordering::Relaxed, Ordering::Relaxed,
).is_ok() {
break;
}
}
release_stream(&active_streams);
});
}
Err(e) => {
@@ -1648,7 +1747,10 @@ fn apply_udp_port_config_quic(
} else {
// New session — send PROXY v2 header via control-style datagram
let sid = next_stream_id.fetch_add(1, Ordering::Relaxed);
sessions.insert(key, sid);
if sessions.insert(key, sid).is_none() {
log::warn!("QUIC UDP session limit reached, dropping datagram from {}", client_addr);
continue;
}
let client_ip = client_addr.ip().to_string();
let client_port = client_addr.port();
+321 -48
View File
@@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream, UdpSocket};
@@ -12,6 +12,8 @@ use serde::{Deserialize, Serialize};
use bytes::Bytes;
use remoteingress_protocol::*;
use crate::performance::{EffectivePerformanceConfig, PerformanceConfig};
use crate::transport::TransportMode;
use crate::transport::quic as quic_transport;
type HubTlsStream = tokio_rustls::server::TlsStream<TcpStream>;
@@ -56,6 +58,8 @@ pub struct HubConfig {
pub tls_cert_pem: Option<String>,
#[serde(default)]
pub tls_key_pem: Option<String>,
#[serde(default)]
pub performance: Option<PerformanceConfig>,
}
impl Default for HubConfig {
@@ -65,6 +69,7 @@ impl Default for HubConfig {
target_host: Some("127.0.0.1".to_string()),
tls_cert_pem: None,
tls_key_pem: None,
performance: None,
}
}
}
@@ -80,6 +85,10 @@ pub struct AllowedEdge {
#[serde(default)]
pub listen_ports_udp: Vec<u16>,
pub stun_interval_secs: Option<u64>,
#[serde(default)]
pub firewall_config: Option<serde_json::Value>,
#[serde(default)]
pub performance: Option<PerformanceConfig>,
}
/// Handshake response sent to edge after authentication.
@@ -90,6 +99,9 @@ struct HandshakeResponse {
#[serde(default)]
listen_ports_udp: Vec<u16>,
stun_interval_secs: u64,
#[serde(skip_serializing_if = "Option::is_none")]
firewall_config: Option<serde_json::Value>,
performance: EffectivePerformanceConfig,
}
/// Configuration update pushed to a connected edge at runtime.
@@ -99,6 +111,46 @@ pub struct EdgeConfigUpdate {
pub listen_ports: Vec<u16>,
#[serde(default)]
pub listen_ports_udp: Vec<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
pub firewall_config: Option<serde_json::Value>,
pub performance: EffectivePerformanceConfig,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct FlowControlStatus {
pub applies: bool,
pub current_window_bytes: u32,
pub min_window_bytes: u32,
pub max_window_bytes: u32,
pub total_window_budget_bytes: u64,
pub estimated_in_flight_bytes: u64,
pub stalled_streams: u64,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct QueueStatus {
pub ctrl_queue_depth: u64,
pub data_queue_depth: u64,
pub sustained_queue_depth: u64,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct TrafficStatus {
pub bytes_in: u64,
pub bytes_out: u64,
pub streams_opened_total: u64,
pub streams_closed_total: u64,
pub rejected_streams: u64,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct UdpStatus {
pub active_sessions: u64,
pub dropped_datagrams: u64,
}
/// Runtime status of a connected edge.
@@ -109,6 +161,13 @@ pub struct ConnectedEdgeStatus {
pub connected_at: u64,
pub active_streams: usize,
pub peer_addr: String,
pub transport_mode: TransportMode,
pub fallback_used: bool,
pub performance: EffectivePerformanceConfig,
pub flow_control: FlowControlStatus,
pub queues: QueueStatus,
pub traffic: TrafficStatus,
pub udp: UdpStatus,
}
/// Events emitted by the hub.
@@ -151,11 +210,30 @@ struct ConnectedEdgeInfo {
connected_at: u64,
peer_addr: String,
edge_stream_count: Arc<AtomicU32>,
transport_mode: TransportMode,
fallback_used: bool,
performance: EffectivePerformanceConfig,
metrics: Arc<EdgeRuntimeMetrics>,
config_tx: mpsc::Sender<EdgeConfigUpdate>,
/// Used to cancel the old connection when an edge reconnects.
cancel_token: CancellationToken,
}
#[derive(Default)]
struct EdgeRuntimeMetrics {
streams_opened_total: AtomicU64,
streams_closed_total: AtomicU64,
rejected_streams: AtomicU64,
bytes_in: AtomicU64,
bytes_out: AtomicU64,
stalled_streams: AtomicU64,
dropped_datagrams: AtomicU64,
active_udp_sessions: AtomicU64,
ctrl_queue_depth: AtomicU64,
data_queue_depth: AtomicU64,
sustained_queue_depth: AtomicU64,
}
impl TunnelHub {
pub fn new(config: HubConfig) -> Self {
let (event_tx, event_rx) = mpsc::channel(1024);
@@ -179,6 +257,7 @@ impl TunnelHub {
/// Update the list of allowed edges.
/// For any currently-connected edge whose ports changed, push a config update.
pub async fn update_allowed_edges(&self, edges: Vec<AllowedEdge>) {
let global_performance = self.config.read().await.performance.clone();
let mut map = self.allowed_edges.write().await;
// Build new map
@@ -192,14 +271,22 @@ impl TunnelHub {
for edge in &edges {
if let Some(info) = connected.get(&edge.id) {
// Check if ports changed compared to old config
let ports_changed = match map.get(&edge.id) {
Some(old) => old.listen_ports != edge.listen_ports || old.listen_ports_udp != edge.listen_ports_udp,
let config_changed = match map.get(&edge.id) {
Some(old) => old.listen_ports != edge.listen_ports
|| old.listen_ports_udp != edge.listen_ports_udp
|| old.firewall_config != edge.firewall_config
|| old.performance != edge.performance,
None => true, // newly allowed edge that's already connected
};
if ports_changed {
if config_changed {
let update = EdgeConfigUpdate {
listen_ports: edge.listen_ports.clone(),
listen_ports_udp: edge.listen_ports_udp.clone(),
firewall_config: edge.firewall_config.clone(),
performance: PerformanceConfig::merge(
global_performance.as_ref(),
edge.performance.as_ref(),
).effective(),
};
let _ = info.config_tx.try_send(update);
}
@@ -217,11 +304,50 @@ impl TunnelHub {
let mut connected = Vec::new();
for (id, info) in edges.iter() {
let active_streams = info.edge_stream_count.load(Ordering::Relaxed);
let flow_window = if info.transport_mode == TransportMode::TcpTls {
compute_window_for_limits(
active_streams,
info.performance.total_window_budget_bytes,
info.performance.min_stream_window_bytes,
info.performance.max_stream_window_bytes,
)
} else {
0
};
connected.push(ConnectedEdgeStatus {
edge_id: id.clone(),
connected_at: info.connected_at,
active_streams: info.edge_stream_count.load(Ordering::Relaxed) as usize,
active_streams: active_streams as usize,
peer_addr: info.peer_addr.clone(),
transport_mode: info.transport_mode,
fallback_used: info.fallback_used,
performance: info.performance.clone(),
flow_control: FlowControlStatus {
applies: info.transport_mode == TransportMode::TcpTls,
current_window_bytes: flow_window,
min_window_bytes: info.performance.min_stream_window_bytes,
max_window_bytes: info.performance.max_stream_window_bytes,
total_window_budget_bytes: info.performance.total_window_budget_bytes,
estimated_in_flight_bytes: flow_window as u64 * active_streams as u64,
stalled_streams: info.metrics.stalled_streams.load(Ordering::Relaxed),
},
queues: QueueStatus {
ctrl_queue_depth: info.metrics.ctrl_queue_depth.load(Ordering::Relaxed),
data_queue_depth: info.metrics.data_queue_depth.load(Ordering::Relaxed),
sustained_queue_depth: info.metrics.sustained_queue_depth.load(Ordering::Relaxed),
},
traffic: TrafficStatus {
bytes_in: info.metrics.bytes_in.load(Ordering::Relaxed),
bytes_out: info.metrics.bytes_out.load(Ordering::Relaxed),
streams_opened_total: info.metrics.streams_opened_total.load(Ordering::Relaxed),
streams_closed_total: info.metrics.streams_closed_total.load(Ordering::Relaxed),
rejected_streams: info.metrics.rejected_streams.load(Ordering::Relaxed),
},
udp: UdpStatus {
active_sessions: info.metrics.active_udp_sessions.load(Ordering::Relaxed),
dropped_datagrams: info.metrics.dropped_datagrams.load(Ordering::Relaxed),
},
});
}
@@ -240,9 +366,14 @@ impl TunnelHub {
let listener = TcpListener::bind(("0.0.0.0", config.tunnel_port)).await?;
log::info!("Hub listening on TCP port {}", config.tunnel_port);
let effective_performance = config.performance.clone().unwrap_or_default().effective();
// Start QUIC endpoint on the same port (UDP)
let quic_endpoint = match quic_transport::build_quic_server_config(tls_config) {
let quic_endpoint = match quic_transport::build_quic_server_config_with_limits(
tls_config,
effective_performance.max_streams_per_edge.min(u32::MAX as usize) as u32,
effective_performance.quic_datagram_receive_buffer_bytes,
) {
Ok(quic_server_config) => {
let bind_addr: std::net::SocketAddr = ([0, 0, 0, 0], config.tunnel_port).into();
match quinn::Endpoint::server(quic_server_config, bind_addr) {
@@ -271,6 +402,7 @@ impl TunnelHub {
let event_tx = self.event_tx.clone();
let target_host = config.target_host.unwrap_or_else(|| "127.0.0.1".to_string());
let hub_token = self.cancel_token.clone();
let hub_performance = config.performance.clone();
tokio::spawn(async move {
// Spawn QUIC acceptor as a separate task
@@ -280,6 +412,7 @@ impl TunnelHub {
let event_tx_q = event_tx.clone();
let target_q = target_host.clone();
let hub_token_q = hub_token.clone();
let performance_q = hub_performance.clone();
Some(tokio::spawn(async move {
loop {
tokio::select! {
@@ -292,6 +425,7 @@ impl TunnelHub {
let target = target_q.clone();
let edge_token = hub_token_q.child_token();
let peer_addr = incoming.remote_address().ip().to_string();
let performance = performance_q.clone();
tokio::spawn(async move {
// Accept the QUIC connection
let quic_conn = match incoming.await {
@@ -301,8 +435,8 @@ impl TunnelHub {
return;
}
};
if let Err(e) = handle_edge_connection_quic(
quic_conn, allowed, connected, event_tx, target, edge_token, peer_addr,
if let Err(e) = handle_edge_connection_quic(
quic_conn, allowed, connected, event_tx, target, edge_token, peer_addr, performance,
).await {
log::error!("QUIC edge connection error: {}", e);
}
@@ -336,9 +470,10 @@ impl TunnelHub {
let target = target_host.clone();
let edge_token = hub_token.child_token();
let peer_addr = addr.ip().to_string();
let performance = hub_performance.clone();
tokio::spawn(async move {
if let Err(e) = handle_edge_connection(
stream, acceptor, allowed, connected, event_tx, target, edge_token, peer_addr,
stream, acceptor, allowed, connected, event_tx, target, edge_token, peer_addr, performance,
).await {
log::error!("Edge connection error: {}", e);
}
@@ -381,15 +516,21 @@ impl TunnelHub {
}
}
fn parse_requested_transport_mode(value: Option<&str>) -> Option<TransportMode> {
match value {
Some("tcpTls") => Some(TransportMode::TcpTls),
Some("quic") => Some(TransportMode::Quic),
Some("quicWithFallback") => Some(TransportMode::QuicWithFallback),
_ => None,
}
}
impl Drop for TunnelHub {
fn drop(&mut self) {
self.cancel_token.cancel();
}
}
/// Maximum concurrent streams per edge connection.
const MAX_STREAMS_PER_EDGE: usize = 1024;
/// Process a single frame received from the edge side of the tunnel.
/// Handles FRAME_OPEN, FRAME_DATA, FRAME_WINDOW_UPDATE, FRAME_CLOSE, and FRAME_PONG.
async fn handle_hub_frame(
@@ -407,6 +548,8 @@ async fn handle_hub_frame(
target_host: &str,
edge_token: &CancellationToken,
cleanup_tx: &mpsc::Sender<u32>,
performance: &EffectivePerformanceConfig,
metrics: &Arc<EdgeRuntimeMetrics>,
) -> FrameAction {
match frame.frame_type {
FRAME_OPEN => {
@@ -414,8 +557,9 @@ async fn handle_hub_frame(
let permit = match stream_semaphore.clone().try_acquire_owned() {
Ok(p) => p,
Err(_) => {
metrics.rejected_streams.fetch_add(1, Ordering::Relaxed);
log::warn!("Edge {} exceeded max streams ({}), rejecting stream {}",
edge_id, MAX_STREAMS_PER_EDGE, frame.stream_id);
edge_id, performance.max_streams_per_edge, frame.stream_id);
let close_frame = encode_frame(frame.stream_id, FRAME_CLOSE_BACK, &[]);
tunnel_io.queue_ctrl(close_frame);
return FrameAction::Continue;
@@ -435,6 +579,8 @@ async fn handle_hub_frame(
let sustained_writer_tx = sustained_tx.clone(); // sustained: DATA_BACK from elephant flows
let target = target_host.to_string();
let stream_token = edge_token.child_token();
let active_after_open = edge_stream_count.fetch_add(1, Ordering::Relaxed) + 1;
metrics.streams_opened_total.fetch_add(1, Ordering::Relaxed);
let _ = event_tx.try_send(HubEvent::StreamOpened {
edge_id: edge_id.to_string(),
@@ -444,9 +590,12 @@ async fn handle_hub_frame(
// Create channel for data from edge to this stream
let (stream_data_tx, mut stream_data_rx) = mpsc::unbounded_channel::<Bytes>();
// Adaptive initial window: scale with current stream count
// to keep total in-flight data within the 200MB budget.
let initial_window = compute_window_for_stream_count(
edge_stream_count.load(Ordering::Relaxed),
// to keep total in-flight data within the configured edge budget.
let initial_window = compute_window_for_limits(
active_after_open,
performance.total_window_budget_bytes,
performance.min_stream_window_bytes,
performance.max_stream_window_bytes,
);
let send_window = Arc::new(AtomicU32::new(initial_window));
let window_notify = Arc::new(Notify::new());
@@ -459,9 +608,10 @@ async fn handle_hub_frame(
// Spawn task: connect to SmartProxy, send PROXY header, pipe data
let stream_counter = Arc::clone(edge_stream_count);
let stream_metrics = metrics.clone();
let stream_performance = performance.clone();
tokio::spawn(async move {
let _permit = permit; // hold semaphore permit until stream completes
stream_counter.fetch_add(1, Ordering::Relaxed);
let result = async {
// A2: Connect to SmartProxy with timeout
@@ -475,6 +625,12 @@ async fn handle_hub_frame(
})??;
upstream.set_nodelay(true)?;
// TCP keepalive detects silent failures on the hub→SmartProxy connection
let ka = socket2::TcpKeepalive::new()
.with_time(Duration::from_secs(30));
#[cfg(target_os = "linux")]
let ka = ka.with_interval(Duration::from_secs(10));
let _ = socket2::SockRef::from(&upstream).set_tcp_keepalive(&ka);
upstream.write_all(proxy_header.as_bytes()).await?;
let (mut up_read, mut up_write) =
@@ -485,7 +641,7 @@ async fn handle_hub_frame(
let writer_token = stream_token.clone();
let wub_tx = writer_tx.clone();
let stream_counter_w = Arc::clone(&stream_counter);
let writer_for_edge_data = tokio::spawn(async move {
let mut writer_for_edge_data = tokio::spawn(async move {
let mut consumed_since_update: u32 = 0;
loop {
tokio::select! {
@@ -513,8 +669,11 @@ async fn handle_hub_frame(
// Track consumption for adaptive flow control.
// Increment capped to adaptive window to limit per-stream in-flight data.
consumed_since_update += len;
let adaptive_window = remoteingress_protocol::compute_window_for_stream_count(
let adaptive_window = remoteingress_protocol::compute_window_for_limits(
stream_counter_w.load(Ordering::Relaxed),
stream_performance.total_window_budget_bytes,
stream_performance.min_stream_window_bytes,
stream_performance.max_stream_window_bytes,
);
let threshold = adaptive_window / 2;
if consumed_since_update >= threshold {
@@ -569,9 +728,10 @@ async fn handle_hub_frame(
tokio::select! {
_ = notified => continue,
_ = stream_token.cancelled() => break,
_ = tokio::time::sleep(Duration::from_secs(120)) => {
log::warn!("Stream {} download stalled (window empty for 120s)", stream_id);
break;
_ = tokio::time::sleep(Duration::from_secs(55)) => {
stream_metrics.stalled_streams.fetch_add(1, Ordering::Relaxed);
log::warn!("Stream {} download stalled (window empty for 55s)", stream_id);
break;
}
}
}
@@ -598,6 +758,7 @@ async fn handle_hub_frame(
let frame = Bytes::copy_from_slice(&buf[..FRAME_HEADER_SIZE + n]);
// Sustained classification: >2.5 MB/s for >10 seconds
dl_bytes_sent += n as u64;
stream_metrics.bytes_out.fetch_add(n as u64, Ordering::Relaxed);
if !is_sustained {
let elapsed = dl_start.elapsed().as_secs();
if elapsed >= remoteingress_protocol::SUSTAINED_MIN_DURATION_SECS
@@ -633,7 +794,11 @@ async fn handle_hub_frame(
}
}
writer_for_edge_data.abort();
// Give the writer task 2s to shut down gracefully (sends TCP FIN
// via up_write.shutdown()) before force-aborting (which causes RST).
if tokio::time::timeout(Duration::from_secs(2), &mut writer_for_edge_data).await.is_err() {
writer_for_edge_data.abort();
}
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
}
.await;
@@ -658,6 +823,7 @@ async fn handle_hub_frame(
_ = cleanup.send(stream_id) => {}
_ = stream_token.cancelled() => {}
}
stream_metrics.streams_closed_total.fetch_add(1, Ordering::Relaxed);
stream_counter.fetch_sub(1, Ordering::Relaxed);
});
}
@@ -666,6 +832,7 @@ async fn handle_hub_frame(
// limits bytes-in-flight, so the channel won't grow unbounded. send() only
// fails if the receiver is dropped (stream handler already exited).
if let Some(state) = streams.get(&frame.stream_id) {
metrics.bytes_in.fetch_add(frame.payload.len() as u64, Ordering::Relaxed);
if state.data_tx.send(frame.payload).is_err() {
// Receiver dropped — stream handler already exited, clean up
streams.remove(&frame.stream_id);
@@ -678,8 +845,8 @@ async fn handle_hub_frame(
if increment > 0 {
if let Some(state) = streams.get(&frame.stream_id) {
let prev = state.send_window.fetch_add(increment, Ordering::Release);
if prev + increment > MAX_WINDOW_SIZE {
state.send_window.store(MAX_WINDOW_SIZE, Ordering::Release);
if prev + increment > performance.max_stream_window_bytes {
state.send_window.store(performance.max_stream_window_bytes, Ordering::Release);
}
state.window_notify.notify_one();
}
@@ -714,6 +881,7 @@ async fn handle_hub_frame(
data_tx: udp_tx,
cancel_token: session_token.clone(),
});
metrics.active_udp_sessions.fetch_add(1, Ordering::Relaxed);
// Spawn upstream UDP forwarder
tokio::spawn(async move {
@@ -748,6 +916,7 @@ async fn handle_hub_frame(
Ok(len) => {
let frame = encode_frame(stream_id, FRAME_UDP_DATA_BACK, &buf[..len]);
if data_writer_tx.try_send(frame).is_err() {
// Return datagrams may be dropped under pressure.
break;
}
}
@@ -781,18 +950,22 @@ async fn handle_hub_frame(
}
recv_handle.abort();
// active_udp_sessions is decremented by the FRAME_UDP_CLOSE path or connection cleanup.
log::debug!("UDP session {} closed for edge {}", stream_id, edge_id_str);
});
}
FRAME_UDP_DATA => {
// Forward datagram to upstream
if let Some(state) = udp_sessions.get(&frame.stream_id) {
let _ = state.data_tx.try_send(frame.payload);
if state.data_tx.try_send(frame.payload).is_err() {
metrics.dropped_datagrams.fetch_add(1, Ordering::Relaxed);
}
}
}
FRAME_UDP_CLOSE => {
if let Some(state) = udp_sessions.remove(&frame.stream_id) {
state.cancel_token.cancel();
metrics.active_udp_sessions.fetch_sub(1, Ordering::Relaxed);
}
}
_ => {
@@ -812,6 +985,7 @@ async fn handle_edge_connection(
target_host: String,
edge_token: CancellationToken,
peer_addr: String,
hub_performance: Option<PerformanceConfig>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Disable Nagle's algorithm for low-latency control frames (PING/PONG, WINDOW_UPDATE)
stream.set_nodelay(true)?;
@@ -842,29 +1016,38 @@ async fn handle_edge_connection(
.map_err(|_| "auth line not valid UTF-8")?;
let auth_line = auth_line.trim();
let parts: Vec<&str> = auth_line.splitn(3, ' ').collect();
if parts.len() != 3 || parts[0] != "EDGE" {
let parts: Vec<&str> = auth_line.split_whitespace().collect();
if parts.len() < 3 || parts[0] != "EDGE" {
return Err("invalid auth line".into());
}
let edge_id = parts[1].to_string();
let secret = parts[2];
let requested_transport = parse_requested_transport_mode(parts.get(3).copied());
let fallback_used = requested_transport == Some(TransportMode::QuicWithFallback);
// Verify credentials and extract edge config
let (listen_ports, listen_ports_udp, stun_interval_secs) = {
let (listen_ports, listen_ports_udp, stun_interval_secs, firewall_config, edge_performance) = {
let edges = allowed.read().await;
match edges.get(&edge_id) {
Some(edge) => {
if !constant_time_eq(secret.as_bytes(), edge.secret.as_bytes()) {
return Err(format!("invalid secret for edge {}", edge_id).into());
}
(edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300))
(
edge.listen_ports.clone(),
edge.listen_ports_udp.clone(),
edge.stun_interval_secs.unwrap_or(300),
edge.firewall_config.clone(),
edge.performance.clone(),
)
}
None => {
return Err(format!("unknown edge {}", edge_id).into());
}
}
};
let performance = PerformanceConfig::merge(hub_performance.as_ref(), edge_performance.as_ref()).effective();
log::info!("Edge {} authenticated from {}", edge_id, peer_addr);
let _ = event_tx.try_send(HubEvent::EdgeConnected {
@@ -877,6 +1060,8 @@ async fn handle_edge_connection(
listen_ports: listen_ports.clone(),
listen_ports_udp: listen_ports_udp.clone(),
stun_interval_secs,
firewall_config,
performance: performance.clone(),
};
let mut handshake_json = serde_json::to_string(&handshake)?;
handshake_json.push('\n');
@@ -888,6 +1073,7 @@ async fn handle_edge_connection(
let mut udp_sessions: HashMap<u32, HubUdpSessionState> = HashMap::new();
// Per-edge active stream counter for adaptive flow control
let edge_stream_count = Arc::new(AtomicU32::new(0));
let metrics = Arc::new(EdgeRuntimeMetrics::default());
// Cleanup channel: spawned stream tasks send stream_id here when done
let (cleanup_tx, mut cleanup_rx) = mpsc::channel::<u32>(256);
let now = std::time::SystemTime::now()
@@ -913,6 +1099,10 @@ async fn handle_edge_connection(
connected_at: now,
peer_addr,
edge_stream_count: edge_stream_count.clone(),
transport_mode: TransportMode::TcpTls,
fallback_used,
performance: performance.clone(),
metrics: metrics.clone(),
config_tx,
cancel_token: edge_token.clone(),
},
@@ -953,7 +1143,7 @@ async fn handle_edge_connection(
});
// A4: Semaphore to limit concurrent streams per edge
let stream_semaphore = Arc::new(Semaphore::new(MAX_STREAMS_PER_EDGE));
let stream_semaphore = Arc::new(Semaphore::new(performance.max_streams_per_edge));
// Heartbeat: periodic PING and liveness timeout
let ping_interval_dur = Duration::from_secs(15);
@@ -999,7 +1189,7 @@ async fn handle_edge_connection(
frame, &mut tunnel_io, &mut streams, &mut udp_sessions,
&stream_semaphore, &edge_stream_count,
&edge_id, &event_tx, &ctrl_tx, &data_tx, &sustained_tx, &target_host, &edge_token,
&cleanup_tx,
&cleanup_tx, &performance, &metrics,
).await {
disconnect_reason = reason;
break 'hub_loop;
@@ -1012,6 +1202,10 @@ async fn handle_edge_connection(
if ping_ticker.poll_tick(cx).is_ready() {
tunnel_io.queue_ctrl(encode_frame(0, FRAME_PING, &[]));
}
let depths = tunnel_io.queue_depths();
metrics.ctrl_queue_depth.store(depths.ctrl as u64, Ordering::Relaxed);
metrics.data_queue_depth.store(depths.data as u64, Ordering::Relaxed);
metrics.sustained_queue_depth.store(depths.sustained as u64, Ordering::Relaxed);
tunnel_io.poll_step(cx, &mut ctrl_rx, &mut data_rx, &mut sustained_rx, &mut liveness_deadline, &edge_token)
}).await;
@@ -1023,7 +1217,7 @@ async fn handle_edge_connection(
frame, &mut tunnel_io, &mut streams, &mut udp_sessions,
&stream_semaphore, &edge_stream_count,
&edge_id, &event_tx, &ctrl_tx, &data_tx, &sustained_tx, &target_host, &edge_token,
&cleanup_tx,
&cleanup_tx, &performance, &metrics,
).await {
disconnect_reason = reason;
break;
@@ -1073,7 +1267,11 @@ async fn handle_edge_connection(
).await;
{
let mut edges = connected.lock().await;
edges.remove(&edge_id);
// Only remove if the entry is still ours (not replaced by a reconnection).
// A replaced entry has a fresh non-cancelled token from the new handler.
if edges.get(&edge_id).map_or(false, |e| e.cancel_token.is_cancelled()) {
edges.remove(&edge_id);
}
}
let _ = event_tx.try_send(HubEvent::EdgeDisconnected {
edge_id: edge_id.clone(),
@@ -1177,6 +1375,7 @@ async fn handle_edge_connection_quic(
target_host: String,
edge_token: CancellationToken,
peer_addr: String,
hub_performance: Option<PerformanceConfig>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
log::info!("QUIC edge connection from {}", peer_addr);
@@ -1205,8 +1404,8 @@ async fn handle_edge_connection_quic(
.map_err(|_| "QUIC auth line not valid UTF-8")?;
let auth_line = auth_line.trim();
let parts: Vec<&str> = auth_line.splitn(3, ' ').collect();
if parts.len() != 3 || parts[0] != "EDGE" {
let parts: Vec<&str> = auth_line.split_whitespace().collect();
if parts.len() < 3 || parts[0] != "EDGE" {
return Err("invalid QUIC auth line".into());
}
@@ -1214,18 +1413,25 @@ async fn handle_edge_connection_quic(
let secret = parts[2];
// Verify credentials
let (listen_ports, listen_ports_udp, stun_interval_secs) = {
let (listen_ports, listen_ports_udp, stun_interval_secs, firewall_config, edge_performance) = {
let edges = allowed.read().await;
match edges.get(&edge_id) {
Some(edge) => {
if !constant_time_eq(secret.as_bytes(), edge.secret.as_bytes()) {
return Err(format!("invalid secret for edge {}", edge_id).into());
}
(edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300))
(
edge.listen_ports.clone(),
edge.listen_ports_udp.clone(),
edge.stun_interval_secs.unwrap_or(300),
edge.firewall_config.clone(),
edge.performance.clone(),
)
}
None => return Err(format!("unknown edge {}", edge_id).into()),
}
};
let performance = PerformanceConfig::merge(hub_performance.as_ref(), edge_performance.as_ref()).effective();
log::info!("QUIC edge {} authenticated from {}", edge_id, peer_addr);
let _ = event_tx.try_send(HubEvent::EdgeConnected {
@@ -1238,6 +1444,8 @@ async fn handle_edge_connection_quic(
listen_ports: listen_ports.clone(),
listen_ports_udp: listen_ports_udp.clone(),
stun_interval_secs,
firewall_config,
performance: performance.clone(),
};
let mut handshake_json = serde_json::to_string(&handshake)?;
handshake_json.push('\n');
@@ -1246,6 +1454,7 @@ async fn handle_edge_connection_quic(
// Track this edge
let edge_stream_count = Arc::new(AtomicU32::new(0));
let metrics = Arc::new(EdgeRuntimeMetrics::default());
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
@@ -1265,13 +1474,17 @@ async fn handle_edge_connection_quic(
connected_at: now,
peer_addr,
edge_stream_count: edge_stream_count.clone(),
transport_mode: TransportMode::Quic,
fallback_used: false,
performance: performance.clone(),
metrics: metrics.clone(),
config_tx,
cancel_token: edge_token.clone(),
},
);
}
let stream_semaphore = Arc::new(Semaphore::new(MAX_STREAMS_PER_EDGE));
let stream_semaphore = Arc::new(Semaphore::new(performance.max_streams_per_edge));
// Spawn task to accept data streams (tunneled client connections)
let data_stream_conn = quic_conn.clone();
@@ -1280,6 +1493,7 @@ async fn handle_edge_connection_quic(
let data_event_tx = event_tx.clone();
let data_semaphore = stream_semaphore.clone();
let data_stream_count = edge_stream_count.clone();
let data_metrics = metrics.clone();
let data_token = edge_token.clone();
let data_handle = tokio::spawn(async move {
let mut stream_id_counter: u32 = 0;
@@ -1292,6 +1506,7 @@ async fn handle_edge_connection_quic(
let permit = match data_semaphore.clone().try_acquire_owned() {
Ok(p) => p,
Err(_) => {
data_metrics.rejected_streams.fetch_add(1, Ordering::Relaxed);
log::warn!("QUIC edge {} exceeded max streams, rejecting", data_edge_id);
// Drop the streams to reject
drop(quic_send);
@@ -1306,21 +1521,24 @@ async fn handle_edge_connection_quic(
let edge_id = data_edge_id.clone();
let event_tx = data_event_tx.clone();
let stream_count = data_stream_count.clone();
let stream_metrics = data_metrics.clone();
let stream_token = data_token.child_token();
let _ = event_tx.try_send(HubEvent::StreamOpened {
edge_id: edge_id.clone(),
stream_id,
});
stream_metrics.streams_opened_total.fetch_add(1, Ordering::Relaxed);
stream_count.fetch_add(1, Ordering::Relaxed);
tokio::spawn(async move {
let _permit = permit;
handle_quic_stream(
quic_send, quic_recv, stream_id,
&target, &edge_id, stream_token,
&target, &edge_id, stream_token, stream_metrics.clone(),
).await;
stream_count.fetch_sub(1, Ordering::Relaxed);
stream_metrics.streams_closed_total.fetch_add(1, Ordering::Relaxed);
let _ = event_tx.try_send(HubEvent::StreamClosed {
edge_id,
stream_id,
@@ -1339,7 +1557,7 @@ async fn handle_edge_connection_quic(
});
// UDP sessions for QUIC datagram transport
let quic_udp_sessions: Arc<Mutex<HashMap<u32, mpsc::Sender<Bytes>>>> =
let quic_udp_sessions: Arc<Mutex<HashMap<u32, (mpsc::Sender<Bytes>, Instant)>>> =
Arc::new(Mutex::new(HashMap::new()));
// Spawn QUIC datagram receiver task
@@ -1348,9 +1566,21 @@ async fn handle_edge_connection_quic(
let dgram_target = target_host.clone();
let dgram_edge_id = edge_id.clone();
let dgram_token = edge_token.clone();
let dgram_metrics = metrics.clone();
let dgram_handle = tokio::spawn(async move {
let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
cleanup_interval.tick().await; // consume initial tick
loop {
tokio::select! {
// Periodic sweep: prune sessions whose task has exited (receiver dropped)
_ = cleanup_interval.tick() => {
let now = Instant::now();
let mut s = dgram_sessions.lock().await;
s.retain(|_id, (tx, last_activity)| {
!tx.is_closed() && now.duration_since(*last_activity) < Duration::from_secs(60)
});
dgram_metrics.active_udp_sessions.store(s.len() as u64, Ordering::Relaxed);
}
datagram = dgram_conn.read_datagram() => {
match datagram {
Ok(data) => {
@@ -1375,10 +1605,13 @@ async fn handle_edge_connection_quic(
let session_token = dgram_token.child_token();
let (tx, mut rx) = mpsc::channel::<Bytes>(256);
let proxy_v2_data: Vec<u8> = proxy_data.to_vec();
let cleanup_sessions = sessions.clone();
let session_metrics = dgram_metrics.clone();
{
let mut s = sessions.lock().await;
s.insert(session_id, tx);
s.insert(session_id, (tx, Instant::now()));
dgram_metrics.active_udp_sessions.store(s.len() as u64, Ordering::Relaxed);
}
tokio::spawn(async move {
@@ -1386,17 +1619,26 @@ async fn handle_edge_connection_quic(
Ok(s) => Arc::new(s),
Err(e) => {
log::error!("QUIC UDP session {} bind failed: {}", session_id, e);
let mut s = cleanup_sessions.lock().await;
s.remove(&session_id);
session_metrics.active_udp_sessions.store(s.len() as u64, Ordering::Relaxed);
return;
}
};
if let Err(e) = upstream.connect((target.as_str(), dest_port)).await {
log::error!("QUIC UDP session {} connect failed: {}", session_id, e);
let mut s = cleanup_sessions.lock().await;
s.remove(&session_id);
session_metrics.active_udp_sessions.store(s.len() as u64, Ordering::Relaxed);
return;
}
// Send PROXY v2 header as first datagram so SmartProxy knows the original client
if let Err(e) = upstream.send(&proxy_v2_data).await {
log::error!("QUIC UDP session {} failed to send PROXY v2 header: {}", session_id, e);
let mut s = cleanup_sessions.lock().await;
s.remove(&session_id);
session_metrics.active_udp_sessions.store(s.len() as u64, Ordering::Relaxed);
return;
}
@@ -1439,15 +1681,24 @@ async fn handle_edge_connection_quic(
}
}
recv_handle.abort();
// Clean up session entry to prevent memory leak
let mut s = cleanup_sessions.lock().await;
s.remove(&session_id);
session_metrics.active_udp_sessions.store(s.len() as u64, Ordering::Relaxed);
});
continue;
}
// Regular data datagram — forward to upstream
let sessions = dgram_sessions.lock().await;
if let Some(tx) = sessions.get(&session_id) {
let _ = tx.try_send(Bytes::copy_from_slice(payload));
let mut sessions = dgram_sessions.lock().await;
if let Some((tx, last_activity)) = sessions.get_mut(&session_id) {
*last_activity = Instant::now();
if tx.try_send(Bytes::copy_from_slice(payload)).is_err() {
dgram_metrics.dropped_datagrams.fetch_add(1, Ordering::Relaxed);
}
} else {
dgram_metrics.dropped_datagrams.fetch_add(1, Ordering::Relaxed);
}
}
Err(e) => {
@@ -1534,7 +1785,11 @@ async fn handle_edge_connection_quic(
{
let mut edges = connected.lock().await;
edges.remove(&edge_id);
// Only remove if the entry is still ours (not replaced by a reconnection).
// A replaced entry has a fresh non-cancelled token from the new handler.
if edges.get(&edge_id).map_or(false, |e| e.cancel_token.is_cancelled()) {
edges.remove(&edge_id);
}
}
let _ = event_tx.try_send(HubEvent::EdgeDisconnected {
edge_id,
@@ -1553,6 +1808,7 @@ async fn handle_quic_stream(
target_host: &str,
_edge_id: &str,
stream_token: CancellationToken,
metrics: Arc<EdgeRuntimeMetrics>,
) {
// Read PROXY header from the beginning of the stream
let proxy_header = match quic_transport::read_proxy_header(&mut quic_recv).await {
@@ -1582,6 +1838,12 @@ async fn handle_quic_stream(
};
let _ = upstream.set_nodelay(true);
// TCP keepalive detects silent failures on the hub→SmartProxy connection
let ka = socket2::TcpKeepalive::new()
.with_time(Duration::from_secs(30));
#[cfg(target_os = "linux")]
let ka = ka.with_interval(Duration::from_secs(10));
let _ = socket2::SockRef::from(&upstream).set_tcp_keepalive(&ka);
// Send PROXY header to SmartProxy
if let Err(e) = upstream.write_all(proxy_header.as_bytes()).await {
log::error!("QUIC stream {} failed to write PROXY header to upstream: {}", stream_id, e);
@@ -1592,13 +1854,15 @@ async fn handle_quic_stream(
// Task: QUIC -> upstream (edge data to SmartProxy)
let writer_token = stream_token.clone();
let writer_task = tokio::spawn(async move {
let writer_metrics = metrics.clone();
let mut writer_task = tokio::spawn(async move {
let mut buf = vec![0u8; 32768];
loop {
tokio::select! {
read_result = quic_recv.read(&mut buf) => {
match read_result {
Ok(Some(n)) => {
writer_metrics.bytes_in.fetch_add(n as u64, Ordering::Relaxed);
let write_result = tokio::select! {
r = tokio::time::timeout(
Duration::from_secs(60),
@@ -1630,6 +1894,7 @@ async fn handle_quic_stream(
match read_result {
Ok(0) => break,
Ok(n) => {
metrics.bytes_out.fetch_add(n as u64, Ordering::Relaxed);
if quic_send.write_all(&buf[..n]).await.is_err() {
break;
}
@@ -1643,7 +1908,11 @@ async fn handle_quic_stream(
// Gracefully close the QUIC send stream
let _ = quic_send.finish();
writer_task.abort();
// Give the writer task 2s to shut down gracefully (sends TCP FIN
// via up_write.shutdown()) before force-aborting (which causes RST).
if tokio::time::timeout(Duration::from_secs(2), &mut writer_task).await.is_err() {
writer_task.abort();
}
}
#[cfg(test)]
@@ -1753,6 +2022,8 @@ mod tests {
listen_ports: vec![443, 8080],
listen_ports_udp: vec![],
stun_interval_secs: 300,
firewall_config: None,
performance: EffectivePerformanceConfig::default(),
};
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["listenPorts"], serde_json::json!([443, 8080]));
@@ -1767,6 +2038,8 @@ mod tests {
let update = EdgeConfigUpdate {
listen_ports: vec![80, 443],
listen_ports_udp: vec![53],
firewall_config: None,
performance: EffectivePerformanceConfig::default(),
};
let json = serde_json::to_value(&update).unwrap();
assert_eq!(json["listenPorts"], serde_json::json!([80, 443]));
@@ -3,5 +3,6 @@ pub mod edge;
pub mod stun;
pub mod transport;
pub mod udp_session;
pub mod performance;
pub use remoteingress_protocol as protocol;
@@ -0,0 +1,130 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum PerformanceProfile {
Balanced,
Throughput,
HighConcurrency,
}
impl Default for PerformanceProfile {
fn default() -> Self {
PerformanceProfile::Balanced
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PerformanceConfig {
#[serde(default)]
pub profile: Option<PerformanceProfile>,
#[serde(default)]
pub max_streams_per_edge: Option<usize>,
#[serde(default)]
pub total_window_budget_bytes: Option<u64>,
#[serde(default)]
pub min_stream_window_bytes: Option<u32>,
#[serde(default)]
pub max_stream_window_bytes: Option<u32>,
#[serde(default)]
pub sustained_stream_window_bytes: Option<u32>,
#[serde(default)]
pub quic_datagram_receive_buffer_bytes: Option<usize>,
}
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct EffectivePerformanceConfig {
pub profile: PerformanceProfile,
pub max_streams_per_edge: usize,
pub total_window_budget_bytes: u64,
pub min_stream_window_bytes: u32,
pub max_stream_window_bytes: u32,
pub sustained_stream_window_bytes: u32,
pub quic_datagram_receive_buffer_bytes: usize,
}
impl Default for EffectivePerformanceConfig {
fn default() -> Self {
PerformanceConfig::default().effective()
}
}
impl PerformanceConfig {
pub fn effective(&self) -> EffectivePerformanceConfig {
let profile = self.profile.unwrap_or_default();
let defaults = profile_defaults(profile);
EffectivePerformanceConfig {
profile,
max_streams_per_edge: self.max_streams_per_edge.unwrap_or(defaults.max_streams_per_edge),
total_window_budget_bytes: self.total_window_budget_bytes.unwrap_or(defaults.total_window_budget_bytes),
min_stream_window_bytes: self.min_stream_window_bytes.unwrap_or(defaults.min_stream_window_bytes),
max_stream_window_bytes: self.max_stream_window_bytes.unwrap_or(defaults.max_stream_window_bytes),
sustained_stream_window_bytes: self.sustained_stream_window_bytes.unwrap_or(defaults.sustained_stream_window_bytes),
quic_datagram_receive_buffer_bytes: self
.quic_datagram_receive_buffer_bytes
.unwrap_or(defaults.quic_datagram_receive_buffer_bytes),
}
}
pub fn merge(global: Option<&PerformanceConfig>, edge: Option<&PerformanceConfig>) -> PerformanceConfig {
let mut merged = global.cloned().unwrap_or_default();
if let Some(edge) = edge {
if edge.profile.is_some() {
merged.profile = edge.profile;
}
if edge.max_streams_per_edge.is_some() {
merged.max_streams_per_edge = edge.max_streams_per_edge;
}
if edge.total_window_budget_bytes.is_some() {
merged.total_window_budget_bytes = edge.total_window_budget_bytes;
}
if edge.min_stream_window_bytes.is_some() {
merged.min_stream_window_bytes = edge.min_stream_window_bytes;
}
if edge.max_stream_window_bytes.is_some() {
merged.max_stream_window_bytes = edge.max_stream_window_bytes;
}
if edge.sustained_stream_window_bytes.is_some() {
merged.sustained_stream_window_bytes = edge.sustained_stream_window_bytes;
}
if edge.quic_datagram_receive_buffer_bytes.is_some() {
merged.quic_datagram_receive_buffer_bytes = edge.quic_datagram_receive_buffer_bytes;
}
}
merged
}
}
fn profile_defaults(profile: PerformanceProfile) -> EffectivePerformanceConfig {
match profile {
PerformanceProfile::Balanced => EffectivePerformanceConfig {
profile,
max_streams_per_edge: 1024,
total_window_budget_bytes: 256 * 1024 * 1024,
min_stream_window_bytes: 128 * 1024,
max_stream_window_bytes: 8 * 1024 * 1024,
sustained_stream_window_bytes: 512 * 1024,
quic_datagram_receive_buffer_bytes: 4 * 1024 * 1024,
},
PerformanceProfile::Throughput => EffectivePerformanceConfig {
profile,
max_streams_per_edge: 512,
total_window_budget_bytes: 512 * 1024 * 1024,
min_stream_window_bytes: 256 * 1024,
max_stream_window_bytes: 32 * 1024 * 1024,
sustained_stream_window_bytes: 2 * 1024 * 1024,
quic_datagram_receive_buffer_bytes: 8 * 1024 * 1024,
},
PerformanceProfile::HighConcurrency => EffectivePerformanceConfig {
profile,
max_streams_per_edge: 4096,
total_window_budget_bytes: 256 * 1024 * 1024,
min_stream_window_bytes: 64 * 1024,
max_stream_window_bytes: 4 * 1024 * 1024,
sustained_stream_window_bytes: 256 * 1024,
quic_datagram_receive_buffer_bytes: 16 * 1024 * 1024,
},
}
}
@@ -4,9 +4,9 @@ use serde::{Deserialize, Serialize};
/// Transport mode for the tunnel connection between edge and hub.
///
/// - `TcpTls`: TCP + TLS with frame-based multiplexing via TunnelIo (default).
/// - `TcpTls`: TCP + TLS with frame-based multiplexing via TunnelIo.
/// - `Quic`: QUIC with native stream multiplexing (one QUIC stream per tunneled connection).
/// - `QuicWithFallback`: Try QUIC first, fall back to TCP+TLS if UDP is blocked.
/// - `QuicWithFallback`: Try QUIC first, fall back to TCP+TLS if UDP is blocked. This is the edge runtime default.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum TransportMode {
@@ -1,5 +1,7 @@
use std::sync::Arc;
const DEFAULT_DATAGRAM_RECEIVE_BUFFER_SIZE: usize = 4 * 1024 * 1024;
/// QUIC control stream message types (reuses frame type constants for consistency).
pub const CTRL_CONFIG: u8 = 0x06;
pub const CTRL_PING: u8 = 0x07;
@@ -11,6 +13,13 @@ pub const CTRL_HEADER_SIZE: usize = 5;
/// Build a quinn ClientConfig that skips server certificate verification
/// (auth is via shared secret, same as the TCP+TLS path).
pub fn build_quic_client_config() -> quinn::ClientConfig {
build_quic_client_config_with_limits(1024, DEFAULT_DATAGRAM_RECEIVE_BUFFER_SIZE)
}
pub fn build_quic_client_config_with_limits(
max_concurrent_bidi_streams: u32,
datagram_receive_buffer_size: usize,
) -> quinn::ClientConfig {
let mut tls_config = rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(Arc::new(NoCertVerifier))
@@ -28,11 +37,9 @@ pub fn build_quic_client_config() -> quinn::ClientConfig {
transport.max_idle_timeout(Some(
quinn::IdleTimeout::try_from(std::time::Duration::from_secs(45)).unwrap(),
));
// Match MAX_STREAMS_PER_EDGE (1024) from hub.rs.
// Default is 100 which is too low for high-concurrency tunneling.
transport.max_concurrent_bidi_streams(1024u32.into());
transport.max_concurrent_bidi_streams(max_concurrent_bidi_streams.into());
// Enable QUIC datagrams (RFC 9221) for low-latency UDP tunneling.
transport.datagram_receive_buffer_size(Some(65536));
transport.datagram_receive_buffer_size(Some(datagram_receive_buffer_size));
let mut client_config = quinn::ClientConfig::new(Arc::new(quic_config));
client_config.transport_config(Arc::new(transport));
@@ -42,6 +49,18 @@ pub fn build_quic_client_config() -> quinn::ClientConfig {
/// Build a quinn ServerConfig from the same TLS server config used for TCP+TLS.
pub fn build_quic_server_config(
tls_server_config: rustls::ServerConfig,
) -> Result<quinn::ServerConfig, Box<dyn std::error::Error + Send + Sync>> {
build_quic_server_config_with_limits(
tls_server_config,
1024,
DEFAULT_DATAGRAM_RECEIVE_BUFFER_SIZE,
)
}
pub fn build_quic_server_config_with_limits(
tls_server_config: rustls::ServerConfig,
max_concurrent_bidi_streams: u32,
datagram_receive_buffer_size: usize,
) -> Result<quinn::ServerConfig, Box<dyn std::error::Error + Send + Sync>> {
let quic_config = quinn::crypto::rustls::QuicServerConfig::try_from(tls_server_config)?;
@@ -50,8 +69,8 @@ pub fn build_quic_server_config(
transport.max_idle_timeout(Some(
quinn::IdleTimeout::try_from(std::time::Duration::from_secs(45)).unwrap(),
));
transport.max_concurrent_bidi_streams(1024u32.into());
transport.datagram_receive_buffer_size(Some(65536));
transport.max_concurrent_bidi_streams(max_concurrent_bidi_streams.into());
transport.datagram_receive_buffer_size(Some(datagram_receive_buffer_size));
let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_config));
server_config.transport_config(Arc::new(transport));
@@ -76,6 +95,11 @@ pub async fn write_ctrl_message(
Ok(())
}
/// Maximum size for a QUIC control message payload (64 KB).
/// Control messages (CONFIG, PING, PONG) are small; this guards against
/// a malicious peer sending a crafted length field to trigger OOM.
const MAX_CTRL_MESSAGE_SIZE: usize = 65536;
/// Read a control message from a QUIC recv stream.
/// Returns (msg_type, payload). Returns None on EOF.
pub async fn read_ctrl_message(
@@ -93,6 +117,12 @@ pub async fn read_ctrl_message(
}
let msg_type = header[0];
let len = u32::from_be_bytes([header[1], header[2], header[3], header[4]]) as usize;
if len > MAX_CTRL_MESSAGE_SIZE {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("control message too large: {} bytes (max {})", len, MAX_CTRL_MESSAGE_SIZE),
));
}
let mut payload = vec![0u8; len];
if len > 0 {
recv.read_exact(&mut payload).await.map_err(|e| {
@@ -17,7 +17,7 @@ pub struct UdpSession {
pub last_activity: Instant,
}
/// Manages UDP sessions with idle timeout expiry.
/// Manages UDP sessions with idle timeout expiry and a maximum session count.
pub struct UdpSessionManager {
/// Forward map: session key → session data.
sessions: HashMap<UdpSessionKey, UdpSession>,
@@ -25,14 +25,21 @@ pub struct UdpSessionManager {
by_stream_id: HashMap<u32, UdpSessionKey>,
/// Idle timeout duration.
idle_timeout: std::time::Duration,
/// Maximum number of concurrent sessions (prevents unbounded growth from floods).
max_sessions: usize,
}
impl UdpSessionManager {
pub fn new(idle_timeout: std::time::Duration) -> Self {
Self::with_max_sessions(idle_timeout, 65536)
}
pub fn with_max_sessions(idle_timeout: std::time::Duration, max_sessions: usize) -> Self {
Self {
sessions: HashMap::new(),
by_stream_id: HashMap::new(),
idle_timeout,
max_sessions,
}
}
@@ -57,8 +64,12 @@ impl UdpSessionManager {
Some(session)
}
/// Insert a new session. Returns a mutable reference to it.
pub fn insert(&mut self, key: UdpSessionKey, stream_id: u32) -> &mut UdpSession {
/// Insert a new session. Returns `None` if the session limit has been reached.
pub fn insert(&mut self, key: UdpSessionKey, stream_id: u32) -> Option<&mut UdpSession> {
// Allow re-insertion of existing keys (update), but reject truly new sessions at capacity
if !self.sessions.contains_key(&key) && self.sessions.len() >= self.max_sessions {
return None;
}
let session = UdpSession {
stream_id,
client_addr: key.client_addr,
@@ -66,7 +77,7 @@ impl UdpSessionManager {
last_activity: Instant::now(),
};
self.by_stream_id.insert(stream_id, key);
self.sessions.entry(key).or_insert(session)
Some(self.sessions.entry(key).or_insert(session))
}
/// Remove a session by stream_id.
@@ -118,7 +129,7 @@ mod tests {
fn test_insert_and_lookup() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 1);
assert!(mgr.insert(key, 1).is_some());
assert_eq!(mgr.len(), 1);
assert!(mgr.get_mut(&key).is_some());
@@ -129,7 +140,7 @@ mod tests {
fn test_client_addr_for_stream() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 42);
assert!(mgr.insert(key, 42).is_some());
assert_eq!(mgr.client_addr_for_stream(42), Some(addr(5000)));
assert_eq!(mgr.client_addr_for_stream(99), None);
@@ -139,7 +150,7 @@ mod tests {
fn test_remove_by_stream_id() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 1);
assert!(mgr.insert(key, 1).is_some());
let removed = mgr.remove_by_stream_id(1);
assert!(removed.is_some());
@@ -159,8 +170,8 @@ mod tests {
let mut mgr = UdpSessionManager::new(Duration::from_millis(50));
let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
let key2 = UdpSessionKey { client_addr: addr(5001), dest_port: 53 };
mgr.insert(key1, 1);
mgr.insert(key2, 2);
assert!(mgr.insert(key1, 1).is_some());
assert!(mgr.insert(key2, 2).is_some());
// Nothing expired yet
assert!(mgr.expire_idle().is_empty());
@@ -178,7 +189,7 @@ mod tests {
async fn test_activity_prevents_expiry() {
let mut mgr = UdpSessionManager::new(Duration::from_millis(100));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 1);
assert!(mgr.insert(key, 1).is_some());
// Touch session at 50ms (before 100ms timeout)
tokio::time::sleep(Duration::from_millis(50)).await;
@@ -200,11 +211,35 @@ mod tests {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
let key2 = UdpSessionKey { client_addr: addr(5000), dest_port: 443 };
mgr.insert(key1, 1);
mgr.insert(key2, 2);
assert!(mgr.insert(key1, 1).is_some());
assert!(mgr.insert(key2, 2).is_some());
assert_eq!(mgr.len(), 2);
assert_eq!(mgr.get_mut(&key1).unwrap().stream_id, 1);
assert_eq!(mgr.get_mut(&key2).unwrap().stream_id, 2);
}
#[test]
fn test_max_sessions_limit() {
let mut mgr = UdpSessionManager::with_max_sessions(Duration::from_secs(60), 2);
let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
let key2 = UdpSessionKey { client_addr: addr(5001), dest_port: 53 };
let key3 = UdpSessionKey { client_addr: addr(5002), dest_port: 53 };
assert!(mgr.insert(key1, 1).is_some());
assert!(mgr.insert(key2, 2).is_some());
// Third insert should be rejected (at capacity)
assert!(mgr.insert(key3, 3).is_none());
assert_eq!(mgr.len(), 2);
// Re-inserting an existing key should succeed (update, not new)
assert!(mgr.insert(key1, 1).is_some());
assert_eq!(mgr.len(), 2);
// After removing one, a new insert should succeed
mgr.remove_by_stream_id(1);
assert_eq!(mgr.len(), 1);
assert!(mgr.insert(key3, 3).is_some());
assert_eq!(mgr.len(), 2);
}
}
+75 -30
View File
@@ -32,12 +32,18 @@ pub const FRAME_HEADER_SIZE: usize = 9;
pub const MAX_PAYLOAD_SIZE: u32 = 16 * 1024 * 1024;
// Per-stream flow control constants
/// Initial (and maximum) per-stream window size (4 MB).
pub const INITIAL_STREAM_WINDOW: u32 = 4 * 1024 * 1024;
/// Send WINDOW_UPDATE after consuming this many bytes (half the initial window).
/// Default maximum per-stream window size (8 MB).
pub const INITIAL_STREAM_WINDOW: u32 = 8 * 1024 * 1024;
/// Minimum safe window size used when strict budget pressure requires going below the configured floor.
pub const ABSOLUTE_MIN_STREAM_WINDOW: u32 = 16 * 1024;
/// Default total TCP/TLS flow-control budget per edge connection (256 MB).
pub const DEFAULT_TOTAL_WINDOW_BUDGET: u64 = 256 * 1024 * 1024;
/// Default preferred minimum stream window (128 KB). The total budget still wins above this.
pub const DEFAULT_MIN_STREAM_WINDOW: u32 = 128 * 1024;
/// Send WINDOW_UPDATE after consuming this many bytes when no dynamic window is available.
pub const WINDOW_UPDATE_THRESHOLD: u32 = INITIAL_STREAM_WINDOW / 2;
/// Maximum window size to prevent overflow.
pub const MAX_WINDOW_SIZE: u32 = 4 * 1024 * 1024;
pub const MAX_WINDOW_SIZE: u32 = 32 * 1024 * 1024;
// Sustained stream classification constants
/// Throughput threshold for sustained classification (2.5 MB/s = 20 Mbit/s).
@@ -55,11 +61,37 @@ pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> B
}
/// Compute the target per-stream window size based on the number of active streams.
/// Total memory budget is ~200MB shared across all streams. Up to 50 streams get the
/// full 4MB window; above that the window scales down to a 1MB floor at 200+ streams.
/// The total budget is authoritative: the configured minimum is a preference, not
/// permission to exceed the edge-level memory budget under very high concurrency.
pub fn compute_window_for_stream_count(active: u32) -> u32 {
let per_stream = (200 * 1024 * 1024u64) / (active.max(1) as u64);
per_stream.clamp(1 * 1024 * 1024, INITIAL_STREAM_WINDOW as u64) as u32
compute_window_for_limits(
active,
DEFAULT_TOTAL_WINDOW_BUDGET,
DEFAULT_MIN_STREAM_WINDOW,
INITIAL_STREAM_WINDOW,
)
}
pub fn compute_window_for_limits(
active: u32,
total_budget_bytes: u64,
min_window_bytes: u32,
max_window_bytes: u32,
) -> u32 {
let active = active.max(1) as u64;
let max_window = max_window_bytes.max(ABSOLUTE_MIN_STREAM_WINDOW);
let preferred_min = min_window_bytes
.max(ABSOLUTE_MIN_STREAM_WINDOW)
.min(max_window);
let per_stream_budget = total_budget_bytes
.max(ABSOLUTE_MIN_STREAM_WINDOW as u64)
/ active;
let bounded = per_stream_budget.min(max_window as u64);
if bounded >= preferred_min as u64 {
bounded as u32
} else {
bounded.max(ABSOLUTE_MIN_STREAM_WINDOW as u64) as u32
}
}
/// Decode a WINDOW_UPDATE payload into a byte increment. Returns None if payload is malformed.
@@ -307,6 +339,13 @@ pub struct TunnelIo<S> {
write: WriteState,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct TunnelQueueDepths {
pub ctrl: usize,
pub data: usize,
pub sustained: usize,
}
impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
pub fn new(stream: S, initial_data: Vec<u8>) -> Self {
let read_pos = initial_data.len();
@@ -346,6 +385,14 @@ impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
self.write.sustained_queue.push_back(frame);
}
pub fn queue_depths(&self) -> TunnelQueueDepths {
TunnelQueueDepths {
ctrl: self.write.ctrl_queue.len(),
data: self.write.data_queue.len(),
sustained: self.write.sustained_queue.len(),
}
}
/// Try to parse a complete frame from the read buffer.
/// Uses a parse_pos cursor to avoid drain() on every frame.
pub fn try_parse_frame(&mut self) -> Option<Result<Frame, std::io::Error>> {
@@ -910,7 +957,7 @@ mod tests {
#[test]
fn test_adaptive_window_zero_streams() {
// 0 streams treated as 1: 200MB/1 -> clamped to 4MB max
// 0 streams treated as 1: budget/1 -> clamped to max
assert_eq!(compute_window_for_stream_count(0), INITIAL_STREAM_WINDOW);
}
@@ -920,47 +967,44 @@ mod tests {
}
#[test]
fn test_adaptive_window_50_streams_full() {
// 200MB/50 = 4MB = exactly INITIAL_STREAM_WINDOW
assert_eq!(compute_window_for_stream_count(50), INITIAL_STREAM_WINDOW);
fn test_adaptive_window_32_streams_full() {
// 256MB/32 = 8MB = exactly INITIAL_STREAM_WINDOW
assert_eq!(compute_window_for_stream_count(32), INITIAL_STREAM_WINDOW);
}
#[test]
fn test_adaptive_window_51_streams_starts_scaling() {
// 200MB/51 < 4MB — first value below max
let w = compute_window_for_stream_count(51);
fn test_adaptive_window_33_streams_starts_scaling() {
// 256MB/33 < 8MB — first value below max
let w = compute_window_for_stream_count(33);
assert!(w < INITIAL_STREAM_WINDOW);
assert_eq!(w, (200 * 1024 * 1024u64 / 51) as u32);
assert_eq!(w, (DEFAULT_TOTAL_WINDOW_BUDGET / 33) as u32);
}
#[test]
fn test_adaptive_window_100_streams() {
// 200MB/100 = 2MB
assert_eq!(compute_window_for_stream_count(100), 2 * 1024 * 1024);
assert_eq!(compute_window_for_stream_count(100), (DEFAULT_TOTAL_WINDOW_BUDGET / 100) as u32);
}
#[test]
fn test_adaptive_window_200_streams_at_floor() {
// 200MB/200 = 1MB = exactly the floor
assert_eq!(compute_window_for_stream_count(200), 1 * 1024 * 1024);
fn test_adaptive_window_200_streams_uses_budget() {
assert_eq!(compute_window_for_stream_count(200), (DEFAULT_TOTAL_WINDOW_BUDGET / 200) as u32);
}
#[test]
fn test_adaptive_window_500_streams_clamped() {
// 200MB/500 = 0.4MB -> clamped up to 1MB floor
assert_eq!(compute_window_for_stream_count(500), 1 * 1024 * 1024);
fn test_adaptive_window_500_streams_stays_under_budget() {
assert_eq!(compute_window_for_stream_count(500), (DEFAULT_TOTAL_WINDOW_BUDGET / 500) as u32);
}
#[test]
fn test_adaptive_window_max_u32() {
// Extreme: u32::MAX streams -> tiny value -> clamped to 1MB
assert_eq!(compute_window_for_stream_count(u32::MAX), 1 * 1024 * 1024);
// Extreme: u32::MAX streams -> tiny value -> clamped to absolute minimum.
assert_eq!(compute_window_for_stream_count(u32::MAX), ABSOLUTE_MIN_STREAM_WINDOW);
}
#[test]
fn test_adaptive_window_monotonically_decreasing() {
let mut prev = compute_window_for_stream_count(1);
for n in [2, 10, 50, 51, 100, 200, 500, 1000] {
for n in [2, 10, 32, 33, 100, 200, 500, 1000] {
let w = compute_window_for_stream_count(n);
assert!(w <= prev, "window increased from {} to {} at n={}", prev, w, n);
prev = w;
@@ -969,11 +1013,12 @@ mod tests {
#[test]
fn test_adaptive_window_total_budget_bounded() {
// active x per_stream_window should never exceed 200MB (+ clamp overhead for high N)
for n in [1, 10, 50, 100, 200] {
// active x per_stream_window should never exceed the configured budget while the
// budget can still provide at least the absolute minimum per stream.
for n in [1, 10, 32, 33, 100, 200, 500, 1000] {
let w = compute_window_for_stream_count(n);
let total = w as u64 * n as u64;
assert!(total <= 200 * 1024 * 1024, "total {}MB exceeds budget at n={}", total / (1024*1024), n);
assert!(total <= DEFAULT_TOTAL_WINDOW_BUDGET, "total {}MB exceeds budget at n={}", total / (1024*1024), n);
}
}
+5
View File
@@ -322,6 +322,11 @@ tap.test('TCP/TLS setup: start TCP echo server and TCP+TLS tunnel', async () =>
tunnel = await startTunnel(edgePort, hubPort);
expect(tunnel.hub.running).toBeTrue();
const hubStatus = await tunnel.hub.getStatus();
expect(hubStatus.connectedEdges.length).toBeGreaterThanOrEqual(1);
const edgeStatus = hubStatus.connectedEdges[0];
expect(['quic', 'tcpTls'].includes(edgeStatus.transportMode)).toEqual(true);
expect(edgeStatus.performance.maxStreamsPerEdge).toBeGreaterThanOrEqual(1024);
});
tap.test('TCP/TLS: single TCP stream — 32MB transfer exceeding initial 4MB window', async () => {
+228
View File
@@ -0,0 +1,228 @@
import { expect, tap } from '@push.rocks/tapbundle';
import * as net from 'net';
import * as crypto from 'crypto';
import { RemoteIngressHub, RemoteIngressEdge } from '../ts/index.js';
// ---------------------------------------------------------------------------
// Helpers (same patterns as test.quic.node.ts)
// ---------------------------------------------------------------------------
async function findFreePorts(count: number): Promise<number[]> {
const servers: net.Server[] = [];
const ports: number[] = [];
for (let i = 0; i < count; i++) {
const server = net.createServer();
await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve));
ports.push((server.address() as net.AddressInfo).port);
servers.push(server);
}
await Promise.all(servers.map((s) => new Promise<void>((resolve) => s.close(() => resolve()))));
return ports;
}
type TrackingServer = net.Server & { destroyAll: () => void };
function startEchoServer(port: number, host: string): Promise<TrackingServer> {
return new Promise((resolve, reject) => {
const connections = new Set<net.Socket>();
const server = net.createServer((socket) => {
connections.add(socket);
socket.on('close', () => connections.delete(socket));
let proxyHeaderParsed = false;
let pendingBuf = Buffer.alloc(0);
socket.on('data', (data: Buffer) => {
if (!proxyHeaderParsed) {
pendingBuf = Buffer.concat([pendingBuf, data]);
const idx = pendingBuf.indexOf('\r\n');
if (idx !== -1) {
proxyHeaderParsed = true;
const remainder = pendingBuf.subarray(idx + 2);
if (remainder.length > 0) socket.write(remainder);
}
return;
}
socket.write(data);
});
socket.on('error', () => {});
}) as TrackingServer;
server.destroyAll = () => {
for (const conn of connections) conn.destroy();
connections.clear();
};
server.on('error', reject);
server.listen(port, host, () => resolve(server));
});
}
async function forceCloseServer(server: TrackingServer): Promise<void> {
server.destroyAll();
await new Promise<void>((resolve) => server.close(() => resolve()));
}
function sendAndReceive(port: number, data: Buffer, timeoutMs = 30000): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
let totalReceived = 0;
const expectedLength = data.length;
let settled = false;
const client = net.createConnection({ host: '127.0.0.1', port }, () => {
client.write(data);
client.end();
});
const timer = setTimeout(() => {
if (!settled) {
settled = true;
client.destroy();
reject(new Error(`Timeout after ${timeoutMs}ms — received ${totalReceived}/${expectedLength} bytes`));
}
}, timeoutMs);
client.on('data', (chunk: Buffer) => {
chunks.push(chunk);
totalReceived += chunk.length;
if (totalReceived >= expectedLength && !settled) {
settled = true;
clearTimeout(timer);
client.destroy();
resolve(Buffer.concat(chunks));
}
});
client.on('end', () => {
if (!settled) {
settled = true;
clearTimeout(timer);
resolve(Buffer.concat(chunks));
}
});
client.on('error', (err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
reject(err);
}
});
});
}
function sha256(buf: Buffer): string {
return crypto.createHash('sha256').update(buf).digest('hex');
}
// ---------------------------------------------------------------------------
// QUIC Long-Running Stability Test — 2 minutes
// ---------------------------------------------------------------------------
let hub: RemoteIngressHub;
let edge: RemoteIngressEdge;
let echoServer: TrackingServer;
let hubPort: number;
let edgePort: number;
let disconnectCount = 0;
tap.test('QUIC stability setup: start echo server and QUIC tunnel', async () => {
[hubPort, edgePort] = await findFreePorts(2);
echoServer = await startEchoServer(edgePort, '127.0.0.2');
hub = new RemoteIngressHub();
edge = new RemoteIngressEdge();
await hub.start({
tunnelPort: hubPort,
targetHost: '127.0.0.2',
});
await hub.updateAllowedEdges([
{ id: 'test-edge', secret: 'test-secret', listenPorts: [edgePort] },
]);
const connectedPromise = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('QUIC edge did not connect within 10s')), 10000);
edge.once('tunnelConnected', () => {
clearTimeout(timeout);
resolve();
});
});
// Track disconnects — any disconnect during the test is a failure signal
edge.on('tunnelDisconnected', () => {
disconnectCount++;
console.log(`[STABILITY] Unexpected tunnel disconnect #${disconnectCount}`);
});
await edge.start({
hubHost: '127.0.0.1',
hubPort,
edgeId: 'test-edge',
secret: 'test-secret',
bindAddress: '127.0.0.1',
transportMode: 'quic',
});
await connectedPromise;
await new Promise((resolve) => setTimeout(resolve, 500));
const status = await edge.getStatus();
expect(status.connected).toBeTrue();
});
tap.test('QUIC stability: tunnel stays alive for 30s with periodic echo probes', async () => {
const testDurationMs = 30_000; // 30 seconds
const probeIntervalMs = 5_000; // probe every 5 seconds
const startTime = Date.now();
let probeCount = 0;
let failedProbes = 0;
while (Date.now() - startTime < testDurationMs) {
probeCount++;
const elapsed = Math.round((Date.now() - startTime) / 1000);
// Verify edge still reports connected
const status = await edge.getStatus();
if (!status.connected) {
throw new Error(`Tunnel disconnected at ${elapsed}s (probe #${probeCount})`);
}
// Send a 4KB echo probe through the tunnel
const data = crypto.randomBytes(4096);
const hash = sha256(data);
try {
const received = await sendAndReceive(edgePort, data, 10000);
if (received.length !== 4096 || sha256(received) !== hash) {
failedProbes++;
console.log(`[STABILITY] Probe #${probeCount} at ${elapsed}s: data mismatch`);
} else {
console.log(`[STABILITY] Probe #${probeCount} at ${elapsed}s: OK`);
}
} catch (err) {
failedProbes++;
console.log(`[STABILITY] Probe #${probeCount} at ${elapsed}s: FAILED — ${err}`);
}
// Wait for next probe interval
const remaining = testDurationMs - (Date.now() - startTime);
if (remaining > 0) {
await new Promise((resolve) => setTimeout(resolve, Math.min(probeIntervalMs, remaining)));
}
}
console.log(`[STABILITY] Completed: ${probeCount} probes, ${failedProbes} failures, ${disconnectCount} disconnects`);
expect(failedProbes).toEqual(0);
expect(disconnectCount).toEqual(0);
// Final status check
const finalStatus = await edge.getStatus();
expect(finalStatus.connected).toBeTrue();
});
tap.test('QUIC stability teardown', async () => {
await edge.stop();
await hub.stop();
await forceCloseServer(echoServer);
});
export default tap.start();
+8
View File
@@ -185,6 +185,14 @@ tap.test('QUIC setup: start TCP echo server and QUIC tunnel', async () => {
expect(tunnel.hub.running).toBeTrue();
const status = await tunnel.edge.getStatus();
expect(status.connected).toBeTrue();
const hubStatus = await tunnel.hub.getStatus();
expect(hubStatus.connectedEdges.length).toBeGreaterThanOrEqual(1);
const edgeStatus = hubStatus.connectedEdges[0];
expect(edgeStatus.transportMode).toEqual('quic');
expect(edgeStatus.fallbackUsed).toEqual(false);
expect(edgeStatus.performance.profile).toEqual('balanced');
expect(edgeStatus.flowControl.applies).toEqual(false);
expect(edgeStatus.traffic.streamsOpenedTotal).toEqual(0);
});
tap.test('QUIC: single TCP stream echo — 1KB', async () => {
+1 -1
View File
@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@serve.zone/remoteingress',
version: '4.13.0',
version: '4.17.1',
description: 'Edge ingress tunnel for DcRouter - tunnels TCP and UDP traffic from the network edge to SmartProxy over TLS or QUIC, preserving client IP via PROXY protocol.'
}
+167 -9
View File
@@ -1,6 +1,7 @@
import * as plugins from './plugins.js';
import { EventEmitter } from 'events';
import { decodeConnectionToken } from './classes.token.js';
import type { IFirewallConfig } from './classes.remoteingresshub.js';
// Command map for the edge side of remoteingress-bin
type TEdgeCommands = {
@@ -55,6 +56,8 @@ export class RemoteIngressEdge extends EventEmitter {
private restartBackoffMs = 1000;
private restartAttempts = 0;
private statusInterval: ReturnType<typeof setInterval> | undefined;
private nft: InstanceType<typeof plugins.smartnftables.SmartNftables> | null = null;
private pendingFirewallConfig: IFirewallConfig | null = null;
constructor() {
super();
@@ -79,6 +82,15 @@ export class RemoteIngressEdge extends EventEmitter {
plugins.path.join(packageDir, 'rust', 'target', 'debug', 'remoteingress-bin'),
],
searchSystemPath: false,
logger: {
log: (level: string, message: string) => {
if (level === 'error') {
console.error(`[RemoteIngressEdge] ${message}`);
} else {
console.log(`[RemoteIngressEdge] ${message}`);
}
},
},
});
// Forward events from Rust binary
@@ -101,6 +113,96 @@ export class RemoteIngressEdge extends EventEmitter {
console.log(`[RemoteIngressEdge] Ports updated by hub: ${data.listenPorts.join(', ')}`);
this.emit('portsUpdated', data);
});
this.bridge.on('management:firewallConfigUpdated', (data: { firewallConfig: IFirewallConfig }) => {
console.log(`[RemoteIngressEdge] Firewall config updated from hub`);
void this.applyFirewallConfig(data.firewallConfig).catch((err) => {
console.error(`[RemoteIngressEdge] Failed to apply firewall config: ${err}`);
});
this.emit('firewallConfigUpdated', data);
});
}
/**
* Initialize the nftables manager. Fails gracefully if not running as root.
*/
private async initNft(options: { reset?: boolean } = {}): Promise<void> {
try {
this.nft = new plugins.smartnftables.SmartNftables({
tableName: 'remoteingress',
dryRun: false,
});
if (options.reset) {
await (this.nft as any).cleanup({ force: true });
}
await this.nft.initialize();
console.log('[RemoteIngressEdge] SmartNftables initialized');
if (this.pendingFirewallConfig) {
const pending = this.pendingFirewallConfig;
this.pendingFirewallConfig = null;
await this.applyFirewallConfig(pending);
}
} catch (err) {
console.warn(`[RemoteIngressEdge] Failed to initialize nftables (not root?): ${err}`);
this.nft = null;
}
}
/**
* Apply firewall configuration received from the hub.
* Performs a full replacement: cleans up existing rules, then applies the new config.
*/
private async applyFirewallConfig(config: IFirewallConfig): Promise<void> {
if (!this.nft) {
this.pendingFirewallConfig = config;
return;
}
try {
// Full cleanup and reinitialize to replace all rules atomically
await (this.nft as any).cleanup({ force: true });
await this.nft.initialize();
// Apply blocked IPs
if (config.blockedIps && config.blockedIps.length > 0) {
await (this.nft.firewall as any).blockIPSet('hub-blocklist', {
setName: 'blocked_ipv4',
ips: config.blockedIps,
comment: 'RemoteIngress hub blocklist',
});
console.log(`[RemoteIngressEdge] Blocked ${config.blockedIps.length} IPs`);
}
// Apply rate limits
if (config.rateLimits && config.rateLimits.length > 0) {
for (const rl of config.rateLimits) {
await this.nft.rateLimit.addRateLimit(rl.id, {
port: rl.port,
protocol: rl.protocol,
rate: rl.rate,
burst: rl.burst,
perSourceIP: rl.perSourceIP,
});
}
console.log(`[RemoteIngressEdge] Applied ${config.rateLimits.length} rate limits`);
}
// Apply firewall rules
if (config.rules && config.rules.length > 0) {
for (const rule of config.rules) {
await this.nft.firewall.addRule(rule.id, {
direction: rule.direction,
action: rule.action,
sourceIP: rule.sourceIP,
destPort: rule.destPort,
protocol: rule.protocol,
comment: rule.comment,
});
}
console.log(`[RemoteIngressEdge] Applied ${config.rules.length} firewall rules`);
}
} catch (err) {
console.error(`[RemoteIngressEdge] Failed to apply firewall config: ${err}`);
}
}
/**
@@ -125,22 +227,34 @@ export class RemoteIngressEdge extends EventEmitter {
this.savedConfig = edgeConfig;
this.stopping = false;
// Clear any stale nftables state left by a prior process before the edge
// can accept hub config or bind public listener ports.
await this.initNft({ reset: true });
const spawned = await this.bridge.spawn();
if (!spawned) {
throw new Error('Failed to spawn remoteingress-bin');
}
// Register crash recovery handler
// Register crash recovery handler (remove first to avoid duplicates)
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.on('exit', this.handleCrashRecovery);
await this.bridge.sendCommand('startEdge', {
hubHost: edgeConfig.hubHost,
hubPort: edgeConfig.hubPort ?? 8443,
edgeId: edgeConfig.edgeId,
secret: edgeConfig.secret,
...(edgeConfig.bindAddress ? { bindAddress: edgeConfig.bindAddress } : {}),
...(edgeConfig.transportMode ? { transportMode: edgeConfig.transportMode } : {}),
});
try {
await this.bridge.sendCommand('startEdge', {
hubHost: edgeConfig.hubHost,
hubPort: edgeConfig.hubPort ?? 8443,
edgeId: edgeConfig.edgeId,
secret: edgeConfig.secret,
...(edgeConfig.bindAddress ? { bindAddress: edgeConfig.bindAddress } : {}),
...(edgeConfig.transportMode ? { transportMode: edgeConfig.transportMode } : {}),
});
} catch (err) {
// Clean up the spawned process to avoid orphaning it
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.kill();
throw err;
}
this.started = true;
this.restartAttempts = 0;
@@ -170,6 +284,15 @@ export class RemoteIngressEdge extends EventEmitter {
clearInterval(this.statusInterval);
this.statusInterval = undefined;
}
// Clean up nftables rules before stopping
if (this.nft) {
try {
await (this.nft as any).cleanup({ force: true });
} catch (err) {
console.warn(`[RemoteIngressEdge] nftables cleanup error: ${err}`);
}
this.nft = null;
}
if (this.started) {
try {
await this.bridge.sendCommand('stopEdge', {} as Record<string, never>);
@@ -180,6 +303,11 @@ export class RemoteIngressEdge extends EventEmitter {
this.bridge.kill();
this.started = false;
}
this.savedConfig = null;
this.pendingFirewallConfig = null;
// Remove all listeners to prevent memory buildup
this.bridge.removeAllListeners();
this.removeAllListeners();
}
/**
@@ -211,6 +339,12 @@ export class RemoteIngressEdge extends EventEmitter {
this.started = false;
// Clear orphaned status interval from previous run
if (this.statusInterval) {
clearInterval(this.statusInterval);
this.statusInterval = undefined;
}
if (this.restartAttempts >= MAX_RESTART_ATTEMPTS) {
console.error('[RemoteIngressEdge] Max restart attempts reached, giving up');
this.emit('crashRecoveryFailed');
@@ -218,16 +352,25 @@ export class RemoteIngressEdge extends EventEmitter {
}
await new Promise(resolve => setTimeout(resolve, this.restartBackoffMs));
// Re-check after backoff — stop() may have been called during the wait
if (this.stopping || !this.savedConfig) {
return;
}
this.restartBackoffMs = Math.min(this.restartBackoffMs * 2, MAX_RESTART_BACKOFF_MS);
this.restartAttempts++;
try {
// Drop stale kernel rules before reconnecting. The hub will send the
// current full firewall snapshot during handshake/config refresh.
await this.initNft({ reset: true });
const spawned = await this.bridge.spawn();
if (!spawned) {
console.error('[RemoteIngressEdge] Failed to respawn binary');
return;
}
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.on('exit', this.handleCrashRecovery);
await this.bridge.sendCommand('startEdge', {
@@ -242,6 +385,21 @@ export class RemoteIngressEdge extends EventEmitter {
this.started = true;
this.restartAttempts = 0;
this.restartBackoffMs = 1000;
// Restart periodic status logging
this.statusInterval = setInterval(async () => {
try {
const status = await this.getStatus();
console.log(
`[RemoteIngressEdge] Status: connected=${status.connected}, ` +
`streams=${status.activeStreams}, ports=[${status.listenPorts.join(',')}], ` +
`publicIp=${status.publicIp ?? 'unknown'}`
);
} catch {
// Bridge may be shutting down
}
}, 60_000);
console.log('[RemoteIngressEdge] Successfully recovered from crash');
this.emit('crashRecovered');
} catch (err) {
+135 -21
View File
@@ -9,11 +9,12 @@ type THubCommands = {
};
startHub: {
params: {
tunnelPort: number;
targetHost?: string;
tlsCertPem?: string;
tlsKeyPem?: string;
};
tunnelPort: number;
targetHost?: string;
tlsCertPem?: string;
tlsKeyPem?: string;
performance?: IPerformanceConfig;
};
result: { started: boolean };
};
stopHub: {
@@ -22,7 +23,7 @@ type THubCommands = {
};
updateAllowedEdges: {
params: {
edges: Array<{ id: string; secret: string; listenPorts?: number[]; listenPortsUdp?: number[]; stunIntervalSecs?: number }>;
edges: Array<{ id: string; secret: string; listenPorts?: number[]; listenPortsUdp?: number[]; stunIntervalSecs?: number; firewallConfig?: IFirewallConfig; performance?: IPerformanceConfig }>;
};
result: { updated: boolean };
};
@@ -31,16 +32,48 @@ type THubCommands = {
result: {
running: boolean;
tunnelPort: number;
connectedEdges: Array<{
edgeId: string;
connectedAt: number;
activeStreams: number;
peerAddr: string;
}>;
connectedEdges: Array<{
edgeId: string;
connectedAt: number;
activeStreams: number;
peerAddr: string;
transportMode: 'tcpTls' | 'quic' | 'quicWithFallback';
fallbackUsed: boolean;
performance: IEffectivePerformanceConfig;
flowControl: IFlowControlStatus;
queues: IQueueStatus;
traffic: ITrafficStatus;
udp: IUdpStatus;
}>;
};
};
};
export interface IFirewallRateLimit {
id: string;
port: number;
protocol?: 'tcp' | 'udp';
rate: string;
burst?: number;
perSourceIP?: boolean;
}
export interface IFirewallRule {
id: string;
direction: 'input' | 'output' | 'forward';
action: 'accept' | 'drop' | 'reject';
sourceIP?: string;
destPort?: number;
protocol?: 'tcp' | 'udp';
comment?: string;
}
export interface IFirewallConfig {
blockedIps?: string[];
rateLimits?: IFirewallRateLimit[];
rules?: IFirewallRule[];
}
export interface IHubConfig {
tunnelPort?: number;
targetHost?: string;
@@ -48,9 +81,61 @@ export interface IHubConfig {
certPem?: string;
keyPem?: string;
};
performance?: IPerformanceConfig;
}
type TAllowedEdge = { id: string; secret: string; listenPorts?: number[]; listenPortsUdp?: number[]; stunIntervalSecs?: number };
export type TPerformanceProfile = 'balanced' | 'throughput' | 'highConcurrency';
export interface IPerformanceConfig {
profile?: TPerformanceProfile;
maxStreamsPerEdge?: number;
totalWindowBudgetBytes?: number;
minStreamWindowBytes?: number;
maxStreamWindowBytes?: number;
sustainedStreamWindowBytes?: number;
quicDatagramReceiveBufferBytes?: number;
}
export interface IEffectivePerformanceConfig {
profile: TPerformanceProfile;
maxStreamsPerEdge: number;
totalWindowBudgetBytes: number;
minStreamWindowBytes: number;
maxStreamWindowBytes: number;
sustainedStreamWindowBytes: number;
quicDatagramReceiveBufferBytes: number;
}
export interface IFlowControlStatus {
applies: boolean;
currentWindowBytes: number;
minWindowBytes: number;
maxWindowBytes: number;
totalWindowBudgetBytes: number;
estimatedInFlightBytes: number;
stalledStreams: number;
}
export interface IQueueStatus {
ctrlQueueDepth: number;
dataQueueDepth: number;
sustainedQueueDepth: number;
}
export interface ITrafficStatus {
bytesIn: number;
bytesOut: number;
streamsOpenedTotal: number;
streamsClosedTotal: number;
rejectedStreams: number;
}
export interface IUdpStatus {
activeSessions: number;
droppedDatagrams: number;
}
type TAllowedEdge = { id: string; secret: string; listenPorts?: number[]; listenPortsUdp?: number[]; stunIntervalSecs?: number; firewallConfig?: IFirewallConfig; performance?: IPerformanceConfig };
const MAX_RESTART_ATTEMPTS = 10;
const MAX_RESTART_BACKOFF_MS = 30_000;
@@ -87,6 +172,15 @@ export class RemoteIngressHub extends EventEmitter {
plugins.path.join(packageDir, 'rust', 'target', 'debug', 'remoteingress-bin'),
],
searchSystemPath: false,
logger: {
log: (level: string, message: string) => {
if (level === 'error') {
console.error(`[RemoteIngressHub] ${message}`);
} else {
console.log(`[RemoteIngressHub] ${message}`);
}
},
},
});
// Forward events from Rust binary
@@ -118,16 +212,25 @@ export class RemoteIngressHub extends EventEmitter {
throw new Error('Failed to spawn remoteingress-bin');
}
// Register crash recovery handler
// Register crash recovery handler (remove first to avoid duplicates)
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.on('exit', this.handleCrashRecovery);
await this.bridge.sendCommand('startHub', {
tunnelPort: config.tunnelPort ?? 8443,
targetHost: config.targetHost ?? '127.0.0.1',
...(config.tls?.certPem && config.tls?.keyPem
? { tlsCertPem: config.tls.certPem, tlsKeyPem: config.tls.keyPem }
: {}),
});
try {
await this.bridge.sendCommand('startHub', {
tunnelPort: config.tunnelPort ?? 8443,
targetHost: config.targetHost ?? '127.0.0.1',
...(config.performance ? { performance: config.performance } : {}),
...(config.tls?.certPem && config.tls?.keyPem
? { tlsCertPem: config.tls.certPem, tlsKeyPem: config.tls.keyPem }
: {}),
});
} catch (err) {
// Clean up the spawned process to avoid orphaning it
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.kill();
throw err;
}
this.started = true;
this.restartAttempts = 0;
@@ -149,6 +252,11 @@ export class RemoteIngressHub extends EventEmitter {
this.bridge.kill();
this.started = false;
}
this.savedConfig = null;
this.savedEdges = [];
// Remove all listeners to prevent memory buildup
this.bridge.removeAllListeners();
this.removeAllListeners();
}
/**
@@ -195,6 +303,10 @@ export class RemoteIngressHub extends EventEmitter {
}
await new Promise(resolve => setTimeout(resolve, this.restartBackoffMs));
// Re-check after backoff — stop() may have been called during the wait
if (this.stopping || !this.savedConfig) {
return;
}
this.restartBackoffMs = Math.min(this.restartBackoffMs * 2, MAX_RESTART_BACKOFF_MS);
this.restartAttempts++;
@@ -205,6 +317,7 @@ export class RemoteIngressHub extends EventEmitter {
return;
}
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.on('exit', this.handleCrashRecovery);
const config = this.savedConfig;
@@ -214,6 +327,7 @@ export class RemoteIngressHub extends EventEmitter {
...(config.tls?.certPem && config.tls?.keyPem
? { tlsCertPem: config.tls.certPem, tlsKeyPem: config.tls.keyPem }
: {}),
...(config.performance ? { performance: config.performance } : {}),
});
// Restore allowed edges
+2 -1
View File
@@ -3,5 +3,6 @@ import * as path from 'path';
export { path };
// @push.rocks scope
import * as smartnftables from '@push.rocks/smartnftables';
import * as smartrust from '@push.rocks/smartrust';
export { smartrust };
export { smartnftables, smartrust };
+2 -1
View File
@@ -6,7 +6,8 @@
"module": "NodeNext",
"moduleResolution": "NodeNext",
"esModuleInterop": true,
"verbatimModuleSyntax": true
"verbatimModuleSyntax": true,
"types": ["node"]
},
"exclude": [
"dist_*/**/*.d.ts"