Compare commits

...

22 Commits

Author SHA1 Message Date
0b2a83ddb6 v4.15.2
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-26 17:06:28 +00:00
3c5ea6bdc5 fix(readme): adjust tunnel diagram alignment in the README 2026-03-26 17:06:28 +00:00
3dea43400b v4.15.1
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-26 17:05:38 +00:00
8fa3d414dd fix(readme): clarify unified runtime configuration and firewall update behavior 2026-03-26 17:05:38 +00:00
1a62c52d24 v4.15.0
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-26 16:39:53 +00:00
e9a08bdd0f feat(edge,hub): add hub-controlled nftables firewall configuration for remote ingress edges 2026-03-26 16:39:53 +00:00
c2c9dd195d v4.14.3
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-26 07:49:44 +00:00
fb6e9c54ad fix(docs): refresh project metadata and README to reflect current ingress tunnel capabilities 2026-03-26 07:49:44 +00:00
ac22617849 v4.14.2
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-26 07:10:15 +00:00
e5a91f298c fix(hub-core): improve stream shutdown handling and connection cleanup in hub and edge 2026-03-26 07:10:15 +00:00
5e93710c42 v4.14.1
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-21 19:49:05 +00:00
331b5c8d3f fix(remoteingress edge/hub crash recovery): prevent duplicate crash recovery listeners and reset saved runtime state on shutdown 2026-03-21 19:49:05 +00:00
bf3418d0ed v4.14.0
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 00:51:37 +00:00
6d5e6f60f8 feat(quic): add QUIC stability test coverage and bridge logging for hub and edge 2026-03-20 00:51:37 +00:00
de8922148e v4.13.2
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 00:11:34 +00:00
e84eecf82c fix(remoteingress-core): preserve reconnected edge entries during disconnect cleanup 2026-03-20 00:11:34 +00:00
c7641853cf v4.13.1
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 23:28:38 +00:00
6e2025db3e fix(remoteingress-core): default edge transport mode to QUIC with fallback 2026-03-19 23:28:38 +00:00
693031ecdd v4.13.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 14:43:42 +00:00
a2cdadc5e3 feat(docs): document TCP and UDP tunneling over TLS and QUIC 2026-03-19 14:43:42 +00:00
948032fc9e v4.12.1
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 14:09:32 +00:00
a400945371 fix(remoteingress-core): send PROXY v2 headers for UDP upstream sessions and expire idle UDP sessions 2026-03-19 14:09:32 +00:00
17 changed files with 2516 additions and 1403 deletions

View File

@@ -11,26 +11,26 @@
"githost": "code.foss.global",
"gitscope": "serve.zone",
"gitrepo": "remoteingress",
"description": "Provides a service for creating private tunnels and reaching private clusters from the outside, facilitating secure remote access as part of the @serve.zone stack.",
"description": "Edge ingress tunnel for DcRouter - tunnels TCP and UDP traffic from the network edge to SmartProxy over TLS or QUIC, preserving client IP via PROXY protocol.",
"npmPackagename": "@serve.zone/remoteingress",
"license": "MIT",
"projectDomain": "serve.zone",
"keywords": [
"remote access",
"private tunnels",
"network security",
"TLS encryption",
"connector",
"ingress tunnel",
"network edge",
"PROXY protocol",
"multiplexed tunnel",
"TCP proxy",
"TLS tunnel",
"QUIC transport",
"UDP tunneling",
"serve.zone stack",
"private clusters access",
"public access management",
"TypeScript application",
"node.js package",
"secure communications",
"TLS/SSL certificates",
"development tools",
"software development",
"private network integration"
"TypeScript",
"Rust",
"SmartProxy",
"DcRouter",
"flow control"
]
},
"release": {

View File

@@ -1,7 +1,7 @@
{
"json.schemas": [
{
"fileMatch": ["/npmextra.json"],
"fileMatch": ["/.smartconfig.json"],
"schema": {
"type": "object",
"properties": {

View File

@@ -1,5 +1,79 @@
# Changelog
## 2026-03-26 - 4.15.2 - fix(readme)
adjust tunnel diagram alignment in the README
- Improves formatting consistency in the Hub/Edge topology diagram.
## 2026-03-26 - 4.15.1 - fix(readme)
clarify unified runtime configuration and firewall update behavior
- Updates the architecture and feature descriptions to reflect that ports, firewall rules, and rate limits are pushed together in a single config update
- Clarifies that firewall configuration is delivered via FRAME_CONFIG on handshake and subsequent updates, with atomic full-rule replacement at the edge
- Simplifies and reorganizes README wording around edge and hub responsibilities without changing implementation behavior
## 2026-03-26 - 4.15.0 - feat(edge,hub)
add hub-controlled nftables firewall configuration for remote ingress edges
- add firewallConfig support to allowed edge definitions, handshake responses, and runtime config updates
- emit firewallConfigUpdated events from the Rust bridge and edge runtime when firewall settings change
- initialize SmartNftables on edges, apply blocked IPs, rate limits, and custom rules, and clean up nftables rules on stop
- document centralized firewall management, root requirements, and new edge events in the README
## 2026-03-26 - 4.14.3 - fix(docs)
refresh project metadata and README to reflect current ingress tunnel capabilities
- update package metadata description and keywords to better describe edge ingress, TLS/QUIC transport, and SmartProxy integration
- revise README terminology, API docs, and feature list to document crash recovery, bindAddress support, and current event names
- improve README formatting and examples for architecture, wire protocol, QoS, and token usage
## 2026-03-26 - 4.14.2 - fix(hub-core)
improve stream shutdown handling and connection cleanup in hub and edge
- Cancel edge upload loops immediately when the hub closes a stream instead of waiting for the window stall timeout.
- Reduce stalled stream timeouts from 120s to 55s to detect broken connections faster.
- Allow hub writer tasks to shut down gracefully before aborting to avoid unnecessary TCP resets.
- Enable TCP keepalive on hub upstream connections to detect silent SmartProxy failures.
- Remove leaked QUIC UDP session entries when setup fails or sessions end.
- Rename npmextra.json to .smartconfig.json and update package packaging references.
## 2026-03-21 - 4.14.1 - fix(remoteingress edge/hub crash recovery)
prevent duplicate crash recovery listeners and reset saved runtime state on shutdown
- Removes existing exit listeners before re-registering crash recovery handlers for edge and hub processes.
- Clears saved edge and hub configuration on stop to avoid stale restart state.
- Resets orphaned edge status intervals and restarts periodic status logging after successful crash recovery.
## 2026-03-20 - 4.14.0 - feat(quic)
add QUIC stability test coverage and bridge logging for hub and edge
- adds a long-running QUIC stability test with periodic echo probes and disconnect detection
- enables prefixed bridge logging for RemoteIngressHub and RemoteIngressEdge to improve runtime diagnostics
## 2026-03-20 - 4.13.2 - fix(remoteingress-core)
preserve reconnected edge entries during disconnect cleanup
- Guard edge removal so disconnect handlers only delete entries whose cancel token is already cancelled
- Prevents stale TCP and QUIC disconnect paths from removing a newer connection after an edge reconnects
## 2026-03-19 - 4.13.1 - fix(remoteingress-core)
default edge transport mode to QUIC with fallback
- Changes the default transport mode in edge connections from TCP/TLS to QUIC with fallback when no transport mode is explicitly configured.
## 2026-03-19 - 4.13.0 - feat(docs)
document TCP and UDP tunneling over TLS and QUIC
- update package description to reflect TCP and UDP support and TLS or QUIC transports
- refresh README architecture, features, and usage examples for UDP forwarding, QUIC transport, and PROXY protocol v1/v2 support
## 2026-03-19 - 4.12.1 - fix(remoteingress-core)
send PROXY v2 headers for UDP upstream sessions and expire idle UDP sessions
- Adds periodic idle UDP session expiry in edge tunnel and QUIC loops, including UDP close signaling for expired tunnel sessions.
- Sends the PROXY v2 header as the first datagram for UDP upstream connections in both standard and QUIC hub paths.
- Updates the UDP node test server to ignore the initial PROXY v2 datagram per source before echoing payload traffic.
## 2026-03-19 - 4.12.0 - feat(remoteingress-core)
add UDP tunneling over QUIC datagrams and expand transport-specific test coverage

21
license.md Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2024 Task Venture Capital GmbH
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -1,8 +1,8 @@
{
"name": "@serve.zone/remoteingress",
"version": "4.12.0",
"version": "4.15.2",
"private": false,
"description": "Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.",
"description": "Edge ingress tunnel for DcRouter - tunnels TCP and UDP traffic from the network edge to SmartProxy over TLS or QUIC, preserving client IP via PROXY protocol.",
"main": "dist_ts/index.js",
"typings": "dist_ts/index.d.ts",
"type": "module",
@@ -14,17 +14,18 @@
"buildDocs": "(tsdoc)"
},
"devDependencies": {
"@git.zone/tsbuild": "^4.1.2",
"@git.zone/tsbundle": "^2.8.3",
"@git.zone/tsrun": "^2.0.1",
"@git.zone/tsrust": "^1.3.0",
"@git.zone/tstest": "^3.1.8",
"@git.zone/tsbuild": "^4.4.0",
"@git.zone/tsbundle": "^2.10.0",
"@git.zone/tsrun": "^2.0.2",
"@git.zone/tsrust": "^1.3.2",
"@git.zone/tstest": "^3.6.0",
"@push.rocks/tapbundle": "^6.0.3",
"@types/node": "^25.3.0"
"@types/node": "^25.5.0"
},
"dependencies": {
"@push.rocks/qenv": "^6.1.3",
"@push.rocks/smartrust": "^1.2.1"
"@push.rocks/smartnftables": "^1.0.1",
"@push.rocks/smartrust": "^1.3.2"
},
"repository": {
"type": "git",
@@ -47,7 +48,7 @@
"dist_rust/**/*",
"assets/**/*",
"cli.js",
"npmextra.json",
".smartconfig.json",
"readme.md"
],
"keywords": [

2736
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

437
readme.md
View File

@@ -1,6 +1,6 @@
# @serve.zone/remoteingress
Edge ingress tunnel for DcRouter — accepts incoming TCP connections at the network edge and tunnels them over a single encrypted TLS connection to a DcRouter SmartProxy instance, preserving the original client IP via PROXY protocol v1.
Edge ingress tunnel for DcRouter — tunnels **TCP and UDP** traffic from the network edge to a private DcRouter/SmartProxy cluster over encrypted TLS or QUIC connections, preserving the original client IP via PROXY protocol. Includes **hub-controlled nftables firewall** for IP blocking, rate limiting, and custom firewall rules applied directly at the edge.
## Issue Reporting and Security
@@ -12,48 +12,54 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
pnpm install @serve.zone/remoteingress
```
## 🏗️ Architecture
## Architecture
`@serve.zone/remoteingress` uses a **Hub/Edge** topology with a high-performance Rust core and a TypeScript API surface:
```
┌─────────────────────┐ TLS Tunnel ┌─────────────────────┐
│ Network Edge │ ◄══════════════════════════► │ Private Cluster │
(multiplexed frames +
RemoteIngressEdge shared-secret auth) │ RemoteIngressHub
Accepts client TCP │ │ Forwards to
connections on │ │ SmartProxy on
hub-assigned ports │ │ local ports
TLS or QUIC Tunnel
┌─────────────────────┐ ◄══════════════════════════► ┌─────────────────────┐
Network EdgeTCP+TLS: frame mux Private Cluster
│ QUIC: native streams │
RemoteIngressEdge │ UDP: QUIC datagrams │ RemoteIngressHub
│ │
• TCP/UDP listeners│ ◄─── FRAME_CONFIG pushes ─── │ • Port assignments
│ • nftables firewall│ ports + firewall rules │ • Firewall config │
│ • Rate limiting │ at any time │ • Rate limit rules │
└─────────────────────┘ └─────────────────────┘
▲ │
│ TCP from end users
│ TCP + UDP from end users ▼
Internet DcRouter / SmartProxy
```
| Component | Role |
|-----------|------|
| **RemoteIngressEdge** | Deployed at the network edge (e.g. a VPS or cloud instance). Listens on ports assigned by the hub, accepts raw TCP connections, and multiplexes them over a single TLS tunnel to the hub. Ports are hot-reloadable — the hub can change them at runtime. |
| **RemoteIngressHub** | Deployed alongside DcRouter/SmartProxy in a private cluster. Accepts edge connections, demuxes streams, and forwards each to SmartProxy with a PROXY protocol v1 header so the real client IP is preserved. Controls which ports each edge listens on. |
| **RemoteIngressEdge** | Deployed at the network edge (VPS, cloud instance). Runs as root. Listens on hub-assigned TCP/UDP ports, tunnels traffic to the hub, and applies hub-pushed nftables rules (IP blocking, rate limiting). All config is hot-reloadable at runtime. |
| **RemoteIngressHub** | Deployed alongside DcRouter/SmartProxy in a private cluster. Accepts edge connections, demuxes streams/datagrams, and forwards each to SmartProxy with PROXY protocol headers so the real client IP is preserved. Pushes all edge config (ports, firewall) via a single API. |
| **Rust Binary** (`remoteingress-bin`) | The performance-critical networking core. Managed via `@push.rocks/smartrust` RustBridge IPC — you never interact with it directly. Cross-compiled for `linux/amd64` and `linux/arm64`. |
### Key Features
### Key Features
- 🔒 **TLS-encrypted tunnel** between edge and hub (auto-generated self-signed cert or bring your own)
- 🔀 **Multiplexed streams** — thousands of client connections flow over a single tunnel
- 🌐 **PROXY protocol v1** — SmartProxy sees the real client IP, not the tunnel IP
- 🔑 **Shared-secret authentication** — edges must present valid credentials to connect
- 🎫 **Connection tokens**encode all connection details into a single opaque string
- 📡 **STUN-based public IP discovery**the edge automatically discovers its public IP via Cloudflare STUN
- 🔄 **Auto-reconnect** with exponential backoff if the tunnel drops
- 🎛️ **Dynamic port configuration**the hub assigns listen ports per edge and can hot-reload them at runtime via `FRAME_CONFIG` frames
- 📣 **Event-driven**both Hub and Edge extend `EventEmitter` for real-time monitoring
- **Rust core** — all frame encoding, TLS, and TCP proxying happen in native code for maximum throughput
- 🎚️ **3-tier QoS**control frames, normal data, and sustained (elephant flow) traffic each get their own priority queue
- 📊 **Adaptive flow control**per-stream windows scale with active stream count to prevent memory overuse
- **Dual transport** — choose between TCP+TLS (frame-multiplexed) or QUIC (native stream multiplexing, zero head-of-line blocking)
- **TCP + UDP tunneling** — tunnel any TCP connection or UDP datagram through the same edge/hub pair
- **PROXY protocol v1 & v2** — SmartProxy sees the real client IP for both TCP (v1 text) and UDP (v2 binary)
- **Hub-controlled firewall** — push nftables rules (IP blocking, rate limiting, custom firewall rules) to edges as part of the same config update that assigns ports — powered by `@push.rocks/smartnftables`
- **Multiplexed streams** — thousands of concurrent TCP connections over a single tunnel
- **QUIC datagrams** — UDP traffic forwarded via QUIC unreliable datagrams for lowest possible latency
- **Shared-secret authentication** — edges must present valid credentials to connect
- **Connection tokens** — encode all connection details into a single opaque base64url string
- **STUN-based public IP discovery** — edges automatically discover their public IP via Cloudflare STUN
- **Auto-reconnect** with exponential backoff if the tunnel drops
- **Dynamic runtime configuration** — the hub pushes ports, firewall rules, and rate limits to edges at any time via a single `updateAllowedEdges()` call
- **Event-driven** — both Hub and Edge extend `EventEmitter` for real-time monitoring
- **3-tier QoS** — control frames, normal data, and sustained (elephant flow) traffic each get their own priority queue
- **Adaptive flow control** — per-stream windows scale with active stream count to prevent memory overuse
- **UDP session management** — automatic session tracking with 60s idle timeout and cleanup
- **Crash recovery** — automatic restart with exponential backoff if the Rust binary crashes unexpectedly
## 🚀 Usage
## Usage
Both classes are imported from the package and communicate with the Rust binary under the hood. All you need to do is configure and start them.
Both classes are imported from the package and communicate with the Rust binary under the hood.
### Setting Up the Hub (Private Cluster Side)
@@ -63,32 +69,34 @@ import { RemoteIngressHub } from '@serve.zone/remoteingress';
const hub = new RemoteIngressHub();
// Listen for events
hub.on('edgeConnected', ({ edgeId }) => {
console.log(`Edge ${edgeId} connected`);
});
hub.on('edgeDisconnected', ({ edgeId }) => {
console.log(`Edge ${edgeId} disconnected`);
});
hub.on('streamOpened', ({ edgeId, streamId }) => {
console.log(`Stream ${streamId} opened from edge ${edgeId}`);
});
hub.on('streamClosed', ({ edgeId, streamId }) => {
console.log(`Stream ${streamId} closed from edge ${edgeId}`);
});
hub.on('edgeConnected', ({ edgeId }) => console.log(`Edge ${edgeId} connected`));
hub.on('edgeDisconnected', ({ edgeId }) => console.log(`Edge ${edgeId} disconnected`));
hub.on('streamOpened', ({ edgeId, streamId }) => console.log(`Stream ${streamId} from ${edgeId}`));
hub.on('streamClosed', ({ edgeId, streamId }) => console.log(`Stream ${streamId} closed`));
// Start the hub — it will listen for incoming edge TLS connections
// Start the hub — listens for edge connections on both TCP and QUIC (same port)
await hub.start({
tunnelPort: 8443, // port edges connect to (default: 8443)
targetHost: '127.0.0.1', // SmartProxy host to forward streams to (default: 127.0.0.1)
targetHost: '127.0.0.1', // SmartProxy host to forward traffic to
});
// Register which edges are allowed to connect, including their listen ports
// Register allowed edges with TCP and UDP listen ports + firewall config
await hub.updateAllowedEdges([
{
id: 'edge-nyc-01',
secret: 'supersecrettoken1',
listenPorts: [80, 443], // ports the edge should listen on
stunIntervalSecs: 300, // STUN discovery interval (default: 300)
listenPorts: [80, 443], // TCP ports the edge should listen on
listenPortsUdp: [53, 51820], // UDP ports (e.g., DNS, WireGuard)
stunIntervalSecs: 300,
firewallConfig: {
blockedIps: ['192.168.1.100', '10.0.0.0/8'],
rateLimits: [
{ id: 'http-rate', port: 80, protocol: 'tcp', rate: '100/second', perSourceIP: true },
],
rules: [
{ id: 'allow-ssh', direction: 'input', action: 'accept', sourceIP: '10.0.0.0/24', destPort: 22, protocol: 'tcp' },
],
},
},
{
id: 'edge-fra-02',
@@ -97,38 +105,35 @@ await hub.updateAllowedEdges([
},
]);
// Dynamically update ports for a connected edge — changes are pushed instantly
// Dynamically update ports and firewall — changes are pushed instantly to connected edges
await hub.updateAllowedEdges([
{
id: 'edge-nyc-01',
secret: 'supersecrettoken1',
listenPorts: [80, 443, 8443], // added port 8443 — edge picks it up in real time
listenPorts: [80, 443, 8443], // added TCP port 8443
listenPortsUdp: [53], // removed WireGuard UDP port
firewallConfig: {
blockedIps: ['192.168.1.100', '10.0.0.0/8', '203.0.113.50'], // added new blocked IP
rateLimits: [
{ id: 'http-rate', port: 80, protocol: 'tcp', rate: '200/second', perSourceIP: true },
],
},
},
]);
// Check status at any time
// Check status
const status = await hub.getStatus();
console.log(status);
// {
// running: true,
// tunnelPort: 8443,
// connectedEdges: [
// { edgeId: 'edge-nyc-01', connectedAt: 1700000000, activeStreams: 12 }
// ]
// }
// { running: true, tunnelPort: 8443, connectedEdges: [...] }
// Graceful shutdown
await hub.stop();
```
### Setting Up the Edge (Network Edge Side)
The edge can be configured in two ways: with an **opaque connection token** (recommended) or with explicit config fields.
The edge can connect via **TCP+TLS** (default) or **QUIC** transport. Edges run as **root** so they can bind to privileged ports and apply nftables firewall rules.
#### Option A: Connection Token (Recommended)
A single token encodes all connection details — ideal for provisioning edges at scale:
```typescript
import { RemoteIngressEdge } from '@serve.zone/remoteingress';
@@ -137,111 +142,159 @@ const edge = new RemoteIngressEdge();
edge.on('tunnelConnected', () => console.log('Tunnel established'));
edge.on('tunnelDisconnected', () => console.log('Tunnel lost — will auto-reconnect'));
edge.on('publicIpDiscovered', ({ ip }) => console.log(`Public IP: ${ip}`));
edge.on('portsAssigned', ({ listenPorts }) => console.log(`Listening on ports: ${listenPorts}`));
edge.on('portsUpdated', ({ listenPorts }) => console.log(`Ports updated: ${listenPorts}`));
edge.on('portsAssigned', ({ listenPorts }) => console.log(`TCP ports: ${listenPorts}`));
edge.on('firewallConfigUpdated', () => console.log('Firewall rules applied'));
// Single token contains hubHost, hubPort, edgeId, and secret
await edge.start({
token: 'eyJoIjoiaHViLmV4YW1wbGUuY29tIiwicCI6ODQ0MywiZSI6ImVkZ2UtbnljLTAxIiwicyI6InN1cGVyc2VjcmV0dG9rZW4xIn0',
token: 'eyJoIjoiaHViLmV4YW1wbGUuY29tIiwi...',
});
```
#### Option B: Explicit Config
#### Option B: Explicit Config with QUIC Transport
```typescript
import { RemoteIngressEdge } from '@serve.zone/remoteingress';
const edge = new RemoteIngressEdge();
edge.on('tunnelConnected', () => console.log('Tunnel established'));
edge.on('tunnelDisconnected', () => console.log('Tunnel lost — will auto-reconnect'));
edge.on('publicIpDiscovered', ({ ip }) => console.log(`Public IP: ${ip}`));
edge.on('portsAssigned', ({ listenPorts }) => console.log(`Listening on ports: ${listenPorts}`));
edge.on('portsUpdated', ({ listenPorts }) => console.log(`Ports updated: ${listenPorts}`));
await edge.start({
hubHost: 'hub.example.com', // hostname or IP of the hub
hubPort: 8443, // must match hub's tunnelPort (default: 8443)
edgeId: 'edge-nyc-01', // unique edge identifier
secret: 'supersecrettoken1', // must match the hub's allowed edge secret
hubHost: 'hub.example.com',
hubPort: 8443,
edgeId: 'edge-nyc-01',
secret: 'supersecrettoken1',
transportMode: 'quic', // 'tcpTls' (default) | 'quic' | 'quicWithFallback'
});
// Check status at any time
const edgeStatus = await edge.getStatus();
console.log(edgeStatus);
// {
// running: true,
// connected: true,
// publicIp: '203.0.113.42',
// activeStreams: 5,
// listenPorts: [80, 443]
// }
// { running: true, connected: true, publicIp: '203.0.113.42', activeStreams: 5, listenPorts: [80, 443] }
// Graceful shutdown
await edge.stop();
```
### 🎫 Connection Tokens
#### Transport Modes
Connection tokens let you distribute a single opaque string instead of four separate config values. The hub operator generates the token; the edge operator just pastes it in.
| Mode | Description |
|------|-------------|
| `'tcpTls'` | **Default.** Single TLS connection with frame-based multiplexing. Universal compatibility. |
| `'quic'` | QUIC with native stream multiplexing. Eliminates head-of-line blocking. Uses QUIC datagrams for UDP traffic. |
| `'quicWithFallback'` | Tries QUIC first (5s timeout), falls back to TCP+TLS if UDP is blocked by the network. |
### Connection Tokens
Encode all connection details into a single opaque string for easy distribution:
```typescript
import { encodeConnectionToken, decodeConnectionToken } from '@serve.zone/remoteingress';
// Hub side: generate a token for a new edge
// Hub operator generates a token
const token = encodeConnectionToken({
hubHost: 'hub.example.com',
hubPort: 8443,
edgeId: 'edge-nyc-01',
secret: 'supersecrettoken1',
});
console.log(token);
// => 'eyJoIjoiaHViLmV4YW1wbGUuY29tIiwi...'
// Edge side: inspect a token (optional — start() does this automatically)
// Edge operator decodes (optional — start() does this automatically)
const data = decodeConnectionToken(token);
console.log(data);
// {
// hubHost: 'hub.example.com',
// hubPort: 8443,
// edgeId: 'edge-nyc-01',
// secret: 'supersecrettoken1'
// }
// { hubHost: 'hub.example.com', hubPort: 8443, edgeId: 'edge-nyc-01', secret: '...' }
```
Tokens are base64url-encoded (URL-safe, no padding) — safe to pass as environment variables, CLI arguments, or store in config files.
Tokens are base64url-encoded — safe for environment variables, CLI arguments, and config files.
## 📖 API Reference
## 🔥 Firewall Config
The `firewallConfig` field in `updateAllowedEdges()` works exactly like `listenPorts` — it travels in the same `FRAME_CONFIG` frame, is delivered on initial handshake and on every subsequent update, and is applied atomically at the edge using `@push.rocks/smartnftables`. Each update fully replaces the previous ruleset.
Since edges run as root, the rules are applied directly to the Linux kernel via nftables. If the edge isn't root or nftables is unavailable, it logs a warning and continues — the tunnel works fine, just without kernel-level firewall rules.
### Config Structure
```typescript
interface IFirewallConfig {
blockedIps?: string[]; // IPs or CIDRs to block (e.g., '1.2.3.4', '10.0.0.0/8')
rateLimits?: IFirewallRateLimit[];
rules?: IFirewallRule[];
}
interface IFirewallRateLimit {
id: string; // unique identifier for this rate limit
port: number; // port to rate-limit
protocol?: 'tcp' | 'udp'; // default: both
rate: string; // e.g., '100/second', '1000/minute'
burst?: number; // burst allowance
perSourceIP?: boolean; // per-client rate limiting (recommended)
}
interface IFirewallRule {
id: string; // unique identifier for this rule
direction: 'input' | 'output' | 'forward';
action: 'accept' | 'drop' | 'reject';
sourceIP?: string; // source IP or CIDR
destPort?: number; // destination port
protocol?: 'tcp' | 'udp';
comment?: string;
}
```
### Example: Rate Limiting + IP Blocking
```typescript
await hub.updateAllowedEdges([
{
id: 'edge-nyc-01',
secret: 'secret',
listenPorts: [80, 443],
firewallConfig: {
// Block known bad actors
blockedIps: ['198.51.100.0/24', '203.0.113.50'],
// Rate limit HTTP traffic per source IP
rateLimits: [
{ id: 'http', port: 80, protocol: 'tcp', rate: '100/second', burst: 50, perSourceIP: true },
{ id: 'https', port: 443, protocol: 'tcp', rate: '200/second', burst: 100, perSourceIP: true },
],
// Allow monitoring from trusted subnet
rules: [
{ id: 'monitoring', direction: 'input', action: 'accept', sourceIP: '10.0.0.0/24', destPort: 9090, protocol: 'tcp', comment: 'Prometheus scraping' },
],
},
},
]);
```
## API Reference
### `RemoteIngressHub`
| Method / Property | Description |
|-------------------|-------------|
| `start(config?)` | Spawns the Rust binary and starts the tunnel listener. Config: `{ tunnelPort?: number, targetHost?: string }` |
| `stop()` | Gracefully shuts down the hub and kills the Rust process. |
| `updateAllowedEdges(edges)` | Dynamically update which edges are authorized and what ports they listen on. Each edge: `{ id: string, secret: string, listenPorts?: number[], stunIntervalSecs?: number }`. If ports change for a connected edge, the update is pushed immediately via a `FRAME_CONFIG` frame. |
| `getStatus()` | Returns current hub status including connected edges and active stream counts. |
| `start(config?)` | Start the hub. Config: `{ tunnelPort?, targetHost?, tls?: { certPem?, keyPem? } }`. Listens on both TCP and UDP (QUIC) on the tunnel port. |
| `stop()` | Graceful shutdown. |
| `updateAllowedEdges(edges)` | Set authorized edges. Each: `{ id, secret, listenPorts?, listenPortsUdp?, stunIntervalSecs?, firewallConfig? }`. Port and firewall changes are pushed to connected edges in real time. |
| `getStatus()` | Returns `{ running, tunnelPort, connectedEdges: [...] }`. |
| `running` | `boolean` — whether the Rust binary is alive. |
**Events:** `edgeConnected`, `edgeDisconnected`, `streamOpened`, `streamClosed`
**Events:** `edgeConnected`, `edgeDisconnected`, `streamOpened`, `streamClosed`, `crashRecovered`, `crashRecoveryFailed`
### `RemoteIngressEdge`
| Method / Property | Description |
|-------------------|-------------|
| `start(config)` | Spawns the Rust binary and connects to the hub. Accepts `{ token: string }` or `IEdgeConfig`. Listen ports are received from the hub during handshake. |
| `stop()` | Gracefully shuts down the edge and kills the Rust process. |
| `getStatus()` | Returns current edge status including connection state, public IP, listen ports, and active streams. |
| `start(config)` | Connect to hub. Accepts `{ token }` or `{ hubHost, hubPort, edgeId, secret, bindAddress?, transportMode? }`. |
| `stop()` | Graceful shutdown. Cleans up all nftables rules. |
| `getStatus()` | Returns `{ running, connected, publicIp, activeStreams, listenPorts }`. |
| `running` | `boolean` — whether the Rust binary is alive. |
**Events:** `tunnelConnected`, `tunnelDisconnected`, `publicIpDiscovered`, `portsAssigned`, `portsUpdated`
**Events:** `tunnelConnected`, `tunnelDisconnected`, `publicIpDiscovered`, `portsAssigned`, `portsUpdated`, `firewallConfigUpdated`, `crashRecovered`, `crashRecoveryFailed`
### Token Utilities
| Function | Description |
|----------|-------------|
| `encodeConnectionToken(data)` | Encodes `IConnectionTokenData` into a base64url token string. |
| `decodeConnectionToken(token)` | Decodes a token back into `IConnectionTokenData`. Throws on malformed or incomplete tokens. |
| `encodeConnectionToken(data)` | Encodes connection info into a base64url token. |
| `decodeConnectionToken(token)` | Decodes a token. Throws on malformed input. |
### Interfaces
@@ -249,6 +302,10 @@ Tokens are base64url-encoded (URL-safe, no padding) — safe to pass as environm
interface IHubConfig {
tunnelPort?: number; // default: 8443
targetHost?: string; // default: '127.0.0.1'
tls?: {
certPem?: string; // PEM-encoded TLS certificate
keyPem?: string; // PEM-encoded TLS private key
};
}
interface IEdgeConfig {
@@ -256,6 +313,8 @@ interface IEdgeConfig {
hubPort?: number; // default: 8443
edgeId: string;
secret: string;
bindAddress?: string;
transportMode?: 'tcpTls' | 'quic' | 'quicWithFallback';
}
interface IConnectionTokenData {
@@ -266,9 +325,11 @@ interface IConnectionTokenData {
}
```
## 🔌 Wire Protocol
## Wire Protocol
The tunnel uses a custom binary frame protocol over TLS:
### TCP+TLS Transport (Frame Protocol)
The tunnel uses a custom binary frame protocol over a single TLS connection:
```
[stream_id: 4 bytes BE][type: 1 byte][length: 4 bytes BE][payload: N bytes]
@@ -276,111 +337,149 @@ The tunnel uses a custom binary frame protocol over TLS:
| Frame Type | Value | Direction | Purpose |
|------------|-------|-----------|---------|
| `OPEN` | `0x01` | Edge → Hub | Open a new stream; payload is PROXY v1 header |
| `DATA` | `0x02` | Edge → Hub | Client data flowing upstream |
| `CLOSE` | `0x03` | Edge → Hub | Client closed the connection |
| `DATA_BACK` | `0x04` | Hub → Edge | Response data flowing downstream |
| `CLOSE_BACK` | `0x05` | Hub → Edge | Upstream (SmartProxy) closed the connection |
| `CONFIG` | `0x06` | Hub → Edge | Runtime configuration update (e.g. port changes); payload is JSON |
| `PING` | `0x07` | Hub → Edge | Heartbeat probe (sent every 15s) |
| `OPEN` | `0x01` | Edge → Hub | Open TCP stream; payload is PROXY v1 header |
| `DATA` | `0x02` | Edge → Hub | Client data (upload) |
| `CLOSE` | `0x03` | Edge → Hub | Client closed connection |
| `DATA_BACK` | `0x04` | Hub → Edge | Response data (download) |
| `CLOSE_BACK` | `0x05` | Hub → Edge | Upstream closed connection |
| `CONFIG` | `0x06` | Hub → Edge | Runtime config update (JSON: ports + firewall config) |
| `PING` | `0x07` | Hub → Edge | Heartbeat probe (every 15s) |
| `PONG` | `0x08` | Edge → Hub | Heartbeat response |
| `WINDOW_UPDATE` | `0x09` | Edge → Hub | Per-stream flow control: edge consumed N bytes, hub can send more |
| `WINDOW_UPDATE_BACK` | `0x0A` | Hub → Edge | Per-stream flow control: hub consumed N bytes, edge can send more |
| `WINDOW_UPDATE` | `0x09` | Edge → Hub | Flow control: edge consumed N bytes |
| `WINDOW_UPDATE_BACK` | `0x0A` | Hub → Edge | Flow control: hub consumed N bytes |
| `UDP_OPEN` | `0x0B` | Edge → Hub | Open UDP session; payload is PROXY v2 header |
| `UDP_DATA` | `0x0C` | Edge → Hub | UDP datagram (upload) |
| `UDP_DATA_BACK` | `0x0D` | Hub → Edge | UDP datagram (download) |
| `UDP_CLOSE` | `0x0E` | Either | Close UDP session |
Max payload size per frame: **16 MB**. Stream IDs are 32-bit unsigned integers.
### QUIC Transport
When using QUIC, the frame protocol is replaced by native QUIC primitives:
- **TCP connections:** Each tunneled TCP connection gets its own QUIC bidirectional stream. No framing overhead.
- **UDP datagrams:** Forwarded via QUIC unreliable datagrams (RFC 9221). Format: `[session_id: 4 bytes][payload]`. Session open uses magic byte `0xFF`: `[session_id: 4][0xFF][PROXY v2 header]`.
- **Control channel:** First QUIC bidirectional stream carries auth handshake + config updates using `[type: 1][length: 4][payload]` format.
### Handshake Sequence
1. Edge opens a TLS connection to the hub
1. Edge opens a TLS or QUIC connection to the hub
2. Edge sends: `EDGE <edgeId> <secret>\n`
3. Hub verifies credentials (constant-time comparison) and responds with JSON: `{"listenPorts":[...],"stunIntervalSecs":300}\n`
4. Edge starts TCP listeners on the assigned ports
5. Frame protocol begins — `OPEN`/`DATA`/`CLOSE` frames flow in both directions
6. Hub can push `CONFIG` frames at any time to update the edge's listen ports
3. Hub verifies credentials (constant-time comparison) and responds with JSON:
`{"listenPorts":[...],"listenPortsUdp":[...],"stunIntervalSecs":300,"firewallConfig":{...}}\n`
4. Edge starts TCP and UDP listeners on the assigned ports
5. Edge applies firewall config via nftables (if present and running as root)
6. Data flows — TCP frames/QUIC streams for TCP traffic, UDP frames/QUIC datagrams for UDP traffic
## 🎚️ QoS & Flow Control
## QoS & Flow Control
The tunnel multiplexer uses a **3-tier priority system** and **per-stream flow control** to ensure fair bandwidth sharing across thousands of concurrent streams.
### Priority Tiers (TCP+TLS Transport)
### Priority Tiers
All outbound frames are queued into one of three priority levels:
| Tier | Queue | Frames | Behavior |
|------|-------|--------|----------|
| 🔴 **Control** (highest) | `ctrl_queue` | PING, PONG, WINDOW_UPDATE, OPEN, CLOSE, CONFIG | Always drained first. Never delayed. |
| 🟡 **Data** (normal) | `data_queue` | DATA, DATA_BACK from normal streams | Drained when ctrl is empty. Gated at 64 buffered items for backpressure. |
| 🟢 **Sustained** (lowest) | `sustained_queue` | DATA, DATA_BACK from elephant flows | Drained freely when ctrl+data are empty. Otherwise guaranteed **1 MB/s** via forced drain every second. |
This prevents large bulk transfers (e.g. git clones, file downloads) from starving interactive traffic and ensures `WINDOW_UPDATE` frames are never delayed — which would cause flow control deadlocks.
| Tier | Frames | Behavior |
|------|--------|----------|
| **Control** | PING, PONG, WINDOW_UPDATE, OPEN, CLOSE, CONFIG | Always drained first. Never delayed. |
| **Data** | DATA/DATA_BACK from normal streams, UDP frames | Drained when control queue is empty. |
| **Sustained** | DATA/DATA_BACK from elephant flows | Lowest priority with guaranteed **1 MB/s** drain rate. |
### Sustained Stream Classification
A stream is automatically classified as **sustained** (elephant flow) when:
- It has been active for **>10 seconds**, AND
- Its average throughput exceeds **20 Mbit/s** (2.5 MB/s)
A TCP stream is classified as **sustained** (elephant flow) when:
- Active for **>10 seconds**, AND
- Average throughput exceeds **20 Mbit/s** (2.5 MB/s)
Once classified, the stream's flow control window is locked to the **1 MB floor** and its data frames move to the lowest-priority queue. Classification is one-way — a stream never gets promoted back to normal.
Once classified, its flow control window locks to 1 MB and data frames move to the lowest-priority queue.
### Adaptive Per-Stream Windows
Each stream has a send window that limits bytes-in-flight. The window size adapts to the number of active streams using a shared **200 MB memory budget**:
Each TCP stream has a send window from a shared **200 MB budget**:
| Active Streams | Window per Stream |
|---|---|
| 150 | 4 MB (maximum) |
| 51100 | Scales down (4 MB → 2 MB) |
| 51200 | Scales down (4 MB → 1 MB) |
| 200+ | 1 MB (floor) |
The consumer sends `WINDOW_UPDATE` frames after processing data, allowing the producer to send more. This prevents any single stream from consuming unbounded memory and provides natural backpressure.
UDP traffic uses no flow control — datagrams are fire-and-forget, matching UDP semantics.
## 💡 Example Scenarios
## Example Scenarios
### 1. Expose a Private Kubernetes Cluster to the Internet
### 1. 🌐 Expose a Private Cluster to the Internet
Deploy an Edge on a public VPS, point your DNS to the VPS IP. The Edge tunnels all traffic to the Hub running inside the cluster, which hands it off to SmartProxy/DcRouter. Your cluster stays fully private — no public-facing ports needed.
Deploy an Edge on a public VPS, point DNS to its IP. The Edge tunnels all TCP and UDP traffic to the Hub running inside your private cluster. No public ports needed on the cluster.
### 2. Multi-Region Edge Ingress
### 2. 🗺️ Multi-Region Edge Ingress
Run multiple Edges in different geographic regions (NYC, Frankfurt, Tokyo) all connecting to a single Hub. Use GeoDNS to route users to their nearest Edge. The Hub sees the real client IPs via PROXY protocol regardless of which edge they connected through.
Run Edges in NYC, Frankfurt, and Tokyo all connecting to a single Hub. Use GeoDNS to route users to their nearest Edge. PROXY protocol ensures the Hub sees real client IPs regardless of which Edge they entered through.
### 3. Secure API Exposure
### 3. 📡 UDP Forwarding (DNS, Gaming, VoIP)
Your backend runs on a private network with no direct internet access. An Edge on a minimal cloud instance acts as the only public entry point. TLS tunnel + shared-secret auth ensure only your authorized Edge can forward traffic.
### 4. Token-Based Edge Provisioning
Generate connection tokens on the hub side and distribute them to edge operators. Each edge only needs a single token string to connect — no manual configuration of host, port, ID, and secret.
Configure UDP listen ports alongside TCP ports. DNS queries, game server traffic, or VoIP packets are tunneled through the same edge/hub connection and forwarded to SmartProxy with a PROXY v2 binary header preserving the client's real IP.
```typescript
// Hub operator generates token
await hub.updateAllowedEdges([
{
id: 'edge-nyc-01',
secret: 'secret',
listenPorts: [80, 443], // TCP
listenPortsUdp: [53, 27015], // DNS + game server
},
]);
```
### 4. 🚀 QUIC Transport for Low-Latency
Use QUIC transport to eliminate head-of-line blocking — a lost packet on one stream doesn't stall others. QUIC also enables 0-RTT reconnection and connection migration.
```typescript
await edge.start({
hubHost: 'hub.example.com',
hubPort: 8443,
edgeId: 'edge-01',
secret: 'secret',
transportMode: 'quicWithFallback', // try QUIC, fall back to TLS if UDP blocked
});
```
### 5. 🔑 Token-Based Edge Provisioning
Generate connection tokens on the hub side and distribute them to edge operators:
```typescript
import { encodeConnectionToken, RemoteIngressEdge } from '@serve.zone/remoteingress';
const token = encodeConnectionToken({
hubHost: 'hub.prod.example.com',
hubPort: 8443,
edgeId: 'edge-tokyo-01',
secret: 'generated-secret-abc123',
});
// Send `token` to the edge operator via secure channel
// Send `token` to the edge operator a single string is all they need
// Edge operator starts with just the token
const edge = new RemoteIngressEdge();
await edge.start({ token });
```
### 5. Dynamic Port Management
### 6. 🛡️ Centralized Firewall Management
The hub controls which ports each edge listens on. Ports can be changed at runtime without restarting the edge — the hub pushes a `CONFIG` frame and the edge hot-reloads its TCP listeners.
Push firewall rules from the hub to all your edge nodes. Block bad actors, rate-limit abusive traffic, and whitelist trusted subnets — all from a single control plane:
```typescript
// Initially assign ports 80 and 443
await hub.updateAllowedEdges([
{ id: 'edge-nyc-01', secret: 'secret', listenPorts: [80, 443] },
]);
// Later, add port 8080 — the connected edge picks it up instantly
await hub.updateAllowedEdges([
{ id: 'edge-nyc-01', secret: 'secret', listenPorts: [80, 443, 8080] },
{
id: 'edge-nyc-01',
secret: 'secret',
listenPorts: [80, 443],
firewallConfig: {
blockedIps: ['198.51.100.0/24'],
rateLimits: [
{ id: 'https', port: 443, protocol: 'tcp', rate: '500/second', perSourceIP: true, burst: 100 },
],
rules: [
{ id: 'allow-monitoring', direction: 'input', action: 'accept', sourceIP: '10.0.0.0/8', destPort: 9090, protocol: 'tcp' },
],
},
},
]);
// Firewall rules are applied at the edge via nftables within seconds
```
## License and Legal Information

View File

@@ -316,6 +316,12 @@ async fn handle_request(
serde_json::json!({ "listenPorts": listen_ports }),
);
}
EdgeEvent::FirewallConfigUpdated { firewall_config } => {
send_event(
"firewallConfigUpdated",
serde_json::json!({ "firewallConfig": firewall_config }),
);
}
}
}
});

View File

@@ -36,6 +36,9 @@ struct EdgeStreamState {
send_window: Arc<AtomicU32>,
/// Notifier to wake the client reader when the window opens.
window_notify: Arc<Notify>,
/// Per-stream cancellation token — cancelled on FRAME_CLOSE_BACK to promptly
/// terminate the upload loop instead of waiting for the window stall timeout.
cancel_token: CancellationToken,
}
/// Edge configuration (hub-host + credentials only; ports come from hub).
@@ -64,6 +67,8 @@ struct HandshakeConfig {
listen_ports_udp: Vec<u16>,
#[serde(default = "default_stun_interval")]
stun_interval_secs: u64,
#[serde(default)]
firewall_config: Option<serde_json::Value>,
}
fn default_stun_interval() -> u64 {
@@ -77,6 +82,8 @@ struct ConfigUpdate {
listen_ports: Vec<u16>,
#[serde(default)]
listen_ports_udp: Vec<u16>,
#[serde(default)]
firewall_config: Option<serde_json::Value>,
}
/// Events emitted by the edge.
@@ -93,6 +100,8 @@ pub enum EdgeEvent {
PortsAssigned { listen_ports: Vec<u16> },
#[serde(rename_all = "camelCase")]
PortsUpdated { listen_ports: Vec<u16> },
#[serde(rename_all = "camelCase")]
FirewallConfigUpdated { firewall_config: serde_json::Value },
}
/// Edge status response.
@@ -220,7 +229,7 @@ async fn edge_main_loop(
let mut backoff_ms: u64 = 1000;
let max_backoff_ms: u64 = 30000;
let transport_mode = config.transport_mode.unwrap_or(TransportMode::TcpTls);
let transport_mode = config.transport_mode.unwrap_or(TransportMode::QuicWithFallback);
// Build TLS config ONCE outside the reconnect loop — preserves session
// cache across reconnections for TLS session resumption (saves 1 RTT).
@@ -399,7 +408,11 @@ async fn handle_edge_frame(
}
FRAME_CLOSE_BACK => {
let mut writers = client_writers.lock().await;
writers.remove(&frame.stream_id);
if let Some(state) = writers.remove(&frame.stream_id) {
// Cancel the stream's token so the upload loop exits promptly
// instead of waiting for the window stall timeout.
state.cancel_token.cancel();
}
}
FRAME_CONFIG => {
if let Ok(update) = serde_json::from_slice::<ConfigUpdate>(&frame.payload) {
@@ -432,6 +445,11 @@ async fn handle_edge_frame(
connection_token,
bind_address,
);
if let Some(fw_config) = update.firewall_config {
let _ = event_tx.try_send(EdgeEvent::FirewallConfigUpdated {
firewall_config: fw_config,
});
}
}
}
FRAME_PING => {
@@ -562,6 +580,13 @@ async fn connect_to_hub_and_run(
listen_ports: handshake.listen_ports.clone(),
});
// Emit firewall config if present in handshake
if let Some(fw_config) = handshake.firewall_config {
let _ = event_tx.try_send(EdgeEvent::FirewallConfigUpdated {
firewall_config: fw_config,
});
}
// Start STUN discovery
let stun_interval = handshake.stun_interval_secs;
let public_ip_clone = public_ip.clone();
@@ -642,8 +667,23 @@ async fn connect_to_hub_and_run(
let liveness_timeout_dur = Duration::from_secs(45);
let mut last_activity = Instant::now();
let mut liveness_deadline = Box::pin(sleep_until(last_activity + liveness_timeout_dur));
let mut next_udp_expiry = Instant::now() + Duration::from_secs(30);
let result = 'io_loop: loop {
// Expire idle UDP sessions periodically
if Instant::now() >= next_udp_expiry {
let mut sessions = udp_sessions.lock().await;
let expired = sessions.expire_idle();
for sid in &expired {
let close_frame = encode_frame(*sid, FRAME_UDP_CLOSE, &[]);
let _ = tunnel_data_tx.try_send(close_frame);
}
if !expired.is_empty() {
log::debug!("Expired {} idle UDP sessions", expired.len());
}
next_udp_expiry = Instant::now() + Duration::from_secs(30);
}
// Drain any buffered frames
loop {
let frame = match tunnel_io.try_parse_frame() {
@@ -997,6 +1037,7 @@ async fn handle_client_connection(
back_tx,
send_window: Arc::clone(&send_window),
window_notify: Arc::clone(&window_notify),
cancel_token: client_token.clone(),
});
}
@@ -1078,8 +1119,8 @@ async fn handle_client_connection(
tokio::select! {
_ = notified => continue,
_ = client_token.cancelled() => break,
_ = tokio::time::sleep(Duration::from_secs(120)) => {
log::warn!("Stream {} upload stalled (window empty for 120s)", stream_id);
_ = tokio::time::sleep(Duration::from_secs(55)) => {
log::warn!("Stream {} upload stalled (window empty for 55s)", stream_id);
break;
}
}
@@ -1286,6 +1327,13 @@ async fn connect_to_hub_and_run_quic_with_connection(
listen_ports: handshake.listen_ports.clone(),
});
// Emit firewall config if present in handshake
if let Some(fw_config) = handshake.firewall_config {
let _ = event_tx.try_send(EdgeEvent::FirewallConfigUpdated {
firewall_config: fw_config,
});
}
// Start STUN discovery
let stun_interval = handshake.stun_interval_secs;
let public_ip_clone = public_ip.clone();
@@ -1346,7 +1394,18 @@ async fn connect_to_hub_and_run_quic_with_connection(
);
// Monitor control stream for config updates, connection health, and QUIC datagrams.
let mut next_udp_expiry_quic = Instant::now() + Duration::from_secs(30);
let result = 'quic_loop: loop {
// Expire idle UDP sessions periodically
if Instant::now() >= next_udp_expiry_quic {
let mut sessions = udp_sessions_quic.lock().await;
let expired = sessions.expire_idle();
if !expired.is_empty() {
log::debug!("Expired {} idle QUIC UDP sessions", expired.len());
}
next_udp_expiry_quic = Instant::now() + Duration::from_secs(30);
}
tokio::select! {
// Read control messages from hub
ctrl_msg = quic_transport::read_ctrl_message(&mut ctrl_recv) => {
@@ -1370,6 +1429,16 @@ async fn connect_to_hub_and_run_quic_with_connection(
connection_token,
bind_address,
);
apply_udp_port_config_quic(
&update.listen_ports_udp,
&mut udp_listeners_quic,
&quic_conn,
&udp_sessions_quic,
&udp_sockets_quic,
next_stream_id,
connection_token,
bind_address,
);
}
}
quic_transport::CTRL_PING => {

View File

@@ -80,6 +80,8 @@ pub struct AllowedEdge {
#[serde(default)]
pub listen_ports_udp: Vec<u16>,
pub stun_interval_secs: Option<u64>,
#[serde(default)]
pub firewall_config: Option<serde_json::Value>,
}
/// Handshake response sent to edge after authentication.
@@ -90,6 +92,8 @@ struct HandshakeResponse {
#[serde(default)]
listen_ports_udp: Vec<u16>,
stun_interval_secs: u64,
#[serde(skip_serializing_if = "Option::is_none")]
firewall_config: Option<serde_json::Value>,
}
/// Configuration update pushed to a connected edge at runtime.
@@ -99,6 +103,8 @@ pub struct EdgeConfigUpdate {
pub listen_ports: Vec<u16>,
#[serde(default)]
pub listen_ports_udp: Vec<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
pub firewall_config: Option<serde_json::Value>,
}
/// Runtime status of a connected edge.
@@ -192,14 +198,17 @@ impl TunnelHub {
for edge in &edges {
if let Some(info) = connected.get(&edge.id) {
// Check if ports changed compared to old config
let ports_changed = match map.get(&edge.id) {
Some(old) => old.listen_ports != edge.listen_ports || old.listen_ports_udp != edge.listen_ports_udp,
let config_changed = match map.get(&edge.id) {
Some(old) => old.listen_ports != edge.listen_ports
|| old.listen_ports_udp != edge.listen_ports_udp
|| old.firewall_config != edge.firewall_config,
None => true, // newly allowed edge that's already connected
};
if ports_changed {
if config_changed {
let update = EdgeConfigUpdate {
listen_ports: edge.listen_ports.clone(),
listen_ports_udp: edge.listen_ports_udp.clone(),
firewall_config: edge.firewall_config.clone(),
};
let _ = info.config_tx.try_send(update);
}
@@ -475,6 +484,12 @@ async fn handle_hub_frame(
})??;
upstream.set_nodelay(true)?;
// TCP keepalive detects silent failures on the hub→SmartProxy connection
let ka = socket2::TcpKeepalive::new()
.with_time(Duration::from_secs(30));
#[cfg(target_os = "linux")]
let ka = ka.with_interval(Duration::from_secs(10));
let _ = socket2::SockRef::from(&upstream).set_tcp_keepalive(&ka);
upstream.write_all(proxy_header.as_bytes()).await?;
let (mut up_read, mut up_write) =
@@ -485,7 +500,7 @@ async fn handle_hub_frame(
let writer_token = stream_token.clone();
let wub_tx = writer_tx.clone();
let stream_counter_w = Arc::clone(&stream_counter);
let writer_for_edge_data = tokio::spawn(async move {
let mut writer_for_edge_data = tokio::spawn(async move {
let mut consumed_since_update: u32 = 0;
loop {
tokio::select! {
@@ -569,8 +584,8 @@ async fn handle_hub_frame(
tokio::select! {
_ = notified => continue,
_ = stream_token.cancelled() => break,
_ = tokio::time::sleep(Duration::from_secs(120)) => {
log::warn!("Stream {} download stalled (window empty for 120s)", stream_id);
_ = tokio::time::sleep(Duration::from_secs(55)) => {
log::warn!("Stream {} download stalled (window empty for 55s)", stream_id);
break;
}
}
@@ -633,7 +648,11 @@ async fn handle_hub_frame(
}
}
writer_for_edge_data.abort();
// Give the writer task 2s to shut down gracefully (sends TCP FIN
// via up_write.shutdown()) before force-aborting (which causes RST).
if tokio::time::timeout(Duration::from_secs(2), &mut writer_for_edge_data).await.is_err() {
writer_for_edge_data.abort();
}
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
}
.await;
@@ -706,6 +725,7 @@ async fn handle_hub_frame(
let data_writer_tx = data_tx.clone();
let session_token = edge_token.child_token();
let edge_id_str = edge_id.to_string();
let proxy_v2_header = frame.payload.clone();
// Channel for forwarding datagrams from edge to upstream
let (udp_tx, mut udp_rx) = mpsc::channel::<Bytes>(256);
@@ -728,6 +748,12 @@ async fn handle_hub_frame(
return;
}
// Send PROXY v2 header as first datagram so SmartProxy knows the original client
if let Err(e) = upstream.send(&proxy_v2_header).await {
log::error!("UDP session {} failed to send PROXY v2 header: {}", stream_id, e);
return;
}
// Task: upstream -> edge (return datagrams)
let upstream_recv = Arc::new(upstream);
let upstream_send = upstream_recv.clone();
@@ -844,14 +870,14 @@ async fn handle_edge_connection(
let secret = parts[2];
// Verify credentials and extract edge config
let (listen_ports, listen_ports_udp, stun_interval_secs) = {
let (listen_ports, listen_ports_udp, stun_interval_secs, firewall_config) = {
let edges = allowed.read().await;
match edges.get(&edge_id) {
Some(edge) => {
if !constant_time_eq(secret.as_bytes(), edge.secret.as_bytes()) {
return Err(format!("invalid secret for edge {}", edge_id).into());
}
(edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300))
(edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300), edge.firewall_config.clone())
}
None => {
return Err(format!("unknown edge {}", edge_id).into());
@@ -870,6 +896,7 @@ async fn handle_edge_connection(
listen_ports: listen_ports.clone(),
listen_ports_udp: listen_ports_udp.clone(),
stun_interval_secs,
firewall_config,
};
let mut handshake_json = serde_json::to_string(&handshake)?;
handshake_json.push('\n');
@@ -1066,7 +1093,11 @@ async fn handle_edge_connection(
).await;
{
let mut edges = connected.lock().await;
edges.remove(&edge_id);
// Only remove if the entry is still ours (not replaced by a reconnection).
// A replaced entry has a fresh non-cancelled token from the new handler.
if edges.get(&edge_id).map_or(false, |e| e.cancel_token.is_cancelled()) {
edges.remove(&edge_id);
}
}
let _ = event_tx.try_send(HubEvent::EdgeDisconnected {
edge_id: edge_id.clone(),
@@ -1207,14 +1238,14 @@ async fn handle_edge_connection_quic(
let secret = parts[2];
// Verify credentials
let (listen_ports, listen_ports_udp, stun_interval_secs) = {
let (listen_ports, listen_ports_udp, stun_interval_secs, firewall_config) = {
let edges = allowed.read().await;
match edges.get(&edge_id) {
Some(edge) => {
if !constant_time_eq(secret.as_bytes(), edge.secret.as_bytes()) {
return Err(format!("invalid secret for edge {}", edge_id).into());
}
(edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300))
(edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300), edge.firewall_config.clone())
}
None => return Err(format!("unknown edge {}", edge_id).into()),
}
@@ -1231,6 +1262,7 @@ async fn handle_edge_connection_quic(
listen_ports: listen_ports.clone(),
listen_ports_udp: listen_ports_udp.clone(),
stun_interval_secs,
firewall_config,
};
let mut handshake_json = serde_json::to_string(&handshake)?;
handshake_json.push('\n');
@@ -1367,6 +1399,8 @@ async fn handle_edge_connection_quic(
let sessions = dgram_sessions.clone();
let session_token = dgram_token.child_token();
let (tx, mut rx) = mpsc::channel::<Bytes>(256);
let proxy_v2_data: Vec<u8> = proxy_data.to_vec();
let cleanup_sessions = sessions.clone();
{
let mut s = sessions.lock().await;
@@ -1378,11 +1412,20 @@ async fn handle_edge_connection_quic(
Ok(s) => Arc::new(s),
Err(e) => {
log::error!("QUIC UDP session {} bind failed: {}", session_id, e);
cleanup_sessions.lock().await.remove(&session_id);
return;
}
};
if let Err(e) = upstream.connect((target.as_str(), dest_port)).await {
log::error!("QUIC UDP session {} connect failed: {}", session_id, e);
cleanup_sessions.lock().await.remove(&session_id);
return;
}
// Send PROXY v2 header as first datagram so SmartProxy knows the original client
if let Err(e) = upstream.send(&proxy_v2_data).await {
log::error!("QUIC UDP session {} failed to send PROXY v2 header: {}", session_id, e);
cleanup_sessions.lock().await.remove(&session_id);
return;
}
@@ -1425,6 +1468,8 @@ async fn handle_edge_connection_quic(
}
}
recv_handle.abort();
// Clean up session entry to prevent memory leak
cleanup_sessions.lock().await.remove(&session_id);
});
continue;
@@ -1520,7 +1565,11 @@ async fn handle_edge_connection_quic(
{
let mut edges = connected.lock().await;
edges.remove(&edge_id);
// Only remove if the entry is still ours (not replaced by a reconnection).
// A replaced entry has a fresh non-cancelled token from the new handler.
if edges.get(&edge_id).map_or(false, |e| e.cancel_token.is_cancelled()) {
edges.remove(&edge_id);
}
}
let _ = event_tx.try_send(HubEvent::EdgeDisconnected {
edge_id,
@@ -1568,6 +1617,12 @@ async fn handle_quic_stream(
};
let _ = upstream.set_nodelay(true);
// TCP keepalive detects silent failures on the hub→SmartProxy connection
let ka = socket2::TcpKeepalive::new()
.with_time(Duration::from_secs(30));
#[cfg(target_os = "linux")]
let ka = ka.with_interval(Duration::from_secs(10));
let _ = socket2::SockRef::from(&upstream).set_tcp_keepalive(&ka);
// Send PROXY header to SmartProxy
if let Err(e) = upstream.write_all(proxy_header.as_bytes()).await {
log::error!("QUIC stream {} failed to write PROXY header to upstream: {}", stream_id, e);
@@ -1578,7 +1633,7 @@ async fn handle_quic_stream(
// Task: QUIC -> upstream (edge data to SmartProxy)
let writer_token = stream_token.clone();
let writer_task = tokio::spawn(async move {
let mut writer_task = tokio::spawn(async move {
let mut buf = vec![0u8; 32768];
loop {
tokio::select! {
@@ -1629,7 +1684,11 @@ async fn handle_quic_stream(
// Gracefully close the QUIC send stream
let _ = quic_send.finish();
writer_task.abort();
// Give the writer task 2s to shut down gracefully (sends TCP FIN
// via up_write.shutdown()) before force-aborting (which causes RST).
if tokio::time::timeout(Duration::from_secs(2), &mut writer_task).await.is_err() {
writer_task.abort();
}
}
#[cfg(test)]
@@ -1739,6 +1798,7 @@ mod tests {
listen_ports: vec![443, 8080],
listen_ports_udp: vec![],
stun_interval_secs: 300,
firewall_config: None,
};
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["listenPorts"], serde_json::json!([443, 8080]));
@@ -1753,6 +1813,7 @@ mod tests {
let update = EdgeConfigUpdate {
listen_ports: vec![80, 443],
listen_ports_udp: vec![53],
firewall_config: None,
};
let json = serde_json::to_value(&update).unwrap();
assert_eq!(json["listenPorts"], serde_json::json!([80, 443]));

View File

@@ -0,0 +1,228 @@
import { expect, tap } from '@push.rocks/tapbundle';
import * as net from 'net';
import * as crypto from 'crypto';
import { RemoteIngressHub, RemoteIngressEdge } from '../ts/index.js';
// ---------------------------------------------------------------------------
// Helpers (same patterns as test.quic.node.ts)
// ---------------------------------------------------------------------------
async function findFreePorts(count: number): Promise<number[]> {
const servers: net.Server[] = [];
const ports: number[] = [];
for (let i = 0; i < count; i++) {
const server = net.createServer();
await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve));
ports.push((server.address() as net.AddressInfo).port);
servers.push(server);
}
await Promise.all(servers.map((s) => new Promise<void>((resolve) => s.close(() => resolve()))));
return ports;
}
type TrackingServer = net.Server & { destroyAll: () => void };
function startEchoServer(port: number, host: string): Promise<TrackingServer> {
return new Promise((resolve, reject) => {
const connections = new Set<net.Socket>();
const server = net.createServer((socket) => {
connections.add(socket);
socket.on('close', () => connections.delete(socket));
let proxyHeaderParsed = false;
let pendingBuf = Buffer.alloc(0);
socket.on('data', (data: Buffer) => {
if (!proxyHeaderParsed) {
pendingBuf = Buffer.concat([pendingBuf, data]);
const idx = pendingBuf.indexOf('\r\n');
if (idx !== -1) {
proxyHeaderParsed = true;
const remainder = pendingBuf.subarray(idx + 2);
if (remainder.length > 0) socket.write(remainder);
}
return;
}
socket.write(data);
});
socket.on('error', () => {});
}) as TrackingServer;
server.destroyAll = () => {
for (const conn of connections) conn.destroy();
connections.clear();
};
server.on('error', reject);
server.listen(port, host, () => resolve(server));
});
}
async function forceCloseServer(server: TrackingServer): Promise<void> {
server.destroyAll();
await new Promise<void>((resolve) => server.close(() => resolve()));
}
function sendAndReceive(port: number, data: Buffer, timeoutMs = 30000): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
let totalReceived = 0;
const expectedLength = data.length;
let settled = false;
const client = net.createConnection({ host: '127.0.0.1', port }, () => {
client.write(data);
client.end();
});
const timer = setTimeout(() => {
if (!settled) {
settled = true;
client.destroy();
reject(new Error(`Timeout after ${timeoutMs}ms — received ${totalReceived}/${expectedLength} bytes`));
}
}, timeoutMs);
client.on('data', (chunk: Buffer) => {
chunks.push(chunk);
totalReceived += chunk.length;
if (totalReceived >= expectedLength && !settled) {
settled = true;
clearTimeout(timer);
client.destroy();
resolve(Buffer.concat(chunks));
}
});
client.on('end', () => {
if (!settled) {
settled = true;
clearTimeout(timer);
resolve(Buffer.concat(chunks));
}
});
client.on('error', (err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
reject(err);
}
});
});
}
function sha256(buf: Buffer): string {
return crypto.createHash('sha256').update(buf).digest('hex');
}
// ---------------------------------------------------------------------------
// QUIC Long-Running Stability Test — 2 minutes
// ---------------------------------------------------------------------------
let hub: RemoteIngressHub;
let edge: RemoteIngressEdge;
let echoServer: TrackingServer;
let hubPort: number;
let edgePort: number;
let disconnectCount = 0;
tap.test('QUIC stability setup: start echo server and QUIC tunnel', async () => {
[hubPort, edgePort] = await findFreePorts(2);
echoServer = await startEchoServer(edgePort, '127.0.0.2');
hub = new RemoteIngressHub();
edge = new RemoteIngressEdge();
await hub.start({
tunnelPort: hubPort,
targetHost: '127.0.0.2',
});
await hub.updateAllowedEdges([
{ id: 'test-edge', secret: 'test-secret', listenPorts: [edgePort] },
]);
const connectedPromise = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('QUIC edge did not connect within 10s')), 10000);
edge.once('tunnelConnected', () => {
clearTimeout(timeout);
resolve();
});
});
// Track disconnects — any disconnect during the test is a failure signal
edge.on('tunnelDisconnected', () => {
disconnectCount++;
console.log(`[STABILITY] Unexpected tunnel disconnect #${disconnectCount}`);
});
await edge.start({
hubHost: '127.0.0.1',
hubPort,
edgeId: 'test-edge',
secret: 'test-secret',
bindAddress: '127.0.0.1',
transportMode: 'quic',
});
await connectedPromise;
await new Promise((resolve) => setTimeout(resolve, 500));
const status = await edge.getStatus();
expect(status.connected).toBeTrue();
});
tap.test('QUIC stability: tunnel stays alive for 30s with periodic echo probes', async () => {
const testDurationMs = 30_000; // 30 seconds
const probeIntervalMs = 5_000; // probe every 5 seconds
const startTime = Date.now();
let probeCount = 0;
let failedProbes = 0;
while (Date.now() - startTime < testDurationMs) {
probeCount++;
const elapsed = Math.round((Date.now() - startTime) / 1000);
// Verify edge still reports connected
const status = await edge.getStatus();
if (!status.connected) {
throw new Error(`Tunnel disconnected at ${elapsed}s (probe #${probeCount})`);
}
// Send a 4KB echo probe through the tunnel
const data = crypto.randomBytes(4096);
const hash = sha256(data);
try {
const received = await sendAndReceive(edgePort, data, 10000);
if (received.length !== 4096 || sha256(received) !== hash) {
failedProbes++;
console.log(`[STABILITY] Probe #${probeCount} at ${elapsed}s: data mismatch`);
} else {
console.log(`[STABILITY] Probe #${probeCount} at ${elapsed}s: OK`);
}
} catch (err) {
failedProbes++;
console.log(`[STABILITY] Probe #${probeCount} at ${elapsed}s: FAILED — ${err}`);
}
// Wait for next probe interval
const remaining = testDurationMs - (Date.now() - startTime);
if (remaining > 0) {
await new Promise((resolve) => setTimeout(resolve, Math.min(probeIntervalMs, remaining)));
}
}
console.log(`[STABILITY] Completed: ${probeCount} probes, ${failedProbes} failures, ${disconnectCount} disconnects`);
expect(failedProbes).toEqual(0);
expect(disconnectCount).toEqual(0);
// Final status check
const finalStatus = await edge.getStatus();
expect(finalStatus.connected).toBeTrue();
});
tap.test('QUIC stability teardown', async () => {
await edge.stop();
await hub.stop();
await forceCloseServer(echoServer);
});
export default tap.start();

View File

@@ -29,15 +29,16 @@ async function findFreePorts(count: number): Promise<number[]> {
function startUdpEchoServer(port: number, host: string): Promise<dgram.Socket> {
return new Promise((resolve, reject) => {
const server = dgram.createSocket('udp4');
let proxyHeaderReceived = false;
// Track which source endpoints have sent their PROXY v2 header.
// The hub sends a 28-byte PROXY v2 header as the first datagram per session.
const seenSources = new Set<string>();
server.on('message', (msg, rinfo) => {
if (!proxyHeaderReceived) {
// First datagram is the PROXY v2 header (28 bytes for IPv4)
// In the current implementation, the hub connects directly via UDP
// so the first real datagram is the actual data (no PROXY header yet)
// For now, just echo everything back
proxyHeaderReceived = true;
const sourceKey = `${rinfo.address}:${rinfo.port}`;
if (!seenSources.has(sourceKey)) {
seenSources.add(sourceKey);
// First datagram from this source is the PROXY v2 header — skip it
return;
}
// Echo back
server.send(msg, rinfo.port, rinfo.address);

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@serve.zone/remoteingress',
version: '4.12.0',
description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.'
version: '4.15.2',
description: 'Edge ingress tunnel for DcRouter - tunnels TCP and UDP traffic from the network edge to SmartProxy over TLS or QUIC, preserving client IP via PROXY protocol.'
}

View File

@@ -1,6 +1,7 @@
import * as plugins from './plugins.js';
import { EventEmitter } from 'events';
import { decodeConnectionToken } from './classes.token.js';
import type { IFirewallConfig } from './classes.remoteingresshub.js';
// Command map for the edge side of remoteingress-bin
type TEdgeCommands = {
@@ -55,6 +56,7 @@ export class RemoteIngressEdge extends EventEmitter {
private restartBackoffMs = 1000;
private restartAttempts = 0;
private statusInterval: ReturnType<typeof setInterval> | undefined;
private nft: InstanceType<typeof plugins.smartnftables.SmartNftables> | null = null;
constructor() {
super();
@@ -79,6 +81,15 @@ export class RemoteIngressEdge extends EventEmitter {
plugins.path.join(packageDir, 'rust', 'target', 'debug', 'remoteingress-bin'),
],
searchSystemPath: false,
logger: {
log: (level: string, message: string) => {
if (level === 'error') {
console.error(`[RemoteIngressEdge] ${message}`);
} else {
console.log(`[RemoteIngressEdge] ${message}`);
}
},
},
});
// Forward events from Rust binary
@@ -101,6 +112,83 @@ export class RemoteIngressEdge extends EventEmitter {
console.log(`[RemoteIngressEdge] Ports updated by hub: ${data.listenPorts.join(', ')}`);
this.emit('portsUpdated', data);
});
this.bridge.on('management:firewallConfigUpdated', (data: { firewallConfig: IFirewallConfig }) => {
console.log(`[RemoteIngressEdge] Firewall config updated from hub`);
this.applyFirewallConfig(data.firewallConfig);
this.emit('firewallConfigUpdated', data);
});
}
/**
* Initialize the nftables manager. Fails gracefully if not running as root.
*/
private async initNft(): Promise<void> {
try {
this.nft = new plugins.smartnftables.SmartNftables({
tableName: 'remoteingress',
dryRun: false,
});
await this.nft.initialize();
console.log('[RemoteIngressEdge] SmartNftables initialized');
} catch (err) {
console.warn(`[RemoteIngressEdge] Failed to initialize nftables (not root?): ${err}`);
this.nft = null;
}
}
/**
* Apply firewall configuration received from the hub.
* Performs a full replacement: cleans up existing rules, then applies the new config.
*/
private async applyFirewallConfig(config: IFirewallConfig): Promise<void> {
if (!this.nft) {
return;
}
try {
// Full cleanup and reinitialize to replace all rules atomically
await this.nft.cleanup();
await this.nft.initialize();
// Apply blocked IPs
if (config.blockedIps && config.blockedIps.length > 0) {
for (const ip of config.blockedIps) {
await this.nft.firewall.blockIP(ip);
}
console.log(`[RemoteIngressEdge] Blocked ${config.blockedIps.length} IPs`);
}
// Apply rate limits
if (config.rateLimits && config.rateLimits.length > 0) {
for (const rl of config.rateLimits) {
await this.nft.rateLimit.addRateLimit(rl.id, {
port: rl.port,
protocol: rl.protocol,
rate: rl.rate,
burst: rl.burst,
perSourceIP: rl.perSourceIP,
});
}
console.log(`[RemoteIngressEdge] Applied ${config.rateLimits.length} rate limits`);
}
// Apply firewall rules
if (config.rules && config.rules.length > 0) {
for (const rule of config.rules) {
await this.nft.firewall.addRule(rule.id, {
direction: rule.direction,
action: rule.action,
sourceIP: rule.sourceIP,
destPort: rule.destPort,
protocol: rule.protocol,
comment: rule.comment,
});
}
console.log(`[RemoteIngressEdge] Applied ${config.rules.length} firewall rules`);
}
} catch (err) {
console.error(`[RemoteIngressEdge] Failed to apply firewall config: ${err}`);
}
}
/**
@@ -130,7 +218,8 @@ export class RemoteIngressEdge extends EventEmitter {
throw new Error('Failed to spawn remoteingress-bin');
}
// Register crash recovery handler
// Register crash recovery handler (remove first to avoid duplicates)
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.on('exit', this.handleCrashRecovery);
await this.bridge.sendCommand('startEdge', {
@@ -146,6 +235,9 @@ export class RemoteIngressEdge extends EventEmitter {
this.restartAttempts = 0;
this.restartBackoffMs = 1000;
// Initialize nftables (graceful degradation if not root)
await this.initNft();
// Start periodic status logging
this.statusInterval = setInterval(async () => {
try {
@@ -170,6 +262,15 @@ export class RemoteIngressEdge extends EventEmitter {
clearInterval(this.statusInterval);
this.statusInterval = undefined;
}
// Clean up nftables rules before stopping
if (this.nft) {
try {
await this.nft.cleanup();
} catch (err) {
console.warn(`[RemoteIngressEdge] nftables cleanup error: ${err}`);
}
this.nft = null;
}
if (this.started) {
try {
await this.bridge.sendCommand('stopEdge', {} as Record<string, never>);
@@ -180,6 +281,7 @@ export class RemoteIngressEdge extends EventEmitter {
this.bridge.kill();
this.started = false;
}
this.savedConfig = null;
}
/**
@@ -211,6 +313,12 @@ export class RemoteIngressEdge extends EventEmitter {
this.started = false;
// Clear orphaned status interval from previous run
if (this.statusInterval) {
clearInterval(this.statusInterval);
this.statusInterval = undefined;
}
if (this.restartAttempts >= MAX_RESTART_ATTEMPTS) {
console.error('[RemoteIngressEdge] Max restart attempts reached, giving up');
this.emit('crashRecoveryFailed');
@@ -228,6 +336,7 @@ export class RemoteIngressEdge extends EventEmitter {
return;
}
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.on('exit', this.handleCrashRecovery);
await this.bridge.sendCommand('startEdge', {
@@ -242,6 +351,24 @@ export class RemoteIngressEdge extends EventEmitter {
this.started = true;
this.restartAttempts = 0;
this.restartBackoffMs = 1000;
// Re-initialize nftables (hub will re-push config via handshake)
await this.initNft();
// Restart periodic status logging
this.statusInterval = setInterval(async () => {
try {
const status = await this.getStatus();
console.log(
`[RemoteIngressEdge] Status: connected=${status.connected}, ` +
`streams=${status.activeStreams}, ports=[${status.listenPorts.join(',')}], ` +
`publicIp=${status.publicIp ?? 'unknown'}`
);
} catch {
// Bridge may be shutting down
}
}, 60_000);
console.log('[RemoteIngressEdge] Successfully recovered from crash');
this.emit('crashRecovered');
} catch (err) {

View File

@@ -22,7 +22,7 @@ type THubCommands = {
};
updateAllowedEdges: {
params: {
edges: Array<{ id: string; secret: string; listenPorts?: number[]; listenPortsUdp?: number[]; stunIntervalSecs?: number }>;
edges: Array<{ id: string; secret: string; listenPorts?: number[]; listenPortsUdp?: number[]; stunIntervalSecs?: number; firewallConfig?: IFirewallConfig }>;
};
result: { updated: boolean };
};
@@ -41,6 +41,31 @@ type THubCommands = {
};
};
export interface IFirewallRateLimit {
id: string;
port: number;
protocol?: 'tcp' | 'udp';
rate: string;
burst?: number;
perSourceIP?: boolean;
}
export interface IFirewallRule {
id: string;
direction: 'input' | 'output' | 'forward';
action: 'accept' | 'drop' | 'reject';
sourceIP?: string;
destPort?: number;
protocol?: 'tcp' | 'udp';
comment?: string;
}
export interface IFirewallConfig {
blockedIps?: string[];
rateLimits?: IFirewallRateLimit[];
rules?: IFirewallRule[];
}
export interface IHubConfig {
tunnelPort?: number;
targetHost?: string;
@@ -50,7 +75,7 @@ export interface IHubConfig {
};
}
type TAllowedEdge = { id: string; secret: string; listenPorts?: number[]; listenPortsUdp?: number[]; stunIntervalSecs?: number };
type TAllowedEdge = { id: string; secret: string; listenPorts?: number[]; listenPortsUdp?: number[]; stunIntervalSecs?: number; firewallConfig?: IFirewallConfig };
const MAX_RESTART_ATTEMPTS = 10;
const MAX_RESTART_BACKOFF_MS = 30_000;
@@ -87,6 +112,15 @@ export class RemoteIngressHub extends EventEmitter {
plugins.path.join(packageDir, 'rust', 'target', 'debug', 'remoteingress-bin'),
],
searchSystemPath: false,
logger: {
log: (level: string, message: string) => {
if (level === 'error') {
console.error(`[RemoteIngressHub] ${message}`);
} else {
console.log(`[RemoteIngressHub] ${message}`);
}
},
},
});
// Forward events from Rust binary
@@ -118,7 +152,8 @@ export class RemoteIngressHub extends EventEmitter {
throw new Error('Failed to spawn remoteingress-bin');
}
// Register crash recovery handler
// Register crash recovery handler (remove first to avoid duplicates)
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.on('exit', this.handleCrashRecovery);
await this.bridge.sendCommand('startHub', {
@@ -149,6 +184,8 @@ export class RemoteIngressHub extends EventEmitter {
this.bridge.kill();
this.started = false;
}
this.savedConfig = null;
this.savedEdges = [];
}
/**
@@ -205,6 +242,7 @@ export class RemoteIngressHub extends EventEmitter {
return;
}
this.bridge.removeListener('exit', this.handleCrashRecovery);
this.bridge.on('exit', this.handleCrashRecovery);
const config = this.savedConfig;

View File

@@ -3,5 +3,6 @@ import * as path from 'path';
export { path };
// @push.rocks scope
import * as smartnftables from '@push.rocks/smartnftables';
import * as smartrust from '@push.rocks/smartrust';
export { smartrust };
export { smartnftables, smartrust };

View File

@@ -6,7 +6,8 @@
"module": "NodeNext",
"moduleResolution": "NodeNext",
"esModuleInterop": true,
"verbatimModuleSyntax": true
"verbatimModuleSyntax": true,
"types": ["node"]
},
"exclude": [
"dist_*/**/*.d.ts"