Compare commits

...

12 Commits

Author SHA1 Message Date
jkunz 27d4a5d3c1 v4.17.1
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-26 15:08:37 +00:00
jkunz 627603532d fix(remoteingressedge): reset nftables state on startup and restart before reapplying hub firewall config 2026-04-26 15:08:37 +00:00
jkunz dd0cd479d5 v4.17.0
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-26 12:09:58 +00:00
jkunz e709e40404 feat(core): add performance profiles, transport observability, and edge stream budget controls 2026-04-26 12:09:58 +00:00
jkunz 5304bbb486 v4.15.3
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-27 11:34:31 +00:00
jkunz ac993dd5a3 fix(core): harden UDP session handling, QUIC control message validation, and bridge process cleanup 2026-03-27 11:34:31 +00:00
jkunz 0b2a83ddb6 v4.15.2
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-26 17:06:28 +00:00
jkunz 3c5ea6bdc5 fix(readme): adjust tunnel diagram alignment in the README 2026-03-26 17:06:28 +00:00
jkunz 3dea43400b v4.15.1
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-26 17:05:38 +00:00
jkunz 8fa3d414dd fix(readme): clarify unified runtime configuration and firewall update behavior 2026-03-26 17:05:38 +00:00
jkunz 1a62c52d24 v4.15.0
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-26 16:39:53 +00:00
jkunz e9a08bdd0f feat(edge,hub): add hub-controlled nftables firewall configuration for remote ingress edges 2026-03-26 16:39:53 +00:00
19 changed files with 1207 additions and 216 deletions
+53
View File
@@ -1,5 +1,58 @@
# Changelog
## 2026-04-26 - 4.17.1 - fix(remoteingressedge)
reset nftables state on startup and restart before reapplying hub firewall config
- upgrade @push.rocks/smartnftables to ^1.2.0 to use forced cleanup and IP set blocking
- queue firewall updates until nftables is initialized and apply pending config afterward
- replace per-IP blocking with blockIPSet for the hub blocklist
- force nftables cleanup during startup, restart, firewall replacement, and shutdown to remove stale kernel rules
## 2026-04-26 - 4.17.0 - feat(core)
add performance profiles, transport observability, and edge stream budget controls
- introduce configurable performance profiles and effective per-edge limits for stream concurrency, flow-control windows, and QUIC datagram buffers
- expose hub-side edge status for transport mode, fallback usage, flow-control, queue depths, traffic counters, and UDP session metrics
- enforce edge-side stream admission limits before spawning client tunnel tasks and make TCP/TLS window sizing honor edge memory budgets under high concurrency
- increase QUIC datagram receive buffer configurability and improve hub-side QUIC UDP session tracking and idle pruning
- update hub APIs and documentation to support performance configuration and clarify quicWithFallback as the default edge transport
## 2026-04-26 - 4.16.0 - feat(performance)
add remote ingress performance controls and runtime observability
- add performance profiles and configurable stream/window budgets for hub and edge connections
- expose per-edge transport, flow-control, queue, traffic, and UDP status from hub status
- enforce edge-side stream admission before spawning client tunnel tasks
- make TCP/TLS flow control honor an edge-level memory budget under high concurrency
- increase QUIC datagram receive buffers and prune idle hub-side QUIC UDP sessions
## 2026-03-27 - 4.15.3 - fix(core)
harden UDP session handling, QUIC control message validation, and bridge process cleanup
- cap UDP session creation and drop excess datagrams with warnings to prevent unbounded session growth
- periodically prune closed datagram sessions on the hub and reject oversized QUIC control messages to avoid resource exhaustion
- clean up spawned edge and hub bridge processes on startup failure, remove listeners on stop, and avoid restarting after shutdown during backoff
## 2026-03-26 - 4.15.2 - fix(readme)
adjust tunnel diagram alignment in the README
- Improves formatting consistency in the Hub/Edge topology diagram.
## 2026-03-26 - 4.15.1 - fix(readme)
clarify unified runtime configuration and firewall update behavior
- Updates the architecture and feature descriptions to reflect that ports, firewall rules, and rate limits are pushed together in a single config update
- Clarifies that firewall configuration is delivered via FRAME_CONFIG on handshake and subsequent updates, with atomic full-rule replacement at the edge
- Simplifies and reorganizes README wording around edge and hub responsibilities without changing implementation behavior
## 2026-03-26 - 4.15.0 - feat(edge,hub)
add hub-controlled nftables firewall configuration for remote ingress edges
- add firewallConfig support to allowed edge definitions, handshake responses, and runtime config updates
- emit firewallConfigUpdated events from the Rust bridge and edge runtime when firewall settings change
- initialize SmartNftables on edges, apply blocked IPs, rate limits, and custom rules, and clean up nftables rules on stop
- document centralized firewall management, root requirements, and new edge events in the README
## 2026-03-26 - 4.14.3 - fix(docs)
refresh project metadata and README to reflect current ingress tunnel capabilities
+2 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@serve.zone/remoteingress",
"version": "4.14.3",
"version": "4.17.1",
"private": false,
"description": "Edge ingress tunnel for DcRouter - tunnels TCP and UDP traffic from the network edge to SmartProxy over TLS or QUIC, preserving client IP via PROXY protocol.",
"main": "dist_ts/index.js",
@@ -24,6 +24,7 @@
},
"dependencies": {
"@push.rocks/qenv": "^6.1.3",
"@push.rocks/smartnftables": "^1.2.0",
"@push.rocks/smartrust": "^1.3.2"
},
"repository": {
+11
View File
@@ -11,6 +11,9 @@ importers:
'@push.rocks/qenv':
specifier: ^6.1.3
version: 6.1.3
'@push.rocks/smartnftables':
specifier: ^1.2.0
version: 1.2.0
'@push.rocks/smartrust':
specifier: ^1.3.2
version: 1.3.2
@@ -1204,6 +1207,9 @@ packages:
'@push.rocks/smartnetwork@4.4.0':
resolution: {integrity: sha512-OvFtz41cvQ7lcXwaIOhghNUUlNoMxvwKDctbDvMyuZyEH08SpLjhyv2FuKbKL/mgwA/WxakTbohoC8SW7t+kiw==}
'@push.rocks/smartnftables@1.2.0':
resolution: {integrity: sha512-VTRHnxHrJj9VOq2MaCOqxiA4JLGRnzEaZ7kXxA7v3ljX+Y2wWK9VYpwKKBEbjgjoTpQyOf+I0gEG9wkR/jtUvQ==}
'@push.rocks/smartnpm@2.0.6':
resolution: {integrity: sha512-7anKDOjX6gXWs1IAc+YWz9ZZ8gDsTwaLh+CxRnGHjAawOmK788NrrgVCg2Fb3qojrPnoxecc46F8Ivp1BT7Izw==}
@@ -6433,6 +6439,11 @@ snapshots:
transitivePeerDependencies:
- supports-color
'@push.rocks/smartnftables@1.2.0':
dependencies:
'@push.rocks/smartlog': 3.2.1
'@push.rocks/smartpromise': 4.2.3
'@push.rocks/smartnpm@2.0.6':
dependencies:
'@push.rocks/consolecolor': 2.0.3
+157 -51
View File
@@ -1,6 +1,6 @@
# @serve.zone/remoteingress
Edge ingress tunnel for DcRouter — tunnels **TCP and UDP** traffic from the network edge to a private DcRouter/SmartProxy cluster over encrypted TLS or QUIC connections, preserving the original client IP via PROXY protocol.
Edge ingress tunnel for DcRouter — tunnels **TCP and UDP** traffic from the network edge to a private DcRouter/SmartProxy cluster over encrypted TLS or QUIC connections, preserving the original client IP via PROXY protocol. Includes **hub-controlled nftables firewall** for IP blocking, rate limiting, and custom firewall rules applied directly at the edge.
## Issue Reporting and Security
@@ -17,39 +17,40 @@ pnpm install @serve.zone/remoteingress
`@serve.zone/remoteingress` uses a **Hub/Edge** topology with a high-performance Rust core and a TypeScript API surface:
```
TLS or QUIC Tunnel
TLS or QUIC Tunnel
┌─────────────────────┐ ◄══════════════════════════► ┌─────────────────────┐
│ Network Edge │ TCP+TLS: frame mux │ Private Cluster │
│ │ QUIC: native streams │ │
│ RemoteIngressEdge │ UDP: QUIC datagrams │ RemoteIngressHub │
│ │ │ │
Accepts TCP & UDP │ Forwards to
on hub-assigned │ │ SmartProxy on
ports │ local ports
└─────────────────────┘ └─────────────────────┘
│ TCP + UDP from end users
Internet DcRouter / SmartProxy
│ │ QUIC: native streams │ │
│ RemoteIngressEdge │ UDP: QUIC datagrams │ RemoteIngressHub │
│ │ │ │
TCP/UDP listeners│ ◄─── FRAME_CONFIG pushes ─── • Port assignments
• nftables firewall│ ports + firewall rules │ • Firewall config
• Rate limitingat any time │ • Rate limit rules
└─────────────────────┘ └─────────────────────┘
▲ │
│ TCP + UDP from end users ▼
Internet DcRouter / SmartProxy
```
| Component | Role |
|-----------|------|
| **RemoteIngressEdge** | Deployed at the network edge (VPS, cloud instance). Listens on TCP and UDP ports assigned by the hub, accepts connections/datagrams, and tunnels them to the hub. Ports are hot-reloadable at runtime. |
| **RemoteIngressHub** | Deployed alongside DcRouter/SmartProxy in a private cluster. Accepts edge connections, demuxes streams/datagrams, and forwards each to SmartProxy with PROXY protocol headers so the real client IP is preserved. |
| **RemoteIngressEdge** | Deployed at the network edge (VPS, cloud instance). Runs as root. Listens on hub-assigned TCP/UDP ports, tunnels traffic to the hub, and applies hub-pushed nftables rules (IP blocking, rate limiting). All config is hot-reloadable at runtime. |
| **RemoteIngressHub** | Deployed alongside DcRouter/SmartProxy in a private cluster. Accepts edge connections, demuxes streams/datagrams, and forwards each to SmartProxy with PROXY protocol headers so the real client IP is preserved. Pushes all edge config (ports, firewall) via a single API. |
| **Rust Binary** (`remoteingress-bin`) | The performance-critical networking core. Managed via `@push.rocks/smartrust` RustBridge IPC — you never interact with it directly. Cross-compiled for `linux/amd64` and `linux/arm64`. |
### Key Features
### Key Features
- **Dual transport** — choose between TCP+TLS (frame-multiplexed) or QUIC (native stream multiplexing, zero head-of-line blocking)
- **TCP + UDP tunneling** — tunnel any TCP connection or UDP datagram through the same edge/hub pair
- **PROXY protocol v1 & v2** — SmartProxy sees the real client IP for both TCP (v1 text) and UDP (v2 binary)
- **Hub-controlled firewall** — push nftables rules (IP blocking, rate limiting, custom firewall rules) to edges as part of the same config update that assigns ports — powered by `@push.rocks/smartnftables`
- **Multiplexed streams** — thousands of concurrent TCP connections over a single tunnel
- **QUIC datagrams** — UDP traffic forwarded via QUIC unreliable datagrams for lowest possible latency
- **Shared-secret authentication** — edges must present valid credentials to connect
- **Connection tokens** — encode all connection details into a single opaque base64url string
- **STUN-based public IP discovery** — edges automatically discover their public IP via Cloudflare STUN
- **Auto-reconnect** with exponential backoff if the tunnel drops
- **Dynamic port configuration** — the hub assigns TCP and UDP listen ports per edge, hot-reloadable at runtime
- **Dynamic runtime configuration** — the hub pushes ports, firewall rules, and rate limits to edges at any time via a single `updateAllowedEdges()` call
- **Event-driven** — both Hub and Edge extend `EventEmitter` for real-time monitoring
- **3-tier QoS** — control frames, normal data, and sustained (elephant flow) traffic each get their own priority queue
- **Adaptive flow control** — per-stream windows scale with active stream count to prevent memory overuse
@@ -79,7 +80,7 @@ await hub.start({
targetHost: '127.0.0.1', // SmartProxy host to forward traffic to
});
// Register allowed edges with TCP and UDP listen ports
// Register allowed edges with TCP and UDP listen ports + firewall config
await hub.updateAllowedEdges([
{
id: 'edge-nyc-01',
@@ -87,6 +88,15 @@ await hub.updateAllowedEdges([
listenPorts: [80, 443], // TCP ports the edge should listen on
listenPortsUdp: [53, 51820], // UDP ports (e.g., DNS, WireGuard)
stunIntervalSecs: 300,
firewallConfig: {
blockedIps: ['192.168.1.100', '10.0.0.0/8'],
rateLimits: [
{ id: 'http-rate', port: 80, protocol: 'tcp', rate: '100/second', perSourceIP: true },
],
rules: [
{ id: 'allow-ssh', direction: 'input', action: 'accept', sourceIP: '10.0.0.0/24', destPort: 22, protocol: 'tcp' },
],
},
},
{
id: 'edge-fra-02',
@@ -95,13 +105,19 @@ await hub.updateAllowedEdges([
},
]);
// Dynamically update ports — changes are pushed instantly to connected edges
// Dynamically update ports and firewall — changes are pushed instantly to connected edges
await hub.updateAllowedEdges([
{
id: 'edge-nyc-01',
secret: 'supersecrettoken1',
listenPorts: [80, 443, 8443], // added TCP port 8443
listenPortsUdp: [53], // removed WireGuard UDP port
firewallConfig: {
blockedIps: ['192.168.1.100', '10.0.0.0/8', '203.0.113.50'], // added new blocked IP
rateLimits: [
{ id: 'http-rate', port: 80, protocol: 'tcp', rate: '200/second', perSourceIP: true },
],
},
},
]);
@@ -114,7 +130,7 @@ await hub.stop();
### Setting Up the Edge (Network Edge Side)
The edge can connect via **TCP+TLS** (default) or **QUIC** transport.
The edge connects via **QUIC with TCP+TLS fallback** by default. Edges run as **root** so they can bind to privileged ports and apply nftables firewall rules.
#### Option A: Connection Token (Recommended)
@@ -127,6 +143,7 @@ edge.on('tunnelConnected', () => console.log('Tunnel established'));
edge.on('tunnelDisconnected', () => console.log('Tunnel lost — will auto-reconnect'));
edge.on('publicIpDiscovered', ({ ip }) => console.log(`Public IP: ${ip}`));
edge.on('portsAssigned', ({ listenPorts }) => console.log(`TCP ports: ${listenPorts}`));
edge.on('firewallConfigUpdated', () => console.log('Firewall rules applied'));
await edge.start({
token: 'eyJoIjoiaHViLmV4YW1wbGUuY29tIiwi...',
@@ -145,7 +162,7 @@ await edge.start({
hubPort: 8443,
edgeId: 'edge-nyc-01',
secret: 'supersecrettoken1',
transportMode: 'quic', // 'tcpTls' (default) | 'quic' | 'quicWithFallback'
transportMode: 'quic', // 'tcpTls' | 'quic' | 'quicWithFallback' (default)
});
const edgeStatus = await edge.getStatus();
@@ -158,9 +175,9 @@ await edge.stop();
| Mode | Description |
|------|-------------|
| `'tcpTls'` | **Default.** Single TLS connection with frame-based multiplexing. Universal compatibility. |
| `'tcpTls'` | Single TLS connection with frame-based multiplexing. Universal compatibility. |
| `'quic'` | QUIC with native stream multiplexing. Eliminates head-of-line blocking. Uses QUIC datagrams for UDP traffic. |
| `'quicWithFallback'` | Tries QUIC first (5s timeout), falls back to TCP+TLS if UDP is blocked by the network. |
| `'quicWithFallback'` | **Default.** Tries QUIC first (5s timeout), falls back to TCP+TLS if UDP is blocked by the network. |
### Connection Tokens
@@ -185,6 +202,68 @@ const data = decodeConnectionToken(token);
Tokens are base64url-encoded — safe for environment variables, CLI arguments, and config files.
## 🔥 Firewall Config
The `firewallConfig` field in `updateAllowedEdges()` works exactly like `listenPorts` — it travels in the same `FRAME_CONFIG` frame, is delivered on initial handshake and on every subsequent update, and is applied atomically at the edge using `@push.rocks/smartnftables`. Each update fully replaces the previous ruleset.
Since edges run as root, the rules are applied directly to the Linux kernel via nftables. If the edge isn't root or nftables is unavailable, it logs a warning and continues — the tunnel works fine, just without kernel-level firewall rules.
### Config Structure
```typescript
interface IFirewallConfig {
blockedIps?: string[]; // IPs or CIDRs to block (e.g., '1.2.3.4', '10.0.0.0/8')
rateLimits?: IFirewallRateLimit[];
rules?: IFirewallRule[];
}
interface IFirewallRateLimit {
id: string; // unique identifier for this rate limit
port: number; // port to rate-limit
protocol?: 'tcp' | 'udp'; // default: both
rate: string; // e.g., '100/second', '1000/minute'
burst?: number; // burst allowance
perSourceIP?: boolean; // per-client rate limiting (recommended)
}
interface IFirewallRule {
id: string; // unique identifier for this rule
direction: 'input' | 'output' | 'forward';
action: 'accept' | 'drop' | 'reject';
sourceIP?: string; // source IP or CIDR
destPort?: number; // destination port
protocol?: 'tcp' | 'udp';
comment?: string;
}
```
### Example: Rate Limiting + IP Blocking
```typescript
await hub.updateAllowedEdges([
{
id: 'edge-nyc-01',
secret: 'secret',
listenPorts: [80, 443],
firewallConfig: {
// Block known bad actors
blockedIps: ['198.51.100.0/24', '203.0.113.50'],
// Rate limit HTTP traffic per source IP
rateLimits: [
{ id: 'http', port: 80, protocol: 'tcp', rate: '100/second', burst: 50, perSourceIP: true },
{ id: 'https', port: 443, protocol: 'tcp', rate: '200/second', burst: 100, perSourceIP: true },
],
// Allow monitoring from trusted subnet
rules: [
{ id: 'monitoring', direction: 'input', action: 'accept', sourceIP: '10.0.0.0/24', destPort: 9090, protocol: 'tcp', comment: 'Prometheus scraping' },
],
},
},
]);
```
## API Reference
### `RemoteIngressHub`
@@ -193,7 +272,7 @@ Tokens are base64url-encoded — safe for environment variables, CLI arguments,
|-------------------|-------------|
| `start(config?)` | Start the hub. Config: `{ tunnelPort?, targetHost?, tls?: { certPem?, keyPem? } }`. Listens on both TCP and UDP (QUIC) on the tunnel port. |
| `stop()` | Graceful shutdown. |
| `updateAllowedEdges(edges)` | Set authorized edges. Each: `{ id, secret, listenPorts?, listenPortsUdp?, stunIntervalSecs? }`. Port changes are pushed to connected edges in real time. |
| `updateAllowedEdges(edges)` | Set authorized edges. Each: `{ id, secret, listenPorts?, listenPortsUdp?, stunIntervalSecs?, firewallConfig? }`. Port and firewall changes are pushed to connected edges in real time. |
| `getStatus()` | Returns `{ running, tunnelPort, connectedEdges: [...] }`. |
| `running` | `boolean` — whether the Rust binary is alive. |
@@ -204,11 +283,11 @@ Tokens are base64url-encoded — safe for environment variables, CLI arguments,
| Method / Property | Description |
|-------------------|-------------|
| `start(config)` | Connect to hub. Accepts `{ token }` or `{ hubHost, hubPort, edgeId, secret, bindAddress?, transportMode? }`. |
| `stop()` | Graceful shutdown. |
| `stop()` | Graceful shutdown. Cleans up all nftables rules. |
| `getStatus()` | Returns `{ running, connected, publicIp, activeStreams, listenPorts }`. |
| `running` | `boolean` — whether the Rust binary is alive. |
**Events:** `tunnelConnected`, `tunnelDisconnected`, `publicIpDiscovered`, `portsAssigned`, `portsUpdated`, `crashRecovered`, `crashRecoveryFailed`
**Events:** `tunnelConnected`, `tunnelDisconnected`, `publicIpDiscovered`, `portsAssigned`, `portsUpdated`, `firewallConfigUpdated`, `crashRecovered`, `crashRecoveryFailed`
### Token Utilities
@@ -258,19 +337,19 @@ The tunnel uses a custom binary frame protocol over a single TLS connection:
| Frame Type | Value | Direction | Purpose |
|------------|-------|-----------|---------|
| `OPEN` | `0x01` | Edge -> Hub | Open TCP stream; payload is PROXY v1 header |
| `DATA` | `0x02` | Edge -> Hub | Client data (upload) |
| `CLOSE` | `0x03` | Edge -> Hub | Client closed connection |
| `DATA_BACK` | `0x04` | Hub -> Edge | Response data (download) |
| `CLOSE_BACK` | `0x05` | Hub -> Edge | Upstream closed connection |
| `CONFIG` | `0x06` | Hub -> Edge | Runtime config update (JSON payload) |
| `PING` | `0x07` | Hub -> Edge | Heartbeat probe (every 15s) |
| `PONG` | `0x08` | Edge -> Hub | Heartbeat response |
| `WINDOW_UPDATE` | `0x09` | Edge -> Hub | Flow control: edge consumed N bytes |
| `WINDOW_UPDATE_BACK` | `0x0A` | Hub -> Edge | Flow control: hub consumed N bytes |
| `UDP_OPEN` | `0x0B` | Edge -> Hub | Open UDP session; payload is PROXY v2 header |
| `UDP_DATA` | `0x0C` | Edge -> Hub | UDP datagram (upload) |
| `UDP_DATA_BACK` | `0x0D` | Hub -> Edge | UDP datagram (download) |
| `OPEN` | `0x01` | Edge Hub | Open TCP stream; payload is PROXY v1 header |
| `DATA` | `0x02` | Edge Hub | Client data (upload) |
| `CLOSE` | `0x03` | Edge Hub | Client closed connection |
| `DATA_BACK` | `0x04` | Hub Edge | Response data (download) |
| `CLOSE_BACK` | `0x05` | Hub Edge | Upstream closed connection |
| `CONFIG` | `0x06` | Hub Edge | Runtime config update (JSON: ports + firewall config) |
| `PING` | `0x07` | Hub Edge | Heartbeat probe (every 15s) |
| `PONG` | `0x08` | Edge Hub | Heartbeat response |
| `WINDOW_UPDATE` | `0x09` | Edge Hub | Flow control: edge consumed N bytes |
| `WINDOW_UPDATE_BACK` | `0x0A` | Hub Edge | Flow control: hub consumed N bytes |
| `UDP_OPEN` | `0x0B` | Edge Hub | Open UDP session; payload is PROXY v2 header |
| `UDP_DATA` | `0x0C` | Edge Hub | UDP datagram (upload) |
| `UDP_DATA_BACK` | `0x0D` | Hub Edge | UDP datagram (download) |
| `UDP_CLOSE` | `0x0E` | Either | Close UDP session |
### QUIC Transport
@@ -286,9 +365,10 @@ When using QUIC, the frame protocol is replaced by native QUIC primitives:
1. Edge opens a TLS or QUIC connection to the hub
2. Edge sends: `EDGE <edgeId> <secret>\n`
3. Hub verifies credentials (constant-time comparison) and responds with JSON:
`{"listenPorts":[...],"listenPortsUdp":[...],"stunIntervalSecs":300}\n`
`{"listenPorts":[...],"listenPortsUdp":[...],"stunIntervalSecs":300,"firewallConfig":{...}}\n`
4. Edge starts TCP and UDP listeners on the assigned ports
5. Data flows — TCP frames/QUIC streams for TCP traffic, UDP frames/QUIC datagrams for UDP traffic
5. Edge applies firewall config via nftables (if present and running as root)
6. Data flows — TCP frames/QUIC streams for TCP traffic, UDP frames/QUIC datagrams for UDP traffic
## QoS & Flow Control
@@ -314,23 +394,23 @@ Each TCP stream has a send window from a shared **200 MB budget**:
| Active Streams | Window per Stream |
|---|---|
| 1-50 | 4 MB (maximum) |
| 51-200 | Scales down (4 MB -> 1 MB) |
| 150 | 4 MB (maximum) |
| 51200 | Scales down (4 MB 1 MB) |
| 200+ | 1 MB (floor) |
UDP traffic uses no flow control — datagrams are fire-and-forget, matching UDP semantics.
## Example Scenarios
### 1. Expose a Private Cluster to the Internet
### 1. 🌐 Expose a Private Cluster to the Internet
Deploy an Edge on a public VPS, point DNS to its IP. The Edge tunnels all TCP and UDP traffic to the Hub running inside your private cluster. No public ports needed on the cluster.
### 2. Multi-Region Edge Ingress
### 2. 🗺️ Multi-Region Edge Ingress
Run Edges in NYC, Frankfurt, and Tokyo — all connecting to a single Hub. Use GeoDNS to route users to their nearest Edge. PROXY protocol ensures the Hub sees real client IPs regardless of which Edge they entered through.
### 3. UDP Forwarding (DNS, Gaming, VoIP)
### 3. 📡 UDP Forwarding (DNS, Gaming, VoIP)
Configure UDP listen ports alongside TCP ports. DNS queries, game server traffic, or VoIP packets are tunneled through the same edge/hub connection and forwarded to SmartProxy with a PROXY v2 binary header preserving the client's real IP.
@@ -345,7 +425,7 @@ await hub.updateAllowedEdges([
]);
```
### 4. QUIC Transport for Low-Latency
### 4. 🚀 QUIC Transport for Low-Latency
Use QUIC transport to eliminate head-of-line blocking — a lost packet on one stream doesn't stall others. QUIC also enables 0-RTT reconnection and connection migration.
@@ -359,7 +439,7 @@ await edge.start({
});
```
### 5. Token-Based Edge Provisioning
### 5. 🔑 Token-Based Edge Provisioning
Generate connection tokens on the hub side and distribute them to edge operators:
@@ -378,21 +458,47 @@ const edge = new RemoteIngressEdge();
await edge.start({ token });
```
### 6. 🛡️ Centralized Firewall Management
Push firewall rules from the hub to all your edge nodes. Block bad actors, rate-limit abusive traffic, and whitelist trusted subnets — all from a single control plane:
```typescript
await hub.updateAllowedEdges([
{
id: 'edge-nyc-01',
secret: 'secret',
listenPorts: [80, 443],
firewallConfig: {
blockedIps: ['198.51.100.0/24'],
rateLimits: [
{ id: 'https', port: 443, protocol: 'tcp', rate: '500/second', perSourceIP: true, burst: 100 },
],
rules: [
{ id: 'allow-monitoring', direction: 'input', action: 'accept', sourceIP: '10.0.0.0/8', destPort: 9090, protocol: 'tcp' },
],
},
},
]);
// Firewall rules are applied at the edge via nftables within seconds
```
## License and Legal Information
This repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the [license](license) file within this repository.
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
### Trademarks
This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.
This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH or third parties, and are not included within the scope of the MIT license granted herein.
Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines or the guidelines of the respective third-party owners, and any usage must be approved in writing. Third-party trademarks used herein are the property of their respective owners and used only in a descriptive manner, e.g. for an implementation of an API or similar.
### Company Information
Task Venture Capital GmbH
Registered at District court Bremen HRB 35230 HB, Germany
Registered at District Court Bremen HRB 35230 HB, Germany
For any legal inquiries or if you require further information, please contact us via email at hello@task.vc.
For any legal inquiries or further information, please contact us via email at hello@task.vc.
By using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.
@@ -316,6 +316,12 @@ async fn handle_request(
serde_json::json!({ "listenPorts": listen_ports }),
);
}
EdgeEvent::FirewallConfigUpdated { firewall_config } => {
send_event(
"firewallConfigUpdated",
serde_json::json!({ "firewallConfig": firewall_config }),
);
}
}
}
});
+131 -37
View File
@@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize};
use bytes::Bytes;
use remoteingress_protocol::*;
use crate::performance::EffectivePerformanceConfig;
use crate::transport::TransportMode;
use crate::transport::quic as quic_transport;
use crate::udp_session::{UdpSessionKey, UdpSessionManager};
@@ -53,7 +54,7 @@ pub struct EdgeConfig {
/// Useful for testing on localhost where edge and upstream share the same machine.
#[serde(default)]
pub bind_address: Option<String>,
/// Transport mode for the tunnel connection (defaults to TcpTls).
/// Transport mode for the tunnel connection (defaults to QuicWithFallback).
#[serde(default)]
pub transport_mode: Option<TransportMode>,
}
@@ -67,12 +68,55 @@ struct HandshakeConfig {
listen_ports_udp: Vec<u16>,
#[serde(default = "default_stun_interval")]
stun_interval_secs: u64,
#[serde(default)]
firewall_config: Option<serde_json::Value>,
#[serde(default)]
performance: EffectivePerformanceConfig,
}
fn default_stun_interval() -> u64 {
300
}
fn try_reserve_stream(active_streams: &AtomicU32, max_streams: usize) -> bool {
let max_streams = max_streams.min(u32::MAX as usize) as u32;
loop {
let current = active_streams.load(Ordering::Relaxed);
if current >= max_streams {
return false;
}
if active_streams
.compare_exchange_weak(current, current + 1, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
return true;
}
}
}
fn release_stream(active_streams: &AtomicU32) {
loop {
let current = active_streams.load(Ordering::Relaxed);
if current == 0 {
break;
}
if active_streams
.compare_exchange_weak(current, current - 1, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
break;
}
}
}
fn transport_mode_wire_name(mode: TransportMode) -> &'static str {
match mode {
TransportMode::TcpTls => "tcpTls",
TransportMode::Quic => "quic",
TransportMode::QuicWithFallback => "quicWithFallback",
}
}
/// Runtime config update received from hub via FRAME_CONFIG.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -80,6 +124,10 @@ struct ConfigUpdate {
listen_ports: Vec<u16>,
#[serde(default)]
listen_ports_udp: Vec<u16>,
#[serde(default)]
firewall_config: Option<serde_json::Value>,
#[serde(default)]
performance: EffectivePerformanceConfig,
}
/// Events emitted by the edge.
@@ -96,6 +144,8 @@ pub enum EdgeEvent {
PortsAssigned { listen_ports: Vec<u16> },
#[serde(rename_all = "camelCase")]
PortsUpdated { listen_ports: Vec<u16> },
#[serde(rename_all = "camelCase")]
FirewallConfigUpdated { firewall_config: serde_json::Value },
}
/// Edge status response.
@@ -372,6 +422,7 @@ async fn handle_edge_frame(
bind_address: &str,
udp_sessions: &Arc<Mutex<UdpSessionManager>>,
udp_sockets: &Arc<Mutex<HashMap<u16, Arc<UdpSocket>>>>,
performance: &mut EffectivePerformanceConfig,
) -> EdgeFrameAction {
match frame.frame_type {
FRAME_DATA_BACK => {
@@ -392,8 +443,8 @@ async fn handle_edge_frame(
let writers = client_writers.lock().await;
if let Some(state) = writers.get(&frame.stream_id) {
let prev = state.send_window.fetch_add(increment, Ordering::Release);
if prev + increment > MAX_WINDOW_SIZE {
state.send_window.store(MAX_WINDOW_SIZE, Ordering::Release);
if prev + increment > performance.max_stream_window_bytes {
state.send_window.store(performance.max_stream_window_bytes, Ordering::Release);
}
state.window_notify.notify_one();
}
@@ -411,6 +462,7 @@ async fn handle_edge_frame(
FRAME_CONFIG => {
if let Ok(update) = serde_json::from_slice::<ConfigUpdate>(&frame.payload) {
log::info!("Config update from hub: ports {:?}, udp {:?}", update.listen_ports, update.listen_ports_udp);
*performance = update.performance.clone();
*listen_ports.write().await = update.listen_ports.clone();
let _ = event_tx.try_send(EdgeEvent::PortsUpdated {
listen_ports: update.listen_ports.clone(),
@@ -427,6 +479,7 @@ async fn handle_edge_frame(
edge_id,
connection_token,
bind_address,
performance,
);
apply_udp_port_config(
&update.listen_ports_udp,
@@ -439,6 +492,11 @@ async fn handle_edge_frame(
connection_token,
bind_address,
);
if let Some(fw_config) = update.firewall_config {
let _ = event_tx.try_send(EdgeEvent::FirewallConfigUpdated {
firewall_config: fw_config,
});
}
}
}
FRAME_PING => {
@@ -513,7 +571,13 @@ async fn connect_to_hub_and_run(
};
// Send auth line (we own the whole stream — no split)
let auth_line = format!("EDGE {} {}\n", config.edge_id, config.secret);
let requested_transport = config.transport_mode.unwrap_or(TransportMode::QuicWithFallback);
let auth_line = format!(
"EDGE {} {} {}\n",
config.edge_id,
config.secret,
transport_mode_wire_name(requested_transport),
);
if tls_stream.write_all(auth_line.as_bytes()).await.is_err() {
return EdgeLoopResult::Reconnect("auth_write_failed".to_string());
}
@@ -552,6 +616,7 @@ async fn connect_to_hub_and_run(
return EdgeLoopResult::Reconnect(format!("handshake_invalid: {}", e));
}
};
let mut performance = handshake.performance.clone();
log::info!(
"Handshake from hub: ports {:?}, stun_interval {}s",
@@ -569,6 +634,13 @@ async fn connect_to_hub_and_run(
listen_ports: handshake.listen_ports.clone(),
});
// Emit firewall config if present in handshake
if let Some(fw_config) = handshake.firewall_config {
let _ = event_tx.try_send(EdgeEvent::FirewallConfigUpdated {
firewall_config: fw_config,
});
}
// Start STUN discovery
let stun_interval = handshake.stun_interval_secs;
let public_ip_clone = public_ip.clone();
@@ -622,6 +694,7 @@ async fn connect_to_hub_and_run(
&config.edge_id,
connection_token,
bind_address,
&performance,
);
// UDP session manager + listeners
@@ -682,7 +755,7 @@ async fn connect_to_hub_and_run(
frame, &mut tunnel_io, &client_writers, listen_ports, event_tx,
&tunnel_writer_tx, &tunnel_data_tx, &tunnel_sustained_tx, &mut port_listeners,
&mut udp_listeners, active_streams, next_stream_id, &config.edge_id,
connection_token, bind_address, &udp_sessions, &udp_sockets,
connection_token, bind_address, &udp_sessions, &udp_sockets, &mut performance,
).await {
break 'io_loop EdgeLoopResult::Reconnect(reason);
}
@@ -701,7 +774,7 @@ async fn connect_to_hub_and_run(
frame, &mut tunnel_io, &client_writers, listen_ports, event_tx,
&tunnel_writer_tx, &tunnel_data_tx, &tunnel_sustained_tx, &mut port_listeners,
&mut udp_listeners, active_streams, next_stream_id, &config.edge_id,
connection_token, bind_address, &udp_sessions, &udp_sockets,
connection_token, bind_address, &udp_sessions, &udp_sockets, &mut performance,
).await {
break EdgeLoopResult::Reconnect(reason);
}
@@ -767,6 +840,7 @@ fn apply_port_config(
edge_id: &str,
connection_token: &CancellationToken,
bind_address: &str,
performance: &EffectivePerformanceConfig,
) {
let new_set: std::collections::HashSet<u16> = new_ports.iter().copied().collect();
let old_set: std::collections::HashSet<u16> = port_listeners.keys().copied().collect();
@@ -789,6 +863,7 @@ fn apply_port_config(
let next_stream_id = next_stream_id.clone();
let edge_id = edge_id.to_string();
let port_token = connection_token.child_token();
let performance = performance.clone();
let bind_addr = bind_address.to_string();
let handle = tokio::spawn(async move {
@@ -824,8 +899,12 @@ fn apply_port_config(
let edge_id = edge_id.clone();
let client_token = port_token.child_token();
active_streams.fetch_add(1, Ordering::Relaxed);
if !try_reserve_stream(&active_streams, performance.max_streams_per_edge) {
log::warn!("Rejecting client on port {}: max streams ({}) reached", port, performance.max_streams_per_edge);
continue;
}
let performance = performance.clone();
tokio::spawn(async move {
handle_client_connection(
client_stream,
@@ -839,20 +918,10 @@ fn apply_port_config(
client_writers,
client_token,
Arc::clone(&active_streams),
performance,
)
.await;
// Saturating decrement: prevent underflow when
// edge_main_loop's store(0) races with task cleanup.
loop {
let current = active_streams.load(Ordering::Relaxed);
if current == 0 { break; }
if active_streams.compare_exchange_weak(
current, current - 1,
Ordering::Relaxed, Ordering::Relaxed,
).is_ok() {
break;
}
}
release_stream(&active_streams);
});
}
Err(e) => {
@@ -936,7 +1005,10 @@ fn apply_udp_port_config(
} else {
// New session — allocate stream_id and send UDP_OPEN
let sid = next_stream_id.fetch_add(1, Ordering::Relaxed);
sessions.insert(key, sid);
if sessions.insert(key, sid).is_none() {
log::warn!("UDP session limit reached, dropping datagram from {}", client_addr);
continue;
}
let client_ip = client_addr.ip().to_string();
let client_port = client_addr.port();
@@ -984,6 +1056,7 @@ async fn handle_client_connection(
client_writers: Arc<Mutex<HashMap<u32, EdgeStreamState>>>,
client_token: CancellationToken,
active_streams: Arc<AtomicU32>,
performance: EffectivePerformanceConfig,
) {
let client_ip = client_addr.ip().to_string();
let client_port = client_addr.port();
@@ -1007,9 +1080,12 @@ async fn handle_client_connection(
// streams due to channel overflow — backpressure slows streams, never kills them.
let (back_tx, mut back_rx) = mpsc::unbounded_channel::<Bytes>();
// Adaptive initial window: scale with current stream count to keep total in-flight
// data within the 200MB budget. Prevents burst flooding when many streams open.
let initial_window = remoteingress_protocol::compute_window_for_stream_count(
// data within the configured edge budget. Prevents burst flooding when many streams open.
let initial_window = remoteingress_protocol::compute_window_for_limits(
active_streams.load(Ordering::Relaxed),
performance.total_window_budget_bytes,
performance.min_stream_window_bytes,
performance.max_stream_window_bytes,
);
let send_window = Arc::new(AtomicU32::new(initial_window));
let window_notify = Arc::new(Notify::new());
@@ -1046,8 +1122,11 @@ async fn handle_client_connection(
// effective window shrinks to match current demand (fewer streams
// = larger window, more streams = smaller window per stream).
consumed_since_update += len;
let adaptive_window = remoteingress_protocol::compute_window_for_stream_count(
let adaptive_window = remoteingress_protocol::compute_window_for_limits(
active_streams_h2c.load(Ordering::Relaxed),
performance.total_window_budget_bytes,
performance.min_stream_window_bytes,
performance.max_stream_window_bytes,
);
let threshold = adaptive_window / 2;
if consumed_since_update >= threshold {
@@ -1261,7 +1340,13 @@ async fn connect_to_hub_and_run_quic_with_connection(
};
// Auth handshake on control stream (same protocol as TCP+TLS)
let auth_line = format!("EDGE {} {}\n", config.edge_id, config.secret);
let requested_transport = config.transport_mode.unwrap_or(TransportMode::QuicWithFallback);
let auth_line = format!(
"EDGE {} {} {}\n",
config.edge_id,
config.secret,
transport_mode_wire_name(requested_transport),
);
if let Err(e) = ctrl_send.write_all(auth_line.as_bytes()).await {
return EdgeLoopResult::Reconnect(format!("quic_auth_write_failed: {}", e));
}
@@ -1293,6 +1378,7 @@ async fn connect_to_hub_and_run_quic_with_connection(
return EdgeLoopResult::Reconnect(format!("quic_handshake_invalid: {}", e));
}
};
let mut performance = handshake.performance.clone();
log::info!(
"QUIC handshake from hub: ports {:?}, stun_interval {}s",
@@ -1309,6 +1395,13 @@ async fn connect_to_hub_and_run_quic_with_connection(
listen_ports: handshake.listen_ports.clone(),
});
// Emit firewall config if present in handshake
if let Some(fw_config) = handshake.firewall_config {
let _ = event_tx.try_send(EdgeEvent::FirewallConfigUpdated {
firewall_config: fw_config,
});
}
// Start STUN discovery
let stun_interval = handshake.stun_interval_secs;
let public_ip_clone = public_ip.clone();
@@ -1349,6 +1442,7 @@ async fn connect_to_hub_and_run_quic_with_connection(
&config.edge_id,
connection_token,
bind_address,
&performance,
);
// UDP listeners for QUIC transport — uses QUIC datagrams for low-latency forwarding.
@@ -1390,6 +1484,7 @@ async fn connect_to_hub_and_run_quic_with_connection(
quic_transport::CTRL_CONFIG => {
if let Ok(update) = serde_json::from_slice::<ConfigUpdate>(&payload) {
log::info!("QUIC config update from hub: ports {:?}", update.listen_ports);
performance = update.performance.clone();
*listen_ports.write().await = update.listen_ports.clone();
let _ = event_tx.try_send(EdgeEvent::PortsUpdated {
listen_ports: update.listen_ports.clone(),
@@ -1403,6 +1498,7 @@ async fn connect_to_hub_and_run_quic_with_connection(
&config.edge_id,
connection_token,
bind_address,
&performance,
);
apply_udp_port_config_quic(
&update.listen_ports_udp,
@@ -1509,6 +1605,7 @@ fn apply_port_config_quic(
edge_id: &str,
connection_token: &CancellationToken,
bind_address: &str,
performance: &EffectivePerformanceConfig,
) {
let new_set: std::collections::HashSet<u16> = new_ports.iter().copied().collect();
let old_set: std::collections::HashSet<u16> = port_listeners.keys().copied().collect();
@@ -1529,6 +1626,7 @@ fn apply_port_config_quic(
let _edge_id = edge_id.to_string();
let port_token = connection_token.child_token();
let bind_addr = bind_address.to_string();
let performance = performance.clone();
let handle = tokio::spawn(async move {
let listener = match TcpListener::bind((bind_addr.as_str(), port)).await {
@@ -1557,7 +1655,10 @@ fn apply_port_config_quic(
let active_streams = active_streams.clone();
let client_token = port_token.child_token();
active_streams.fetch_add(1, Ordering::Relaxed);
if !try_reserve_stream(&active_streams, performance.max_streams_per_edge) {
log::warn!("Rejecting QUIC client on port {}: max streams ({}) reached", port, performance.max_streams_per_edge);
continue;
}
tokio::spawn(async move {
handle_client_connection_quic(
@@ -1568,17 +1669,7 @@ fn apply_port_config_quic(
quic_conn,
client_token,
).await;
// Saturating decrement
loop {
let current = active_streams.load(Ordering::Relaxed);
if current == 0 { break; }
if active_streams.compare_exchange_weak(
current, current - 1,
Ordering::Relaxed, Ordering::Relaxed,
).is_ok() {
break;
}
}
release_stream(&active_streams);
});
}
Err(e) => {
@@ -1656,7 +1747,10 @@ fn apply_udp_port_config_quic(
} else {
// New session — send PROXY v2 header via control-style datagram
let sid = next_stream_id.fetch_add(1, Ordering::Relaxed);
sessions.insert(key, sid);
if sessions.insert(key, sid).is_none() {
log::warn!("QUIC UDP session limit reached, dropping datagram from {}", client_addr);
continue;
}
let client_ip = client_addr.ip().to_string();
let client_port = client_addr.port();
+285 -46
View File
@@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream, UdpSocket};
@@ -12,6 +12,8 @@ use serde::{Deserialize, Serialize};
use bytes::Bytes;
use remoteingress_protocol::*;
use crate::performance::{EffectivePerformanceConfig, PerformanceConfig};
use crate::transport::TransportMode;
use crate::transport::quic as quic_transport;
type HubTlsStream = tokio_rustls::server::TlsStream<TcpStream>;
@@ -56,6 +58,8 @@ pub struct HubConfig {
pub tls_cert_pem: Option<String>,
#[serde(default)]
pub tls_key_pem: Option<String>,
#[serde(default)]
pub performance: Option<PerformanceConfig>,
}
impl Default for HubConfig {
@@ -65,6 +69,7 @@ impl Default for HubConfig {
target_host: Some("127.0.0.1".to_string()),
tls_cert_pem: None,
tls_key_pem: None,
performance: None,
}
}
}
@@ -80,6 +85,10 @@ pub struct AllowedEdge {
#[serde(default)]
pub listen_ports_udp: Vec<u16>,
pub stun_interval_secs: Option<u64>,
#[serde(default)]
pub firewall_config: Option<serde_json::Value>,
#[serde(default)]
pub performance: Option<PerformanceConfig>,
}
/// Handshake response sent to edge after authentication.
@@ -90,6 +99,9 @@ struct HandshakeResponse {
#[serde(default)]
listen_ports_udp: Vec<u16>,
stun_interval_secs: u64,
#[serde(skip_serializing_if = "Option::is_none")]
firewall_config: Option<serde_json::Value>,
performance: EffectivePerformanceConfig,
}
/// Configuration update pushed to a connected edge at runtime.
@@ -99,6 +111,46 @@ pub struct EdgeConfigUpdate {
pub listen_ports: Vec<u16>,
#[serde(default)]
pub listen_ports_udp: Vec<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
pub firewall_config: Option<serde_json::Value>,
pub performance: EffectivePerformanceConfig,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct FlowControlStatus {
pub applies: bool,
pub current_window_bytes: u32,
pub min_window_bytes: u32,
pub max_window_bytes: u32,
pub total_window_budget_bytes: u64,
pub estimated_in_flight_bytes: u64,
pub stalled_streams: u64,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct QueueStatus {
pub ctrl_queue_depth: u64,
pub data_queue_depth: u64,
pub sustained_queue_depth: u64,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct TrafficStatus {
pub bytes_in: u64,
pub bytes_out: u64,
pub streams_opened_total: u64,
pub streams_closed_total: u64,
pub rejected_streams: u64,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct UdpStatus {
pub active_sessions: u64,
pub dropped_datagrams: u64,
}
/// Runtime status of a connected edge.
@@ -109,6 +161,13 @@ pub struct ConnectedEdgeStatus {
pub connected_at: u64,
pub active_streams: usize,
pub peer_addr: String,
pub transport_mode: TransportMode,
pub fallback_used: bool,
pub performance: EffectivePerformanceConfig,
pub flow_control: FlowControlStatus,
pub queues: QueueStatus,
pub traffic: TrafficStatus,
pub udp: UdpStatus,
}
/// Events emitted by the hub.
@@ -151,11 +210,30 @@ struct ConnectedEdgeInfo {
connected_at: u64,
peer_addr: String,
edge_stream_count: Arc<AtomicU32>,
transport_mode: TransportMode,
fallback_used: bool,
performance: EffectivePerformanceConfig,
metrics: Arc<EdgeRuntimeMetrics>,
config_tx: mpsc::Sender<EdgeConfigUpdate>,
/// Used to cancel the old connection when an edge reconnects.
cancel_token: CancellationToken,
}
#[derive(Default)]
struct EdgeRuntimeMetrics {
streams_opened_total: AtomicU64,
streams_closed_total: AtomicU64,
rejected_streams: AtomicU64,
bytes_in: AtomicU64,
bytes_out: AtomicU64,
stalled_streams: AtomicU64,
dropped_datagrams: AtomicU64,
active_udp_sessions: AtomicU64,
ctrl_queue_depth: AtomicU64,
data_queue_depth: AtomicU64,
sustained_queue_depth: AtomicU64,
}
impl TunnelHub {
pub fn new(config: HubConfig) -> Self {
let (event_tx, event_rx) = mpsc::channel(1024);
@@ -179,6 +257,7 @@ impl TunnelHub {
/// Update the list of allowed edges.
/// For any currently-connected edge whose ports changed, push a config update.
pub async fn update_allowed_edges(&self, edges: Vec<AllowedEdge>) {
let global_performance = self.config.read().await.performance.clone();
let mut map = self.allowed_edges.write().await;
// Build new map
@@ -192,14 +271,22 @@ impl TunnelHub {
for edge in &edges {
if let Some(info) = connected.get(&edge.id) {
// Check if ports changed compared to old config
let ports_changed = match map.get(&edge.id) {
Some(old) => old.listen_ports != edge.listen_ports || old.listen_ports_udp != edge.listen_ports_udp,
let config_changed = match map.get(&edge.id) {
Some(old) => old.listen_ports != edge.listen_ports
|| old.listen_ports_udp != edge.listen_ports_udp
|| old.firewall_config != edge.firewall_config
|| old.performance != edge.performance,
None => true, // newly allowed edge that's already connected
};
if ports_changed {
if config_changed {
let update = EdgeConfigUpdate {
listen_ports: edge.listen_ports.clone(),
listen_ports_udp: edge.listen_ports_udp.clone(),
firewall_config: edge.firewall_config.clone(),
performance: PerformanceConfig::merge(
global_performance.as_ref(),
edge.performance.as_ref(),
).effective(),
};
let _ = info.config_tx.try_send(update);
}
@@ -217,11 +304,50 @@ impl TunnelHub {
let mut connected = Vec::new();
for (id, info) in edges.iter() {
let active_streams = info.edge_stream_count.load(Ordering::Relaxed);
let flow_window = if info.transport_mode == TransportMode::TcpTls {
compute_window_for_limits(
active_streams,
info.performance.total_window_budget_bytes,
info.performance.min_stream_window_bytes,
info.performance.max_stream_window_bytes,
)
} else {
0
};
connected.push(ConnectedEdgeStatus {
edge_id: id.clone(),
connected_at: info.connected_at,
active_streams: info.edge_stream_count.load(Ordering::Relaxed) as usize,
active_streams: active_streams as usize,
peer_addr: info.peer_addr.clone(),
transport_mode: info.transport_mode,
fallback_used: info.fallback_used,
performance: info.performance.clone(),
flow_control: FlowControlStatus {
applies: info.transport_mode == TransportMode::TcpTls,
current_window_bytes: flow_window,
min_window_bytes: info.performance.min_stream_window_bytes,
max_window_bytes: info.performance.max_stream_window_bytes,
total_window_budget_bytes: info.performance.total_window_budget_bytes,
estimated_in_flight_bytes: flow_window as u64 * active_streams as u64,
stalled_streams: info.metrics.stalled_streams.load(Ordering::Relaxed),
},
queues: QueueStatus {
ctrl_queue_depth: info.metrics.ctrl_queue_depth.load(Ordering::Relaxed),
data_queue_depth: info.metrics.data_queue_depth.load(Ordering::Relaxed),
sustained_queue_depth: info.metrics.sustained_queue_depth.load(Ordering::Relaxed),
},
traffic: TrafficStatus {
bytes_in: info.metrics.bytes_in.load(Ordering::Relaxed),
bytes_out: info.metrics.bytes_out.load(Ordering::Relaxed),
streams_opened_total: info.metrics.streams_opened_total.load(Ordering::Relaxed),
streams_closed_total: info.metrics.streams_closed_total.load(Ordering::Relaxed),
rejected_streams: info.metrics.rejected_streams.load(Ordering::Relaxed),
},
udp: UdpStatus {
active_sessions: info.metrics.active_udp_sessions.load(Ordering::Relaxed),
dropped_datagrams: info.metrics.dropped_datagrams.load(Ordering::Relaxed),
},
});
}
@@ -240,9 +366,14 @@ impl TunnelHub {
let listener = TcpListener::bind(("0.0.0.0", config.tunnel_port)).await?;
log::info!("Hub listening on TCP port {}", config.tunnel_port);
let effective_performance = config.performance.clone().unwrap_or_default().effective();
// Start QUIC endpoint on the same port (UDP)
let quic_endpoint = match quic_transport::build_quic_server_config(tls_config) {
let quic_endpoint = match quic_transport::build_quic_server_config_with_limits(
tls_config,
effective_performance.max_streams_per_edge.min(u32::MAX as usize) as u32,
effective_performance.quic_datagram_receive_buffer_bytes,
) {
Ok(quic_server_config) => {
let bind_addr: std::net::SocketAddr = ([0, 0, 0, 0], config.tunnel_port).into();
match quinn::Endpoint::server(quic_server_config, bind_addr) {
@@ -271,6 +402,7 @@ impl TunnelHub {
let event_tx = self.event_tx.clone();
let target_host = config.target_host.unwrap_or_else(|| "127.0.0.1".to_string());
let hub_token = self.cancel_token.clone();
let hub_performance = config.performance.clone();
tokio::spawn(async move {
// Spawn QUIC acceptor as a separate task
@@ -280,6 +412,7 @@ impl TunnelHub {
let event_tx_q = event_tx.clone();
let target_q = target_host.clone();
let hub_token_q = hub_token.clone();
let performance_q = hub_performance.clone();
Some(tokio::spawn(async move {
loop {
tokio::select! {
@@ -292,6 +425,7 @@ impl TunnelHub {
let target = target_q.clone();
let edge_token = hub_token_q.child_token();
let peer_addr = incoming.remote_address().ip().to_string();
let performance = performance_q.clone();
tokio::spawn(async move {
// Accept the QUIC connection
let quic_conn = match incoming.await {
@@ -301,8 +435,8 @@ impl TunnelHub {
return;
}
};
if let Err(e) = handle_edge_connection_quic(
quic_conn, allowed, connected, event_tx, target, edge_token, peer_addr,
if let Err(e) = handle_edge_connection_quic(
quic_conn, allowed, connected, event_tx, target, edge_token, peer_addr, performance,
).await {
log::error!("QUIC edge connection error: {}", e);
}
@@ -336,9 +470,10 @@ impl TunnelHub {
let target = target_host.clone();
let edge_token = hub_token.child_token();
let peer_addr = addr.ip().to_string();
let performance = hub_performance.clone();
tokio::spawn(async move {
if let Err(e) = handle_edge_connection(
stream, acceptor, allowed, connected, event_tx, target, edge_token, peer_addr,
stream, acceptor, allowed, connected, event_tx, target, edge_token, peer_addr, performance,
).await {
log::error!("Edge connection error: {}", e);
}
@@ -381,15 +516,21 @@ impl TunnelHub {
}
}
fn parse_requested_transport_mode(value: Option<&str>) -> Option<TransportMode> {
match value {
Some("tcpTls") => Some(TransportMode::TcpTls),
Some("quic") => Some(TransportMode::Quic),
Some("quicWithFallback") => Some(TransportMode::QuicWithFallback),
_ => None,
}
}
impl Drop for TunnelHub {
fn drop(&mut self) {
self.cancel_token.cancel();
}
}
/// Maximum concurrent streams per edge connection.
const MAX_STREAMS_PER_EDGE: usize = 1024;
/// Process a single frame received from the edge side of the tunnel.
/// Handles FRAME_OPEN, FRAME_DATA, FRAME_WINDOW_UPDATE, FRAME_CLOSE, and FRAME_PONG.
async fn handle_hub_frame(
@@ -407,6 +548,8 @@ async fn handle_hub_frame(
target_host: &str,
edge_token: &CancellationToken,
cleanup_tx: &mpsc::Sender<u32>,
performance: &EffectivePerformanceConfig,
metrics: &Arc<EdgeRuntimeMetrics>,
) -> FrameAction {
match frame.frame_type {
FRAME_OPEN => {
@@ -414,8 +557,9 @@ async fn handle_hub_frame(
let permit = match stream_semaphore.clone().try_acquire_owned() {
Ok(p) => p,
Err(_) => {
metrics.rejected_streams.fetch_add(1, Ordering::Relaxed);
log::warn!("Edge {} exceeded max streams ({}), rejecting stream {}",
edge_id, MAX_STREAMS_PER_EDGE, frame.stream_id);
edge_id, performance.max_streams_per_edge, frame.stream_id);
let close_frame = encode_frame(frame.stream_id, FRAME_CLOSE_BACK, &[]);
tunnel_io.queue_ctrl(close_frame);
return FrameAction::Continue;
@@ -435,6 +579,8 @@ async fn handle_hub_frame(
let sustained_writer_tx = sustained_tx.clone(); // sustained: DATA_BACK from elephant flows
let target = target_host.to_string();
let stream_token = edge_token.child_token();
let active_after_open = edge_stream_count.fetch_add(1, Ordering::Relaxed) + 1;
metrics.streams_opened_total.fetch_add(1, Ordering::Relaxed);
let _ = event_tx.try_send(HubEvent::StreamOpened {
edge_id: edge_id.to_string(),
@@ -444,9 +590,12 @@ async fn handle_hub_frame(
// Create channel for data from edge to this stream
let (stream_data_tx, mut stream_data_rx) = mpsc::unbounded_channel::<Bytes>();
// Adaptive initial window: scale with current stream count
// to keep total in-flight data within the 200MB budget.
let initial_window = compute_window_for_stream_count(
edge_stream_count.load(Ordering::Relaxed),
// to keep total in-flight data within the configured edge budget.
let initial_window = compute_window_for_limits(
active_after_open,
performance.total_window_budget_bytes,
performance.min_stream_window_bytes,
performance.max_stream_window_bytes,
);
let send_window = Arc::new(AtomicU32::new(initial_window));
let window_notify = Arc::new(Notify::new());
@@ -459,9 +608,10 @@ async fn handle_hub_frame(
// Spawn task: connect to SmartProxy, send PROXY header, pipe data
let stream_counter = Arc::clone(edge_stream_count);
let stream_metrics = metrics.clone();
let stream_performance = performance.clone();
tokio::spawn(async move {
let _permit = permit; // hold semaphore permit until stream completes
stream_counter.fetch_add(1, Ordering::Relaxed);
let result = async {
// A2: Connect to SmartProxy with timeout
@@ -519,8 +669,11 @@ async fn handle_hub_frame(
// Track consumption for adaptive flow control.
// Increment capped to adaptive window to limit per-stream in-flight data.
consumed_since_update += len;
let adaptive_window = remoteingress_protocol::compute_window_for_stream_count(
let adaptive_window = remoteingress_protocol::compute_window_for_limits(
stream_counter_w.load(Ordering::Relaxed),
stream_performance.total_window_budget_bytes,
stream_performance.min_stream_window_bytes,
stream_performance.max_stream_window_bytes,
);
let threshold = adaptive_window / 2;
if consumed_since_update >= threshold {
@@ -575,9 +728,10 @@ async fn handle_hub_frame(
tokio::select! {
_ = notified => continue,
_ = stream_token.cancelled() => break,
_ = tokio::time::sleep(Duration::from_secs(55)) => {
log::warn!("Stream {} download stalled (window empty for 55s)", stream_id);
break;
_ = tokio::time::sleep(Duration::from_secs(55)) => {
stream_metrics.stalled_streams.fetch_add(1, Ordering::Relaxed);
log::warn!("Stream {} download stalled (window empty for 55s)", stream_id);
break;
}
}
}
@@ -604,6 +758,7 @@ async fn handle_hub_frame(
let frame = Bytes::copy_from_slice(&buf[..FRAME_HEADER_SIZE + n]);
// Sustained classification: >2.5 MB/s for >10 seconds
dl_bytes_sent += n as u64;
stream_metrics.bytes_out.fetch_add(n as u64, Ordering::Relaxed);
if !is_sustained {
let elapsed = dl_start.elapsed().as_secs();
if elapsed >= remoteingress_protocol::SUSTAINED_MIN_DURATION_SECS
@@ -668,6 +823,7 @@ async fn handle_hub_frame(
_ = cleanup.send(stream_id) => {}
_ = stream_token.cancelled() => {}
}
stream_metrics.streams_closed_total.fetch_add(1, Ordering::Relaxed);
stream_counter.fetch_sub(1, Ordering::Relaxed);
});
}
@@ -676,6 +832,7 @@ async fn handle_hub_frame(
// limits bytes-in-flight, so the channel won't grow unbounded. send() only
// fails if the receiver is dropped (stream handler already exited).
if let Some(state) = streams.get(&frame.stream_id) {
metrics.bytes_in.fetch_add(frame.payload.len() as u64, Ordering::Relaxed);
if state.data_tx.send(frame.payload).is_err() {
// Receiver dropped — stream handler already exited, clean up
streams.remove(&frame.stream_id);
@@ -688,8 +845,8 @@ async fn handle_hub_frame(
if increment > 0 {
if let Some(state) = streams.get(&frame.stream_id) {
let prev = state.send_window.fetch_add(increment, Ordering::Release);
if prev + increment > MAX_WINDOW_SIZE {
state.send_window.store(MAX_WINDOW_SIZE, Ordering::Release);
if prev + increment > performance.max_stream_window_bytes {
state.send_window.store(performance.max_stream_window_bytes, Ordering::Release);
}
state.window_notify.notify_one();
}
@@ -724,6 +881,7 @@ async fn handle_hub_frame(
data_tx: udp_tx,
cancel_token: session_token.clone(),
});
metrics.active_udp_sessions.fetch_add(1, Ordering::Relaxed);
// Spawn upstream UDP forwarder
tokio::spawn(async move {
@@ -758,6 +916,7 @@ async fn handle_hub_frame(
Ok(len) => {
let frame = encode_frame(stream_id, FRAME_UDP_DATA_BACK, &buf[..len]);
if data_writer_tx.try_send(frame).is_err() {
// Return datagrams may be dropped under pressure.
break;
}
}
@@ -791,18 +950,22 @@ async fn handle_hub_frame(
}
recv_handle.abort();
// active_udp_sessions is decremented by the FRAME_UDP_CLOSE path or connection cleanup.
log::debug!("UDP session {} closed for edge {}", stream_id, edge_id_str);
});
}
FRAME_UDP_DATA => {
// Forward datagram to upstream
if let Some(state) = udp_sessions.get(&frame.stream_id) {
let _ = state.data_tx.try_send(frame.payload);
if state.data_tx.try_send(frame.payload).is_err() {
metrics.dropped_datagrams.fetch_add(1, Ordering::Relaxed);
}
}
}
FRAME_UDP_CLOSE => {
if let Some(state) = udp_sessions.remove(&frame.stream_id) {
state.cancel_token.cancel();
metrics.active_udp_sessions.fetch_sub(1, Ordering::Relaxed);
}
}
_ => {
@@ -822,6 +985,7 @@ async fn handle_edge_connection(
target_host: String,
edge_token: CancellationToken,
peer_addr: String,
hub_performance: Option<PerformanceConfig>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Disable Nagle's algorithm for low-latency control frames (PING/PONG, WINDOW_UPDATE)
stream.set_nodelay(true)?;
@@ -852,29 +1016,38 @@ async fn handle_edge_connection(
.map_err(|_| "auth line not valid UTF-8")?;
let auth_line = auth_line.trim();
let parts: Vec<&str> = auth_line.splitn(3, ' ').collect();
if parts.len() != 3 || parts[0] != "EDGE" {
let parts: Vec<&str> = auth_line.split_whitespace().collect();
if parts.len() < 3 || parts[0] != "EDGE" {
return Err("invalid auth line".into());
}
let edge_id = parts[1].to_string();
let secret = parts[2];
let requested_transport = parse_requested_transport_mode(parts.get(3).copied());
let fallback_used = requested_transport == Some(TransportMode::QuicWithFallback);
// Verify credentials and extract edge config
let (listen_ports, listen_ports_udp, stun_interval_secs) = {
let (listen_ports, listen_ports_udp, stun_interval_secs, firewall_config, edge_performance) = {
let edges = allowed.read().await;
match edges.get(&edge_id) {
Some(edge) => {
if !constant_time_eq(secret.as_bytes(), edge.secret.as_bytes()) {
return Err(format!("invalid secret for edge {}", edge_id).into());
}
(edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300))
(
edge.listen_ports.clone(),
edge.listen_ports_udp.clone(),
edge.stun_interval_secs.unwrap_or(300),
edge.firewall_config.clone(),
edge.performance.clone(),
)
}
None => {
return Err(format!("unknown edge {}", edge_id).into());
}
}
};
let performance = PerformanceConfig::merge(hub_performance.as_ref(), edge_performance.as_ref()).effective();
log::info!("Edge {} authenticated from {}", edge_id, peer_addr);
let _ = event_tx.try_send(HubEvent::EdgeConnected {
@@ -887,6 +1060,8 @@ async fn handle_edge_connection(
listen_ports: listen_ports.clone(),
listen_ports_udp: listen_ports_udp.clone(),
stun_interval_secs,
firewall_config,
performance: performance.clone(),
};
let mut handshake_json = serde_json::to_string(&handshake)?;
handshake_json.push('\n');
@@ -898,6 +1073,7 @@ async fn handle_edge_connection(
let mut udp_sessions: HashMap<u32, HubUdpSessionState> = HashMap::new();
// Per-edge active stream counter for adaptive flow control
let edge_stream_count = Arc::new(AtomicU32::new(0));
let metrics = Arc::new(EdgeRuntimeMetrics::default());
// Cleanup channel: spawned stream tasks send stream_id here when done
let (cleanup_tx, mut cleanup_rx) = mpsc::channel::<u32>(256);
let now = std::time::SystemTime::now()
@@ -923,6 +1099,10 @@ async fn handle_edge_connection(
connected_at: now,
peer_addr,
edge_stream_count: edge_stream_count.clone(),
transport_mode: TransportMode::TcpTls,
fallback_used,
performance: performance.clone(),
metrics: metrics.clone(),
config_tx,
cancel_token: edge_token.clone(),
},
@@ -963,7 +1143,7 @@ async fn handle_edge_connection(
});
// A4: Semaphore to limit concurrent streams per edge
let stream_semaphore = Arc::new(Semaphore::new(MAX_STREAMS_PER_EDGE));
let stream_semaphore = Arc::new(Semaphore::new(performance.max_streams_per_edge));
// Heartbeat: periodic PING and liveness timeout
let ping_interval_dur = Duration::from_secs(15);
@@ -1009,7 +1189,7 @@ async fn handle_edge_connection(
frame, &mut tunnel_io, &mut streams, &mut udp_sessions,
&stream_semaphore, &edge_stream_count,
&edge_id, &event_tx, &ctrl_tx, &data_tx, &sustained_tx, &target_host, &edge_token,
&cleanup_tx,
&cleanup_tx, &performance, &metrics,
).await {
disconnect_reason = reason;
break 'hub_loop;
@@ -1022,6 +1202,10 @@ async fn handle_edge_connection(
if ping_ticker.poll_tick(cx).is_ready() {
tunnel_io.queue_ctrl(encode_frame(0, FRAME_PING, &[]));
}
let depths = tunnel_io.queue_depths();
metrics.ctrl_queue_depth.store(depths.ctrl as u64, Ordering::Relaxed);
metrics.data_queue_depth.store(depths.data as u64, Ordering::Relaxed);
metrics.sustained_queue_depth.store(depths.sustained as u64, Ordering::Relaxed);
tunnel_io.poll_step(cx, &mut ctrl_rx, &mut data_rx, &mut sustained_rx, &mut liveness_deadline, &edge_token)
}).await;
@@ -1033,7 +1217,7 @@ async fn handle_edge_connection(
frame, &mut tunnel_io, &mut streams, &mut udp_sessions,
&stream_semaphore, &edge_stream_count,
&edge_id, &event_tx, &ctrl_tx, &data_tx, &sustained_tx, &target_host, &edge_token,
&cleanup_tx,
&cleanup_tx, &performance, &metrics,
).await {
disconnect_reason = reason;
break;
@@ -1191,6 +1375,7 @@ async fn handle_edge_connection_quic(
target_host: String,
edge_token: CancellationToken,
peer_addr: String,
hub_performance: Option<PerformanceConfig>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
log::info!("QUIC edge connection from {}", peer_addr);
@@ -1219,8 +1404,8 @@ async fn handle_edge_connection_quic(
.map_err(|_| "QUIC auth line not valid UTF-8")?;
let auth_line = auth_line.trim();
let parts: Vec<&str> = auth_line.splitn(3, ' ').collect();
if parts.len() != 3 || parts[0] != "EDGE" {
let parts: Vec<&str> = auth_line.split_whitespace().collect();
if parts.len() < 3 || parts[0] != "EDGE" {
return Err("invalid QUIC auth line".into());
}
@@ -1228,18 +1413,25 @@ async fn handle_edge_connection_quic(
let secret = parts[2];
// Verify credentials
let (listen_ports, listen_ports_udp, stun_interval_secs) = {
let (listen_ports, listen_ports_udp, stun_interval_secs, firewall_config, edge_performance) = {
let edges = allowed.read().await;
match edges.get(&edge_id) {
Some(edge) => {
if !constant_time_eq(secret.as_bytes(), edge.secret.as_bytes()) {
return Err(format!("invalid secret for edge {}", edge_id).into());
}
(edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300))
(
edge.listen_ports.clone(),
edge.listen_ports_udp.clone(),
edge.stun_interval_secs.unwrap_or(300),
edge.firewall_config.clone(),
edge.performance.clone(),
)
}
None => return Err(format!("unknown edge {}", edge_id).into()),
}
};
let performance = PerformanceConfig::merge(hub_performance.as_ref(), edge_performance.as_ref()).effective();
log::info!("QUIC edge {} authenticated from {}", edge_id, peer_addr);
let _ = event_tx.try_send(HubEvent::EdgeConnected {
@@ -1252,6 +1444,8 @@ async fn handle_edge_connection_quic(
listen_ports: listen_ports.clone(),
listen_ports_udp: listen_ports_udp.clone(),
stun_interval_secs,
firewall_config,
performance: performance.clone(),
};
let mut handshake_json = serde_json::to_string(&handshake)?;
handshake_json.push('\n');
@@ -1260,6 +1454,7 @@ async fn handle_edge_connection_quic(
// Track this edge
let edge_stream_count = Arc::new(AtomicU32::new(0));
let metrics = Arc::new(EdgeRuntimeMetrics::default());
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
@@ -1279,13 +1474,17 @@ async fn handle_edge_connection_quic(
connected_at: now,
peer_addr,
edge_stream_count: edge_stream_count.clone(),
transport_mode: TransportMode::Quic,
fallback_used: false,
performance: performance.clone(),
metrics: metrics.clone(),
config_tx,
cancel_token: edge_token.clone(),
},
);
}
let stream_semaphore = Arc::new(Semaphore::new(MAX_STREAMS_PER_EDGE));
let stream_semaphore = Arc::new(Semaphore::new(performance.max_streams_per_edge));
// Spawn task to accept data streams (tunneled client connections)
let data_stream_conn = quic_conn.clone();
@@ -1294,6 +1493,7 @@ async fn handle_edge_connection_quic(
let data_event_tx = event_tx.clone();
let data_semaphore = stream_semaphore.clone();
let data_stream_count = edge_stream_count.clone();
let data_metrics = metrics.clone();
let data_token = edge_token.clone();
let data_handle = tokio::spawn(async move {
let mut stream_id_counter: u32 = 0;
@@ -1306,6 +1506,7 @@ async fn handle_edge_connection_quic(
let permit = match data_semaphore.clone().try_acquire_owned() {
Ok(p) => p,
Err(_) => {
data_metrics.rejected_streams.fetch_add(1, Ordering::Relaxed);
log::warn!("QUIC edge {} exceeded max streams, rejecting", data_edge_id);
// Drop the streams to reject
drop(quic_send);
@@ -1320,21 +1521,24 @@ async fn handle_edge_connection_quic(
let edge_id = data_edge_id.clone();
let event_tx = data_event_tx.clone();
let stream_count = data_stream_count.clone();
let stream_metrics = data_metrics.clone();
let stream_token = data_token.child_token();
let _ = event_tx.try_send(HubEvent::StreamOpened {
edge_id: edge_id.clone(),
stream_id,
});
stream_metrics.streams_opened_total.fetch_add(1, Ordering::Relaxed);
stream_count.fetch_add(1, Ordering::Relaxed);
tokio::spawn(async move {
let _permit = permit;
handle_quic_stream(
quic_send, quic_recv, stream_id,
&target, &edge_id, stream_token,
&target, &edge_id, stream_token, stream_metrics.clone(),
).await;
stream_count.fetch_sub(1, Ordering::Relaxed);
stream_metrics.streams_closed_total.fetch_add(1, Ordering::Relaxed);
let _ = event_tx.try_send(HubEvent::StreamClosed {
edge_id,
stream_id,
@@ -1353,7 +1557,7 @@ async fn handle_edge_connection_quic(
});
// UDP sessions for QUIC datagram transport
let quic_udp_sessions: Arc<Mutex<HashMap<u32, mpsc::Sender<Bytes>>>> =
let quic_udp_sessions: Arc<Mutex<HashMap<u32, (mpsc::Sender<Bytes>, Instant)>>> =
Arc::new(Mutex::new(HashMap::new()));
// Spawn QUIC datagram receiver task
@@ -1362,9 +1566,21 @@ async fn handle_edge_connection_quic(
let dgram_target = target_host.clone();
let dgram_edge_id = edge_id.clone();
let dgram_token = edge_token.clone();
let dgram_metrics = metrics.clone();
let dgram_handle = tokio::spawn(async move {
let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
cleanup_interval.tick().await; // consume initial tick
loop {
tokio::select! {
// Periodic sweep: prune sessions whose task has exited (receiver dropped)
_ = cleanup_interval.tick() => {
let now = Instant::now();
let mut s = dgram_sessions.lock().await;
s.retain(|_id, (tx, last_activity)| {
!tx.is_closed() && now.duration_since(*last_activity) < Duration::from_secs(60)
});
dgram_metrics.active_udp_sessions.store(s.len() as u64, Ordering::Relaxed);
}
datagram = dgram_conn.read_datagram() => {
match datagram {
Ok(data) => {
@@ -1390,10 +1606,12 @@ async fn handle_edge_connection_quic(
let (tx, mut rx) = mpsc::channel::<Bytes>(256);
let proxy_v2_data: Vec<u8> = proxy_data.to_vec();
let cleanup_sessions = sessions.clone();
let session_metrics = dgram_metrics.clone();
{
let mut s = sessions.lock().await;
s.insert(session_id, tx);
s.insert(session_id, (tx, Instant::now()));
dgram_metrics.active_udp_sessions.store(s.len() as u64, Ordering::Relaxed);
}
tokio::spawn(async move {
@@ -1401,20 +1619,26 @@ async fn handle_edge_connection_quic(
Ok(s) => Arc::new(s),
Err(e) => {
log::error!("QUIC UDP session {} bind failed: {}", session_id, e);
cleanup_sessions.lock().await.remove(&session_id);
let mut s = cleanup_sessions.lock().await;
s.remove(&session_id);
session_metrics.active_udp_sessions.store(s.len() as u64, Ordering::Relaxed);
return;
}
};
if let Err(e) = upstream.connect((target.as_str(), dest_port)).await {
log::error!("QUIC UDP session {} connect failed: {}", session_id, e);
cleanup_sessions.lock().await.remove(&session_id);
let mut s = cleanup_sessions.lock().await;
s.remove(&session_id);
session_metrics.active_udp_sessions.store(s.len() as u64, Ordering::Relaxed);
return;
}
// Send PROXY v2 header as first datagram so SmartProxy knows the original client
if let Err(e) = upstream.send(&proxy_v2_data).await {
log::error!("QUIC UDP session {} failed to send PROXY v2 header: {}", session_id, e);
cleanup_sessions.lock().await.remove(&session_id);
let mut s = cleanup_sessions.lock().await;
s.remove(&session_id);
session_metrics.active_udp_sessions.store(s.len() as u64, Ordering::Relaxed);
return;
}
@@ -1458,16 +1682,23 @@ async fn handle_edge_connection_quic(
}
recv_handle.abort();
// Clean up session entry to prevent memory leak
cleanup_sessions.lock().await.remove(&session_id);
let mut s = cleanup_sessions.lock().await;
s.remove(&session_id);
session_metrics.active_udp_sessions.store(s.len() as u64, Ordering::Relaxed);
});
continue;
}
// Regular data datagram — forward to upstream
let sessions = dgram_sessions.lock().await;
if let Some(tx) = sessions.get(&session_id) {
let _ = tx.try_send(Bytes::copy_from_slice(payload));
let mut sessions = dgram_sessions.lock().await;
if let Some((tx, last_activity)) = sessions.get_mut(&session_id) {
*last_activity = Instant::now();
if tx.try_send(Bytes::copy_from_slice(payload)).is_err() {
dgram_metrics.dropped_datagrams.fetch_add(1, Ordering::Relaxed);
}
} else {
dgram_metrics.dropped_datagrams.fetch_add(1, Ordering::Relaxed);
}
}
Err(e) => {
@@ -1577,6 +1808,7 @@ async fn handle_quic_stream(
target_host: &str,
_edge_id: &str,
stream_token: CancellationToken,
metrics: Arc<EdgeRuntimeMetrics>,
) {
// Read PROXY header from the beginning of the stream
let proxy_header = match quic_transport::read_proxy_header(&mut quic_recv).await {
@@ -1622,6 +1854,7 @@ async fn handle_quic_stream(
// Task: QUIC -> upstream (edge data to SmartProxy)
let writer_token = stream_token.clone();
let writer_metrics = metrics.clone();
let mut writer_task = tokio::spawn(async move {
let mut buf = vec![0u8; 32768];
loop {
@@ -1629,6 +1862,7 @@ async fn handle_quic_stream(
read_result = quic_recv.read(&mut buf) => {
match read_result {
Ok(Some(n)) => {
writer_metrics.bytes_in.fetch_add(n as u64, Ordering::Relaxed);
let write_result = tokio::select! {
r = tokio::time::timeout(
Duration::from_secs(60),
@@ -1660,6 +1894,7 @@ async fn handle_quic_stream(
match read_result {
Ok(0) => break,
Ok(n) => {
metrics.bytes_out.fetch_add(n as u64, Ordering::Relaxed);
if quic_send.write_all(&buf[..n]).await.is_err() {
break;
}
@@ -1787,6 +2022,8 @@ mod tests {
listen_ports: vec![443, 8080],
listen_ports_udp: vec![],
stun_interval_secs: 300,
firewall_config: None,
performance: EffectivePerformanceConfig::default(),
};
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["listenPorts"], serde_json::json!([443, 8080]));
@@ -1801,6 +2038,8 @@ mod tests {
let update = EdgeConfigUpdate {
listen_ports: vec![80, 443],
listen_ports_udp: vec![53],
firewall_config: None,
performance: EffectivePerformanceConfig::default(),
};
let json = serde_json::to_value(&update).unwrap();
assert_eq!(json["listenPorts"], serde_json::json!([80, 443]));
@@ -3,5 +3,6 @@ pub mod edge;
pub mod stun;
pub mod transport;
pub mod udp_session;
pub mod performance;
pub use remoteingress_protocol as protocol;
@@ -0,0 +1,130 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum PerformanceProfile {
Balanced,
Throughput,
HighConcurrency,
}
impl Default for PerformanceProfile {
fn default() -> Self {
PerformanceProfile::Balanced
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PerformanceConfig {
#[serde(default)]
pub profile: Option<PerformanceProfile>,
#[serde(default)]
pub max_streams_per_edge: Option<usize>,
#[serde(default)]
pub total_window_budget_bytes: Option<u64>,
#[serde(default)]
pub min_stream_window_bytes: Option<u32>,
#[serde(default)]
pub max_stream_window_bytes: Option<u32>,
#[serde(default)]
pub sustained_stream_window_bytes: Option<u32>,
#[serde(default)]
pub quic_datagram_receive_buffer_bytes: Option<usize>,
}
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct EffectivePerformanceConfig {
pub profile: PerformanceProfile,
pub max_streams_per_edge: usize,
pub total_window_budget_bytes: u64,
pub min_stream_window_bytes: u32,
pub max_stream_window_bytes: u32,
pub sustained_stream_window_bytes: u32,
pub quic_datagram_receive_buffer_bytes: usize,
}
impl Default for EffectivePerformanceConfig {
fn default() -> Self {
PerformanceConfig::default().effective()
}
}
impl PerformanceConfig {
pub fn effective(&self) -> EffectivePerformanceConfig {
let profile = self.profile.unwrap_or_default();
let defaults = profile_defaults(profile);
EffectivePerformanceConfig {
profile,
max_streams_per_edge: self.max_streams_per_edge.unwrap_or(defaults.max_streams_per_edge),
total_window_budget_bytes: self.total_window_budget_bytes.unwrap_or(defaults.total_window_budget_bytes),
min_stream_window_bytes: self.min_stream_window_bytes.unwrap_or(defaults.min_stream_window_bytes),
max_stream_window_bytes: self.max_stream_window_bytes.unwrap_or(defaults.max_stream_window_bytes),
sustained_stream_window_bytes: self.sustained_stream_window_bytes.unwrap_or(defaults.sustained_stream_window_bytes),
quic_datagram_receive_buffer_bytes: self
.quic_datagram_receive_buffer_bytes
.unwrap_or(defaults.quic_datagram_receive_buffer_bytes),
}
}
pub fn merge(global: Option<&PerformanceConfig>, edge: Option<&PerformanceConfig>) -> PerformanceConfig {
let mut merged = global.cloned().unwrap_or_default();
if let Some(edge) = edge {
if edge.profile.is_some() {
merged.profile = edge.profile;
}
if edge.max_streams_per_edge.is_some() {
merged.max_streams_per_edge = edge.max_streams_per_edge;
}
if edge.total_window_budget_bytes.is_some() {
merged.total_window_budget_bytes = edge.total_window_budget_bytes;
}
if edge.min_stream_window_bytes.is_some() {
merged.min_stream_window_bytes = edge.min_stream_window_bytes;
}
if edge.max_stream_window_bytes.is_some() {
merged.max_stream_window_bytes = edge.max_stream_window_bytes;
}
if edge.sustained_stream_window_bytes.is_some() {
merged.sustained_stream_window_bytes = edge.sustained_stream_window_bytes;
}
if edge.quic_datagram_receive_buffer_bytes.is_some() {
merged.quic_datagram_receive_buffer_bytes = edge.quic_datagram_receive_buffer_bytes;
}
}
merged
}
}
fn profile_defaults(profile: PerformanceProfile) -> EffectivePerformanceConfig {
match profile {
PerformanceProfile::Balanced => EffectivePerformanceConfig {
profile,
max_streams_per_edge: 1024,
total_window_budget_bytes: 256 * 1024 * 1024,
min_stream_window_bytes: 128 * 1024,
max_stream_window_bytes: 8 * 1024 * 1024,
sustained_stream_window_bytes: 512 * 1024,
quic_datagram_receive_buffer_bytes: 4 * 1024 * 1024,
},
PerformanceProfile::Throughput => EffectivePerformanceConfig {
profile,
max_streams_per_edge: 512,
total_window_budget_bytes: 512 * 1024 * 1024,
min_stream_window_bytes: 256 * 1024,
max_stream_window_bytes: 32 * 1024 * 1024,
sustained_stream_window_bytes: 2 * 1024 * 1024,
quic_datagram_receive_buffer_bytes: 8 * 1024 * 1024,
},
PerformanceProfile::HighConcurrency => EffectivePerformanceConfig {
profile,
max_streams_per_edge: 4096,
total_window_budget_bytes: 256 * 1024 * 1024,
min_stream_window_bytes: 64 * 1024,
max_stream_window_bytes: 4 * 1024 * 1024,
sustained_stream_window_bytes: 256 * 1024,
quic_datagram_receive_buffer_bytes: 16 * 1024 * 1024,
},
}
}
@@ -4,9 +4,9 @@ use serde::{Deserialize, Serialize};
/// Transport mode for the tunnel connection between edge and hub.
///
/// - `TcpTls`: TCP + TLS with frame-based multiplexing via TunnelIo (default).
/// - `TcpTls`: TCP + TLS with frame-based multiplexing via TunnelIo.
/// - `Quic`: QUIC with native stream multiplexing (one QUIC stream per tunneled connection).
/// - `QuicWithFallback`: Try QUIC first, fall back to TCP+TLS if UDP is blocked.
/// - `QuicWithFallback`: Try QUIC first, fall back to TCP+TLS if UDP is blocked. This is the edge runtime default.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum TransportMode {
@@ -1,5 +1,7 @@
use std::sync::Arc;
const DEFAULT_DATAGRAM_RECEIVE_BUFFER_SIZE: usize = 4 * 1024 * 1024;
/// QUIC control stream message types (reuses frame type constants for consistency).
pub const CTRL_CONFIG: u8 = 0x06;
pub const CTRL_PING: u8 = 0x07;
@@ -11,6 +13,13 @@ pub const CTRL_HEADER_SIZE: usize = 5;
/// Build a quinn ClientConfig that skips server certificate verification
/// (auth is via shared secret, same as the TCP+TLS path).
pub fn build_quic_client_config() -> quinn::ClientConfig {
build_quic_client_config_with_limits(1024, DEFAULT_DATAGRAM_RECEIVE_BUFFER_SIZE)
}
pub fn build_quic_client_config_with_limits(
max_concurrent_bidi_streams: u32,
datagram_receive_buffer_size: usize,
) -> quinn::ClientConfig {
let mut tls_config = rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(Arc::new(NoCertVerifier))
@@ -28,11 +37,9 @@ pub fn build_quic_client_config() -> quinn::ClientConfig {
transport.max_idle_timeout(Some(
quinn::IdleTimeout::try_from(std::time::Duration::from_secs(45)).unwrap(),
));
// Match MAX_STREAMS_PER_EDGE (1024) from hub.rs.
// Default is 100 which is too low for high-concurrency tunneling.
transport.max_concurrent_bidi_streams(1024u32.into());
transport.max_concurrent_bidi_streams(max_concurrent_bidi_streams.into());
// Enable QUIC datagrams (RFC 9221) for low-latency UDP tunneling.
transport.datagram_receive_buffer_size(Some(65536));
transport.datagram_receive_buffer_size(Some(datagram_receive_buffer_size));
let mut client_config = quinn::ClientConfig::new(Arc::new(quic_config));
client_config.transport_config(Arc::new(transport));
@@ -42,6 +49,18 @@ pub fn build_quic_client_config() -> quinn::ClientConfig {
/// Build a quinn ServerConfig from the same TLS server config used for TCP+TLS.
pub fn build_quic_server_config(
tls_server_config: rustls::ServerConfig,
) -> Result<quinn::ServerConfig, Box<dyn std::error::Error + Send + Sync>> {
build_quic_server_config_with_limits(
tls_server_config,
1024,
DEFAULT_DATAGRAM_RECEIVE_BUFFER_SIZE,
)
}
pub fn build_quic_server_config_with_limits(
tls_server_config: rustls::ServerConfig,
max_concurrent_bidi_streams: u32,
datagram_receive_buffer_size: usize,
) -> Result<quinn::ServerConfig, Box<dyn std::error::Error + Send + Sync>> {
let quic_config = quinn::crypto::rustls::QuicServerConfig::try_from(tls_server_config)?;
@@ -50,8 +69,8 @@ pub fn build_quic_server_config(
transport.max_idle_timeout(Some(
quinn::IdleTimeout::try_from(std::time::Duration::from_secs(45)).unwrap(),
));
transport.max_concurrent_bidi_streams(1024u32.into());
transport.datagram_receive_buffer_size(Some(65536));
transport.max_concurrent_bidi_streams(max_concurrent_bidi_streams.into());
transport.datagram_receive_buffer_size(Some(datagram_receive_buffer_size));
let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_config));
server_config.transport_config(Arc::new(transport));
@@ -76,6 +95,11 @@ pub async fn write_ctrl_message(
Ok(())
}
/// Maximum size for a QUIC control message payload (64 KB).
/// Control messages (CONFIG, PING, PONG) are small; this guards against
/// a malicious peer sending a crafted length field to trigger OOM.
const MAX_CTRL_MESSAGE_SIZE: usize = 65536;
/// Read a control message from a QUIC recv stream.
/// Returns (msg_type, payload). Returns None on EOF.
pub async fn read_ctrl_message(
@@ -93,6 +117,12 @@ pub async fn read_ctrl_message(
}
let msg_type = header[0];
let len = u32::from_be_bytes([header[1], header[2], header[3], header[4]]) as usize;
if len > MAX_CTRL_MESSAGE_SIZE {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("control message too large: {} bytes (max {})", len, MAX_CTRL_MESSAGE_SIZE),
));
}
let mut payload = vec![0u8; len];
if len > 0 {
recv.read_exact(&mut payload).await.map_err(|e| {
@@ -17,7 +17,7 @@ pub struct UdpSession {
pub last_activity: Instant,
}
/// Manages UDP sessions with idle timeout expiry.
/// Manages UDP sessions with idle timeout expiry and a maximum session count.
pub struct UdpSessionManager {
/// Forward map: session key → session data.
sessions: HashMap<UdpSessionKey, UdpSession>,
@@ -25,14 +25,21 @@ pub struct UdpSessionManager {
by_stream_id: HashMap<u32, UdpSessionKey>,
/// Idle timeout duration.
idle_timeout: std::time::Duration,
/// Maximum number of concurrent sessions (prevents unbounded growth from floods).
max_sessions: usize,
}
impl UdpSessionManager {
pub fn new(idle_timeout: std::time::Duration) -> Self {
Self::with_max_sessions(idle_timeout, 65536)
}
pub fn with_max_sessions(idle_timeout: std::time::Duration, max_sessions: usize) -> Self {
Self {
sessions: HashMap::new(),
by_stream_id: HashMap::new(),
idle_timeout,
max_sessions,
}
}
@@ -57,8 +64,12 @@ impl UdpSessionManager {
Some(session)
}
/// Insert a new session. Returns a mutable reference to it.
pub fn insert(&mut self, key: UdpSessionKey, stream_id: u32) -> &mut UdpSession {
/// Insert a new session. Returns `None` if the session limit has been reached.
pub fn insert(&mut self, key: UdpSessionKey, stream_id: u32) -> Option<&mut UdpSession> {
// Allow re-insertion of existing keys (update), but reject truly new sessions at capacity
if !self.sessions.contains_key(&key) && self.sessions.len() >= self.max_sessions {
return None;
}
let session = UdpSession {
stream_id,
client_addr: key.client_addr,
@@ -66,7 +77,7 @@ impl UdpSessionManager {
last_activity: Instant::now(),
};
self.by_stream_id.insert(stream_id, key);
self.sessions.entry(key).or_insert(session)
Some(self.sessions.entry(key).or_insert(session))
}
/// Remove a session by stream_id.
@@ -118,7 +129,7 @@ mod tests {
fn test_insert_and_lookup() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 1);
assert!(mgr.insert(key, 1).is_some());
assert_eq!(mgr.len(), 1);
assert!(mgr.get_mut(&key).is_some());
@@ -129,7 +140,7 @@ mod tests {
fn test_client_addr_for_stream() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 42);
assert!(mgr.insert(key, 42).is_some());
assert_eq!(mgr.client_addr_for_stream(42), Some(addr(5000)));
assert_eq!(mgr.client_addr_for_stream(99), None);
@@ -139,7 +150,7 @@ mod tests {
fn test_remove_by_stream_id() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 1);
assert!(mgr.insert(key, 1).is_some());
let removed = mgr.remove_by_stream_id(1);
assert!(removed.is_some());
@@ -159,8 +170,8 @@ mod tests {
let mut mgr = UdpSessionManager::new(Duration::from_millis(50));
let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
let key2 = UdpSessionKey { client_addr: addr(5001), dest_port: 53 };
mgr.insert(key1, 1);
mgr.insert(key2, 2);
assert!(mgr.insert(key1, 1).is_some());
assert!(mgr.insert(key2, 2).is_some());
// Nothing expired yet
assert!(mgr.expire_idle().is_empty());
@@ -178,7 +189,7 @@ mod tests {
async fn test_activity_prevents_expiry() {
let mut mgr = UdpSessionManager::new(Duration::from_millis(100));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 1);
assert!(mgr.insert(key, 1).is_some());
// Touch session at 50ms (before 100ms timeout)
tokio::time::sleep(Duration::from_millis(50)).await;
@@ -200,11 +211,35 @@ mod tests {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
let key2 = UdpSessionKey { client_addr: addr(5000), dest_port: 443 };
mgr.insert(key1, 1);
mgr.insert(key2, 2);
assert!(mgr.insert(key1, 1).is_some());
assert!(mgr.insert(key2, 2).is_some());
assert_eq!(mgr.len(), 2);
assert_eq!(mgr.get_mut(&key1).unwrap().stream_id, 1);
assert_eq!(mgr.get_mut(&key2).unwrap().stream_id, 2);
}
#[test]
fn test_max_sessions_limit() {
let mut mgr = UdpSessionManager::with_max_sessions(Duration::from_secs(60), 2);
let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
let key2 = UdpSessionKey { client_addr: addr(5001), dest_port: 53 };
let key3 = UdpSessionKey { client_addr: addr(5002), dest_port: 53 };
assert!(mgr.insert(key1, 1).is_some());
assert!(mgr.insert(key2, 2).is_some());
// Third insert should be rejected (at capacity)
assert!(mgr.insert(key3, 3).is_none());
assert_eq!(mgr.len(), 2);
// Re-inserting an existing key should succeed (update, not new)
assert!(mgr.insert(key1, 1).is_some());
assert_eq!(mgr.len(), 2);
// After removing one, a new insert should succeed
mgr.remove_by_stream_id(1);
assert_eq!(mgr.len(), 1);
assert!(mgr.insert(key3, 3).is_some());
assert_eq!(mgr.len(), 2);
}
}
+75 -30
View File
@@ -32,12 +32,18 @@ pub const FRAME_HEADER_SIZE: usize = 9;
pub const MAX_PAYLOAD_SIZE: u32 = 16 * 1024 * 1024;
// Per-stream flow control constants
/// Initial (and maximum) per-stream window size (4 MB).
pub const INITIAL_STREAM_WINDOW: u32 = 4 * 1024 * 1024;
/// Send WINDOW_UPDATE after consuming this many bytes (half the initial window).
/// Default maximum per-stream window size (8 MB).
pub const INITIAL_STREAM_WINDOW: u32 = 8 * 1024 * 1024;
/// Minimum safe window size used when strict budget pressure requires going below the configured floor.
pub const ABSOLUTE_MIN_STREAM_WINDOW: u32 = 16 * 1024;
/// Default total TCP/TLS flow-control budget per edge connection (256 MB).
pub const DEFAULT_TOTAL_WINDOW_BUDGET: u64 = 256 * 1024 * 1024;
/// Default preferred minimum stream window (128 KB). The total budget still wins above this.
pub const DEFAULT_MIN_STREAM_WINDOW: u32 = 128 * 1024;
/// Send WINDOW_UPDATE after consuming this many bytes when no dynamic window is available.
pub const WINDOW_UPDATE_THRESHOLD: u32 = INITIAL_STREAM_WINDOW / 2;
/// Maximum window size to prevent overflow.
pub const MAX_WINDOW_SIZE: u32 = 4 * 1024 * 1024;
pub const MAX_WINDOW_SIZE: u32 = 32 * 1024 * 1024;
// Sustained stream classification constants
/// Throughput threshold for sustained classification (2.5 MB/s = 20 Mbit/s).
@@ -55,11 +61,37 @@ pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> B
}
/// Compute the target per-stream window size based on the number of active streams.
/// Total memory budget is ~200MB shared across all streams. Up to 50 streams get the
/// full 4MB window; above that the window scales down to a 1MB floor at 200+ streams.
/// The total budget is authoritative: the configured minimum is a preference, not
/// permission to exceed the edge-level memory budget under very high concurrency.
pub fn compute_window_for_stream_count(active: u32) -> u32 {
let per_stream = (200 * 1024 * 1024u64) / (active.max(1) as u64);
per_stream.clamp(1 * 1024 * 1024, INITIAL_STREAM_WINDOW as u64) as u32
compute_window_for_limits(
active,
DEFAULT_TOTAL_WINDOW_BUDGET,
DEFAULT_MIN_STREAM_WINDOW,
INITIAL_STREAM_WINDOW,
)
}
pub fn compute_window_for_limits(
active: u32,
total_budget_bytes: u64,
min_window_bytes: u32,
max_window_bytes: u32,
) -> u32 {
let active = active.max(1) as u64;
let max_window = max_window_bytes.max(ABSOLUTE_MIN_STREAM_WINDOW);
let preferred_min = min_window_bytes
.max(ABSOLUTE_MIN_STREAM_WINDOW)
.min(max_window);
let per_stream_budget = total_budget_bytes
.max(ABSOLUTE_MIN_STREAM_WINDOW as u64)
/ active;
let bounded = per_stream_budget.min(max_window as u64);
if bounded >= preferred_min as u64 {
bounded as u32
} else {
bounded.max(ABSOLUTE_MIN_STREAM_WINDOW as u64) as u32
}
}
/// Decode a WINDOW_UPDATE payload into a byte increment. Returns None if payload is malformed.
@@ -307,6 +339,13 @@ pub struct TunnelIo<S> {
write: WriteState,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct TunnelQueueDepths {
pub ctrl: usize,
pub data: usize,
pub sustained: usize,
}
impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
pub fn new(stream: S, initial_data: Vec<u8>) -> Self {
let read_pos = initial_data.len();
@@ -346,6 +385,14 @@ impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
self.write.sustained_queue.push_back(frame);
}
pub fn queue_depths(&self) -> TunnelQueueDepths {
TunnelQueueDepths {
ctrl: self.write.ctrl_queue.len(),
data: self.write.data_queue.len(),
sustained: self.write.sustained_queue.len(),
}
}
/// Try to parse a complete frame from the read buffer.
/// Uses a parse_pos cursor to avoid drain() on every frame.
pub fn try_parse_frame(&mut self) -> Option<Result<Frame, std::io::Error>> {
@@ -910,7 +957,7 @@ mod tests {
#[test]
fn test_adaptive_window_zero_streams() {
// 0 streams treated as 1: 200MB/1 -> clamped to 4MB max
// 0 streams treated as 1: budget/1 -> clamped to max
assert_eq!(compute_window_for_stream_count(0), INITIAL_STREAM_WINDOW);
}
@@ -920,47 +967,44 @@ mod tests {
}
#[test]
fn test_adaptive_window_50_streams_full() {
// 200MB/50 = 4MB = exactly INITIAL_STREAM_WINDOW
assert_eq!(compute_window_for_stream_count(50), INITIAL_STREAM_WINDOW);
fn test_adaptive_window_32_streams_full() {
// 256MB/32 = 8MB = exactly INITIAL_STREAM_WINDOW
assert_eq!(compute_window_for_stream_count(32), INITIAL_STREAM_WINDOW);
}
#[test]
fn test_adaptive_window_51_streams_starts_scaling() {
// 200MB/51 < 4MB — first value below max
let w = compute_window_for_stream_count(51);
fn test_adaptive_window_33_streams_starts_scaling() {
// 256MB/33 < 8MB — first value below max
let w = compute_window_for_stream_count(33);
assert!(w < INITIAL_STREAM_WINDOW);
assert_eq!(w, (200 * 1024 * 1024u64 / 51) as u32);
assert_eq!(w, (DEFAULT_TOTAL_WINDOW_BUDGET / 33) as u32);
}
#[test]
fn test_adaptive_window_100_streams() {
// 200MB/100 = 2MB
assert_eq!(compute_window_for_stream_count(100), 2 * 1024 * 1024);
assert_eq!(compute_window_for_stream_count(100), (DEFAULT_TOTAL_WINDOW_BUDGET / 100) as u32);
}
#[test]
fn test_adaptive_window_200_streams_at_floor() {
// 200MB/200 = 1MB = exactly the floor
assert_eq!(compute_window_for_stream_count(200), 1 * 1024 * 1024);
fn test_adaptive_window_200_streams_uses_budget() {
assert_eq!(compute_window_for_stream_count(200), (DEFAULT_TOTAL_WINDOW_BUDGET / 200) as u32);
}
#[test]
fn test_adaptive_window_500_streams_clamped() {
// 200MB/500 = 0.4MB -> clamped up to 1MB floor
assert_eq!(compute_window_for_stream_count(500), 1 * 1024 * 1024);
fn test_adaptive_window_500_streams_stays_under_budget() {
assert_eq!(compute_window_for_stream_count(500), (DEFAULT_TOTAL_WINDOW_BUDGET / 500) as u32);
}
#[test]
fn test_adaptive_window_max_u32() {
// Extreme: u32::MAX streams -> tiny value -> clamped to 1MB
assert_eq!(compute_window_for_stream_count(u32::MAX), 1 * 1024 * 1024);
// Extreme: u32::MAX streams -> tiny value -> clamped to absolute minimum.
assert_eq!(compute_window_for_stream_count(u32::MAX), ABSOLUTE_MIN_STREAM_WINDOW);
}
#[test]
fn test_adaptive_window_monotonically_decreasing() {
let mut prev = compute_window_for_stream_count(1);
for n in [2, 10, 50, 51, 100, 200, 500, 1000] {
for n in [2, 10, 32, 33, 100, 200, 500, 1000] {
let w = compute_window_for_stream_count(n);
assert!(w <= prev, "window increased from {} to {} at n={}", prev, w, n);
prev = w;
@@ -969,11 +1013,12 @@ mod tests {
#[test]
fn test_adaptive_window_total_budget_bounded() {
// active x per_stream_window should never exceed 200MB (+ clamp overhead for high N)
for n in [1, 10, 50, 100, 200] {
// active x per_stream_window should never exceed the configured budget while the
// budget can still provide at least the absolute minimum per stream.
for n in [1, 10, 32, 33, 100, 200, 500, 1000] {
let w = compute_window_for_stream_count(n);
let total = w as u64 * n as u64;
assert!(total <= 200 * 1024 * 1024, "total {}MB exceeds budget at n={}", total / (1024*1024), n);
assert!(total <= DEFAULT_TOTAL_WINDOW_BUDGET, "total {}MB exceeds budget at n={}", total / (1024*1024), n);
}
}
+5
View File
@@ -322,6 +322,11 @@ tap.test('TCP/TLS setup: start TCP echo server and TCP+TLS tunnel', async () =>
tunnel = await startTunnel(edgePort, hubPort);
expect(tunnel.hub.running).toBeTrue();
const hubStatus = await tunnel.hub.getStatus();
expect(hubStatus.connectedEdges.length).toBeGreaterThanOrEqual(1);
const edgeStatus = hubStatus.connectedEdges[0];
expect(['quic', 'tcpTls'].includes(edgeStatus.transportMode)).toEqual(true);
expect(edgeStatus.performance.maxStreamsPerEdge).toBeGreaterThanOrEqual(1024);
});
tap.test('TCP/TLS: single TCP stream — 32MB transfer exceeding initial 4MB window', async () => {
+8
View File
@@ -185,6 +185,14 @@ tap.test('QUIC setup: start TCP echo server and QUIC tunnel', async () => {
expect(tunnel.hub.running).toBeTrue();
const status = await tunnel.edge.getStatus();
expect(status.connected).toBeTrue();
const hubStatus = await tunnel.hub.getStatus();
expect(hubStatus.connectedEdges.length).toBeGreaterThanOrEqual(1);
const edgeStatus = hubStatus.connectedEdges[0];
expect(edgeStatus.transportMode).toEqual('quic');
expect(edgeStatus.fallbackUsed).toEqual(false);
expect(edgeStatus.performance.profile).toEqual('balanced');
expect(edgeStatus.flowControl.applies).toEqual(false);
expect(edgeStatus.traffic.streamsOpenedTotal).toEqual(0);
});
tap.test('QUIC: single TCP stream echo — 1KB', async () => {
+1 -1
View File
@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@serve.zone/remoteingress',
version: '4.14.3',
version: '4.17.1',
description: 'Edge ingress tunnel for DcRouter - tunnels TCP and UDP traffic from the network edge to SmartProxy over TLS or QUIC, preserving client IP via PROXY protocol.'
}
+133 -8
View File
@@ -1,6 +1,7 @@
import * as plugins from './plugins.js';
import { EventEmitter } from 'events';
import { decodeConnectionToken } from './classes.token.js';
import type { IFirewallConfig } from './classes.remoteingresshub.js';
// Command map for the edge side of remoteingress-bin
type TEdgeCommands = {
@@ -55,6 +56,8 @@ export class RemoteIngressEdge extends EventEmitter {
private restartBackoffMs = 1000;
private restartAttempts = 0;
private statusInterval: ReturnType<typeof setInterval> | undefined;
private nft: InstanceType<typeof plugins.smartnftables.SmartNftables> | null = null;
private pendingFirewallConfig: IFirewallConfig | null = null;
constructor() {
super();
@@ -110,6 +113,96 @@ export class RemoteIngressEdge extends EventEmitter {
console.log(`[RemoteIngressEdge] Ports updated by hub: ${data.listenPorts.join(', ')}`);
this.emit('portsUpdated', data);
});
this.bridge.on('management:firewallConfigUpdated', (data: { firewallConfig: IFirewallConfig }) => {
console.log(`[RemoteIngressEdge] Firewall config updated from hub`);
void this.applyFirewallConfig(data.firewallConfig).catch((err) => {
console.error(`[RemoteIngressEdge] Failed to apply firewall config: ${err}`);
});
this.emit('firewallConfigUpdated', data);
});
}
/**
* Initialize the nftables manager. Fails gracefully if not running as root.
*/
private async initNft(options: { reset?: boolean } = {}): Promise<void> {
try {
this.nft = new plugins.smartnftables.SmartNftables({
tableName: 'remoteingress',
dryRun: false,
});
if (options.reset) {
await (this.nft as any).cleanup({ force: true });
}
await this.nft.initialize();
console.log('[RemoteIngressEdge] SmartNftables initialized');
if (this.pendingFirewallConfig) {
const pending = this.pendingFirewallConfig;
this.pendingFirewallConfig = null;
await this.applyFirewallConfig(pending);
}
} catch (err) {
console.warn(`[RemoteIngressEdge] Failed to initialize nftables (not root?): ${err}`);
this.nft = null;
}
}
/**
* Apply firewall configuration received from the hub.
* Performs a full replacement: cleans up existing rules, then applies the new config.
*/
private async applyFirewallConfig(config: IFirewallConfig): Promise<void> {
if (!this.nft) {
this.pendingFirewallConfig = config;
return;
}
try {
// Full cleanup and reinitialize to replace all rules atomically
await (this.nft as any).cleanup({ force: true });
await this.nft.initialize();
// Apply blocked IPs
if (config.blockedIps && config.blockedIps.length > 0) {
await (this.nft.firewall as any).blockIPSet('hub-blocklist', {
setName: 'blocked_ipv4',
ips: config.blockedIps,
comment: 'RemoteIngress hub blocklist',
});
console.log(`[RemoteIngressEdge] Blocked ${config.blockedIps.length} IPs`);
}
// Apply rate limits
if (config.rateLimits && config.rateLimits.length > 0) {
for (const rl of config.rateLimits) {
await this.nft.rateLimit.addRateLimit(rl.id, {
port: rl.port,
protocol: rl.protocol,
rate: rl.rate,
burst: rl.burst,
perSourceIP: rl.perSourceIP,
});
}
console.log(`[RemoteIngressEdge] Applied ${config.rateLimits.length} rate limits`);
}
// Apply firewall rules
if (config.rules && config.rules.length > 0) {
for (const rule of config.rules) {
await this.nft.firewall.addRule(rule.id, {
direction: rule.direction,
action: rule.action,
sourceIP: rule.sourceIP,
destPort: rule.destPort,
protocol: rule.protocol,
comment: rule.comment,
});
}
console.log(`[RemoteIngressEdge] Applied ${config.rules.length} firewall rules`);
}
} catch (err) {
console.error(`[RemoteIngressEdge] Failed to apply firewall config: ${err}`);
}
}
/**
@@ -134,6 +227,10 @@ export class RemoteIngressEdge extends EventEmitter {
this.savedConfig = edgeConfig;
this.stopping = false;
// Clear any stale nftables state left by a prior process before the edge
// can accept hub config or bind public listener ports.
await this.initNft({ reset: true });
const spawned = await this.bridge.spawn();
if (!spawned) {
throw new Error('Failed to spawn remoteingress-bin');
@@ -143,14 +240,21 @@ export class RemoteIngressEdge extends EventEmitter {
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.on('exit', this.handleCrashRecovery);
await this.bridge.sendCommand('startEdge', {
hubHost: edgeConfig.hubHost,
hubPort: edgeConfig.hubPort ?? 8443,
edgeId: edgeConfig.edgeId,
secret: edgeConfig.secret,
...(edgeConfig.bindAddress ? { bindAddress: edgeConfig.bindAddress } : {}),
...(edgeConfig.transportMode ? { transportMode: edgeConfig.transportMode } : {}),
});
try {
await this.bridge.sendCommand('startEdge', {
hubHost: edgeConfig.hubHost,
hubPort: edgeConfig.hubPort ?? 8443,
edgeId: edgeConfig.edgeId,
secret: edgeConfig.secret,
...(edgeConfig.bindAddress ? { bindAddress: edgeConfig.bindAddress } : {}),
...(edgeConfig.transportMode ? { transportMode: edgeConfig.transportMode } : {}),
});
} catch (err) {
// Clean up the spawned process to avoid orphaning it
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.kill();
throw err;
}
this.started = true;
this.restartAttempts = 0;
@@ -180,6 +284,15 @@ export class RemoteIngressEdge extends EventEmitter {
clearInterval(this.statusInterval);
this.statusInterval = undefined;
}
// Clean up nftables rules before stopping
if (this.nft) {
try {
await (this.nft as any).cleanup({ force: true });
} catch (err) {
console.warn(`[RemoteIngressEdge] nftables cleanup error: ${err}`);
}
this.nft = null;
}
if (this.started) {
try {
await this.bridge.sendCommand('stopEdge', {} as Record<string, never>);
@@ -191,6 +304,10 @@ export class RemoteIngressEdge extends EventEmitter {
this.started = false;
}
this.savedConfig = null;
this.pendingFirewallConfig = null;
// Remove all listeners to prevent memory buildup
this.bridge.removeAllListeners();
this.removeAllListeners();
}
/**
@@ -235,10 +352,18 @@ export class RemoteIngressEdge extends EventEmitter {
}
await new Promise(resolve => setTimeout(resolve, this.restartBackoffMs));
// Re-check after backoff — stop() may have been called during the wait
if (this.stopping || !this.savedConfig) {
return;
}
this.restartBackoffMs = Math.min(this.restartBackoffMs * 2, MAX_RESTART_BACKOFF_MS);
this.restartAttempts++;
try {
// Drop stale kernel rules before reconnecting. The hub will send the
// current full firewall snapshot during handshake/config refresh.
await this.initNft({ reset: true });
const spawned = await this.bridge.spawn();
if (!spawned) {
console.error('[RemoteIngressEdge] Failed to respawn binary');
+121 -20
View File
@@ -9,11 +9,12 @@ type THubCommands = {
};
startHub: {
params: {
tunnelPort: number;
targetHost?: string;
tlsCertPem?: string;
tlsKeyPem?: string;
};
tunnelPort: number;
targetHost?: string;
tlsCertPem?: string;
tlsKeyPem?: string;
performance?: IPerformanceConfig;
};
result: { started: boolean };
};
stopHub: {
@@ -22,7 +23,7 @@ type THubCommands = {
};
updateAllowedEdges: {
params: {
edges: Array<{ id: string; secret: string; listenPorts?: number[]; listenPortsUdp?: number[]; stunIntervalSecs?: number }>;
edges: Array<{ id: string; secret: string; listenPorts?: number[]; listenPortsUdp?: number[]; stunIntervalSecs?: number; firewallConfig?: IFirewallConfig; performance?: IPerformanceConfig }>;
};
result: { updated: boolean };
};
@@ -31,16 +32,48 @@ type THubCommands = {
result: {
running: boolean;
tunnelPort: number;
connectedEdges: Array<{
edgeId: string;
connectedAt: number;
activeStreams: number;
peerAddr: string;
}>;
connectedEdges: Array<{
edgeId: string;
connectedAt: number;
activeStreams: number;
peerAddr: string;
transportMode: 'tcpTls' | 'quic' | 'quicWithFallback';
fallbackUsed: boolean;
performance: IEffectivePerformanceConfig;
flowControl: IFlowControlStatus;
queues: IQueueStatus;
traffic: ITrafficStatus;
udp: IUdpStatus;
}>;
};
};
};
export interface IFirewallRateLimit {
id: string;
port: number;
protocol?: 'tcp' | 'udp';
rate: string;
burst?: number;
perSourceIP?: boolean;
}
export interface IFirewallRule {
id: string;
direction: 'input' | 'output' | 'forward';
action: 'accept' | 'drop' | 'reject';
sourceIP?: string;
destPort?: number;
protocol?: 'tcp' | 'udp';
comment?: string;
}
export interface IFirewallConfig {
blockedIps?: string[];
rateLimits?: IFirewallRateLimit[];
rules?: IFirewallRule[];
}
export interface IHubConfig {
tunnelPort?: number;
targetHost?: string;
@@ -48,9 +81,61 @@ export interface IHubConfig {
certPem?: string;
keyPem?: string;
};
performance?: IPerformanceConfig;
}
type TAllowedEdge = { id: string; secret: string; listenPorts?: number[]; listenPortsUdp?: number[]; stunIntervalSecs?: number };
export type TPerformanceProfile = 'balanced' | 'throughput' | 'highConcurrency';
export interface IPerformanceConfig {
profile?: TPerformanceProfile;
maxStreamsPerEdge?: number;
totalWindowBudgetBytes?: number;
minStreamWindowBytes?: number;
maxStreamWindowBytes?: number;
sustainedStreamWindowBytes?: number;
quicDatagramReceiveBufferBytes?: number;
}
export interface IEffectivePerformanceConfig {
profile: TPerformanceProfile;
maxStreamsPerEdge: number;
totalWindowBudgetBytes: number;
minStreamWindowBytes: number;
maxStreamWindowBytes: number;
sustainedStreamWindowBytes: number;
quicDatagramReceiveBufferBytes: number;
}
export interface IFlowControlStatus {
applies: boolean;
currentWindowBytes: number;
minWindowBytes: number;
maxWindowBytes: number;
totalWindowBudgetBytes: number;
estimatedInFlightBytes: number;
stalledStreams: number;
}
export interface IQueueStatus {
ctrlQueueDepth: number;
dataQueueDepth: number;
sustainedQueueDepth: number;
}
export interface ITrafficStatus {
bytesIn: number;
bytesOut: number;
streamsOpenedTotal: number;
streamsClosedTotal: number;
rejectedStreams: number;
}
export interface IUdpStatus {
activeSessions: number;
droppedDatagrams: number;
}
type TAllowedEdge = { id: string; secret: string; listenPorts?: number[]; listenPortsUdp?: number[]; stunIntervalSecs?: number; firewallConfig?: IFirewallConfig; performance?: IPerformanceConfig };
const MAX_RESTART_ATTEMPTS = 10;
const MAX_RESTART_BACKOFF_MS = 30_000;
@@ -131,13 +216,21 @@ export class RemoteIngressHub extends EventEmitter {
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.on('exit', this.handleCrashRecovery);
await this.bridge.sendCommand('startHub', {
tunnelPort: config.tunnelPort ?? 8443,
targetHost: config.targetHost ?? '127.0.0.1',
...(config.tls?.certPem && config.tls?.keyPem
? { tlsCertPem: config.tls.certPem, tlsKeyPem: config.tls.keyPem }
: {}),
});
try {
await this.bridge.sendCommand('startHub', {
tunnelPort: config.tunnelPort ?? 8443,
targetHost: config.targetHost ?? '127.0.0.1',
...(config.performance ? { performance: config.performance } : {}),
...(config.tls?.certPem && config.tls?.keyPem
? { tlsCertPem: config.tls.certPem, tlsKeyPem: config.tls.keyPem }
: {}),
});
} catch (err) {
// Clean up the spawned process to avoid orphaning it
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.kill();
throw err;
}
this.started = true;
this.restartAttempts = 0;
@@ -161,6 +254,9 @@ export class RemoteIngressHub extends EventEmitter {
}
this.savedConfig = null;
this.savedEdges = [];
// Remove all listeners to prevent memory buildup
this.bridge.removeAllListeners();
this.removeAllListeners();
}
/**
@@ -207,6 +303,10 @@ export class RemoteIngressHub extends EventEmitter {
}
await new Promise(resolve => setTimeout(resolve, this.restartBackoffMs));
// Re-check after backoff — stop() may have been called during the wait
if (this.stopping || !this.savedConfig) {
return;
}
this.restartBackoffMs = Math.min(this.restartBackoffMs * 2, MAX_RESTART_BACKOFF_MS);
this.restartAttempts++;
@@ -227,6 +327,7 @@ export class RemoteIngressHub extends EventEmitter {
...(config.tls?.certPem && config.tls?.keyPem
? { tlsCertPem: config.tls.certPem, tlsKeyPem: config.tls.keyPem }
: {}),
...(config.performance ? { performance: config.performance } : {}),
});
// Restore allowed edges
+2 -1
View File
@@ -3,5 +3,6 @@ import * as path from 'path';
export { path };
// @push.rocks scope
import * as smartnftables from '@push.rocks/smartnftables';
import * as smartrust from '@push.rocks/smartrust';
export { smartrust };
export { smartnftables, smartrust };