Compare commits

...

8 Commits

Author SHA1 Message Date
c7641853cf v4.13.1
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 23:28:38 +00:00
6e2025db3e fix(remoteingress-core): default edge transport mode to QUIC with fallback 2026-03-19 23:28:38 +00:00
693031ecdd v4.13.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 14:43:42 +00:00
a2cdadc5e3 feat(docs): document TCP and UDP tunneling over TLS and QUIC 2026-03-19 14:43:42 +00:00
948032fc9e v4.12.1
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 14:09:32 +00:00
a400945371 fix(remoteingress-core): send PROXY v2 headers for UDP upstream sessions and expire idle UDP sessions 2026-03-19 14:09:32 +00:00
bc89e49f39 v4.12.0
Some checks failed
Default (tags) / security (push) Failing after 4s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 12:19:58 +00:00
2087567f15 feat(remoteingress-core): add UDP tunneling over QUIC datagrams and expand transport-specific test coverage 2026-03-19 12:19:58 +00:00
11 changed files with 619 additions and 205 deletions

View File

@@ -1,5 +1,30 @@
# Changelog
## 2026-03-19 - 4.13.1 - fix(remoteingress-core)
default edge transport mode to QUIC with fallback
- Changes the default transport mode in edge connections from TCP/TLS to QUIC with fallback when no transport mode is explicitly configured.
## 2026-03-19 - 4.13.0 - feat(docs)
document TCP and UDP tunneling over TLS and QUIC
- update package description to reflect TCP and UDP support and TLS or QUIC transports
- refresh README architecture, features, and usage examples for UDP forwarding, QUIC transport, and PROXY protocol v1/v2 support
## 2026-03-19 - 4.12.1 - fix(remoteingress-core)
send PROXY v2 headers for UDP upstream sessions and expire idle UDP sessions
- Adds periodic idle UDP session expiry in edge tunnel and QUIC loops, including UDP close signaling for expired tunnel sessions.
- Sends the PROXY v2 header as the first datagram for UDP upstream connections in both standard and QUIC hub paths.
- Updates the UDP node test server to ignore the initial PROXY v2 datagram per source before echoing payload traffic.
## 2026-03-19 - 4.12.0 - feat(remoteingress-core)
add UDP tunneling over QUIC datagrams and expand transport-specific test coverage
- Implement QUIC datagram-based UDP forwarding on both edge and hub, including session setup, payload routing, and listener cleanup
- Enable QUIC datagram receive buffers in client and server transport configuration
- Add UDP-over-QUIC tests and clarify existing test names to distinguish TCP/TLS, UDP/TLS, and QUIC scenarios
## 2026-03-19 - 4.11.0 - feat(remoteingress-core)
add UDP tunneling support between edge and hub

View File

@@ -1,8 +1,8 @@
{
"name": "@serve.zone/remoteingress",
"version": "4.11.0",
"version": "4.13.1",
"private": false,
"description": "Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.",
"description": "Edge ingress tunnel for DcRouter - tunnels TCP and UDP traffic from the network edge to SmartProxy over TLS or QUIC, preserving client IP via PROXY protocol.",
"main": "dist_ts/index.js",
"typings": "dist_ts/index.d.ts",
"type": "module",

305
readme.md
View File

@@ -1,6 +1,6 @@
# @serve.zone/remoteingress
Edge ingress tunnel for DcRouter — accepts incoming TCP connections at the network edge and tunnels them over a single encrypted TLS connection to a DcRouter SmartProxy instance, preserving the original client IP via PROXY protocol v1.
Edge ingress tunnel for DcRouter — tunnels **TCP and UDP** traffic from the network edge to a private DcRouter/SmartProxy cluster over encrypted TLS or QUIC connections, preserving the original client IP via PROXY protocol.
## Issue Reporting and Security
@@ -17,43 +17,46 @@ pnpm install @serve.zone/remoteingress
`@serve.zone/remoteingress` uses a **Hub/Edge** topology with a high-performance Rust core and a TypeScript API surface:
```
┌─────────────────────┐ TLS Tunnel ┌─────────────────────┐
┌─────────────────────┐ TLS or QUIC Tunnel ┌─────────────────────┐
│ Network Edge │ ◄══════════════════════════► │ Private Cluster │
│ │ (multiplexed frames + │ │
│ RemoteIngressEdge │ shared-secret auth) │ RemoteIngressHub │
Accepts client TCP │ │ Forwards to
connections on │ │ SmartProxy on
│ hub-assigned ports │ │ local ports
│ │ TCP+TLS: frame mux │ │
│ RemoteIngressEdge │ QUIC: native streams │ RemoteIngressHub │
UDP: QUIC datagrams │
Accepts TCP & UDP │ │ Forwards to
on hub-assigned │ │ SmartProxy on
│ ports │ │ local ports │
└─────────────────────┘ └─────────────────────┘
▲ │
│ TCP from end users
│ TCP + UDP from end users ▼
Internet DcRouter / SmartProxy
```
| Component | Role |
|-----------|------|
| **RemoteIngressEdge** | Deployed at the network edge (e.g. a VPS or cloud instance). Listens on ports assigned by the hub, accepts raw TCP connections, and multiplexes them over a single TLS tunnel to the hub. Ports are hot-reloadable — the hub can change them at runtime. |
| **RemoteIngressHub** | Deployed alongside DcRouter/SmartProxy in a private cluster. Accepts edge connections, demuxes streams, and forwards each to SmartProxy with a PROXY protocol v1 header so the real client IP is preserved. Controls which ports each edge listens on. |
| **RemoteIngressEdge** | Deployed at the network edge (VPS, cloud instance). Listens on TCP and UDP ports assigned by the hub, accepts connections/datagrams, and tunnels them to the hub. Ports are hot-reloadable at runtime. |
| **RemoteIngressHub** | Deployed alongside DcRouter/SmartProxy in a private cluster. Accepts edge connections, demuxes streams/datagrams, and forwards each to SmartProxy with PROXY protocol headers so the real client IP is preserved. |
| **Rust Binary** (`remoteingress-bin`) | The performance-critical networking core. Managed via `@push.rocks/smartrust` RustBridge IPC — you never interact with it directly. Cross-compiled for `linux/amd64` and `linux/arm64`. |
### ✨ Key Features
- 🔒 **TLS-encrypted tunnel** between edge and hub (auto-generated self-signed cert or bring your own)
- 🔀 **Multiplexed streams** — thousands of client connections flow over a single tunnel
- 🌐 **PROXY protocol v1** — SmartProxy sees the real client IP, not the tunnel IP
- 🔒 **Dual transport** — choose between TCP+TLS (frame-multiplexed) or QUIC (native stream multiplexing, zero head-of-line blocking)
- 🌐 **TCP + UDP tunneling** — tunnel any TCP connection or UDP datagram through the same edge/hub pair
- 📋 **PROXY protocol v1 & v2** — SmartProxy sees the real client IP for both TCP (v1 text) and UDP (v2 binary)
- 🔀 **Multiplexed streams** — thousands of concurrent TCP connections over a single tunnel
-**QUIC datagrams** — UDP traffic forwarded via QUIC unreliable datagrams for lowest possible latency
- 🔑 **Shared-secret authentication** — edges must present valid credentials to connect
- 🎫 **Connection tokens** — encode all connection details into a single opaque string
- 📡 **STUN-based public IP discovery** the edge automatically discovers its public IP via Cloudflare STUN
- 🎫 **Connection tokens** — encode all connection details into a single opaque base64url string
- 📡 **STUN-based public IP discovery** — edges automatically discover their public IP via Cloudflare STUN
- 🔄 **Auto-reconnect** with exponential backoff if the tunnel drops
- 🎛️ **Dynamic port configuration** — the hub assigns listen ports per edge and can hot-reload them at runtime via `FRAME_CONFIG` frames
- 🎛️ **Dynamic port configuration** — the hub assigns TCP and UDP listen ports per edge, hot-reloadable at runtime
- 📣 **Event-driven** — both Hub and Edge extend `EventEmitter` for real-time monitoring
-**Rust core** — all frame encoding, TLS, and TCP proxying happen in native code for maximum throughput
- 🎚️ **3-tier QoS** — control frames, normal data, and sustained (elephant flow) traffic each get their own priority queue
- 📊 **Adaptive flow control** — per-stream windows scale with active stream count to prevent memory overuse
- 🕒 **UDP session management** — automatic session tracking with 60s idle timeout and cleanup
## 🚀 Usage
Both classes are imported from the package and communicate with the Rust binary under the hood. All you need to do is configure and start them.
Both classes are imported from the package and communicate with the Rust binary under the hood.
### Setting Up the Hub (Private Cluster Side)
@@ -63,32 +66,25 @@ import { RemoteIngressHub } from '@serve.zone/remoteingress';
const hub = new RemoteIngressHub();
// Listen for events
hub.on('edgeConnected', ({ edgeId }) => {
console.log(`Edge ${edgeId} connected`);
});
hub.on('edgeDisconnected', ({ edgeId }) => {
console.log(`Edge ${edgeId} disconnected`);
});
hub.on('streamOpened', ({ edgeId, streamId }) => {
console.log(`Stream ${streamId} opened from edge ${edgeId}`);
});
hub.on('streamClosed', ({ edgeId, streamId }) => {
console.log(`Stream ${streamId} closed from edge ${edgeId}`);
});
hub.on('edgeConnected', ({ edgeId }) => console.log(`Edge ${edgeId} connected`));
hub.on('edgeDisconnected', ({ edgeId }) => console.log(`Edge ${edgeId} disconnected`));
hub.on('streamOpened', ({ edgeId, streamId }) => console.log(`Stream ${streamId} from ${edgeId}`));
hub.on('streamClosed', ({ edgeId, streamId }) => console.log(`Stream ${streamId} closed`));
// Start the hub — it will listen for incoming edge TLS connections
// Start the hub — listens for edge connections on both TCP and QUIC (same port)
await hub.start({
tunnelPort: 8443, // port edges connect to (default: 8443)
targetHost: '127.0.0.1', // SmartProxy host to forward streams to (default: 127.0.0.1)
targetHost: '127.0.0.1', // SmartProxy host to forward traffic to
});
// Register which edges are allowed to connect, including their listen ports
// Register allowed edges with TCP and UDP listen ports
await hub.updateAllowedEdges([
{
id: 'edge-nyc-01',
secret: 'supersecrettoken1',
listenPorts: [80, 443], // ports the edge should listen on
stunIntervalSecs: 300, // STUN discovery interval (default: 300)
listenPorts: [80, 443], // TCP ports the edge should listen on
listenPortsUdp: [53, 51820], // UDP ports (e.g., DNS, WireGuard)
stunIntervalSecs: 300,
},
{
id: 'edge-fra-02',
@@ -97,38 +93,29 @@ await hub.updateAllowedEdges([
},
]);
// Dynamically update ports for a connected edge — changes are pushed instantly
// Dynamically update ports — changes are pushed instantly to connected edges
await hub.updateAllowedEdges([
{
id: 'edge-nyc-01',
secret: 'supersecrettoken1',
listenPorts: [80, 443, 8443], // added port 8443 — edge picks it up in real time
listenPorts: [80, 443, 8443], // added TCP port 8443
listenPortsUdp: [53], // removed WireGuard UDP port
},
]);
// Check status at any time
// Check status
const status = await hub.getStatus();
console.log(status);
// {
// running: true,
// tunnelPort: 8443,
// connectedEdges: [
// { edgeId: 'edge-nyc-01', connectedAt: 1700000000, activeStreams: 12 }
// ]
// }
// { running: true, tunnelPort: 8443, connectedEdges: [...] }
// Graceful shutdown
await hub.stop();
```
### Setting Up the Edge (Network Edge Side)
The edge can be configured in two ways: with an **opaque connection token** (recommended) or with explicit config fields.
The edge can connect via **TCP+TLS** (default) or **QUIC** transport.
#### Option A: Connection Token (Recommended)
A single token encodes all connection details — ideal for provisioning edges at scale:
```typescript
import { RemoteIngressEdge } from '@serve.zone/remoteingress';
@@ -137,79 +124,64 @@ const edge = new RemoteIngressEdge();
edge.on('tunnelConnected', () => console.log('Tunnel established'));
edge.on('tunnelDisconnected', () => console.log('Tunnel lost — will auto-reconnect'));
edge.on('publicIpDiscovered', ({ ip }) => console.log(`Public IP: ${ip}`));
edge.on('portsAssigned', ({ listenPorts }) => console.log(`Listening on ports: ${listenPorts}`));
edge.on('portsUpdated', ({ listenPorts }) => console.log(`Ports updated: ${listenPorts}`));
edge.on('portsAssigned', ({ listenPorts }) => console.log(`TCP ports: ${listenPorts}`));
// Single token contains hubHost, hubPort, edgeId, and secret
await edge.start({
token: 'eyJoIjoiaHViLmV4YW1wbGUuY29tIiwicCI6ODQ0MywiZSI6ImVkZ2UtbnljLTAxIiwicyI6InN1cGVyc2VjcmV0dG9rZW4xIn0',
token: 'eyJoIjoiaHViLmV4YW1wbGUuY29tIiwi...',
});
```
#### Option B: Explicit Config
#### Option B: Explicit Config with QUIC Transport
```typescript
import { RemoteIngressEdge } from '@serve.zone/remoteingress';
const edge = new RemoteIngressEdge();
edge.on('tunnelConnected', () => console.log('Tunnel established'));
edge.on('tunnelDisconnected', () => console.log('Tunnel lost — will auto-reconnect'));
edge.on('publicIpDiscovered', ({ ip }) => console.log(`Public IP: ${ip}`));
edge.on('portsAssigned', ({ listenPorts }) => console.log(`Listening on ports: ${listenPorts}`));
edge.on('portsUpdated', ({ listenPorts }) => console.log(`Ports updated: ${listenPorts}`));
await edge.start({
hubHost: 'hub.example.com', // hostname or IP of the hub
hubPort: 8443, // must match hub's tunnelPort (default: 8443)
edgeId: 'edge-nyc-01', // unique edge identifier
secret: 'supersecrettoken1', // must match the hub's allowed edge secret
hubHost: 'hub.example.com',
hubPort: 8443,
edgeId: 'edge-nyc-01',
secret: 'supersecrettoken1',
transportMode: 'quic', // 'tcpTls' (default) | 'quic' | 'quicWithFallback'
});
// Check status at any time
const edgeStatus = await edge.getStatus();
console.log(edgeStatus);
// {
// running: true,
// connected: true,
// publicIp: '203.0.113.42',
// activeStreams: 5,
// listenPorts: [80, 443]
// }
// { running: true, connected: true, publicIp: '203.0.113.42', activeStreams: 5, listenPorts: [80, 443] }
// Graceful shutdown
await edge.stop();
```
#### Transport Modes
| Mode | Description |
|------|-------------|
| `'tcpTls'` | **Default.** Single TLS connection with frame-based multiplexing. Universal compatibility. |
| `'quic'` | QUIC with native stream multiplexing. Eliminates head-of-line blocking. Uses QUIC datagrams for UDP traffic. |
| `'quicWithFallback'` | Tries QUIC first (5s timeout), falls back to TCP+TLS if UDP is blocked by the network. |
### 🎫 Connection Tokens
Connection tokens let you distribute a single opaque string instead of four separate config values. The hub operator generates the token; the edge operator just pastes it in.
Encode all connection details into a single opaque string for easy distribution:
```typescript
import { encodeConnectionToken, decodeConnectionToken } from '@serve.zone/remoteingress';
// Hub side: generate a token for a new edge
// Hub operator generates a token
const token = encodeConnectionToken({
hubHost: 'hub.example.com',
hubPort: 8443,
edgeId: 'edge-nyc-01',
secret: 'supersecrettoken1',
});
console.log(token);
// => 'eyJoIjoiaHViLmV4YW1wbGUuY29tIiwi...'
// Edge side: inspect a token (optional — start() does this automatically)
// Edge operator decodes (optional — start() does this automatically)
const data = decodeConnectionToken(token);
console.log(data);
// {
// hubHost: 'hub.example.com',
// hubPort: 8443,
// edgeId: 'edge-nyc-01',
// secret: 'supersecrettoken1'
// }
// { hubHost: 'hub.example.com', hubPort: 8443, edgeId: 'edge-nyc-01', secret: '...' }
```
Tokens are base64url-encoded (URL-safe, no padding) — safe to pass as environment variables, CLI arguments, or store in config files.
Tokens are base64url-encoded — safe for environment variables, CLI arguments, and config files.
## 📖 API Reference
@@ -217,10 +189,10 @@ Tokens are base64url-encoded (URL-safe, no padding) — safe to pass as environm
| Method / Property | Description |
|-------------------|-------------|
| `start(config?)` | Spawns the Rust binary and starts the tunnel listener. Config: `{ tunnelPort?: number, targetHost?: string }` |
| `stop()` | Gracefully shuts down the hub and kills the Rust process. |
| `updateAllowedEdges(edges)` | Dynamically update which edges are authorized and what ports they listen on. Each edge: `{ id: string, secret: string, listenPorts?: number[], stunIntervalSecs?: number }`. If ports change for a connected edge, the update is pushed immediately via a `FRAME_CONFIG` frame. |
| `getStatus()` | Returns current hub status including connected edges and active stream counts. |
| `start(config?)` | Start the hub. Config: `{ tunnelPort?: number, targetHost?: string }`. Listens on both TCP and UDP (QUIC) on the tunnel port. |
| `stop()` | Graceful shutdown. |
| `updateAllowedEdges(edges)` | Set authorized edges. Each: `{ id, secret, listenPorts?, listenPortsUdp?, stunIntervalSecs? }`. Port changes are pushed to connected edges in real time. |
| `getStatus()` | Returns `{ running, tunnelPort, connectedEdges: [...] }`. |
| `running` | `boolean` — whether the Rust binary is alive. |
**Events:** `edgeConnected`, `edgeDisconnected`, `streamOpened`, `streamClosed`
@@ -229,9 +201,9 @@ Tokens are base64url-encoded (URL-safe, no padding) — safe to pass as environm
| Method / Property | Description |
|-------------------|-------------|
| `start(config)` | Spawns the Rust binary and connects to the hub. Accepts `{ token: string }` or `IEdgeConfig`. Listen ports are received from the hub during handshake. |
| `stop()` | Gracefully shuts down the edge and kills the Rust process. |
| `getStatus()` | Returns current edge status including connection state, public IP, listen ports, and active streams. |
| `start(config)` | Connect to hub. Accepts `{ token }` or `{ hubHost, hubPort, edgeId, secret, transportMode? }`. |
| `stop()` | Graceful shutdown. |
| `getStatus()` | Returns `{ running, connected, publicIp, activeStreams, listenPorts }`. |
| `running` | `boolean` — whether the Rust binary is alive. |
**Events:** `tunnelConnected`, `tunnelDisconnected`, `publicIpDiscovered`, `portsAssigned`, `portsUpdated`
@@ -240,8 +212,8 @@ Tokens are base64url-encoded (URL-safe, no padding) — safe to pass as environm
| Function | Description |
|----------|-------------|
| `encodeConnectionToken(data)` | Encodes `IConnectionTokenData` into a base64url token string. |
| `decodeConnectionToken(token)` | Decodes a token back into `IConnectionTokenData`. Throws on malformed or incomplete tokens. |
| `encodeConnectionToken(data)` | Encodes connection info into a base64url token. |
| `decodeConnectionToken(token)` | Decodes a token. Throws on malformed input. |
### Interfaces
@@ -256,6 +228,8 @@ interface IEdgeConfig {
hubPort?: number; // default: 8443
edgeId: string;
secret: string;
bindAddress?: string;
transportMode?: 'tcpTls' | 'quic' | 'quicWithFallback';
}
interface IConnectionTokenData {
@@ -268,7 +242,9 @@ interface IConnectionTokenData {
## 🔌 Wire Protocol
The tunnel uses a custom binary frame protocol over TLS:
### TCP+TLS Transport (Frame Protocol)
The tunnel uses a custom binary frame protocol over a single TLS connection:
```
[stream_id: 4 bytes BE][type: 1 byte][length: 4 bytes BE][payload: N bytes]
@@ -276,113 +252,124 @@ The tunnel uses a custom binary frame protocol over TLS:
| Frame Type | Value | Direction | Purpose |
|------------|-------|-----------|---------|
| `OPEN` | `0x01` | Edge → Hub | Open a new stream; payload is PROXY v1 header |
| `DATA` | `0x02` | Edge → Hub | Client data flowing upstream |
| `CLOSE` | `0x03` | Edge → Hub | Client closed the connection |
| `DATA_BACK` | `0x04` | Hub → Edge | Response data flowing downstream |
| `CLOSE_BACK` | `0x05` | Hub → Edge | Upstream (SmartProxy) closed the connection |
| `CONFIG` | `0x06` | Hub → Edge | Runtime configuration update (e.g. port changes); payload is JSON |
| `PING` | `0x07` | Hub → Edge | Heartbeat probe (sent every 15s) |
| `OPEN` | `0x01` | Edge → Hub | Open TCP stream; payload is PROXY v1 header |
| `DATA` | `0x02` | Edge → Hub | Client data (upload) |
| `CLOSE` | `0x03` | Edge → Hub | Client closed connection |
| `DATA_BACK` | `0x04` | Hub → Edge | Response data (download) |
| `CLOSE_BACK` | `0x05` | Hub → Edge | Upstream closed connection |
| `CONFIG` | `0x06` | Hub → Edge | Runtime config update (JSON payload) |
| `PING` | `0x07` | Hub → Edge | Heartbeat probe (every 15s) |
| `PONG` | `0x08` | Edge → Hub | Heartbeat response |
| `WINDOW_UPDATE` | `0x09` | Edge → Hub | Per-stream flow control: edge consumed N bytes, hub can send more |
| `WINDOW_UPDATE_BACK` | `0x0A` | Hub → Edge | Per-stream flow control: hub consumed N bytes, edge can send more |
| `WINDOW_UPDATE` | `0x09` | Edge → Hub | Flow control: edge consumed N bytes |
| `WINDOW_UPDATE_BACK` | `0x0A` | Hub → Edge | Flow control: hub consumed N bytes |
| `UDP_OPEN` | `0x0B` | Edge → Hub | Open UDP session; payload is PROXY v2 header |
| `UDP_DATA` | `0x0C` | Edge → Hub | UDP datagram (upload) |
| `UDP_DATA_BACK` | `0x0D` | Hub → Edge | UDP datagram (download) |
| `UDP_CLOSE` | `0x0E` | Either | Close UDP session |
Max payload size per frame: **16 MB**. Stream IDs are 32-bit unsigned integers.
### QUIC Transport
When using QUIC, the frame protocol is replaced by native QUIC primitives:
- **TCP connections:** Each tunneled TCP connection gets its own QUIC bidirectional stream. No framing overhead.
- **UDP datagrams:** Forwarded via QUIC unreliable datagrams (RFC 9221). Format: `[session_id: 4 bytes][payload]`. Session open uses magic byte `0xFF`: `[session_id: 4][0xFF][PROXY v2 header]`.
- **Control channel:** First QUIC bidirectional stream carries auth handshake + config updates using `[type: 1][length: 4][payload]` format.
### Handshake Sequence
1. Edge opens a TLS connection to the hub
1. Edge opens a TLS or QUIC connection to the hub
2. Edge sends: `EDGE <edgeId> <secret>\n`
3. Hub verifies credentials (constant-time comparison) and responds with JSON: `{"listenPorts":[...],"stunIntervalSecs":300}\n`
4. Edge starts TCP listeners on the assigned ports
5. Frame protocol begins — `OPEN`/`DATA`/`CLOSE` frames flow in both directions
6. Hub can push `CONFIG` frames at any time to update the edge's listen ports
3. Hub verifies credentials (constant-time comparison) and responds with JSON:
`{"listenPorts":[...],"listenPortsUdp":[...],"stunIntervalSecs":300}\n`
4. Edge starts TCP and UDP listeners on the assigned ports
5. Data flows — TCP frames/QUIC streams for TCP traffic, UDP frames/QUIC datagrams for UDP traffic
## 🎚️ QoS & Flow Control
The tunnel multiplexer uses a **3-tier priority system** and **per-stream flow control** to ensure fair bandwidth sharing across thousands of concurrent streams.
### Priority Tiers (TCP+TLS Transport)
### Priority Tiers
All outbound frames are queued into one of three priority levels:
| Tier | Queue | Frames | Behavior |
|------|-------|--------|----------|
| 🔴 **Control** (highest) | `ctrl_queue` | PING, PONG, WINDOW_UPDATE, OPEN, CLOSE, CONFIG | Always drained first. Never delayed. |
| 🟡 **Data** (normal) | `data_queue` | DATA, DATA_BACK from normal streams | Drained when ctrl is empty. Gated at 64 buffered items for backpressure. |
| 🟢 **Sustained** (lowest) | `sustained_queue` | DATA, DATA_BACK from elephant flows | Drained freely when ctrl+data are empty. Otherwise guaranteed **1 MB/s** via forced drain every second. |
This prevents large bulk transfers (e.g. git clones, file downloads) from starving interactive traffic and ensures `WINDOW_UPDATE` frames are never delayed — which would cause flow control deadlocks.
| Tier | Frames | Behavior |
|------|--------|----------|
| 🔴 **Control** | PING, PONG, WINDOW_UPDATE, OPEN, CLOSE, CONFIG | Always drained first. Never delayed. |
| 🟡 **Data** | DATA/DATA_BACK from normal streams, UDP frames | Drained when control queue is empty. |
| 🟢 **Sustained** | DATA/DATA_BACK from elephant flows | Lowest priority with guaranteed **1 MB/s** drain rate. |
### Sustained Stream Classification
A stream is automatically classified as **sustained** (elephant flow) when:
- It has been active for **>10 seconds**, AND
- Its average throughput exceeds **20 Mbit/s** (2.5 MB/s)
A TCP stream is classified as **sustained** (elephant flow) when:
- Active for **>10 seconds**, AND
- Average throughput exceeds **20 Mbit/s** (2.5 MB/s)
Once classified, the stream's flow control window is locked to the **1 MB floor** and its data frames move to the lowest-priority queue. Classification is one-way — a stream never gets promoted back to normal.
Once classified, its flow control window locks to 1 MB and data frames move to the lowest-priority queue.
### Adaptive Per-Stream Windows
Each stream has a send window that limits bytes-in-flight. The window size adapts to the number of active streams using a shared **200 MB memory budget**:
Each TCP stream has a send window from a shared **200 MB budget**:
| Active Streams | Window per Stream |
|---|---|
| 150 | 4 MB (maximum) |
| 51100 | Scales down (4 MB → 2 MB) |
| 51200 | Scales down (4 MB → 1 MB) |
| 200+ | 1 MB (floor) |
The consumer sends `WINDOW_UPDATE` frames after processing data, allowing the producer to send more. This prevents any single stream from consuming unbounded memory and provides natural backpressure.
UDP traffic uses no flow control — datagrams are fire-and-forget, matching UDP semantics.
## 💡 Example Scenarios
### 1. Expose a Private Kubernetes Cluster to the Internet
### 1. Expose a Private Cluster to the Internet
Deploy an Edge on a public VPS, point your DNS to the VPS IP. The Edge tunnels all traffic to the Hub running inside the cluster, which hands it off to SmartProxy/DcRouter. Your cluster stays fully private — no public-facing ports needed.
Deploy an Edge on a public VPS, point DNS to its IP. The Edge tunnels all TCP and UDP traffic to the Hub running inside your private cluster. No public ports needed on the cluster.
### 2. Multi-Region Edge Ingress
Run multiple Edges in different geographic regions (NYC, Frankfurt, Tokyo) all connecting to a single Hub. Use GeoDNS to route users to their nearest Edge. The Hub sees the real client IPs via PROXY protocol regardless of which edge they connected through.
Run Edges in NYC, Frankfurt, and Tokyo all connecting to a single Hub. Use GeoDNS to route users to their nearest Edge. PROXY protocol ensures the Hub sees real client IPs regardless of which Edge they entered through.
### 3. Secure API Exposure
### 3. UDP Forwarding (DNS, Gaming, VoIP)
Your backend runs on a private network with no direct internet access. An Edge on a minimal cloud instance acts as the only public entry point. TLS tunnel + shared-secret auth ensure only your authorized Edge can forward traffic.
### 4. Token-Based Edge Provisioning
Generate connection tokens on the hub side and distribute them to edge operators. Each edge only needs a single token string to connect — no manual configuration of host, port, ID, and secret.
Configure UDP listen ports alongside TCP ports. DNS queries, game server traffic, or VoIP packets are tunneled through the same edge/hub connection and forwarded to SmartProxy with a PROXY v2 binary header preserving the client's real IP.
```typescript
await hub.updateAllowedEdges([
{
id: 'edge-nyc-01',
secret: 'secret',
listenPorts: [80, 443], // TCP
listenPortsUdp: [53, 27015], // DNS + game server
},
]);
```
### 4. QUIC Transport for Low-Latency
Use QUIC transport to eliminate head-of-line blocking — a lost packet on one stream doesn't stall others. QUIC also enables 0-RTT reconnection and connection migration.
```typescript
await edge.start({
hubHost: 'hub.example.com',
hubPort: 8443,
edgeId: 'edge-01',
secret: 'secret',
transportMode: 'quicWithFallback', // try QUIC, fall back to TLS if UDP blocked
});
```
### 5. Token-Based Edge Provisioning
Generate connection tokens on the hub side and distribute them to edge operators:
```typescript
// Hub operator generates token
const token = encodeConnectionToken({
hubHost: 'hub.prod.example.com',
hubPort: 8443,
edgeId: 'edge-tokyo-01',
secret: 'generated-secret-abc123',
});
// Send `token` to the edge operator via secure channel
// Send `token` to the edge operator a single string is all they need
// Edge operator starts with just the token
const edge = new RemoteIngressEdge();
await edge.start({ token });
```
### 5. Dynamic Port Management
The hub controls which ports each edge listens on. Ports can be changed at runtime without restarting the edge — the hub pushes a `CONFIG` frame and the edge hot-reloads its TCP listeners.
```typescript
// Initially assign ports 80 and 443
await hub.updateAllowedEdges([
{ id: 'edge-nyc-01', secret: 'secret', listenPorts: [80, 443] },
]);
// Later, add port 8080 — the connected edge picks it up instantly
await hub.updateAllowedEdges([
{ id: 'edge-nyc-01', secret: 'secret', listenPorts: [80, 443, 8080] },
]);
```
## License and Legal Information
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.

View File

@@ -220,7 +220,7 @@ async fn edge_main_loop(
let mut backoff_ms: u64 = 1000;
let max_backoff_ms: u64 = 30000;
let transport_mode = config.transport_mode.unwrap_or(TransportMode::TcpTls);
let transport_mode = config.transport_mode.unwrap_or(TransportMode::QuicWithFallback);
// Build TLS config ONCE outside the reconnect loop — preserves session
// cache across reconnections for TLS session resumption (saves 1 RTT).
@@ -642,8 +642,23 @@ async fn connect_to_hub_and_run(
let liveness_timeout_dur = Duration::from_secs(45);
let mut last_activity = Instant::now();
let mut liveness_deadline = Box::pin(sleep_until(last_activity + liveness_timeout_dur));
let mut next_udp_expiry = Instant::now() + Duration::from_secs(30);
let result = 'io_loop: loop {
// Expire idle UDP sessions periodically
if Instant::now() >= next_udp_expiry {
let mut sessions = udp_sessions.lock().await;
let expired = sessions.expire_idle();
for sid in &expired {
let close_frame = encode_frame(*sid, FRAME_UDP_CLOSE, &[]);
let _ = tunnel_data_tx.try_send(close_frame);
}
if !expired.is_empty() {
log::debug!("Expired {} idle UDP sessions", expired.len());
}
next_udp_expiry = Instant::now() + Duration::from_secs(30);
}
// Drain any buffered frames
loop {
let frame = match tunnel_io.try_parse_frame() {
@@ -1328,9 +1343,36 @@ async fn connect_to_hub_and_run_quic_with_connection(
bind_address,
);
// Monitor control stream for config updates, and connection health.
// Also handle shutdown signals.
// UDP listeners for QUIC transport — uses QUIC datagrams for low-latency forwarding.
let udp_sessions_quic: Arc<Mutex<UdpSessionManager>> =
Arc::new(Mutex::new(UdpSessionManager::new(Duration::from_secs(60))));
let udp_sockets_quic: Arc<Mutex<HashMap<u16, Arc<UdpSocket>>>> =
Arc::new(Mutex::new(HashMap::new()));
let mut udp_listeners_quic: HashMap<u16, JoinHandle<()>> = HashMap::new();
apply_udp_port_config_quic(
&handshake.listen_ports_udp,
&mut udp_listeners_quic,
&quic_conn,
&udp_sessions_quic,
&udp_sockets_quic,
next_stream_id,
connection_token,
bind_address,
);
// Monitor control stream for config updates, connection health, and QUIC datagrams.
let mut next_udp_expiry_quic = Instant::now() + Duration::from_secs(30);
let result = 'quic_loop: loop {
// Expire idle UDP sessions periodically
if Instant::now() >= next_udp_expiry_quic {
let mut sessions = udp_sessions_quic.lock().await;
let expired = sessions.expire_idle();
if !expired.is_empty() {
log::debug!("Expired {} idle QUIC UDP sessions", expired.len());
}
next_udp_expiry_quic = Instant::now() + Duration::from_secs(30);
}
tokio::select! {
// Read control messages from hub
ctrl_msg = quic_transport::read_ctrl_message(&mut ctrl_recv) => {
@@ -1354,6 +1396,16 @@ async fn connect_to_hub_and_run_quic_with_connection(
connection_token,
bind_address,
);
apply_udp_port_config_quic(
&update.listen_ports_udp,
&mut udp_listeners_quic,
&quic_conn,
&udp_sessions_quic,
&udp_sockets_quic,
next_stream_id,
connection_token,
bind_address,
);
}
}
quic_transport::CTRL_PING => {
@@ -1384,6 +1436,30 @@ async fn connect_to_hub_and_run_quic_with_connection(
}
}
}
// Receive QUIC datagrams (UDP return traffic from hub)
datagram = quic_conn.read_datagram() => {
match datagram {
Ok(data) => {
// Format: [session_id:4][payload:N]
if data.len() >= 4 {
let session_id = u32::from_be_bytes([data[0], data[1], data[2], data[3]]);
let payload = &data[4..];
let mut sessions = udp_sessions_quic.lock().await;
if let Some(session) = sessions.get_by_stream_id(session_id) {
let client_addr = session.client_addr;
let dest_port = session.dest_port;
let sockets = udp_sockets_quic.lock().await;
if let Some(socket) = sockets.get(&dest_port) {
let _ = socket.send_to(payload, client_addr).await;
}
}
}
}
Err(e) => {
log::debug!("QUIC datagram recv error: {}", e);
}
}
}
// QUIC connection closed
reason = quic_conn.closed() => {
log::info!("QUIC connection closed: {}", reason);
@@ -1405,6 +1481,9 @@ async fn connect_to_hub_and_run_quic_with_connection(
for (_, h) in port_listeners.drain() {
h.abort();
}
for (_, h) in udp_listeners_quic.drain() {
h.abort();
}
// Graceful QUIC close
quic_conn.close(quinn::VarInt::from_u32(0), b"shutdown");
@@ -1513,6 +1592,104 @@ fn apply_port_config_quic(
/// Handle a single client connection via QUIC transport.
/// Opens a new QUIC bidirectional stream, sends the PROXY header,
/// then bidirectionally copies data between the client TCP socket and the QUIC stream.
/// Apply UDP port config for QUIC transport: bind UdpSockets that send via QUIC datagrams.
fn apply_udp_port_config_quic(
new_ports: &[u16],
udp_listeners: &mut HashMap<u16, JoinHandle<()>>,
quic_conn: &quinn::Connection,
udp_sessions: &Arc<Mutex<UdpSessionManager>>,
udp_sockets: &Arc<Mutex<HashMap<u16, Arc<UdpSocket>>>>,
next_stream_id: &Arc<AtomicU32>,
connection_token: &CancellationToken,
bind_address: &str,
) {
let new_set: std::collections::HashSet<u16> = new_ports.iter().copied().collect();
let old_set: std::collections::HashSet<u16> = udp_listeners.keys().copied().collect();
for &port in old_set.difference(&new_set) {
if let Some(handle) = udp_listeners.remove(&port) {
log::info!("Stopping QUIC UDP listener on port {}", port);
handle.abort();
}
let sockets = udp_sockets.clone();
tokio::spawn(async move { sockets.lock().await.remove(&port); });
}
for &port in new_set.difference(&old_set) {
let quic_conn = quic_conn.clone();
let udp_sessions = udp_sessions.clone();
let udp_sockets = udp_sockets.clone();
let next_stream_id = next_stream_id.clone();
let port_token = connection_token.child_token();
let bind_addr = bind_address.to_string();
let handle = tokio::spawn(async move {
let socket = match UdpSocket::bind((bind_addr.as_str(), port)).await {
Ok(s) => Arc::new(s),
Err(e) => {
log::error!("Failed to bind QUIC UDP port {}: {}", port, e);
return;
}
};
log::info!("Listening on UDP port {} (QUIC datagram transport)", port);
udp_sockets.lock().await.insert(port, socket.clone());
let mut buf = vec![0u8; 65536];
loop {
tokio::select! {
recv_result = socket.recv_from(&mut buf) => {
match recv_result {
Ok((len, client_addr)) => {
let key = UdpSessionKey { client_addr, dest_port: port };
let mut sessions = udp_sessions.lock().await;
let stream_id = if let Some(session) = sessions.get_mut(&key) {
session.stream_id
} else {
// New session — send PROXY v2 header via control-style datagram
let sid = next_stream_id.fetch_add(1, Ordering::Relaxed);
sessions.insert(key, sid);
let client_ip = client_addr.ip().to_string();
let client_port = client_addr.port();
let proxy_header = build_proxy_v2_header_from_str(
&client_ip, "0.0.0.0", client_port, port,
ProxyV2Transport::Udp,
);
// Send OPEN as a QUIC datagram: [session_id:4][0xFF magic:1][proxy_header:28]
let mut open_buf = Vec::with_capacity(4 + 1 + proxy_header.len());
open_buf.extend_from_slice(&sid.to_be_bytes());
open_buf.push(0xFF); // magic byte to distinguish OPEN from DATA
open_buf.extend_from_slice(&proxy_header);
let _ = quic_conn.send_datagram(open_buf.into());
log::debug!("New QUIC UDP session {} from {} -> port {}", sid, client_addr, port);
sid
};
drop(sessions);
// Send datagram: [session_id:4][payload:N]
let mut dgram = Vec::with_capacity(4 + len);
dgram.extend_from_slice(&stream_id.to_be_bytes());
dgram.extend_from_slice(&buf[..len]);
let _ = quic_conn.send_datagram(dgram.into());
}
Err(e) => {
log::error!("QUIC UDP recv error on port {}: {}", port, e);
}
}
}
_ = port_token.cancelled() => {
log::info!("QUIC UDP port {} listener cancelled", port);
break;
}
}
}
});
udp_listeners.insert(port, handle);
}
}
async fn handle_client_connection_quic(
client_stream: TcpStream,
client_addr: std::net::SocketAddr,

View File

@@ -706,6 +706,7 @@ async fn handle_hub_frame(
let data_writer_tx = data_tx.clone();
let session_token = edge_token.child_token();
let edge_id_str = edge_id.to_string();
let proxy_v2_header = frame.payload.clone();
// Channel for forwarding datagrams from edge to upstream
let (udp_tx, mut udp_rx) = mpsc::channel::<Bytes>(256);
@@ -728,6 +729,12 @@ async fn handle_hub_frame(
return;
}
// Send PROXY v2 header as first datagram so SmartProxy knows the original client
if let Err(e) = upstream.send(&proxy_v2_header).await {
log::error!("UDP session {} failed to send PROXY v2 header: {}", stream_id, e);
return;
}
// Task: upstream -> edge (return datagrams)
let upstream_recv = Arc::new(upstream);
let upstream_send = upstream_recv.clone();
@@ -1331,6 +1338,129 @@ async fn handle_edge_connection_quic(
}
});
// UDP sessions for QUIC datagram transport
let quic_udp_sessions: Arc<Mutex<HashMap<u32, mpsc::Sender<Bytes>>>> =
Arc::new(Mutex::new(HashMap::new()));
// Spawn QUIC datagram receiver task
let dgram_conn = quic_conn.clone();
let dgram_sessions = quic_udp_sessions.clone();
let dgram_target = target_host.clone();
let dgram_edge_id = edge_id.clone();
let dgram_token = edge_token.clone();
let dgram_handle = tokio::spawn(async move {
loop {
tokio::select! {
datagram = dgram_conn.read_datagram() => {
match datagram {
Ok(data) => {
if data.len() < 4 { continue; }
let session_id = u32::from_be_bytes([data[0], data[1], data[2], data[3]]);
let payload = &data[4..];
// Check for OPEN magic byte (0xFF)
if !payload.is_empty() && payload[0] == 0xFF {
// This is a session OPEN: [0xFF][proxy_v2_header:28]
let proxy_data = &payload[1..];
let dest_port = if proxy_data.len() >= 28 {
u16::from_be_bytes([proxy_data[26], proxy_data[27]])
} else {
53 // fallback
};
// Create upstream UDP socket
let target = dgram_target.clone();
let conn = dgram_conn.clone();
let sessions = dgram_sessions.clone();
let session_token = dgram_token.child_token();
let (tx, mut rx) = mpsc::channel::<Bytes>(256);
let proxy_v2_data: Vec<u8> = proxy_data.to_vec();
{
let mut s = sessions.lock().await;
s.insert(session_id, tx);
}
tokio::spawn(async move {
let upstream = match UdpSocket::bind("0.0.0.0:0").await {
Ok(s) => Arc::new(s),
Err(e) => {
log::error!("QUIC UDP session {} bind failed: {}", session_id, e);
return;
}
};
if let Err(e) = upstream.connect((target.as_str(), dest_port)).await {
log::error!("QUIC UDP session {} connect failed: {}", session_id, e);
return;
}
// Send PROXY v2 header as first datagram so SmartProxy knows the original client
if let Err(e) = upstream.send(&proxy_v2_data).await {
log::error!("QUIC UDP session {} failed to send PROXY v2 header: {}", session_id, e);
return;
}
// Upstream recv → QUIC datagram back to edge
let upstream_recv = upstream.clone();
let recv_conn = conn.clone();
let recv_token = session_token.clone();
let recv_handle = tokio::spawn(async move {
let mut buf = vec![0u8; 65536];
loop {
tokio::select! {
result = upstream_recv.recv(&mut buf) => {
match result {
Ok(len) => {
let mut dgram = Vec::with_capacity(4 + len);
dgram.extend_from_slice(&session_id.to_be_bytes());
dgram.extend_from_slice(&buf[..len]);
let _ = recv_conn.send_datagram(dgram.into());
}
Err(_) => break,
}
}
_ = recv_token.cancelled() => break,
}
}
});
// Edge datagrams → upstream
loop {
tokio::select! {
data = rx.recv() => {
match data {
Some(datagram) => {
let _ = upstream.send(&datagram).await;
}
None => break,
}
}
_ = session_token.cancelled() => break,
}
}
recv_handle.abort();
});
continue;
}
// Regular data datagram — forward to upstream
let sessions = dgram_sessions.lock().await;
if let Some(tx) = sessions.get(&session_id) {
let _ = tx.try_send(Bytes::copy_from_slice(payload));
}
}
Err(e) => {
log::debug!("QUIC datagram recv error from edge {}: {}", dgram_edge_id, e);
break;
}
}
}
_ = dgram_token.cancelled() => break,
}
}
});
// Control stream loop: forward config updates and handle PONG
let disconnect_reason;
loop {
@@ -1399,6 +1529,7 @@ async fn handle_edge_connection_quic(
// Cleanup
edge_token.cancel();
data_handle.abort();
dgram_handle.abort();
quic_conn.close(quinn::VarInt::from_u32(0), b"hub_shutdown");
{

View File

@@ -31,6 +31,8 @@ pub fn build_quic_client_config() -> quinn::ClientConfig {
// Match MAX_STREAMS_PER_EDGE (1024) from hub.rs.
// Default is 100 which is too low for high-concurrency tunneling.
transport.max_concurrent_bidi_streams(1024u32.into());
// Enable QUIC datagrams (RFC 9221) for low-latency UDP tunneling.
transport.datagram_receive_buffer_size(Some(65536));
let mut client_config = quinn::ClientConfig::new(Arc::new(quic_config));
client_config.transport_config(Arc::new(transport));
@@ -49,6 +51,7 @@ pub fn build_quic_server_config(
quinn::IdleTimeout::try_from(std::time::Duration::from_secs(45)).unwrap(),
));
transport.max_concurrent_bidi_streams(1024u32.into());
transport.datagram_receive_buffer_size(Some(65536));
let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_config));
server_config.transport_config(Arc::new(transport));

View File

@@ -315,7 +315,7 @@ let echoServer: TrackingServer;
let hubPort: number;
let edgePort: number;
tap.test('setup: start echo server and tunnel', async () => {
tap.test('TCP/TLS setup: start TCP echo server and TCP+TLS tunnel', async () => {
[hubPort, edgePort] = await findFreePorts(2);
echoServer = await startEchoServer(edgePort, '127.0.0.2');
@@ -324,7 +324,7 @@ tap.test('setup: start echo server and tunnel', async () => {
expect(tunnel.hub.running).toBeTrue();
});
tap.test('single stream: 32MB transfer exceeding initial 4MB window (multiple refills)', async () => {
tap.test('TCP/TLS: single TCP stream 32MB transfer exceeding initial 4MB window', async () => {
const size = 32 * 1024 * 1024;
const data = crypto.randomBytes(size);
const expectedHash = sha256(data);
@@ -335,7 +335,7 @@ tap.test('single stream: 32MB transfer exceeding initial 4MB window (multiple re
expect(sha256(received)).toEqual(expectedHash);
});
tap.test('200 concurrent streams with 64KB each', async () => {
tap.test('TCP/TLS: 200 concurrent TCP streams x 64KB each', async () => {
const streamCount = 200;
const payloadSize = 64 * 1024;
@@ -355,7 +355,7 @@ tap.test('200 concurrent streams with 64KB each', async () => {
expect(failures.length).toEqual(0);
});
tap.test('512 concurrent streams at minimum window boundary (16KB each)', async () => {
tap.test('TCP/TLS: 512 concurrent TCP streams at minimum window boundary (16KB each)', async () => {
const streamCount = 512;
const payloadSize = 16 * 1024;
@@ -375,7 +375,7 @@ tap.test('512 concurrent streams at minimum window boundary (16KB each)', async
expect(failures.length).toEqual(0);
});
tap.test('asymmetric transfer: 4KB request -> 4MB response', async () => {
tap.test('TCP/TLS: asymmetric TCP transfer 4KB request -> 4MB response', async () => {
// Swap to large-response server
await forceCloseServer(echoServer);
const responseSize = 4 * 1024 * 1024; // 4 MB
@@ -392,7 +392,7 @@ tap.test('asymmetric transfer: 4KB request -> 4MB response', async () => {
}
});
tap.test('100 streams x 1MB each (100MB total exceeding 200MB budget)', async () => {
tap.test('TCP/TLS: 100 TCP streams x 1MB each (100MB total exceeding 200MB budget)', async () => {
const streamCount = 100;
const payloadSize = 1 * 1024 * 1024;
@@ -412,7 +412,7 @@ tap.test('100 streams x 1MB each (100MB total exceeding 200MB budget)', async ()
expect(failures.length).toEqual(0);
});
tap.test('active stream counter tracks concurrent connections', async () => {
tap.test('TCP/TLS: active TCP stream counter tracks concurrent connections', async () => {
const N = 50;
// Open N connections and keep them alive (send data but don't close)
@@ -445,7 +445,7 @@ tap.test('active stream counter tracks concurrent connections', async () => {
}
});
tap.test('50 streams x 2MB each (forces multiple window refills per stream)', async () => {
tap.test('TCP/TLS: 50 TCP streams x 2MB each (forces multiple window refills)', async () => {
// At 50 concurrent streams: adaptive window = 200MB/50 = 4MB per stream
// Each stream sends 2MB → needs ~3 WINDOW_UPDATE refill cycles per stream
const streamCount = 50;
@@ -467,7 +467,7 @@ tap.test('50 streams x 2MB each (forces multiple window refills per stream)', as
expect(failures.length).toEqual(0);
});
tap.test('teardown: stop tunnel and echo server', async () => {
tap.test('TCP/TLS teardown: stop tunnel and TCP echo server', async () => {
await tunnel.cleanup();
await forceCloseServer(echoServer);
});

View File

@@ -231,7 +231,7 @@ let edgePort: number;
// Tests
// ---------------------------------------------------------------------------
tap.test('setup: start throttled tunnel (100 Mbit/s)', async () => {
tap.test('TCP/TLS setup: start throttled TCP+TLS tunnel (100 Mbit/s)', async () => {
[hubPort, proxyPort, edgePort] = await findFreePorts(3);
echoServer = await startEchoServer(edgePort, '127.0.0.2');
@@ -271,7 +271,7 @@ tap.test('setup: start throttled tunnel (100 Mbit/s)', async () => {
expect(status.connected).toBeTrue();
});
tap.test('throttled: 5 streams x 20MB each through 100Mbit tunnel', async () => {
tap.test('TCP/TLS throttled: 5 TCP streams x 20MB each through 100Mbit tunnel', async () => {
const streamCount = 5;
const payloadSize = 20 * 1024 * 1024; // 20MB per stream = 100MB total round-trip
@@ -293,7 +293,7 @@ tap.test('throttled: 5 streams x 20MB each through 100Mbit tunnel', async () =>
expect(status.connected).toBeTrue();
});
tap.test('throttled: slow consumer with 20MB does not kill other streams', async () => {
tap.test('TCP/TLS throttled: slow TCP consumer with 20MB does not kill other streams', async () => {
// Open a connection that creates download-direction backpressure:
// send 20MB but DON'T read the response — client TCP receive buffer fills
const slowSock = net.createConnection({ host: '127.0.0.1', port: edgePort });
@@ -326,7 +326,7 @@ tap.test('throttled: slow consumer with 20MB does not kill other streams', async
slowSock.destroy();
});
tap.test('throttled: rapid churn — 3 x 20MB long + 50 x 1MB short streams', async () => {
tap.test('TCP/TLS throttled: rapid churn — 3 x 20MB long + 50 x 1MB short TCP streams', async () => {
// 3 long streams (20MB each) running alongside 50 short streams (1MB each)
const longPayload = crypto.randomBytes(20 * 1024 * 1024);
const longHash = sha256(longPayload);
@@ -360,7 +360,7 @@ tap.test('throttled: rapid churn — 3 x 20MB long + 50 x 1MB short streams', as
expect(status.connected).toBeTrue();
});
tap.test('throttled: 3 burst waves of 5 streams x 20MB each', async () => {
tap.test('TCP/TLS throttled: 3 burst waves of 5 TCP streams x 20MB each', async () => {
for (let wave = 0; wave < 3; wave++) {
const streamCount = 5;
const payloadSize = 20 * 1024 * 1024; // 20MB per stream = 100MB per wave
@@ -382,7 +382,7 @@ tap.test('throttled: 3 burst waves of 5 streams x 20MB each', async () => {
}
});
tap.test('throttled: tunnel still works after all load tests', async () => {
tap.test('TCP/TLS throttled: TCP tunnel still works after all load tests', async () => {
const data = crypto.randomBytes(1024);
const hash = sha256(data);
const received = await sendAndReceive(edgePort, data, 30000);
@@ -392,7 +392,7 @@ tap.test('throttled: tunnel still works after all load tests', async () => {
expect(status.connected).toBeTrue();
});
tap.test('teardown: stop tunnel', async () => {
tap.test('TCP/TLS teardown: stop throttled tunnel', async () => {
await edge.stop();
await hub.stop();
if (throttle) await throttle.close();

View File

@@ -176,7 +176,7 @@ let echoServer: TrackingServer;
let hubPort: number;
let edgePort: number;
tap.test('QUIC setup: start echo server and QUIC tunnel', async () => {
tap.test('QUIC setup: start TCP echo server and QUIC tunnel', async () => {
[hubPort, edgePort] = await findFreePorts(2);
echoServer = await startEchoServer(edgePort, '127.0.0.2');
@@ -187,7 +187,7 @@ tap.test('QUIC setup: start echo server and QUIC tunnel', async () => {
expect(status.connected).toBeTrue();
});
tap.test('QUIC: single stream echo — 1KB', async () => {
tap.test('QUIC: single TCP stream echo — 1KB', async () => {
const data = crypto.randomBytes(1024);
const hash = sha256(data);
const received = await sendAndReceive(edgePort, data, 10000);
@@ -195,7 +195,7 @@ tap.test('QUIC: single stream echo — 1KB', async () => {
expect(sha256(received)).toEqual(hash);
});
tap.test('QUIC: single stream echo — 1MB', async () => {
tap.test('QUIC: single TCP stream echo — 1MB', async () => {
const size = 1024 * 1024;
const data = crypto.randomBytes(size);
const hash = sha256(data);
@@ -204,7 +204,7 @@ tap.test('QUIC: single stream echo — 1MB', async () => {
expect(sha256(received)).toEqual(hash);
});
tap.test('QUIC: single stream echo — 16MB', async () => {
tap.test('QUIC: single TCP stream echo — 16MB', async () => {
const size = 16 * 1024 * 1024;
const data = crypto.randomBytes(size);
const hash = sha256(data);
@@ -213,7 +213,7 @@ tap.test('QUIC: single stream echo — 16MB', async () => {
expect(sha256(received)).toEqual(hash);
});
tap.test('QUIC: 10 concurrent streams x 1MB each', async () => {
tap.test('QUIC: 10 concurrent TCP streams x 1MB each', async () => {
const streamCount = 10;
const payloadSize = 1024 * 1024;
@@ -232,7 +232,7 @@ tap.test('QUIC: 10 concurrent streams x 1MB each', async () => {
expect(failures.length).toEqual(0);
});
tap.test('QUIC: 50 concurrent streams x 64KB each', async () => {
tap.test('QUIC: 50 concurrent TCP streams x 64KB each', async () => {
const streamCount = 50;
const payloadSize = 64 * 1024;
@@ -251,7 +251,7 @@ tap.test('QUIC: 50 concurrent streams x 64KB each', async () => {
expect(failures.length).toEqual(0);
});
tap.test('QUIC: 200 concurrent streams x 16KB each', async () => {
tap.test('QUIC: 200 concurrent TCP streams x 16KB each', async () => {
const streamCount = 200;
const payloadSize = 16 * 1024;
@@ -270,12 +270,12 @@ tap.test('QUIC: 200 concurrent streams x 16KB each', async () => {
expect(failures.length).toEqual(0);
});
tap.test('QUIC: tunnel still connected after all tests', async () => {
tap.test('QUIC: TCP tunnel still connected after all tests', async () => {
const status = await tunnel.edge.getStatus();
expect(status.connected).toBeTrue();
});
tap.test('QUIC teardown: stop tunnel and echo server', async () => {
tap.test('QUIC teardown: stop TCP tunnel and echo server', async () => {
await tunnel.cleanup();
await forceCloseServer(echoServer);
});

View File

@@ -29,15 +29,16 @@ async function findFreePorts(count: number): Promise<number[]> {
function startUdpEchoServer(port: number, host: string): Promise<dgram.Socket> {
return new Promise((resolve, reject) => {
const server = dgram.createSocket('udp4');
let proxyHeaderReceived = false;
// Track which source endpoints have sent their PROXY v2 header.
// The hub sends a 28-byte PROXY v2 header as the first datagram per session.
const seenSources = new Set<string>();
server.on('message', (msg, rinfo) => {
if (!proxyHeaderReceived) {
// First datagram is the PROXY v2 header (28 bytes for IPv4)
// In the current implementation, the hub connects directly via UDP
// so the first real datagram is the actual data (no PROXY header yet)
// For now, just echo everything back
proxyHeaderReceived = true;
const sourceKey = `${rinfo.address}:${rinfo.port}`;
if (!seenSources.has(sourceKey)) {
seenSources.add(sourceKey);
// First datagram from this source is the PROXY v2 header — skip it
return;
}
// Echo back
server.send(msg, rinfo.port, rinfo.address);
@@ -104,7 +105,7 @@ let edgeUdpPort: number;
// Tests
// ---------------------------------------------------------------------------
tap.test('UDP setup: start echo server and tunnel with UDP ports', async () => {
tap.test('UDP/TLS setup: start UDP echo server and TCP+TLS tunnel with UDP ports', async () => {
[hubPort, edgeUdpPort] = await findFreePorts(2);
// Start UDP echo server on upstream (127.0.0.2)
@@ -142,21 +143,21 @@ tap.test('UDP setup: start echo server and tunnel with UDP ports', async () => {
expect(status.connected).toBeTrue();
});
tap.test('UDP: single datagram echo — 64 bytes', async () => {
tap.test('UDP/TLS: single UDP datagram echo — 64 bytes', async () => {
const data = crypto.randomBytes(64);
const received = await udpSendAndReceive(edgeUdpPort, data, 5000);
expect(received.length).toEqual(64);
expect(Buffer.compare(received, data)).toEqual(0);
});
tap.test('UDP: single datagram echo — 1KB', async () => {
tap.test('UDP/TLS: single UDP datagram echo — 1KB', async () => {
const data = crypto.randomBytes(1024);
const received = await udpSendAndReceive(edgeUdpPort, data, 5000);
expect(received.length).toEqual(1024);
expect(Buffer.compare(received, data)).toEqual(0);
});
tap.test('UDP: 10 sequential datagrams', async () => {
tap.test('UDP/TLS: 10 sequential UDP datagrams', async () => {
for (let i = 0; i < 10; i++) {
const data = crypto.randomBytes(128);
const received = await udpSendAndReceive(edgeUdpPort, data, 5000);
@@ -165,7 +166,7 @@ tap.test('UDP: 10 sequential datagrams', async () => {
}
});
tap.test('UDP: 10 concurrent datagrams from different source ports', async () => {
tap.test('UDP/TLS: 10 concurrent UDP datagrams from different source ports', async () => {
const promises = Array.from({ length: 10 }, () => {
const data = crypto.randomBytes(256);
return udpSendAndReceive(edgeUdpPort, data, 5000).then((received) => ({
@@ -179,15 +180,105 @@ tap.test('UDP: 10 concurrent datagrams from different source ports', async () =>
expect(failures.length).toEqual(0);
});
tap.test('UDP: tunnel still connected after tests', async () => {
tap.test('UDP/TLS: tunnel still connected after UDP tests', async () => {
const status = await edge.getStatus();
expect(status.connected).toBeTrue();
});
tap.test('UDP teardown: stop tunnel and echo server', async () => {
tap.test('UDP/TLS teardown: stop tunnel and UDP echo server', async () => {
await edge.stop();
await hub.stop();
await new Promise<void>((resolve) => echoServer.close(() => resolve()));
});
// ---------------------------------------------------------------------------
// QUIC transport UDP tests
// ---------------------------------------------------------------------------
let quicHub: RemoteIngressHub;
let quicEdge: RemoteIngressEdge;
let quicEchoServer: dgram.Socket;
let quicHubPort: number;
let quicEdgeUdpPort: number;
tap.test('UDP/QUIC setup: start UDP echo server and QUIC tunnel with UDP ports', async () => {
[quicHubPort, quicEdgeUdpPort] = await findFreePorts(2);
quicEchoServer = await startUdpEchoServer(quicEdgeUdpPort, '127.0.0.2');
quicHub = new RemoteIngressHub();
quicEdge = new RemoteIngressEdge();
await quicHub.start({ tunnelPort: quicHubPort, targetHost: '127.0.0.2' });
await quicHub.updateAllowedEdges([
{ id: 'test-edge', secret: 'test-secret', listenPorts: [], listenPortsUdp: [quicEdgeUdpPort] },
]);
const connectedPromise = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('QUIC edge did not connect within 10s')), 10000);
quicEdge.once('tunnelConnected', () => {
clearTimeout(timeout);
resolve();
});
});
await quicEdge.start({
hubHost: '127.0.0.1',
hubPort: quicHubPort,
edgeId: 'test-edge',
secret: 'test-secret',
bindAddress: '127.0.0.1',
transportMode: 'quic',
});
await connectedPromise;
await new Promise((resolve) => setTimeout(resolve, 500));
const status = await quicEdge.getStatus();
expect(status.connected).toBeTrue();
});
tap.test('UDP/QUIC: single UDP datagram echo — 64 bytes', async () => {
const data = crypto.randomBytes(64);
const received = await udpSendAndReceive(quicEdgeUdpPort, data, 5000);
expect(received.length).toEqual(64);
expect(Buffer.compare(received, data)).toEqual(0);
});
tap.test('UDP/QUIC: single UDP datagram echo — 1KB', async () => {
const data = crypto.randomBytes(1024);
const received = await udpSendAndReceive(quicEdgeUdpPort, data, 5000);
expect(received.length).toEqual(1024);
expect(Buffer.compare(received, data)).toEqual(0);
});
tap.test('UDP/QUIC: 10 sequential UDP datagrams', async () => {
for (let i = 0; i < 10; i++) {
const data = crypto.randomBytes(128);
const received = await udpSendAndReceive(quicEdgeUdpPort, data, 5000);
expect(received.length).toEqual(128);
expect(Buffer.compare(received, data)).toEqual(0);
}
});
tap.test('UDP/QUIC: 10 concurrent UDP datagrams', async () => {
const promises = Array.from({ length: 10 }, () => {
const data = crypto.randomBytes(256);
return udpSendAndReceive(quicEdgeUdpPort, data, 5000).then((received) => ({
sizeOk: received.length === 256,
dataOk: Buffer.compare(received, data) === 0,
}));
});
const results = await Promise.all(promises);
const failures = results.filter((r) => !r.sizeOk || !r.dataOk);
expect(failures.length).toEqual(0);
});
tap.test('UDP/QUIC teardown: stop QUIC tunnel and UDP echo server', async () => {
await quicEdge.stop();
await quicHub.stop();
await new Promise<void>((resolve) => quicEchoServer.close(() => resolve()));
});
export default tap.start();

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@serve.zone/remoteingress',
version: '4.11.0',
description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.'
version: '4.13.1',
description: 'Edge ingress tunnel for DcRouter - tunnels TCP and UDP traffic from the network edge to SmartProxy over TLS or QUIC, preserving client IP via PROXY protocol.'
}