Compare commits

...

65 Commits

Author SHA1 Message Date
948032fc9e v4.12.1
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 14:09:32 +00:00
a400945371 fix(remoteingress-core): send PROXY v2 headers for UDP upstream sessions and expire idle UDP sessions 2026-03-19 14:09:32 +00:00
bc89e49f39 v4.12.0
Some checks failed
Default (tags) / security (push) Failing after 4s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 12:19:58 +00:00
2087567f15 feat(remoteingress-core): add UDP tunneling over QUIC datagrams and expand transport-specific test coverage 2026-03-19 12:19:58 +00:00
bfa88f8d76 v4.11.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 12:02:41 +00:00
a96b4ba84a feat(remoteingress-core): add UDP tunneling support between edge and hub 2026-03-19 12:02:41 +00:00
61fa69f108 v4.10.0
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 10:44:22 +00:00
6abfd2ff2a feat(core,edge,hub,transport): add QUIC tunnel transport support with optional edge transport selection 2026-03-19 10:44:22 +00:00
e4807be00b v4.9.1
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-18 00:30:03 +00:00
b649322e65 fix(readme): document QoS tiers, heartbeat frames, and adaptive flow control in the protocol overview 2026-03-18 00:30:03 +00:00
d89d1cfbbf v4.9.0
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-18 00:13:14 +00:00
6cbe8bee5e feat(protocol): add sustained-stream tunnel scheduling to isolate high-throughput traffic 2026-03-18 00:13:14 +00:00
a63247af3e v4.8.19
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-18 00:02:20 +00:00
28a0c769d9 fix(remoteingress-protocol): reduce per-stream flow control windows and increase control channel buffering 2026-03-18 00:02:20 +00:00
ce7ccd83dc v4.8.18
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-17 23:29:02 +00:00
93578d7034 fix(rust-protocol): switch tunnel frame buffers from Vec<u8> to Bytes to reduce copying and memory overhead 2026-03-17 23:29:02 +00:00
4cfc518301 v4.8.17
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-17 22:46:55 +00:00
124df129ec fix(protocol): increase per-stream flow control windows and remove adaptive read caps 2026-03-17 22:46:55 +00:00
0b8420aac9 v4.8.16
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-17 19:13:30 +00:00
afd193336a fix(release): bump package version to 4.8.15 2026-03-17 19:13:30 +00:00
e8d429f117 v4.8.13
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-17 15:50:47 +00:00
3c2299430a fix(remoteingress-protocol): require a flush after each written frame to bound TLS buffer growth 2026-03-17 15:50:47 +00:00
8b5df9a0b7 update 2026-03-17 15:36:23 +00:00
236d6d16ee v4.8.12
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-17 13:27:26 +00:00
81bbb33016 fix(tunnel): prevent tunnel backpressure buffering from exhausting memory and cancel stream handlers before TLS shutdown 2026-03-17 13:27:26 +00:00
79af6fd425 v4.8.11
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-17 12:57:04 +00:00
f71b2f1876 fix(remoteingress-core): stop data frame send loops promptly when stream cancellation is triggered 2026-03-17 12:57:04 +00:00
0161a2589c v4.8.10
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-17 12:47:03 +00:00
bfd9e58b4f fix(remoteingress-core): guard tunnel frame sends with cancellation to prevent async send deadlocks 2026-03-17 12:47:03 +00:00
9a8760c18d v4.8.9
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-17 12:35:15 +00:00
c77caa89fc fix(repo): no changes to commit 2026-03-17 12:35:15 +00:00
04586aab39 v4.8.8
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-17 12:33:28 +00:00
f9a739858d fix(remoteingress-core): cancel stale edge connections when an edge reconnects 2026-03-17 12:33:28 +00:00
da01fbeecd v4.8.7 2026-03-17 12:04:20 +00:00
264e8eeb97 fix(remoteingress-core): perform graceful TLS shutdown on edge and hub tunnel streams 2026-03-17 12:04:20 +00:00
9922c3b020 v4.8.6 2026-03-17 11:50:22 +00:00
38cde37cff fix(remoteingress-core): initialize disconnect reason only when set in hub loop break paths 2026-03-17 11:50:22 +00:00
64572827e5 v4.8.5 2026-03-17 11:48:44 +00:00
c4e26198b9 fix(repo): no changes to commit 2026-03-17 11:48:44 +00:00
0b5d72de28 v4.8.4 2026-03-17 11:47:33 +00:00
e8431c0174 fix(remoteingress-core): prevent stream stalls by guaranteeing flow-control updates and avoiding bounded per-stream channel overflows 2026-03-17 11:47:33 +00:00
d57d6395dd v4.8.3 2026-03-17 11:15:18 +00:00
2e5ceeaf5c fix(protocol,edge): optimize tunnel frame handling and zero-copy uploads in edge I/O 2026-03-17 11:15:18 +00:00
1979910f6f v4.8.2 2026-03-17 10:33:21 +00:00
edfad2dffe fix(rust-edge): refactor tunnel I/O to preserve TLS state and prioritize control frames 2026-03-17 10:33:21 +00:00
d907943ae5 v4.8.1 2026-03-17 01:48:06 +00:00
4bfb1244fc fix(remoteingress-core): remove tunnel writer timeouts from edge and hub buffered writes 2026-03-17 01:48:06 +00:00
e31c3421a6 v4.8.0 2026-03-17 00:58:08 +00:00
de8422966a feat(events): include disconnect reasons in edge and hub management events 2026-03-17 00:58:08 +00:00
a87e9578eb v4.7.2 2026-03-17 00:39:57 +00:00
b851bc7994 fix(remoteingress-core): add tunnel write timeouts and scale initial stream windows by active stream count 2026-03-17 00:39:57 +00:00
1284bb5b73 v4.7.1 2026-03-17 00:15:10 +00:00
1afd0e5347 fix(remoteingress-core): improve tunnel failure detection and reconnect handling 2026-03-17 00:15:10 +00:00
96e7ab00cf v4.7.0 2026-03-16 23:35:02 +00:00
17d1a795cd feat(edge,protocol,test): add configurable edge bind address and expand flow-control test coverage 2026-03-16 23:35:02 +00:00
982f648928 v4.6.1 2026-03-16 22:46:51 +00:00
3a2a060a85 fix(remoteingress-core): avoid spurious tunnel disconnect events and increase control channel capacity 2026-03-16 22:46:51 +00:00
e0c469147e v4.6.0 2026-03-16 19:37:06 +00:00
0fdcdf566e feat(remoteingress-core): add adaptive per-stream flow control based on active stream counts 2026-03-16 19:37:06 +00:00
a808d4c9de v4.5.12 2026-03-16 17:39:25 +00:00
f8a0171ef3 fix(remoteingress-core): improve tunnel liveness handling and enable TCP keepalive for accepted client sockets 2026-03-16 17:39:25 +00:00
1d59a48648 v4.5.11 2026-03-16 13:55:02 +00:00
af2ec11a2d fix(repo): no changes to commit 2026-03-16 13:55:02 +00:00
b6e66a7fa6 v4.5.10 2026-03-16 13:48:35 +00:00
1391b39601 fix(remoteingress-core): guard zero-window reads to avoid false EOF handling on stalled streams 2026-03-16 13:48:35 +00:00
21 changed files with 5828 additions and 573 deletions

View File

@@ -1,5 +1,214 @@
# Changelog
## 2026-03-19 - 4.12.1 - fix(remoteingress-core)
send PROXY v2 headers for UDP upstream sessions and expire idle UDP sessions
- Adds periodic idle UDP session expiry in edge tunnel and QUIC loops, including UDP close signaling for expired tunnel sessions.
- Sends the PROXY v2 header as the first datagram for UDP upstream connections in both standard and QUIC hub paths.
- Updates the UDP node test server to ignore the initial PROXY v2 datagram per source before echoing payload traffic.
## 2026-03-19 - 4.12.0 - feat(remoteingress-core)
add UDP tunneling over QUIC datagrams and expand transport-specific test coverage
- Implement QUIC datagram-based UDP forwarding on both edge and hub, including session setup, payload routing, and listener cleanup
- Enable QUIC datagram receive buffers in client and server transport configuration
- Add UDP-over-QUIC tests and clarify existing test names to distinguish TCP/TLS, UDP/TLS, and QUIC scenarios
## 2026-03-19 - 4.11.0 - feat(remoteingress-core)
add UDP tunneling support between edge and hub
- extend edge and hub handshake/config updates with UDP listen ports
- add UDP tunnel frame types and PROXY protocol v2 header helpers in the protocol crate
- introduce UDP session management on the edge and upstream UDP forwarding on the hub
- add Node.js integration tests covering UDP echo and concurrent datagrams
- expose UDP listen port configuration in the TypeScript hub API
## 2026-03-19 - 4.10.0 - feat(core,edge,hub,transport)
add QUIC tunnel transport support with optional edge transport selection
- adds a shared transport module with QUIC configuration helpers, control message framing, and PROXY header handling
- enables the hub to accept QUIC connections on the tunnel port alongside existing TCP/TLS support
- adds edge transportMode configuration with quic and quicWithFallback options and propagates it through restarts
- includes end-to-end QUIC transport tests covering large payloads and concurrent streams
## 2026-03-18 - 4.9.1 - fix(readme)
document QoS tiers, heartbeat frames, and adaptive flow control in the protocol overview
- Adds PING, PONG, WINDOW_UPDATE, and WINDOW_UPDATE_BACK frame types to the protocol documentation
- Describes the 3-tier priority queues for control, normal data, and sustained traffic
- Explains sustained stream classification and adaptive per-stream window sizing
## 2026-03-18 - 4.9.0 - feat(protocol)
add sustained-stream tunnel scheduling to isolate high-throughput traffic
- Introduce a third low-priority sustained queue in TunnelIo with a forced drain budget to prevent long-lived high-bandwidth streams from starving control and normal data frames.
- Classify upload and download streams as sustained after exceeding the throughput threshold for the minimum duration, and route their DATA and CLOSE frames through the sustained channel.
- Wire the new sustained channel through edge and hub stream handling so sustained traffic is scheduled consistently on both sides of the tunnel.
## 2026-03-18 - 4.8.19 - fix(remoteingress-protocol)
reduce per-stream flow control windows and increase control channel buffering
- Lower the initial and maximum per-stream window from 16MB to 4MB and scale adaptive windows against a 200MB total budget with a 1MB minimum.
- Increase edge and hub control frame channel capacity from 256 to 512 to better handle prioritized control traffic.
- Update flow-control tests and comments to reflect the new window sizing and budget behavior.
## 2026-03-17 - 4.8.18 - fix(rust-protocol)
switch tunnel frame buffers from Vec<u8> to Bytes to reduce copying and memory overhead
- Add the bytes crate to core and protocol crates
- Update frame encoding, reader payloads, channel queues, and stream backchannels to use Bytes
- Adjust edge and hub data/control paths to send framed payloads as Bytes
## 2026-03-17 - 4.8.17 - fix(protocol)
increase per-stream flow control windows and remove adaptive read caps
- Raise the initial per-stream window from 4MB to 16MB and expand the adaptive window budget to 800MB with a 4MB floor
- Stop limiting edge and hub reads by the adaptive per-stream target window, keeping reads capped only by the current window and 32KB chunk size
- Update protocol tests to match the new adaptive window scaling and budget boundaries
## 2026-03-17 - 4.8.16 - fix(release)
bump package version to 4.8.15
- Updates the package.json version field from 4.8.13 to 4.8.15.
## 2026-03-17 - 4.8.13 - fix(remoteingress-protocol)
require a flush after each written frame to bound TLS buffer growth
- Remove the unflushed byte threshold and stop queueing additional writes while a flush is pending
- Simplify write and flush error logging after dropping unflushed byte tracking
- Update tunnel I/O comments to reflect the stricter flush behavior that avoids OOM and connection resets
## 2026-03-17 - 4.8.12 - fix(tunnel)
prevent tunnel backpressure buffering from exhausting memory and cancel stream handlers before TLS shutdown
- stop self-waking and writing new frames while a flush is pending to avoid unbounded TLS session buffer growth under load
- reorder edge and hub shutdown cleanup so stream cancellation happens before TLS close_notify, preventing handlers from blocking on dead channels
- add load tests covering sustained large transfers, burst traffic, and rapid stream churn to verify tunnel stability
## 2026-03-17 - 4.8.11 - fix(remoteingress-core)
stop data frame send loops promptly when stream cancellation is triggered
- Use cancellation-aware tokio::select! around data channel sends in both edge and hub stream forwarding paths
- Prevent stalled or noisy shutdown behavior when stream or client cancellation happens while awaiting frame delivery
## 2026-03-17 - 4.8.10 - fix(remoteingress-core)
guard tunnel frame sends with cancellation to prevent async send deadlocks
- Wrap OPEN, CLOSE, CLOSE_BACK, WINDOW_UPDATE, and cleanup channel sends in cancellation-aware tokio::select! blocks.
- Avoid indefinite blocking when tunnel, stream, or writer tasks are cancelled while awaiting channel capacity.
- Improve shutdown reliability for edge and hub stream handling under tunnel failure conditions.
## 2026-03-17 - 4.8.9 - fix(repo)
no changes to commit
## 2026-03-17 - 4.8.8 - fix(remoteingress-core)
cancel stale edge connections when an edge reconnects
- Remove any existing edge entry before registering a reconnected edge
- Trigger the previous connection's cancellation token so stale sessions shut down immediately instead of waiting for TCP keepalive
## 2026-03-17 - 4.8.7 - fix(remoteingress-core)
perform graceful TLS shutdown on edge and hub tunnel streams
- Send TLS close_notify before cleanup to avoid peer disconnect warnings on both tunnel endpoints
- Wrap stream shutdown in a 2 second timeout so connection teardown does not block cleanup
## 2026-03-17 - 4.8.6 - fix(remoteingress-core)
initialize disconnect reason only when set in hub loop break paths
- Replace the default "unknown" disconnect reason with an explicitly assigned string and document that all hub loop exits set it before use
- Add an allow attribute for unused assignments to avoid warnings around the deferred initialization pattern
## 2026-03-17 - 4.8.5 - fix(repo)
no changes to commit
## 2026-03-17 - 4.8.4 - fix(remoteingress-core)
prevent stream stalls by guaranteeing flow-control updates and avoiding bounded per-stream channel overflows
- Replace bounded per-stream data channels with unbounded channels on edge and hub, relying on existing WINDOW_UPDATE flow control to limit bytes in flight
- Use awaited sends for FRAME_WINDOW_UPDATE and FRAME_WINDOW_UPDATE_BACK so updates are not dropped and streams do not deadlock under backpressure
- Clean up stream state when channel receivers have already exited instead of closing active streams because a bounded queue filled
## 2026-03-17 - 4.8.3 - fix(protocol,edge)
optimize tunnel frame handling and zero-copy uploads in edge I/O
- extract hub frame processing into a shared edge handler to remove duplicated tunnel logic
- add zero-copy frame header encoding and read payloads directly into framed buffers for client-to-hub uploads
- refactor TunnelIo read/write state to avoid unsafe queue access and reduce buffer churn with incremental parsing
## 2026-03-17 - 4.8.2 - fix(rust-edge)
refactor tunnel I/O to preserve TLS state and prioritize control frames
- replace split TLS handling with a single-owner TunnelIo to avoid handshake and buffered read corruption
- prioritize control frames over data frames to prevent WINDOW_UPDATE starvation and flow-control deadlocks
- improve tunnel reliability with incremental frame parsing, liveness/error events, and corrupt frame header logging
## 2026-03-17 - 4.8.1 - fix(remoteingress-core)
remove tunnel writer timeouts from edge and hub buffered writes
- Drops the 30 second timeout wrapper around writer.write_all and writer.flush in both edge and hub tunnel writers.
- Updates error logging to report write failures without referring to stalled writes.
## 2026-03-17 - 4.8.0 - feat(events)
include disconnect reasons in edge and hub management events
- Add reason fields to tunnelDisconnected and edgeDisconnected events emitted from the Rust core and binary bridge
- Propagate specific disconnect causes such as EOF, liveness timeout, writer failure, handshake failure, and hub cancellation
- Update TypeScript edge and hub classes to log and forward disconnect reason data
- Extend serialization tests to cover the new reason fields
## 2026-03-17 - 4.7.2 - fix(remoteingress-core)
add tunnel write timeouts and scale initial stream windows by active stream count
- Wrap tunnel frame writes and flushes in a 30-second timeout on both edge and hub to detect stalled writers and trigger faster reconnect or cleanup.
- Compute each stream's initial send window from the current active stream count instead of using a fixed window to keep total in-flight data within the 32MB budget.
## 2026-03-17 - 4.7.1 - fix(remoteingress-core)
improve tunnel failure detection and reconnect handling
- Enable TCP keepalive on edge and hub connections to detect silent network failures sooner
- Trigger immediate reconnect or disconnect when tunnel writer tasks fail instead of waiting for liveness timeouts
- Prevent active stream counter underflow during concurrent connection cleanup
## 2026-03-16 - 4.7.0 - feat(edge,protocol,test)
add configurable edge bind address and expand flow-control test coverage
- adds an optional bindAddress configuration for edge TCP listeners, defaulting to 0.0.0.0 when not provided
- passes bindAddress through the TypeScript edge client and Rust edge runtime so local test setups can bind to localhost
- adds protocol unit tests for adaptive stream window sizing and window update frame encoding/decoding
- introduces end-to-end flow-control tests and updates the test script to build before running tests
## 2026-03-16 - 4.6.1 - fix(remoteingress-core)
avoid spurious tunnel disconnect events and increase control channel capacity
- Emit TunnelDisconnected only after an established connection is actually lost, preventing false disconnect events during failed reconnect attempts.
- Increase edge and hub control-channel buffer sizes from 64 to 256 to better prioritize control frames under load.
## 2026-03-16 - 4.6.0 - feat(remoteingress-core)
add adaptive per-stream flow control based on active stream counts
- Track active stream counts on edge and hub connections to size per-stream flow control windows dynamically.
- Cap WINDOW_UPDATE increments and read sizes to the adaptive window so bandwidth is shared more evenly across concurrent streams.
- Apply the adaptive logic to both upload and download paths on edge and hub stream handlers.
## 2026-03-16 - 4.5.12 - fix(remoteingress-core)
improve tunnel liveness handling and enable TCP keepalive for accepted client sockets
- Avoid disconnecting edges when PING or PONG frames cannot be queued because the control channel is temporarily full.
- Enable TCP_NODELAY and TCP keepalive on accepted client connections to help detect stale or dropped clients.
## 2026-03-16 - 4.5.11 - fix(repo)
no changes to commit
## 2026-03-16 - 4.5.10 - fix(remoteingress-core)
guard zero-window reads to avoid false EOF handling on stalled streams
- Prevent upload and download loops from calling read on an empty buffer when flow-control window remains at 0 after stall timeout
- Log a warning and close the affected stream instead of misinterpreting Ok(0) as end-of-file
## 2026-03-16 - 4.5.9 - fix(remoteingress-core)
delay stream close until downstream response draining finishes to prevent truncated transfers

View File

@@ -1,6 +1,6 @@
{
"name": "@serve.zone/remoteingress",
"version": "4.5.9",
"version": "4.12.1",
"private": false,
"description": "Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.",
"main": "dist_ts/index.js",
@@ -9,7 +9,7 @@
"author": "Task Venture Capital GmbH",
"license": "MIT",
"scripts": {
"test": "(tstest test/ --verbose --logfile --timeout 60)",
"test": "(pnpm run build && tstest test/ --verbose --logfile --timeout 60)",
"build": "(tsbuild tsfolders --allowimplicitany && tsrust)",
"buildDocs": "(tsdoc)"
},

View File

@@ -17,7 +17,7 @@ pnpm install @serve.zone/remoteingress
`@serve.zone/remoteingress` uses a **Hub/Edge** topology with a high-performance Rust core and a TypeScript API surface:
```
┌─────────────────────┐ TLS Tunnel ┌─────────────────────┐
┌─────────────────────┐ TLS Tunnel ┌─────────────────────┐
│ Network Edge │ ◄══════════════════════════► │ Private Cluster │
│ │ (multiplexed frames + │ │
│ RemoteIngressEdge │ shared-secret auth) │ RemoteIngressHub │
@@ -48,6 +48,8 @@ pnpm install @serve.zone/remoteingress
- 🎛️ **Dynamic port configuration** — the hub assigns listen ports per edge and can hot-reload them at runtime via `FRAME_CONFIG` frames
- 📣 **Event-driven** — both Hub and Edge extend `EventEmitter` for real-time monitoring
-**Rust core** — all frame encoding, TLS, and TCP proxying happen in native code for maximum throughput
- 🎚️ **3-tier QoS** — control frames, normal data, and sustained (elephant flow) traffic each get their own priority queue
- 📊 **Adaptive flow control** — per-stream windows scale with active stream count to prevent memory overuse
## 🚀 Usage
@@ -280,6 +282,10 @@ The tunnel uses a custom binary frame protocol over TLS:
| `DATA_BACK` | `0x04` | Hub → Edge | Response data flowing downstream |
| `CLOSE_BACK` | `0x05` | Hub → Edge | Upstream (SmartProxy) closed the connection |
| `CONFIG` | `0x06` | Hub → Edge | Runtime configuration update (e.g. port changes); payload is JSON |
| `PING` | `0x07` | Hub → Edge | Heartbeat probe (sent every 15s) |
| `PONG` | `0x08` | Edge → Hub | Heartbeat response |
| `WINDOW_UPDATE` | `0x09` | Edge → Hub | Per-stream flow control: edge consumed N bytes, hub can send more |
| `WINDOW_UPDATE_BACK` | `0x0A` | Hub → Edge | Per-stream flow control: hub consumed N bytes, edge can send more |
Max payload size per frame: **16 MB**. Stream IDs are 32-bit unsigned integers.
@@ -292,6 +298,42 @@ Max payload size per frame: **16 MB**. Stream IDs are 32-bit unsigned integers.
5. Frame protocol begins — `OPEN`/`DATA`/`CLOSE` frames flow in both directions
6. Hub can push `CONFIG` frames at any time to update the edge's listen ports
## 🎚️ QoS & Flow Control
The tunnel multiplexer uses a **3-tier priority system** and **per-stream flow control** to ensure fair bandwidth sharing across thousands of concurrent streams.
### Priority Tiers
All outbound frames are queued into one of three priority levels:
| Tier | Queue | Frames | Behavior |
|------|-------|--------|----------|
| 🔴 **Control** (highest) | `ctrl_queue` | PING, PONG, WINDOW_UPDATE, OPEN, CLOSE, CONFIG | Always drained first. Never delayed. |
| 🟡 **Data** (normal) | `data_queue` | DATA, DATA_BACK from normal streams | Drained when ctrl is empty. Gated at 64 buffered items for backpressure. |
| 🟢 **Sustained** (lowest) | `sustained_queue` | DATA, DATA_BACK from elephant flows | Drained freely when ctrl+data are empty. Otherwise guaranteed **1 MB/s** via forced drain every second. |
This prevents large bulk transfers (e.g. git clones, file downloads) from starving interactive traffic and ensures `WINDOW_UPDATE` frames are never delayed — which would cause flow control deadlocks.
### Sustained Stream Classification
A stream is automatically classified as **sustained** (elephant flow) when:
- It has been active for **>10 seconds**, AND
- Its average throughput exceeds **20 Mbit/s** (2.5 MB/s)
Once classified, the stream's flow control window is locked to the **1 MB floor** and its data frames move to the lowest-priority queue. Classification is one-way — a stream never gets promoted back to normal.
### Adaptive Per-Stream Windows
Each stream has a send window that limits bytes-in-flight. The window size adapts to the number of active streams using a shared **200 MB memory budget**:
| Active Streams | Window per Stream |
|---|---|
| 150 | 4 MB (maximum) |
| 51100 | Scales down (4 MB → 2 MB) |
| 200+ | 1 MB (floor) |
The consumer sends `WINDOW_UPDATE` frames after processing data, allowing the producer to send more. This prevents any single stream from consuming unbounded memory and provides natural backpressure.
## 💡 Example Scenarios
### 1. Expose a Private Kubernetes Cluster to the Internet

572
rust/Cargo.lock generated
View File

@@ -95,6 +95,12 @@ version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af"
[[package]]
name = "bumpalo"
version = "3.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb"
[[package]]
name = "bytes"
version = "1.11.1"
@@ -113,12 +119,24 @@ dependencies = [
"shlex",
]
[[package]]
name = "cesu8"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c"
[[package]]
name = "cfg-if"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801"
[[package]]
name = "cfg_aliases"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "clap"
version = "4.5.58"
@@ -174,6 +192,32 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "combine"
version = "4.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd"
dependencies = [
"bytes",
"memchr",
]
[[package]]
name = "core-foundation"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "deranged"
version = "0.5.6"
@@ -222,6 +266,18 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "fastbloom"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e7f34442dbe69c60fe8eaf58a8cafff81a1f278816d8ab4db255b3bef4ac3c4"
dependencies = [
"getrandom 0.3.4",
"libm",
"rand",
"siphasher",
]
[[package]]
name = "find-msvc-tools"
version = "0.1.9"
@@ -253,8 +309,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"wasi",
"wasm-bindgen",
]
[[package]]
@@ -264,9 +322,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"r-efi",
"wasip2",
"wasm-bindgen",
]
[[package]]
@@ -311,6 +371,28 @@ dependencies = [
"syn",
]
[[package]]
name = "jni"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97"
dependencies = [
"cesu8",
"cfg-if",
"combine",
"jni-sys",
"log",
"thiserror 1.0.69",
"walkdir",
"windows-sys 0.45.0",
]
[[package]]
name = "jni-sys"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130"
[[package]]
name = "jobserver"
version = "0.1.34"
@@ -321,12 +403,28 @@ dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b49715b7073f385ba4bc528e5747d02e66cb39c6146efb66b781f131f0fb399c"
dependencies = [
"once_cell",
"wasm-bindgen",
]
[[package]]
name = "libc"
version = "0.2.182"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112"
[[package]]
name = "libm"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981"
[[package]]
name = "libmimalloc-sys"
version = "0.1.44"
@@ -352,6 +450,12 @@ version = "0.4.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
[[package]]
name = "lru-slab"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
[[package]]
name = "memchr"
version = "2.8.0"
@@ -396,6 +500,12 @@ version = "1.70.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe"
[[package]]
name = "openssl-probe"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe"
[[package]]
name = "parking_lot"
version = "0.12.5"
@@ -456,6 +566,15 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "ppv-lite86"
version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9"
dependencies = [
"zerocopy",
]
[[package]]
name = "proc-macro2"
version = "1.0.106"
@@ -465,6 +584,63 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "quinn"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20"
dependencies = [
"bytes",
"cfg_aliases",
"pin-project-lite",
"quinn-proto",
"quinn-udp",
"rustc-hash",
"rustls",
"socket2 0.6.2",
"thiserror 2.0.18",
"tokio",
"tracing",
"web-time",
]
[[package]]
name = "quinn-proto"
version = "0.11.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098"
dependencies = [
"bytes",
"fastbloom",
"getrandom 0.3.4",
"lru-slab",
"rand",
"ring",
"rustc-hash",
"rustls",
"rustls-pki-types",
"rustls-platform-verifier",
"slab",
"thiserror 2.0.18",
"tinyvec",
"tracing",
"web-time",
]
[[package]]
name = "quinn-udp"
version = "0.5.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd"
dependencies = [
"cfg_aliases",
"libc",
"once_cell",
"socket2 0.6.2",
"tracing",
"windows-sys 0.60.2",
]
[[package]]
name = "quote"
version = "1.0.44"
@@ -480,6 +656,35 @@ version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
[[package]]
name = "rand"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
dependencies = [
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c"
dependencies = [
"getrandom 0.3.4",
]
[[package]]
name = "rcgen"
version = "0.13.2"
@@ -551,13 +756,16 @@ dependencies = [
name = "remoteingress-core"
version = "2.0.0"
dependencies = [
"bytes",
"log",
"quinn",
"rcgen",
"remoteingress-protocol",
"rustls",
"rustls-pemfile",
"serde",
"serde_json",
"socket2 0.5.10",
"tokio",
"tokio-rustls",
"tokio-util",
@@ -567,7 +775,10 @@ dependencies = [
name = "remoteingress-protocol"
version = "2.0.0"
dependencies = [
"bytes",
"log",
"tokio",
"tokio-util",
]
[[package]]
@@ -584,6 +795,12 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "rustc-hash"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]]
name = "rustls"
version = "0.23.36"
@@ -600,6 +817,18 @@ dependencies = [
"zeroize",
]
[[package]]
name = "rustls-native-certs"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63"
dependencies = [
"openssl-probe",
"rustls-pki-types",
"schannel",
"security-framework",
]
[[package]]
name = "rustls-pemfile"
version = "2.2.0"
@@ -615,9 +844,37 @@ version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd"
dependencies = [
"web-time",
"zeroize",
]
[[package]]
name = "rustls-platform-verifier"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784"
dependencies = [
"core-foundation",
"core-foundation-sys",
"jni",
"log",
"once_cell",
"rustls",
"rustls-native-certs",
"rustls-platform-verifier-android",
"rustls-webpki",
"security-framework",
"security-framework-sys",
"webpki-root-certs",
"windows-sys 0.61.2",
]
[[package]]
name = "rustls-platform-verifier-android"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f"
[[package]]
name = "rustls-webpki"
version = "0.103.9"
@@ -630,12 +887,59 @@ dependencies = [
"untrusted",
]
[[package]]
name = "rustversion"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]]
name = "schannel"
version = "0.1.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91c1b7e4904c873ef0710c1f407dde2e6287de2bebc1bbbf7d430bb7cbffd939"
dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "security-framework"
version = "3.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d"
dependencies = [
"bitflags",
"core-foundation",
"core-foundation-sys",
"libc",
"security-framework-sys",
]
[[package]]
name = "security-framework-sys"
version = "2.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ce2691df843ecc5d231c0b14ece2acc3efb62c0a398c7e1d875f3983ce020e3"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "serde"
version = "1.0.228"
@@ -695,12 +999,34 @@ dependencies = [
"libc",
]
[[package]]
name = "siphasher"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e"
[[package]]
name = "slab"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5"
[[package]]
name = "smallvec"
version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
[[package]]
name = "socket2"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "socket2"
version = "0.6.2"
@@ -734,6 +1060,46 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "thiserror"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
dependencies = [
"thiserror-impl 1.0.69",
]
[[package]]
name = "thiserror"
version = "2.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4"
dependencies = [
"thiserror-impl 2.0.18",
]
[[package]]
name = "thiserror-impl"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "thiserror-impl"
version = "2.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "time"
version = "0.3.47"
@@ -753,6 +1119,21 @@ version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca"
[[package]]
name = "tinyvec"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e61e67053d25a4e82c844e8424039d9745781b3fc4f32b8d55ed50f5f667ef3"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.49.0"
@@ -765,7 +1146,7 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"socket2 0.6.2",
"tokio-macros",
"windows-sys 0.61.2",
]
@@ -804,6 +1185,26 @@ dependencies = [
"tokio",
]
[[package]]
name = "tracing"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100"
dependencies = [
"log",
"pin-project-lite",
"tracing-core",
]
[[package]]
name = "tracing-core"
version = "0.1.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a"
dependencies = [
"once_cell",
]
[[package]]
name = "unicode-ident"
version = "1.0.24"
@@ -822,6 +1223,16 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "walkdir"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
dependencies = [
"same-file",
"winapi-util",
]
[[package]]
name = "wasi"
version = "0.11.1+wasi-snapshot-preview1"
@@ -837,12 +1248,94 @@ dependencies = [
"wit-bindgen",
]
[[package]]
name = "wasm-bindgen"
version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6532f9a5c1ece3798cb1c2cfdba640b9b3ba884f5db45973a6f442510a87d38e"
dependencies = [
"cfg-if",
"once_cell",
"rustversion",
"wasm-bindgen-macro",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18a2d50fcf105fb33bb15f00e7a77b772945a2ee45dcf454961fd843e74c18e6"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03ce4caeaac547cdf713d280eda22a730824dd11e6b8c3ca9e42247b25c631e3"
dependencies = [
"bumpalo",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75a326b8c223ee17883a4251907455a2431acc2791c98c26279376490c378c16"
dependencies = [
"unicode-ident",
]
[[package]]
name = "web-time"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "webpki-root-certs"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "804f18a4ac2676ffb4e8b5b5fa9ae38af06df08162314f96a68d2a363e21a8ca"
dependencies = [
"rustls-pki-types",
]
[[package]]
name = "winapi-util"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "windows-link"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-sys"
version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets 0.42.2",
]
[[package]]
name = "windows-sys"
version = "0.52.0"
@@ -870,6 +1363,21 @@ dependencies = [
"windows-link",
]
[[package]]
name = "windows-targets"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071"
dependencies = [
"windows_aarch64_gnullvm 0.42.2",
"windows_aarch64_msvc 0.42.2",
"windows_i686_gnu 0.42.2",
"windows_i686_msvc 0.42.2",
"windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm 0.42.2",
"windows_x86_64_msvc 0.42.2",
]
[[package]]
name = "windows-targets"
version = "0.52.6"
@@ -903,6 +1411,12 @@ dependencies = [
"windows_x86_64_msvc 0.53.1",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.6"
@@ -915,6 +1429,12 @@ version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53"
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.6"
@@ -927,6 +1447,12 @@ version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006"
[[package]]
name = "windows_i686_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f"
[[package]]
name = "windows_i686_gnu"
version = "0.52.6"
@@ -951,6 +1477,12 @@ version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c"
[[package]]
name = "windows_i686_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060"
[[package]]
name = "windows_i686_msvc"
version = "0.52.6"
@@ -963,6 +1495,12 @@ version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2"
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.6"
@@ -975,6 +1513,12 @@ version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.6"
@@ -987,6 +1531,12 @@ version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1"
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.6"
@@ -1014,6 +1564,26 @@ dependencies = [
"time",
]
[[package]]
name = "zerocopy"
version = "0.8.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2578b716f8a7a858b7f02d5bd870c14bf4ddbbcf3a4c05414ba6503640505e3"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "zeroize"
version = "1.8.2"

View File

@@ -173,10 +173,10 @@ async fn handle_request(
serde_json::json!({ "edgeId": edge_id, "peerAddr": peer_addr }),
);
}
HubEvent::EdgeDisconnected { edge_id } => {
HubEvent::EdgeDisconnected { edge_id, reason } => {
send_event(
"edgeDisconnected",
serde_json::json!({ "edgeId": edge_id }),
serde_json::json!({ "edgeId": edge_id, "reason": reason }),
);
}
HubEvent::StreamOpened {
@@ -295,8 +295,8 @@ async fn handle_request(
EdgeEvent::TunnelConnected => {
send_event("tunnelConnected", serde_json::json!({}));
}
EdgeEvent::TunnelDisconnected => {
send_event("tunnelDisconnected", serde_json::json!({}));
EdgeEvent::TunnelDisconnected { reason } => {
send_event("tunnelDisconnected", serde_json::json!({ "reason": reason }));
}
EdgeEvent::PublicIpDiscovered { ip } => {
send_event(

View File

@@ -7,6 +7,7 @@ edition = "2021"
remoteingress-protocol = { path = "../remoteingress-protocol" }
tokio = { version = "1", features = ["full"] }
tokio-rustls = "0.26"
bytes = "1"
rustls = { version = "0.23", default-features = false, features = ["ring", "logging", "std", "tls12"] }
rcgen = "0.13"
serde = { version = "1", features = ["derive"] }
@@ -14,3 +15,5 @@ serde_json = "1"
log = "0.4"
rustls-pemfile = "2"
tokio-util = "0.7"
socket2 = "0.5"
quinn = "0.11"

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,7 @@
pub mod hub;
pub mod edge;
pub mod stun;
pub mod transport;
pub mod udp_session;
pub use remoteingress_protocol as protocol;

View File

@@ -0,0 +1,22 @@
pub mod quic;
use serde::{Deserialize, Serialize};
/// Transport mode for the tunnel connection between edge and hub.
///
/// - `TcpTls`: TCP + TLS with frame-based multiplexing via TunnelIo (default).
/// - `Quic`: QUIC with native stream multiplexing (one QUIC stream per tunneled connection).
/// - `QuicWithFallback`: Try QUIC first, fall back to TCP+TLS if UDP is blocked.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum TransportMode {
TcpTls,
Quic,
QuicWithFallback,
}
impl Default for TransportMode {
fn default() -> Self {
TransportMode::TcpTls
}
}

View File

@@ -0,0 +1,194 @@
use std::sync::Arc;
/// QUIC control stream message types (reuses frame type constants for consistency).
pub const CTRL_CONFIG: u8 = 0x06;
pub const CTRL_PING: u8 = 0x07;
pub const CTRL_PONG: u8 = 0x08;
/// Header size for control stream messages: [type:1][length:4] = 5 bytes.
pub const CTRL_HEADER_SIZE: usize = 5;
/// Build a quinn ClientConfig that skips server certificate verification
/// (auth is via shared secret, same as the TCP+TLS path).
pub fn build_quic_client_config() -> quinn::ClientConfig {
let mut tls_config = rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(Arc::new(NoCertVerifier))
.with_no_client_auth();
// QUIC mandates ALPN negotiation (RFC 9001 §8.1).
// Must match the server's ALPN protocol.
tls_config.alpn_protocols = vec![b"remoteingress".to_vec()];
let quic_config = quinn::crypto::rustls::QuicClientConfig::try_from(tls_config)
.expect("failed to build QUIC client config from rustls config");
let mut transport = quinn::TransportConfig::default();
transport.keep_alive_interval(Some(std::time::Duration::from_secs(15)));
transport.max_idle_timeout(Some(
quinn::IdleTimeout::try_from(std::time::Duration::from_secs(45)).unwrap(),
));
// Match MAX_STREAMS_PER_EDGE (1024) from hub.rs.
// Default is 100 which is too low for high-concurrency tunneling.
transport.max_concurrent_bidi_streams(1024u32.into());
// Enable QUIC datagrams (RFC 9221) for low-latency UDP tunneling.
transport.datagram_receive_buffer_size(Some(65536));
let mut client_config = quinn::ClientConfig::new(Arc::new(quic_config));
client_config.transport_config(Arc::new(transport));
client_config
}
/// Build a quinn ServerConfig from the same TLS server config used for TCP+TLS.
pub fn build_quic_server_config(
tls_server_config: rustls::ServerConfig,
) -> Result<quinn::ServerConfig, Box<dyn std::error::Error + Send + Sync>> {
let quic_config = quinn::crypto::rustls::QuicServerConfig::try_from(tls_server_config)?;
let mut transport = quinn::TransportConfig::default();
transport.keep_alive_interval(Some(std::time::Duration::from_secs(15)));
transport.max_idle_timeout(Some(
quinn::IdleTimeout::try_from(std::time::Duration::from_secs(45)).unwrap(),
));
transport.max_concurrent_bidi_streams(1024u32.into());
transport.datagram_receive_buffer_size(Some(65536));
let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_config));
server_config.transport_config(Arc::new(transport));
Ok(server_config)
}
/// Write a control message to a QUIC send stream.
/// Format: [type:1][length:4][payload:N]
pub async fn write_ctrl_message(
send: &mut quinn::SendStream,
msg_type: u8,
payload: &[u8],
) -> Result<(), std::io::Error> {
let len = payload.len() as u32;
let mut header = [0u8; CTRL_HEADER_SIZE];
header[0] = msg_type;
header[1..5].copy_from_slice(&len.to_be_bytes());
send.write_all(&header).await?;
if !payload.is_empty() {
send.write_all(payload).await?;
}
Ok(())
}
/// Read a control message from a QUIC recv stream.
/// Returns (msg_type, payload). Returns None on EOF.
pub async fn read_ctrl_message(
recv: &mut quinn::RecvStream,
) -> Result<Option<(u8, Vec<u8>)>, std::io::Error> {
let mut header = [0u8; CTRL_HEADER_SIZE];
match recv.read_exact(&mut header).await {
Ok(()) => {}
Err(e) => {
if let quinn::ReadExactError::FinishedEarly(_) = e {
return Ok(None);
}
return Err(std::io::Error::new(std::io::ErrorKind::Other, e));
}
}
let msg_type = header[0];
let len = u32::from_be_bytes([header[1], header[2], header[3], header[4]]) as usize;
let mut payload = vec![0u8; len];
if len > 0 {
recv.read_exact(&mut payload).await.map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, e)
})?;
}
Ok(Some((msg_type, payload)))
}
/// Write the PROXY v1 header as the first bytes on a QUIC data stream.
/// The header is length-prefixed so the receiver knows where it ends and data begins.
/// Format: [header_len:4][proxy_header:N]
pub async fn write_proxy_header(
send: &mut quinn::SendStream,
proxy_header: &str,
) -> Result<(), std::io::Error> {
let header_bytes = proxy_header.as_bytes();
let len = header_bytes.len() as u32;
send.write_all(&len.to_be_bytes()).await?;
send.write_all(header_bytes).await?;
Ok(())
}
/// Read the PROXY v1 header from the first bytes of a QUIC data stream.
/// Returns the header string.
pub async fn read_proxy_header(
recv: &mut quinn::RecvStream,
) -> Result<String, std::io::Error> {
let mut len_buf = [0u8; 4];
recv.read_exact(&mut len_buf).await.map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, e)
})?;
let len = u32::from_be_bytes(len_buf) as usize;
if len > 8192 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"proxy header too long",
));
}
let mut header = vec![0u8; len];
recv.read_exact(&mut header).await.map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, e)
})?;
String::from_utf8(header).map_err(|_| {
std::io::Error::new(std::io::ErrorKind::InvalidData, "proxy header not UTF-8")
})
}
/// TLS certificate verifier that accepts any certificate (auth is via shared secret).
/// Same as the one in edge.rs but placed here so the QUIC module is self-contained.
#[derive(Debug)]
struct NoCertVerifier;
impl rustls::client::danger::ServerCertVerifier for NoCertVerifier {
fn verify_server_cert(
&self,
_end_entity: &rustls::pki_types::CertificateDer<'_>,
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
_server_name: &rustls::pki_types::ServerName<'_>,
_ocsp_response: &[u8],
_now: rustls::pki_types::UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
Ok(rustls::client::danger::ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
vec![
rustls::SignatureScheme::RSA_PKCS1_SHA256,
rustls::SignatureScheme::RSA_PKCS1_SHA384,
rustls::SignatureScheme::RSA_PKCS1_SHA512,
rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
rustls::SignatureScheme::ECDSA_NISTP521_SHA512,
rustls::SignatureScheme::RSA_PSS_SHA256,
rustls::SignatureScheme::RSA_PSS_SHA384,
rustls::SignatureScheme::RSA_PSS_SHA512,
rustls::SignatureScheme::ED25519,
rustls::SignatureScheme::ED448,
]
}
}

View File

@@ -0,0 +1,210 @@
use std::collections::HashMap;
use std::net::SocketAddr;
use tokio::time::Instant;
/// Key identifying a unique UDP "session" (one client endpoint talking to one destination port).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct UdpSessionKey {
pub client_addr: SocketAddr,
pub dest_port: u16,
}
/// A single UDP session tracked by the edge.
pub struct UdpSession {
pub stream_id: u32,
pub client_addr: SocketAddr,
pub dest_port: u16,
pub last_activity: Instant,
}
/// Manages UDP sessions with idle timeout expiry.
pub struct UdpSessionManager {
/// Forward map: session key → session data.
sessions: HashMap<UdpSessionKey, UdpSession>,
/// Reverse map: stream_id → session key (for dispatching return traffic).
by_stream_id: HashMap<u32, UdpSessionKey>,
/// Idle timeout duration.
idle_timeout: std::time::Duration,
}
impl UdpSessionManager {
pub fn new(idle_timeout: std::time::Duration) -> Self {
Self {
sessions: HashMap::new(),
by_stream_id: HashMap::new(),
idle_timeout,
}
}
/// Look up an existing session by key. Updates last_activity on hit.
pub fn get_mut(&mut self, key: &UdpSessionKey) -> Option<&mut UdpSession> {
let session = self.sessions.get_mut(key)?;
session.last_activity = Instant::now();
Some(session)
}
/// Look up a session's client address by stream_id (for return traffic).
pub fn client_addr_for_stream(&self, stream_id: u32) -> Option<SocketAddr> {
let key = self.by_stream_id.get(&stream_id)?;
self.sessions.get(key).map(|s| s.client_addr)
}
/// Look up a session by stream_id. Updates last_activity on hit.
pub fn get_by_stream_id(&mut self, stream_id: u32) -> Option<&mut UdpSession> {
let key = self.by_stream_id.get(&stream_id)?;
let session = self.sessions.get_mut(key)?;
session.last_activity = Instant::now();
Some(session)
}
/// Insert a new session. Returns a mutable reference to it.
pub fn insert(&mut self, key: UdpSessionKey, stream_id: u32) -> &mut UdpSession {
let session = UdpSession {
stream_id,
client_addr: key.client_addr,
dest_port: key.dest_port,
last_activity: Instant::now(),
};
self.by_stream_id.insert(stream_id, key);
self.sessions.entry(key).or_insert(session)
}
/// Remove a session by stream_id.
pub fn remove_by_stream_id(&mut self, stream_id: u32) -> Option<UdpSession> {
if let Some(key) = self.by_stream_id.remove(&stream_id) {
self.sessions.remove(&key)
} else {
None
}
}
/// Expire idle sessions. Returns the stream_ids of expired sessions.
pub fn expire_idle(&mut self) -> Vec<u32> {
let now = Instant::now();
let timeout = self.idle_timeout;
let expired_keys: Vec<UdpSessionKey> = self
.sessions
.iter()
.filter(|(_, s)| now.duration_since(s.last_activity) >= timeout)
.map(|(k, _)| *k)
.collect();
let mut expired_ids = Vec::with_capacity(expired_keys.len());
for key in expired_keys {
if let Some(session) = self.sessions.remove(&key) {
self.by_stream_id.remove(&session.stream_id);
expired_ids.push(session.stream_id);
}
}
expired_ids
}
/// Number of active sessions.
pub fn len(&self) -> usize {
self.sessions.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
fn addr(port: u16) -> SocketAddr {
SocketAddr::from(([127, 0, 0, 1], port))
}
#[test]
fn test_insert_and_lookup() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 1);
assert_eq!(mgr.len(), 1);
assert!(mgr.get_mut(&key).is_some());
assert_eq!(mgr.get_mut(&key).unwrap().stream_id, 1);
}
#[test]
fn test_client_addr_for_stream() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 42);
assert_eq!(mgr.client_addr_for_stream(42), Some(addr(5000)));
assert_eq!(mgr.client_addr_for_stream(99), None);
}
#[test]
fn test_remove_by_stream_id() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 1);
let removed = mgr.remove_by_stream_id(1);
assert!(removed.is_some());
assert_eq!(mgr.len(), 0);
assert!(mgr.get_mut(&key).is_none());
assert_eq!(mgr.client_addr_for_stream(1), None);
}
#[test]
fn test_remove_nonexistent() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
assert!(mgr.remove_by_stream_id(999).is_none());
}
#[tokio::test]
async fn test_expire_idle() {
let mut mgr = UdpSessionManager::new(Duration::from_millis(50));
let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
let key2 = UdpSessionKey { client_addr: addr(5001), dest_port: 53 };
mgr.insert(key1, 1);
mgr.insert(key2, 2);
// Nothing expired yet
assert!(mgr.expire_idle().is_empty());
assert_eq!(mgr.len(), 2);
// Wait for timeout
tokio::time::sleep(Duration::from_millis(60)).await;
let expired = mgr.expire_idle();
assert_eq!(expired.len(), 2);
assert_eq!(mgr.len(), 0);
}
#[tokio::test]
async fn test_activity_prevents_expiry() {
let mut mgr = UdpSessionManager::new(Duration::from_millis(100));
let key = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
mgr.insert(key, 1);
// Touch session at 50ms (before 100ms timeout)
tokio::time::sleep(Duration::from_millis(50)).await;
mgr.get_mut(&key); // refreshes last_activity
// At 80ms from last touch, should still be alive
tokio::time::sleep(Duration::from_millis(80)).await;
assert!(mgr.expire_idle().is_empty());
assert_eq!(mgr.len(), 1);
// Wait for full timeout from last activity
tokio::time::sleep(Duration::from_millis(30)).await;
let expired = mgr.expire_idle();
assert_eq!(expired.len(), 1);
}
#[test]
fn test_multiple_sessions_same_client_different_ports() {
let mut mgr = UdpSessionManager::new(Duration::from_secs(60));
let key1 = UdpSessionKey { client_addr: addr(5000), dest_port: 53 };
let key2 = UdpSessionKey { client_addr: addr(5000), dest_port: 443 };
mgr.insert(key1, 1);
mgr.insert(key2, 2);
assert_eq!(mgr.len(), 2);
assert_eq!(mgr.get_mut(&key1).unwrap().stream_id, 1);
assert_eq!(mgr.get_mut(&key2).unwrap().stream_id, 2);
}
}

View File

@@ -4,4 +4,10 @@ version = "2.0.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["io-util"] }
tokio = { version = "1", features = ["io-util", "sync", "time"] }
tokio-util = "0.7"
bytes = "1"
log = "0.4"
[dev-dependencies]
tokio = { version = "1", features = ["io-util", "macros", "rt"] }

View File

@@ -1,4 +1,11 @@
use tokio::io::{AsyncRead, AsyncReadExt};
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use bytes::{Bytes, BytesMut, BufMut};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf};
use tokio::time::Instant;
// Frame type constants
pub const FRAME_OPEN: u8 = 0x01;
@@ -12,6 +19,12 @@ pub const FRAME_PONG: u8 = 0x08; // Edge -> Hub: heartbeat response
pub const FRAME_WINDOW_UPDATE: u8 = 0x09; // Edge -> Hub: per-stream flow control
pub const FRAME_WINDOW_UPDATE_BACK: u8 = 0x0A; // Hub -> Edge: per-stream flow control
// UDP tunnel frame types
pub const FRAME_UDP_OPEN: u8 = 0x0B; // Edge -> Hub: open UDP session (payload: PROXY v2 header)
pub const FRAME_UDP_DATA: u8 = 0x0C; // Edge -> Hub: UDP datagram
pub const FRAME_UDP_DATA_BACK: u8 = 0x0D; // Hub -> Edge: UDP datagram
pub const FRAME_UDP_CLOSE: u8 = 0x0E; // Either direction: close UDP session
// Frame header size: 4 (stream_id) + 1 (type) + 4 (length) = 9 bytes
pub const FRAME_HEADER_SIZE: usize = 9;
@@ -19,19 +32,36 @@ pub const FRAME_HEADER_SIZE: usize = 9;
pub const MAX_PAYLOAD_SIZE: u32 = 16 * 1024 * 1024;
// Per-stream flow control constants
/// Initial per-stream window size (4 MB). Sized for full throughput at high RTT:
/// at 100ms RTT, this sustains ~40 MB/s per stream.
/// Initial (and maximum) per-stream window size (4 MB).
pub const INITIAL_STREAM_WINDOW: u32 = 4 * 1024 * 1024;
/// Send WINDOW_UPDATE after consuming this many bytes (half the initial window).
pub const WINDOW_UPDATE_THRESHOLD: u32 = INITIAL_STREAM_WINDOW / 2;
/// Maximum window size to prevent overflow.
pub const MAX_WINDOW_SIZE: u32 = 16 * 1024 * 1024;
pub const MAX_WINDOW_SIZE: u32 = 4 * 1024 * 1024;
// Sustained stream classification constants
/// Throughput threshold for sustained classification (2.5 MB/s = 20 Mbit/s).
pub const SUSTAINED_THRESHOLD_BPS: u64 = 2_500_000;
/// Minimum duration before a stream can be classified as sustained.
pub const SUSTAINED_MIN_DURATION_SECS: u64 = 10;
/// Fixed window for sustained streams (1 MB — the floor).
pub const SUSTAINED_WINDOW: u32 = 1 * 1024 * 1024;
/// Maximum bytes written from sustained queue per forced drain (1 MB/s guarantee).
pub const SUSTAINED_FORCED_DRAIN_CAP: usize = 1_048_576;
/// Encode a WINDOW_UPDATE frame for a specific stream.
pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> Vec<u8> {
pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> Bytes {
encode_frame(stream_id, frame_type, &increment.to_be_bytes())
}
/// Compute the target per-stream window size based on the number of active streams.
/// Total memory budget is ~200MB shared across all streams. Up to 50 streams get the
/// full 4MB window; above that the window scales down to a 1MB floor at 200+ streams.
pub fn compute_window_for_stream_count(active: u32) -> u32 {
let per_stream = (200 * 1024 * 1024u64) / (active.max(1) as u64);
per_stream.clamp(1 * 1024 * 1024, INITIAL_STREAM_WINDOW as u64) as u32
}
/// Decode a WINDOW_UPDATE payload into a byte increment. Returns None if payload is malformed.
pub fn decode_window_update(payload: &[u8]) -> Option<u32> {
if payload.len() != 4 {
@@ -45,18 +75,28 @@ pub fn decode_window_update(payload: &[u8]) -> Option<u32> {
pub struct Frame {
pub stream_id: u32,
pub frame_type: u8,
pub payload: Vec<u8>,
pub payload: Bytes,
}
/// Encode a frame into bytes: [stream_id:4][type:1][length:4][payload]
pub fn encode_frame(stream_id: u32, frame_type: u8, payload: &[u8]) -> Vec<u8> {
pub fn encode_frame(stream_id: u32, frame_type: u8, payload: &[u8]) -> Bytes {
let len = payload.len() as u32;
let mut buf = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len());
buf.extend_from_slice(&stream_id.to_be_bytes());
buf.push(frame_type);
buf.extend_from_slice(&len.to_be_bytes());
buf.extend_from_slice(payload);
buf
let mut buf = BytesMut::with_capacity(FRAME_HEADER_SIZE + payload.len());
buf.put_slice(&stream_id.to_be_bytes());
buf.put_u8(frame_type);
buf.put_slice(&len.to_be_bytes());
buf.put_slice(payload);
buf.freeze()
}
/// Write a frame header into `buf[0..FRAME_HEADER_SIZE]`.
/// The caller must ensure payload is already at `buf[FRAME_HEADER_SIZE..FRAME_HEADER_SIZE + payload_len]`.
/// This enables zero-copy encoding: read directly into `buf[FRAME_HEADER_SIZE..]`, then
/// prepend the header without copying the payload.
pub fn encode_frame_header(buf: &mut [u8], stream_id: u32, frame_type: u8, payload_len: usize) {
buf[0..4].copy_from_slice(&stream_id.to_be_bytes());
buf[4] = frame_type;
buf[5..9].copy_from_slice(&(payload_len as u32).to_be_bytes());
}
/// Build a PROXY protocol v1 header line.
@@ -73,6 +113,76 @@ pub fn build_proxy_v1_header(
)
}
/// PROXY protocol v2 signature (12 bytes).
pub const PROXY_V2_SIGNATURE: [u8; 12] = [
0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A,
];
/// Transport protocol for PROXY v2 header.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProxyV2Transport {
/// TCP (STREAM) — byte 13 low nibble = 0x1
Tcp,
/// UDP (DGRAM) — byte 13 low nibble = 0x2
Udp,
}
/// Build a PROXY protocol v2 binary header for IPv4.
///
/// Returns a 28-byte header:
/// - 12B signature
/// - 1B version (0x2) + command (0x1 = PROXY)
/// - 1B address family (0x1 = AF_INET) + transport (0x1 = TCP, 0x2 = UDP)
/// - 2B address block length (0x000C = 12)
/// - 4B source IPv4 address
/// - 4B destination IPv4 address
/// - 2B source port
/// - 2B destination port
pub fn build_proxy_v2_header(
src_ip: &std::net::Ipv4Addr,
dst_ip: &std::net::Ipv4Addr,
src_port: u16,
dst_port: u16,
transport: ProxyV2Transport,
) -> Bytes {
let mut buf = BytesMut::with_capacity(28);
// Signature (12 bytes)
buf.put_slice(&PROXY_V2_SIGNATURE);
// Version 2 + PROXY command
buf.put_u8(0x21);
// AF_INET (0x1) + transport
let transport_nibble = match transport {
ProxyV2Transport::Tcp => 0x1,
ProxyV2Transport::Udp => 0x2,
};
buf.put_u8(0x10 | transport_nibble);
// Address block length: 12 bytes for IPv4
buf.put_u16(12);
// Source address (4 bytes, network byte order)
buf.put_slice(&src_ip.octets());
// Destination address (4 bytes, network byte order)
buf.put_slice(&dst_ip.octets());
// Source port (2 bytes, network byte order)
buf.put_u16(src_port);
// Destination port (2 bytes, network byte order)
buf.put_u16(dst_port);
buf.freeze()
}
/// Build a PROXY protocol v2 binary header from string IP addresses.
/// Falls back to 0.0.0.0 if parsing fails.
pub fn build_proxy_v2_header_from_str(
src_ip: &str,
dst_ip: &str,
src_port: u16,
dst_port: u16,
transport: ProxyV2Transport,
) -> Bytes {
let src: std::net::Ipv4Addr = src_ip.parse().unwrap_or(std::net::Ipv4Addr::UNSPECIFIED);
let dst: std::net::Ipv4Addr = dst_ip.parse().unwrap_or(std::net::Ipv4Addr::UNSPECIFIED);
build_proxy_v2_header(&src, &dst, src_port, dst_port, transport)
}
/// Stateful async frame reader that yields `Frame` values from an `AsyncRead`.
pub struct FrameReader<R> {
reader: R,
@@ -111,13 +221,17 @@ impl<R: AsyncRead + Unpin> FrameReader<R> {
]);
if length > MAX_PAYLOAD_SIZE {
log::error!(
"CORRUPT FRAME HEADER: raw={:02x?} stream_id={} type=0x{:02x} length={}",
self.header_buf, stream_id, frame_type, length
);
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("frame payload too large: {} bytes", length),
format!("frame payload too large: {} bytes (header={:02x?})", length, self.header_buf),
));
}
let mut payload = vec![0u8; length as usize];
let mut payload = BytesMut::zeroed(length as usize);
if length > 0 {
self.reader.read_exact(&mut payload).await?;
}
@@ -125,7 +239,7 @@ impl<R: AsyncRead + Unpin> FrameReader<R> {
Ok(Some(Frame {
stream_id,
frame_type,
payload,
payload: payload.freeze(),
}))
}
@@ -135,10 +249,409 @@ impl<R: AsyncRead + Unpin> FrameReader<R> {
}
}
// ---------------------------------------------------------------------------
// TunnelIo: single-owner I/O multiplexer for the TLS tunnel connection
// ---------------------------------------------------------------------------
/// Events produced by the TunnelIo event loop.
#[derive(Debug)]
pub enum TunnelEvent {
/// A complete frame was read from the remote side.
Frame(Frame),
/// The remote side closed the connection (EOF).
Eof,
/// A read error occurred.
ReadError(std::io::Error),
/// A write error occurred.
WriteError(std::io::Error),
/// No frames received for the liveness timeout duration.
LivenessTimeout,
/// The cancellation token was triggered.
Cancelled,
}
/// Write state extracted into a sub-struct so the borrow checker can see
/// disjoint field access between `self.write` and `self.stream`.
struct WriteState {
ctrl_queue: VecDeque<Bytes>, // PONG, WINDOW_UPDATE, CLOSE, OPEN — always first
data_queue: VecDeque<Bytes>, // DATA, DATA_BACK — only when ctrl is empty
sustained_queue: VecDeque<Bytes>, // DATA, DATA_BACK from sustained streams — lowest priority
offset: usize, // progress within current frame being written
flush_needed: bool,
// Sustained starvation prevention: guaranteed 1 MB/s drain
sustained_last_drain: Instant,
sustained_bytes_this_period: usize,
}
impl WriteState {
fn has_work(&self) -> bool {
!self.ctrl_queue.is_empty() || !self.data_queue.is_empty() || !self.sustained_queue.is_empty()
}
}
/// Single-owner I/O engine for the tunnel TLS connection.
///
/// Owns the TLS stream directly — no `tokio::io::split()`, no mutex.
/// Uses three priority write queues:
/// 1. ctrl (PONG, WINDOW_UPDATE, CLOSE, OPEN) — always first
/// 2. data (DATA, DATA_BACK from normal streams) — when ctrl empty
/// 3. sustained (DATA, DATA_BACK from sustained streams) — lowest priority,
/// drained freely when ctrl+data empty, or forced 1MB/s when they're not
pub struct TunnelIo<S> {
stream: S,
// Read state: accumulate bytes, parse frames incrementally
read_buf: Vec<u8>,
read_pos: usize,
parse_pos: usize,
// Write state: extracted sub-struct for safe disjoint borrows
write: WriteState,
}
impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
pub fn new(stream: S, initial_data: Vec<u8>) -> Self {
let read_pos = initial_data.len();
let mut read_buf = initial_data;
if read_buf.capacity() < 65536 {
read_buf.reserve(65536 - read_buf.len());
}
Self {
stream,
read_buf,
read_pos,
parse_pos: 0,
write: WriteState {
ctrl_queue: VecDeque::new(),
data_queue: VecDeque::new(),
sustained_queue: VecDeque::new(),
offset: 0,
flush_needed: false,
sustained_last_drain: Instant::now(),
sustained_bytes_this_period: 0,
},
}
}
/// Queue a high-priority control frame (PONG, WINDOW_UPDATE, CLOSE, OPEN).
pub fn queue_ctrl(&mut self, frame: Bytes) {
self.write.ctrl_queue.push_back(frame);
}
/// Queue a lower-priority data frame (DATA, DATA_BACK).
pub fn queue_data(&mut self, frame: Bytes) {
self.write.data_queue.push_back(frame);
}
/// Queue a lowest-priority sustained data frame.
pub fn queue_sustained(&mut self, frame: Bytes) {
self.write.sustained_queue.push_back(frame);
}
/// Try to parse a complete frame from the read buffer.
/// Uses a parse_pos cursor to avoid drain() on every frame.
pub fn try_parse_frame(&mut self) -> Option<Result<Frame, std::io::Error>> {
let available = self.read_pos - self.parse_pos;
if available < FRAME_HEADER_SIZE {
return None;
}
let base = self.parse_pos;
let stream_id = u32::from_be_bytes([
self.read_buf[base], self.read_buf[base + 1],
self.read_buf[base + 2], self.read_buf[base + 3],
]);
let frame_type = self.read_buf[base + 4];
let length = u32::from_be_bytes([
self.read_buf[base + 5], self.read_buf[base + 6],
self.read_buf[base + 7], self.read_buf[base + 8],
]);
if length > MAX_PAYLOAD_SIZE {
let header = [
self.read_buf[base], self.read_buf[base + 1],
self.read_buf[base + 2], self.read_buf[base + 3],
self.read_buf[base + 4], self.read_buf[base + 5],
self.read_buf[base + 6], self.read_buf[base + 7],
self.read_buf[base + 8],
];
log::error!(
"CORRUPT FRAME HEADER: raw={:02x?} stream_id={} type=0x{:02x} length={}",
header, stream_id, frame_type, length
);
return Some(Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("frame payload too large: {} bytes (header={:02x?})", length, header),
)));
}
let total_frame_size = FRAME_HEADER_SIZE + length as usize;
if available < total_frame_size {
return None;
}
let payload = Bytes::copy_from_slice(
&self.read_buf[base + FRAME_HEADER_SIZE..base + total_frame_size],
);
self.parse_pos += total_frame_size;
// Compact when parse_pos > half the data to reclaim memory
if self.parse_pos > self.read_pos / 2 && self.parse_pos > 0 {
self.read_buf.drain(..self.parse_pos);
self.read_pos -= self.parse_pos;
self.parse_pos = 0;
}
Some(Ok(Frame { stream_id, frame_type, payload }))
}
/// Poll-based I/O step. Returns Ready on events, Pending when idle.
///
/// Order: write(ctrl->data->sustained) -> flush -> read -> channels -> timers
pub fn poll_step(
&mut self,
cx: &mut Context<'_>,
ctrl_rx: &mut tokio::sync::mpsc::Receiver<Bytes>,
data_rx: &mut tokio::sync::mpsc::Receiver<Bytes>,
sustained_rx: &mut tokio::sync::mpsc::Receiver<Bytes>,
liveness_deadline: &mut Pin<Box<tokio::time::Sleep>>,
cancel_token: &tokio_util::sync::CancellationToken,
) -> Poll<TunnelEvent> {
// 1. WRITE: 3-tier priority — ctrl first, then data, then sustained.
// Sustained drains freely when ctrl+data are empty.
// Write one frame, set flush_needed, then flush must complete before
// writing more. This prevents unbounded TLS session buffer growth.
// Safe: `self.write` and `self.stream` are disjoint fields.
let mut writes = 0;
while self.write.has_work() && writes < 16 && !self.write.flush_needed {
// Pick queue: ctrl > data > sustained
let queue_id = if !self.write.ctrl_queue.is_empty() {
0 // ctrl
} else if !self.write.data_queue.is_empty() {
1 // data
} else {
2 // sustained
};
let frame = match queue_id {
0 => self.write.ctrl_queue.front().unwrap(),
1 => self.write.data_queue.front().unwrap(),
_ => self.write.sustained_queue.front().unwrap(),
};
let remaining = &frame[self.write.offset..];
match Pin::new(&mut self.stream).poll_write(cx, remaining) {
Poll::Ready(Ok(0)) => {
log::error!("TunnelIo: poll_write returned 0 (write zero), ctrl_q={} data_q={} sustained_q={}",
self.write.ctrl_queue.len(), self.write.data_queue.len(), self.write.sustained_queue.len());
return Poll::Ready(TunnelEvent::WriteError(
std::io::Error::new(std::io::ErrorKind::WriteZero, "write zero"),
));
}
Poll::Ready(Ok(n)) => {
self.write.offset += n;
self.write.flush_needed = true;
if self.write.offset >= frame.len() {
match queue_id {
0 => { self.write.ctrl_queue.pop_front(); }
1 => { self.write.data_queue.pop_front(); }
_ => {
self.write.sustained_queue.pop_front();
self.write.sustained_last_drain = Instant::now();
self.write.sustained_bytes_this_period = 0;
}
}
self.write.offset = 0;
writes += 1;
}
}
Poll::Ready(Err(e)) => {
log::error!("TunnelIo: poll_write error: {} (ctrl_q={} data_q={} sustained_q={})",
e, self.write.ctrl_queue.len(), self.write.data_queue.len(), self.write.sustained_queue.len());
return Poll::Ready(TunnelEvent::WriteError(e));
}
Poll::Pending => break,
}
}
// 1b. FORCED SUSTAINED DRAIN: when ctrl/data have work but sustained is waiting,
// guarantee at least 1 MB/s by draining up to SUSTAINED_FORCED_DRAIN_CAP
// once per second.
if !self.write.sustained_queue.is_empty()
&& (!self.write.ctrl_queue.is_empty() || !self.write.data_queue.is_empty())
&& !self.write.flush_needed
{
let now = Instant::now();
if now.duration_since(self.write.sustained_last_drain) >= Duration::from_secs(1) {
self.write.sustained_bytes_this_period = 0;
self.write.sustained_last_drain = now;
while !self.write.sustained_queue.is_empty()
&& self.write.sustained_bytes_this_period < SUSTAINED_FORCED_DRAIN_CAP
&& !self.write.flush_needed
{
let frame = self.write.sustained_queue.front().unwrap();
let remaining = &frame[self.write.offset..];
match Pin::new(&mut self.stream).poll_write(cx, remaining) {
Poll::Ready(Ok(0)) => {
return Poll::Ready(TunnelEvent::WriteError(
std::io::Error::new(std::io::ErrorKind::WriteZero, "write zero"),
));
}
Poll::Ready(Ok(n)) => {
self.write.offset += n;
self.write.flush_needed = true;
self.write.sustained_bytes_this_period += n;
if self.write.offset >= frame.len() {
self.write.sustained_queue.pop_front();
self.write.offset = 0;
}
}
Poll::Ready(Err(e)) => {
return Poll::Ready(TunnelEvent::WriteError(e));
}
Poll::Pending => break,
}
}
}
}
// 2. FLUSH: push encrypted data from TLS session to TCP.
if self.write.flush_needed {
match Pin::new(&mut self.stream).poll_flush(cx) {
Poll::Ready(Ok(())) => {
self.write.flush_needed = false;
}
Poll::Ready(Err(e)) => {
log::error!("TunnelIo: poll_flush error: {}", e);
return Poll::Ready(TunnelEvent::WriteError(e));
}
Poll::Pending => {} // TCP waker will notify us
}
}
// 3. READ: drain stream until Pending to ensure the TCP waker is always registered.
// Without this loop, a Ready return with partial frame data would consume
// the waker without re-registering it, causing the task to sleep until a
// timer or channel wakes it (potentially 15+ seconds of lost reads).
loop {
// Compact if needed to make room for reads
if self.parse_pos > 0 && self.read_buf.len() - self.read_pos < 32768 {
self.read_buf.drain(..self.parse_pos);
self.read_pos -= self.parse_pos;
self.parse_pos = 0;
}
if self.read_buf.len() < self.read_pos + 32768 {
self.read_buf.resize(self.read_pos + 32768, 0);
}
let mut rbuf = ReadBuf::new(&mut self.read_buf[self.read_pos..]);
match Pin::new(&mut self.stream).poll_read(cx, &mut rbuf) {
Poll::Ready(Ok(())) => {
let n = rbuf.filled().len();
if n == 0 {
return Poll::Ready(TunnelEvent::Eof);
}
self.read_pos += n;
if let Some(result) = self.try_parse_frame() {
return match result {
Ok(frame) => Poll::Ready(TunnelEvent::Frame(frame)),
Err(e) => Poll::Ready(TunnelEvent::ReadError(e)),
};
}
// Partial data — loop to call poll_read again so the TCP
// waker is re-registered when it finally returns Pending.
}
Poll::Ready(Err(e)) => {
log::error!("TunnelIo: poll_read error: {}", e);
return Poll::Ready(TunnelEvent::ReadError(e));
}
Poll::Pending => break,
}
}
// 4. CHANNELS: drain ctrl (always — priority), data (only if queue is small).
// Ctrl frames must never be delayed — always drain fully.
// Data frames are gated: keep data in the bounded channel for proper
// backpressure when TLS writes are slow. Without this gate, the internal
// data_queue (unbounded VecDeque) grows to hundreds of MB under throttle -> OOM.
let mut got_new = false;
loop {
match ctrl_rx.poll_recv(cx) {
Poll::Ready(Some(frame)) => { self.write.ctrl_queue.push_back(frame); got_new = true; }
Poll::Ready(None) => {
return Poll::Ready(TunnelEvent::WriteError(
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "ctrl channel closed"),
));
}
Poll::Pending => break,
}
}
if self.write.data_queue.len() < 64 {
loop {
match data_rx.poll_recv(cx) {
Poll::Ready(Some(frame)) => { self.write.data_queue.push_back(frame); got_new = true; }
Poll::Ready(None) => {
return Poll::Ready(TunnelEvent::WriteError(
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "data channel closed"),
));
}
Poll::Pending => break,
}
}
}
// Sustained channel: drain when sustained_queue is small (same backpressure pattern).
// Channel close is non-fatal — not all connections have sustained streams.
if self.write.sustained_queue.len() < 64 {
loop {
match sustained_rx.poll_recv(cx) {
Poll::Ready(Some(frame)) => { self.write.sustained_queue.push_back(frame); got_new = true; }
Poll::Ready(None) | Poll::Pending => break,
}
}
}
// 5. TIMERS
if liveness_deadline.as_mut().poll(cx).is_ready() {
return Poll::Ready(TunnelEvent::LivenessTimeout);
}
if cancel_token.is_cancelled() {
return Poll::Ready(TunnelEvent::Cancelled);
}
// 6. SELF-WAKE: only when flush is complete AND we have work.
// When flush is Pending, the TCP write-readiness waker will notify us.
// CRITICAL: do NOT self-wake when flush_needed — poll_write always returns
// Ready (TLS buffers in-memory), so self-waking causes a tight spin loop
// that fills the TLS session buffer unboundedly -> OOM -> ECONNRESET.
if !self.write.flush_needed && (got_new || self.write.has_work()) {
cx.waker().wake_by_ref();
}
Poll::Pending
}
pub fn into_inner(self) -> S {
self.stream
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_encode_frame_header() {
let payload = b"hello";
let mut buf = vec![0u8; FRAME_HEADER_SIZE + payload.len()];
buf[FRAME_HEADER_SIZE..].copy_from_slice(payload);
encode_frame_header(&mut buf, 42, FRAME_DATA, payload.len());
assert_eq!(buf, &encode_frame(42, FRAME_DATA, payload)[..]);
}
#[test]
fn test_encode_frame_header_empty_payload() {
let mut buf = vec![0u8; FRAME_HEADER_SIZE];
encode_frame_header(&mut buf, 99, FRAME_CLOSE, 0);
assert_eq!(buf, &encode_frame(99, FRAME_CLOSE, &[])[..]);
}
#[test]
fn test_encode_frame() {
let data = b"hello";
@@ -167,6 +680,62 @@ mod tests {
assert_eq!(header, "PROXY TCP4 1.2.3.4 5.6.7.8 12345 443\r\n");
}
#[test]
fn test_proxy_v2_header_tcp4() {
let src = "198.51.100.10".parse().unwrap();
let dst = "203.0.113.25".parse().unwrap();
let header = build_proxy_v2_header(&src, &dst, 54321, 8443, ProxyV2Transport::Tcp);
assert_eq!(header.len(), 28);
// Signature
assert_eq!(&header[0..12], &PROXY_V2_SIGNATURE);
// Version 2 + PROXY command
assert_eq!(header[12], 0x21);
// AF_INET + STREAM (TCP)
assert_eq!(header[13], 0x11);
// Address length = 12
assert_eq!(u16::from_be_bytes([header[14], header[15]]), 12);
// Source IP: 198.51.100.10
assert_eq!(&header[16..20], &[198, 51, 100, 10]);
// Dest IP: 203.0.113.25
assert_eq!(&header[20..24], &[203, 0, 113, 25]);
// Source port: 54321
assert_eq!(u16::from_be_bytes([header[24], header[25]]), 54321);
// Dest port: 8443
assert_eq!(u16::from_be_bytes([header[26], header[27]]), 8443);
}
#[test]
fn test_proxy_v2_header_udp4() {
let src = "10.0.0.1".parse().unwrap();
let dst = "10.0.0.2".parse().unwrap();
let header = build_proxy_v2_header(&src, &dst, 12345, 53, ProxyV2Transport::Udp);
assert_eq!(header.len(), 28);
assert_eq!(header[12], 0x21); // v2, PROXY
assert_eq!(header[13], 0x12); // AF_INET + DGRAM (UDP)
assert_eq!(&header[16..20], &[10, 0, 0, 1]); // src
assert_eq!(&header[20..24], &[10, 0, 0, 2]); // dst
assert_eq!(u16::from_be_bytes([header[24], header[25]]), 12345);
assert_eq!(u16::from_be_bytes([header[26], header[27]]), 53);
}
#[test]
fn test_proxy_v2_header_from_str() {
let header = build_proxy_v2_header_from_str("1.2.3.4", "5.6.7.8", 1000, 443, ProxyV2Transport::Tcp);
assert_eq!(header.len(), 28);
assert_eq!(&header[16..20], &[1, 2, 3, 4]);
assert_eq!(&header[20..24], &[5, 6, 7, 8]);
}
#[test]
fn test_proxy_v2_header_from_str_invalid_ip() {
let header = build_proxy_v2_header_from_str("not-an-ip", "also-not", 1000, 443, ProxyV2Transport::Udp);
assert_eq!(header.len(), 28);
// Falls back to 0.0.0.0
assert_eq!(&header[16..20], &[0, 0, 0, 0]);
assert_eq!(&header[20..24], &[0, 0, 0, 0]);
assert_eq!(header[13], 0x12); // UDP
}
#[tokio::test]
async fn test_frame_reader() {
let frame1 = encode_frame(1, FRAME_OPEN, b"PROXY TCP4 1.2.3.4 5.6.7.8 1234 443\r\n");
@@ -304,7 +873,7 @@ mod tests {
let frame = reader.next_frame().await.unwrap().unwrap();
assert_eq!(frame.stream_id, i as u32);
assert_eq!(frame.frame_type, ft);
assert_eq!(frame.payload, format!("payload_{}", i).as_bytes());
assert_eq!(&frame.payload[..], format!("payload_{}", i).as_bytes());
}
assert!(reader.next_frame().await.unwrap().is_none());
@@ -313,7 +882,7 @@ mod tests {
#[tokio::test]
async fn test_frame_reader_zero_length_payload() {
let data = encode_frame(42, FRAME_CLOSE, &[]);
let cursor = std::io::Cursor::new(data);
let cursor = std::io::Cursor::new(data.to_vec());
let mut reader = FrameReader::new(cursor);
let frame = reader.next_frame().await.unwrap().unwrap();
@@ -336,4 +905,101 @@ mod tests {
assert_eq!(&pong[0..4], &0u32.to_be_bytes());
assert_eq!(pong.len(), FRAME_HEADER_SIZE);
}
// --- compute_window_for_stream_count tests ---
#[test]
fn test_adaptive_window_zero_streams() {
// 0 streams treated as 1: 200MB/1 -> clamped to 4MB max
assert_eq!(compute_window_for_stream_count(0), INITIAL_STREAM_WINDOW);
}
#[test]
fn test_adaptive_window_one_stream() {
assert_eq!(compute_window_for_stream_count(1), INITIAL_STREAM_WINDOW);
}
#[test]
fn test_adaptive_window_50_streams_full() {
// 200MB/50 = 4MB = exactly INITIAL_STREAM_WINDOW
assert_eq!(compute_window_for_stream_count(50), INITIAL_STREAM_WINDOW);
}
#[test]
fn test_adaptive_window_51_streams_starts_scaling() {
// 200MB/51 < 4MB — first value below max
let w = compute_window_for_stream_count(51);
assert!(w < INITIAL_STREAM_WINDOW);
assert_eq!(w, (200 * 1024 * 1024u64 / 51) as u32);
}
#[test]
fn test_adaptive_window_100_streams() {
// 200MB/100 = 2MB
assert_eq!(compute_window_for_stream_count(100), 2 * 1024 * 1024);
}
#[test]
fn test_adaptive_window_200_streams_at_floor() {
// 200MB/200 = 1MB = exactly the floor
assert_eq!(compute_window_for_stream_count(200), 1 * 1024 * 1024);
}
#[test]
fn test_adaptive_window_500_streams_clamped() {
// 200MB/500 = 0.4MB -> clamped up to 1MB floor
assert_eq!(compute_window_for_stream_count(500), 1 * 1024 * 1024);
}
#[test]
fn test_adaptive_window_max_u32() {
// Extreme: u32::MAX streams -> tiny value -> clamped to 1MB
assert_eq!(compute_window_for_stream_count(u32::MAX), 1 * 1024 * 1024);
}
#[test]
fn test_adaptive_window_monotonically_decreasing() {
let mut prev = compute_window_for_stream_count(1);
for n in [2, 10, 50, 51, 100, 200, 500, 1000] {
let w = compute_window_for_stream_count(n);
assert!(w <= prev, "window increased from {} to {} at n={}", prev, w, n);
prev = w;
}
}
#[test]
fn test_adaptive_window_total_budget_bounded() {
// active x per_stream_window should never exceed 200MB (+ clamp overhead for high N)
for n in [1, 10, 50, 100, 200] {
let w = compute_window_for_stream_count(n);
let total = w as u64 * n as u64;
assert!(total <= 200 * 1024 * 1024, "total {}MB exceeds budget at n={}", total / (1024*1024), n);
}
}
// --- encode/decode window_update roundtrip ---
#[test]
fn test_window_update_roundtrip() {
for &increment in &[0u32, 1, 64 * 1024, INITIAL_STREAM_WINDOW, MAX_WINDOW_SIZE, u32::MAX] {
let frame = encode_window_update(42, FRAME_WINDOW_UPDATE, increment);
assert_eq!(frame[4], FRAME_WINDOW_UPDATE);
let decoded = decode_window_update(&frame[FRAME_HEADER_SIZE..]);
assert_eq!(decoded, Some(increment));
}
}
#[test]
fn test_window_update_back_roundtrip() {
let frame = encode_window_update(7, FRAME_WINDOW_UPDATE_BACK, 1234567);
assert_eq!(frame[4], FRAME_WINDOW_UPDATE_BACK);
assert_eq!(decode_window_update(&frame[FRAME_HEADER_SIZE..]), Some(1234567));
}
#[test]
fn test_decode_window_update_malformed() {
assert_eq!(decode_window_update(&[]), None);
assert_eq!(decode_window_update(&[0, 0, 0]), None);
assert_eq!(decode_window_update(&[0, 0, 0, 0, 0]), None);
}
}

View File

@@ -0,0 +1,475 @@
import { expect, tap } from '@push.rocks/tapbundle';
import * as net from 'net';
import * as crypto from 'crypto';
import { RemoteIngressHub, RemoteIngressEdge } from '../ts/index.js';
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/** Find N free ports by binding to port 0 and collecting OS-assigned ports. */
async function findFreePorts(count: number): Promise<number[]> {
const servers: net.Server[] = [];
const ports: number[] = [];
for (let i = 0; i < count; i++) {
const server = net.createServer();
await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve));
ports.push((server.address() as net.AddressInfo).port);
servers.push(server);
}
await Promise.all(servers.map((s) => new Promise<void>((resolve) => s.close(() => resolve()))));
return ports;
}
type TrackingServer = net.Server & { destroyAll: () => void };
/** Start a TCP echo server that tracks connections for force-close. */
function startEchoServer(port: number, host: string): Promise<TrackingServer> {
return new Promise((resolve, reject) => {
const connections = new Set<net.Socket>();
const server = net.createServer((socket) => {
connections.add(socket);
socket.on('close', () => connections.delete(socket));
// Skip PROXY protocol v1 header line before echoing
let proxyHeaderParsed = false;
let pendingBuf = Buffer.alloc(0);
socket.on('data', (data: Buffer) => {
if (!proxyHeaderParsed) {
pendingBuf = Buffer.concat([pendingBuf, data]);
const idx = pendingBuf.indexOf('\r\n');
if (idx !== -1) {
proxyHeaderParsed = true;
const remainder = pendingBuf.subarray(idx + 2);
if (remainder.length > 0) {
socket.write(remainder);
}
}
return;
}
socket.write(data);
});
socket.on('error', () => {});
}) as TrackingServer;
server.destroyAll = () => {
for (const conn of connections) conn.destroy();
connections.clear();
};
server.on('error', reject);
server.listen(port, host, () => resolve(server));
});
}
/**
* Start a server that sends a large response immediately on first data received.
* Does NOT wait for end (the tunnel protocol has no half-close).
* On receiving first data chunk after PROXY header, sends responseSize bytes then closes.
*/
function startLargeResponseServer(port: number, host: string, responseSize: number): Promise<TrackingServer> {
return new Promise((resolve, reject) => {
const connections = new Set<net.Socket>();
const server = net.createServer((socket) => {
connections.add(socket);
socket.on('close', () => connections.delete(socket));
let proxyHeaderParsed = false;
let pendingBuf = Buffer.alloc(0);
let responseSent = false;
socket.on('data', (data: Buffer) => {
if (!proxyHeaderParsed) {
pendingBuf = Buffer.concat([pendingBuf, data]);
const idx = pendingBuf.indexOf('\r\n');
if (idx !== -1) {
proxyHeaderParsed = true;
const remainder = pendingBuf.subarray(idx + 2);
if (remainder.length > 0 && !responseSent) {
responseSent = true;
sendLargeResponse(socket, responseSize);
}
}
return;
}
if (!responseSent) {
responseSent = true;
sendLargeResponse(socket, responseSize);
}
});
socket.on('error', () => {});
}) as TrackingServer;
server.destroyAll = () => {
for (const conn of connections) conn.destroy();
connections.clear();
};
server.on('error', reject);
server.listen(port, host, () => resolve(server));
});
}
function sendLargeResponse(socket: net.Socket, totalBytes: number) {
const chunkSize = 32 * 1024;
let sent = 0;
const writeChunk = () => {
while (sent < totalBytes) {
const toWrite = Math.min(chunkSize, totalBytes - sent);
// Use a deterministic pattern for verification
const chunk = Buffer.alloc(toWrite, (sent % 256) & 0xff);
const canContinue = socket.write(chunk);
sent += toWrite;
if (!canContinue) {
socket.once('drain', writeChunk);
return;
}
}
socket.end();
};
writeChunk();
}
/** Force-close a server: destroy all connections, then close. */
async function forceCloseServer(server: TrackingServer): Promise<void> {
server.destroyAll();
await new Promise<void>((resolve) => server.close(() => resolve()));
}
interface TestTunnel {
hub: RemoteIngressHub;
edge: RemoteIngressEdge;
edgePort: number;
cleanup: () => Promise<void>;
}
/**
* Start a full hub + edge tunnel.
* Edge binds to 127.0.0.1, upstream server binds to 127.0.0.2.
* Hub targetHost = 127.0.0.2 so hub -> upstream doesn't loop back to edge.
*/
async function startTunnel(edgePort: number, hubPort: number): Promise<TestTunnel> {
const hub = new RemoteIngressHub();
const edge = new RemoteIngressEdge();
await hub.start({
tunnelPort: hubPort,
targetHost: '127.0.0.2',
});
await hub.updateAllowedEdges([
{ id: 'test-edge', secret: 'test-secret', listenPorts: [edgePort] },
]);
const connectedPromise = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('Edge did not connect within 10s')), 10000);
edge.once('tunnelConnected', () => {
clearTimeout(timeout);
resolve();
});
});
await edge.start({
hubHost: '127.0.0.1',
hubPort,
edgeId: 'test-edge',
secret: 'test-secret',
bindAddress: '127.0.0.1',
});
await connectedPromise;
await new Promise((resolve) => setTimeout(resolve, 500));
return {
hub,
edge,
edgePort,
cleanup: async () => {
await edge.stop();
await hub.stop();
},
};
}
/**
* Send data through the tunnel and collect the echoed response.
*/
function sendAndReceive(port: number, data: Buffer, timeoutMs = 30000): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
let totalReceived = 0;
const expectedLength = data.length;
let settled = false;
const client = net.createConnection({ host: '127.0.0.1', port }, () => {
client.write(data);
client.end();
});
const timer = setTimeout(() => {
if (!settled) {
settled = true;
client.destroy();
reject(new Error(`Timeout after ${timeoutMs}ms — received ${totalReceived}/${expectedLength} bytes`));
}
}, timeoutMs);
client.on('data', (chunk: Buffer) => {
chunks.push(chunk);
totalReceived += chunk.length;
if (totalReceived >= expectedLength && !settled) {
settled = true;
clearTimeout(timer);
client.destroy();
resolve(Buffer.concat(chunks));
}
});
client.on('end', () => {
if (!settled) {
settled = true;
clearTimeout(timer);
resolve(Buffer.concat(chunks));
}
});
client.on('error', (err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
reject(err);
}
});
});
}
/**
* Connect to the tunnel, send a small request, and collect a large response.
* Does NOT call end() — the tunnel has no half-close.
* Instead, collects until expectedResponseSize bytes arrive.
*/
function sendAndReceiveLarge(
port: number,
data: Buffer,
expectedResponseSize: number,
timeoutMs = 60000,
): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
let totalReceived = 0;
let settled = false;
const client = net.createConnection({ host: '127.0.0.1', port }, () => {
client.write(data);
// Do NOT call client.end() — the server will respond immediately
// and the tunnel CLOSE will happen when the download finishes
});
const timer = setTimeout(() => {
if (!settled) {
settled = true;
client.destroy();
reject(new Error(`Timeout after ${timeoutMs}ms — received ${totalReceived}/${expectedResponseSize} bytes`));
}
}, timeoutMs);
client.on('data', (chunk: Buffer) => {
chunks.push(chunk);
totalReceived += chunk.length;
if (totalReceived >= expectedResponseSize && !settled) {
settled = true;
clearTimeout(timer);
client.destroy();
resolve(Buffer.concat(chunks));
}
});
client.on('end', () => {
if (!settled) {
settled = true;
clearTimeout(timer);
resolve(Buffer.concat(chunks));
}
});
client.on('error', (err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
reject(err);
}
});
});
}
function sha256(buf: Buffer): string {
return crypto.createHash('sha256').update(buf).digest('hex');
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
let tunnel: TestTunnel;
let echoServer: TrackingServer;
let hubPort: number;
let edgePort: number;
tap.test('TCP/TLS setup: start TCP echo server and TCP+TLS tunnel', async () => {
[hubPort, edgePort] = await findFreePorts(2);
echoServer = await startEchoServer(edgePort, '127.0.0.2');
tunnel = await startTunnel(edgePort, hubPort);
expect(tunnel.hub.running).toBeTrue();
});
tap.test('TCP/TLS: single TCP stream — 32MB transfer exceeding initial 4MB window', async () => {
const size = 32 * 1024 * 1024;
const data = crypto.randomBytes(size);
const expectedHash = sha256(data);
const received = await sendAndReceive(edgePort, data, 60000);
expect(received.length).toEqual(size);
expect(sha256(received)).toEqual(expectedHash);
});
tap.test('TCP/TLS: 200 concurrent TCP streams x 64KB each', async () => {
const streamCount = 200;
const payloadSize = 64 * 1024;
const promises = Array.from({ length: streamCount }, () => {
const data = crypto.randomBytes(payloadSize);
const hash = sha256(data);
return sendAndReceive(edgePort, data, 30000).then((received) => ({
sent: hash,
received: sha256(received),
sizeOk: received.length === payloadSize,
}));
});
const results = await Promise.all(promises);
const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received);
expect(failures.length).toEqual(0);
});
tap.test('TCP/TLS: 512 concurrent TCP streams at minimum window boundary (16KB each)', async () => {
const streamCount = 512;
const payloadSize = 16 * 1024;
const promises = Array.from({ length: streamCount }, () => {
const data = crypto.randomBytes(payloadSize);
const hash = sha256(data);
return sendAndReceive(edgePort, data, 60000).then((received) => ({
sent: hash,
received: sha256(received),
sizeOk: received.length === payloadSize,
}));
});
const results = await Promise.all(promises);
const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received);
expect(failures.length).toEqual(0);
});
tap.test('TCP/TLS: asymmetric TCP transfer — 4KB request -> 4MB response', async () => {
// Swap to large-response server
await forceCloseServer(echoServer);
const responseSize = 4 * 1024 * 1024; // 4 MB
const largeServer = await startLargeResponseServer(edgePort, '127.0.0.2', responseSize);
try {
const requestData = crypto.randomBytes(4 * 1024); // 4 KB
const received = await sendAndReceiveLarge(edgePort, requestData, responseSize, 60000);
expect(received.length).toEqual(responseSize);
} finally {
// Always restore echo server even on failure
await forceCloseServer(largeServer);
echoServer = await startEchoServer(edgePort, '127.0.0.2');
}
});
tap.test('TCP/TLS: 100 TCP streams x 1MB each (100MB total exceeding 200MB budget)', async () => {
const streamCount = 100;
const payloadSize = 1 * 1024 * 1024;
const promises = Array.from({ length: streamCount }, () => {
const data = crypto.randomBytes(payloadSize);
const hash = sha256(data);
return sendAndReceive(edgePort, data, 120000).then((received) => ({
sent: hash,
received: sha256(received),
sizeOk: received.length === payloadSize,
}));
});
const results = await Promise.all(promises);
const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received);
expect(failures.length).toEqual(0);
});
tap.test('TCP/TLS: active TCP stream counter tracks concurrent connections', async () => {
const N = 50;
// Open N connections and keep them alive (send data but don't close)
const sockets: net.Socket[] = [];
const connectPromises = Array.from({ length: N }, () => {
return new Promise<net.Socket>((resolve, reject) => {
const sock = net.createConnection({ host: '127.0.0.1', port: edgePort }, () => {
resolve(sock);
});
sock.on('error', () => {});
setTimeout(() => reject(new Error('connect timeout')), 5000);
});
});
const connected = await Promise.all(connectPromises);
sockets.push(...connected);
// Brief delay for stream registration to propagate
await new Promise((resolve) => setTimeout(resolve, 500));
// Verify the edge reports >= N active streams.
// This counter is the input to compute_window_for_stream_count(),
// so its accuracy determines whether adaptive window sizing is correct.
const status = await tunnel.edge.getStatus();
expect(status.activeStreams).toBeGreaterThanOrEqual(N);
// Clean up: destroy all sockets (the tunnel's 300s stream timeout will handle cleanup)
for (const sock of sockets) {
sock.destroy();
}
});
tap.test('TCP/TLS: 50 TCP streams x 2MB each (forces multiple window refills)', async () => {
// At 50 concurrent streams: adaptive window = 200MB/50 = 4MB per stream
// Each stream sends 2MB → needs ~3 WINDOW_UPDATE refill cycles per stream
const streamCount = 50;
const payloadSize = 2 * 1024 * 1024;
const promises = Array.from({ length: streamCount }, () => {
const data = crypto.randomBytes(payloadSize);
const hash = sha256(data);
return sendAndReceive(edgePort, data, 120000).then((received) => ({
sent: hash,
received: sha256(received),
sizeOk: received.length === payloadSize,
}));
});
const results = await Promise.all(promises);
const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received);
expect(failures.length).toEqual(0);
});
tap.test('TCP/TLS teardown: stop tunnel and TCP echo server', async () => {
await tunnel.cleanup();
await forceCloseServer(echoServer);
});
export default tap.start();

402
test/test.loadtest.node.ts Normal file
View File

@@ -0,0 +1,402 @@
import { expect, tap } from '@push.rocks/tapbundle';
import * as net from 'net';
import * as stream from 'stream';
import * as crypto from 'crypto';
import { RemoteIngressHub, RemoteIngressEdge } from '../ts/index.js';
// ---------------------------------------------------------------------------
// Helpers (self-contained — same patterns as test.flowcontrol.node.ts)
// ---------------------------------------------------------------------------
async function findFreePorts(count: number): Promise<number[]> {
const servers: net.Server[] = [];
const ports: number[] = [];
for (let i = 0; i < count; i++) {
const server = net.createServer();
await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve));
ports.push((server.address() as net.AddressInfo).port);
servers.push(server);
}
await Promise.all(servers.map((s) => new Promise<void>((resolve) => s.close(() => resolve()))));
return ports;
}
type TrackingServer = net.Server & { destroyAll: () => void };
function startEchoServer(port: number, host: string): Promise<TrackingServer> {
return new Promise((resolve, reject) => {
const connections = new Set<net.Socket>();
const server = net.createServer((socket) => {
connections.add(socket);
socket.on('close', () => connections.delete(socket));
let proxyHeaderParsed = false;
let pendingBuf = Buffer.alloc(0);
socket.on('data', (data: Buffer) => {
if (!proxyHeaderParsed) {
pendingBuf = Buffer.concat([pendingBuf, data]);
const idx = pendingBuf.indexOf('\r\n');
if (idx !== -1) {
proxyHeaderParsed = true;
const remainder = pendingBuf.subarray(idx + 2);
if (remainder.length > 0) socket.write(remainder);
}
return;
}
socket.write(data);
});
socket.on('error', () => {});
}) as TrackingServer;
server.destroyAll = () => {
for (const conn of connections) conn.destroy();
connections.clear();
};
server.on('error', reject);
server.listen(port, host, () => resolve(server));
});
}
function sendAndReceive(port: number, data: Buffer, timeoutMs = 30000): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
let totalReceived = 0;
const expectedLength = data.length;
let settled = false;
const client = net.createConnection({ host: '127.0.0.1', port }, () => {
client.write(data);
client.end();
});
const timer = setTimeout(() => {
if (!settled) {
settled = true;
client.destroy();
reject(new Error(`Timeout after ${timeoutMs}ms — received ${totalReceived}/${expectedLength} bytes`));
}
}, timeoutMs);
client.on('data', (chunk: Buffer) => {
chunks.push(chunk);
totalReceived += chunk.length;
if (totalReceived >= expectedLength && !settled) {
settled = true;
clearTimeout(timer);
client.destroy();
resolve(Buffer.concat(chunks));
}
});
client.on('end', () => {
if (!settled) {
settled = true;
clearTimeout(timer);
resolve(Buffer.concat(chunks));
}
});
client.on('error', (err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
reject(err);
}
});
});
}
function sha256(buf: Buffer): string {
return crypto.createHash('sha256').update(buf).digest('hex');
}
// ---------------------------------------------------------------------------
// Throttle Proxy: rate-limits TCP traffic between edge and hub
// ---------------------------------------------------------------------------
class ThrottleTransform extends stream.Transform {
private bytesPerSec: number;
private bucket: number;
private lastRefill: number;
private destroyed_: boolean = false;
constructor(bytesPerSecond: number) {
super();
this.bytesPerSec = bytesPerSecond;
this.bucket = bytesPerSecond;
this.lastRefill = Date.now();
}
_transform(chunk: Buffer, _encoding: BufferEncoding, callback: stream.TransformCallback) {
if (this.destroyed_) return;
const now = Date.now();
const elapsed = (now - this.lastRefill) / 1000;
this.bucket = Math.min(this.bytesPerSec, this.bucket + elapsed * this.bytesPerSec);
this.lastRefill = now;
if (chunk.length <= this.bucket) {
this.bucket -= chunk.length;
callback(null, chunk);
} else {
// Not enough budget — delay the entire chunk (don't split)
const deficit = chunk.length - this.bucket;
this.bucket = 0;
const delayMs = Math.min((deficit / this.bytesPerSec) * 1000, 1000);
setTimeout(() => {
if (this.destroyed_) { callback(); return; }
this.lastRefill = Date.now();
this.bucket = 0;
callback(null, chunk);
}, delayMs);
}
}
_destroy(err: Error | null, callback: (error: Error | null) => void) {
this.destroyed_ = true;
callback(err);
}
}
interface ThrottleProxy {
server: net.Server;
close: () => Promise<void>;
}
async function startThrottleProxy(
listenPort: number,
targetHost: string,
targetPort: number,
bytesPerSecond: number,
): Promise<ThrottleProxy> {
const connections = new Set<net.Socket>();
const server = net.createServer((clientSock) => {
connections.add(clientSock);
const upstream = net.createConnection({ host: targetHost, port: targetPort });
connections.add(upstream);
const throttleUp = new ThrottleTransform(bytesPerSecond);
const throttleDown = new ThrottleTransform(bytesPerSecond);
clientSock.pipe(throttleUp).pipe(upstream);
upstream.pipe(throttleDown).pipe(clientSock);
let cleaned = false;
const cleanup = (source: string, err?: Error) => {
if (cleaned) return;
cleaned = true;
if (err) {
console.error(`[ThrottleProxy] cleanup triggered by ${source}: ${err.message}`);
} else {
console.error(`[ThrottleProxy] cleanup triggered by ${source} (no error)`);
}
console.error(`[ThrottleProxy] stack:`, new Error().stack);
throttleUp.destroy();
throttleDown.destroy();
clientSock.destroy();
upstream.destroy();
connections.delete(clientSock);
connections.delete(upstream);
};
clientSock.on('error', (e) => cleanup('clientSock.error', e));
upstream.on('error', (e) => cleanup('upstream.error', e));
throttleUp.on('error', (e) => cleanup('throttleUp.error', e));
throttleDown.on('error', (e) => cleanup('throttleDown.error', e));
clientSock.on('close', () => cleanup('clientSock.close'));
upstream.on('close', () => cleanup('upstream.close'));
});
await new Promise<void>((resolve) => server.listen(listenPort, '127.0.0.1', resolve));
return {
server,
close: async () => {
for (const c of connections) c.destroy();
connections.clear();
await new Promise<void>((resolve) => server.close(() => resolve()));
},
};
}
// ---------------------------------------------------------------------------
// Test state
// ---------------------------------------------------------------------------
let hub: RemoteIngressHub;
let edge: RemoteIngressEdge;
let echoServer: TrackingServer;
let throttle: ThrottleProxy;
let hubPort: number;
let proxyPort: number;
let edgePort: number;
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
tap.test('TCP/TLS setup: start throttled TCP+TLS tunnel (100 Mbit/s)', async () => {
[hubPort, proxyPort, edgePort] = await findFreePorts(3);
echoServer = await startEchoServer(edgePort, '127.0.0.2');
// Throttle proxy: edge → proxy → hub at 100 Mbit/s (12.5 MB/s)
throttle = await startThrottleProxy(proxyPort, '127.0.0.1', hubPort, 12.5 * 1024 * 1024);
hub = new RemoteIngressHub();
edge = new RemoteIngressEdge();
await hub.start({ tunnelPort: hubPort, targetHost: '127.0.0.2' });
await hub.updateAllowedEdges([
{ id: 'test-edge', secret: 'test-secret', listenPorts: [edgePort] },
]);
const connectedPromise = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('Edge did not connect within 10s')), 10000);
edge.once('tunnelConnected', () => {
clearTimeout(timeout);
resolve();
});
});
// Edge connects through throttle proxy
await edge.start({
hubHost: '127.0.0.1',
hubPort: proxyPort,
edgeId: 'test-edge',
secret: 'test-secret',
bindAddress: '127.0.0.1',
});
await connectedPromise;
await new Promise((resolve) => setTimeout(resolve, 500));
const status = await edge.getStatus();
expect(status.connected).toBeTrue();
});
tap.test('TCP/TLS throttled: 5 TCP streams x 20MB each through 100Mbit tunnel', async () => {
const streamCount = 5;
const payloadSize = 20 * 1024 * 1024; // 20MB per stream = 100MB total round-trip
const payloads = Array.from({ length: streamCount }, () => crypto.randomBytes(payloadSize));
const promises = payloads.map((data) => {
const hash = sha256(data);
return sendAndReceive(edgePort, data, 300000).then((received) => ({
sent: hash,
received: sha256(received),
sizeOk: received.length === payloadSize,
}));
});
const results = await Promise.all(promises);
const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received);
expect(failures.length).toEqual(0);
const status = await edge.getStatus();
expect(status.connected).toBeTrue();
});
tap.test('TCP/TLS throttled: slow TCP consumer with 20MB does not kill other streams', async () => {
// Open a connection that creates download-direction backpressure:
// send 20MB but DON'T read the response — client TCP receive buffer fills
const slowSock = net.createConnection({ host: '127.0.0.1', port: edgePort });
await new Promise<void>((resolve) => slowSock.on('connect', resolve));
const slowData = crypto.randomBytes(20 * 1024 * 1024);
slowSock.write(slowData);
slowSock.end();
// Don't read — backpressure builds on the download path
// Wait for backpressure to develop
await new Promise((r) => setTimeout(r, 2000));
// Meanwhile, 5 normal echo streams with 20MB each must complete
const payload = crypto.randomBytes(20 * 1024 * 1024);
const hash = sha256(payload);
const promises = Array.from({ length: 5 }, () =>
sendAndReceive(edgePort, payload, 300000).then((r) => ({
hash: sha256(r),
sizeOk: r.length === payload.length,
}))
);
const results = await Promise.all(promises);
const failures = results.filter((r) => !r.sizeOk || r.hash !== hash);
expect(failures.length).toEqual(0);
// Tunnel still alive
const status = await edge.getStatus();
expect(status.connected).toBeTrue();
slowSock.destroy();
});
tap.test('TCP/TLS throttled: rapid churn — 3 x 20MB long + 50 x 1MB short TCP streams', async () => {
// 3 long streams (20MB each) running alongside 50 short streams (1MB each)
const longPayload = crypto.randomBytes(20 * 1024 * 1024);
const longHash = sha256(longPayload);
const longPromises = Array.from({ length: 3 }, () =>
sendAndReceive(edgePort, longPayload, 300000).then((r) => ({
hash: sha256(r),
sizeOk: r.length === longPayload.length,
}))
);
const shortPayload = crypto.randomBytes(1024 * 1024);
const shortHash = sha256(shortPayload);
const shortPromises = Array.from({ length: 50 }, () =>
sendAndReceive(edgePort, shortPayload, 300000).then((r) => ({
hash: sha256(r),
sizeOk: r.length === shortPayload.length,
}))
);
const [longResults, shortResults] = await Promise.all([
Promise.all(longPromises),
Promise.all(shortPromises),
]);
const longFails = longResults.filter((r) => !r.sizeOk || r.hash !== longHash);
const shortFails = shortResults.filter((r) => !r.sizeOk || r.hash !== shortHash);
expect(longFails.length).toEqual(0);
expect(shortFails.length).toEqual(0);
const status = await edge.getStatus();
expect(status.connected).toBeTrue();
});
tap.test('TCP/TLS throttled: 3 burst waves of 5 TCP streams x 20MB each', async () => {
for (let wave = 0; wave < 3; wave++) {
const streamCount = 5;
const payloadSize = 20 * 1024 * 1024; // 20MB per stream = 100MB per wave
const promises = Array.from({ length: streamCount }, () => {
const data = crypto.randomBytes(payloadSize);
return sendAndReceive(edgePort, data, 300000).then((r) => r.length === payloadSize);
});
const results = await Promise.all(promises);
const ok = results.filter(Boolean).length;
expect(ok).toEqual(streamCount);
// Brief pause between waves
await new Promise((r) => setTimeout(r, 500));
const status = await edge.getStatus();
expect(status.connected).toBeTrue();
}
});
tap.test('TCP/TLS throttled: TCP tunnel still works after all load tests', async () => {
const data = crypto.randomBytes(1024);
const hash = sha256(data);
const received = await sendAndReceive(edgePort, data, 30000);
expect(sha256(received)).toEqual(hash);
const status = await edge.getStatus();
expect(status.connected).toBeTrue();
});
tap.test('TCP/TLS teardown: stop throttled tunnel', async () => {
await edge.stop();
await hub.stop();
if (throttle) await throttle.close();
await new Promise<void>((resolve) => echoServer.close(() => resolve()));
});
export default tap.start();

283
test/test.quic.node.ts Normal file
View File

@@ -0,0 +1,283 @@
import { expect, tap } from '@push.rocks/tapbundle';
import * as net from 'net';
import * as crypto from 'crypto';
import { RemoteIngressHub, RemoteIngressEdge } from '../ts/index.js';
// ---------------------------------------------------------------------------
// Helpers (same patterns as test.flowcontrol.node.ts)
// ---------------------------------------------------------------------------
async function findFreePorts(count: number): Promise<number[]> {
const servers: net.Server[] = [];
const ports: number[] = [];
for (let i = 0; i < count; i++) {
const server = net.createServer();
await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve));
ports.push((server.address() as net.AddressInfo).port);
servers.push(server);
}
await Promise.all(servers.map((s) => new Promise<void>((resolve) => s.close(() => resolve()))));
return ports;
}
type TrackingServer = net.Server & { destroyAll: () => void };
function startEchoServer(port: number, host: string): Promise<TrackingServer> {
return new Promise((resolve, reject) => {
const connections = new Set<net.Socket>();
const server = net.createServer((socket) => {
connections.add(socket);
socket.on('close', () => connections.delete(socket));
let proxyHeaderParsed = false;
let pendingBuf = Buffer.alloc(0);
socket.on('data', (data: Buffer) => {
if (!proxyHeaderParsed) {
pendingBuf = Buffer.concat([pendingBuf, data]);
const idx = pendingBuf.indexOf('\r\n');
if (idx !== -1) {
proxyHeaderParsed = true;
const remainder = pendingBuf.subarray(idx + 2);
if (remainder.length > 0) socket.write(remainder);
}
return;
}
socket.write(data);
});
socket.on('error', () => {});
}) as TrackingServer;
server.destroyAll = () => {
for (const conn of connections) conn.destroy();
connections.clear();
};
server.on('error', reject);
server.listen(port, host, () => resolve(server));
});
}
async function forceCloseServer(server: TrackingServer): Promise<void> {
server.destroyAll();
await new Promise<void>((resolve) => server.close(() => resolve()));
}
interface TestTunnel {
hub: RemoteIngressHub;
edge: RemoteIngressEdge;
edgePort: number;
cleanup: () => Promise<void>;
}
/**
* Start a full hub + edge tunnel using QUIC transport.
* Edge binds to 127.0.0.1, upstream server binds to 127.0.0.2.
*/
async function startQuicTunnel(edgePort: number, hubPort: number): Promise<TestTunnel> {
const hub = new RemoteIngressHub();
const edge = new RemoteIngressEdge();
await hub.start({
tunnelPort: hubPort,
targetHost: '127.0.0.2',
});
await hub.updateAllowedEdges([
{ id: 'test-edge', secret: 'test-secret', listenPorts: [edgePort] },
]);
const connectedPromise = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('QUIC edge did not connect within 10s')), 10000);
edge.once('tunnelConnected', () => {
clearTimeout(timeout);
resolve();
});
});
await edge.start({
hubHost: '127.0.0.1',
hubPort,
edgeId: 'test-edge',
secret: 'test-secret',
bindAddress: '127.0.0.1',
transportMode: 'quic',
});
await connectedPromise;
await new Promise((resolve) => setTimeout(resolve, 500));
return {
hub,
edge,
edgePort,
cleanup: async () => {
await edge.stop();
await hub.stop();
},
};
}
function sendAndReceive(port: number, data: Buffer, timeoutMs = 30000): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
let totalReceived = 0;
const expectedLength = data.length;
let settled = false;
const client = net.createConnection({ host: '127.0.0.1', port }, () => {
client.write(data);
client.end();
});
const timer = setTimeout(() => {
if (!settled) {
settled = true;
client.destroy();
reject(new Error(`Timeout after ${timeoutMs}ms — received ${totalReceived}/${expectedLength} bytes`));
}
}, timeoutMs);
client.on('data', (chunk: Buffer) => {
chunks.push(chunk);
totalReceived += chunk.length;
if (totalReceived >= expectedLength && !settled) {
settled = true;
clearTimeout(timer);
client.destroy();
resolve(Buffer.concat(chunks));
}
});
client.on('end', () => {
if (!settled) {
settled = true;
clearTimeout(timer);
resolve(Buffer.concat(chunks));
}
});
client.on('error', (err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
reject(err);
}
});
});
}
function sha256(buf: Buffer): string {
return crypto.createHash('sha256').update(buf).digest('hex');
}
// ---------------------------------------------------------------------------
// QUIC Transport E2E Tests
// ---------------------------------------------------------------------------
let tunnel: TestTunnel;
let echoServer: TrackingServer;
let hubPort: number;
let edgePort: number;
tap.test('QUIC setup: start TCP echo server and QUIC tunnel', async () => {
[hubPort, edgePort] = await findFreePorts(2);
echoServer = await startEchoServer(edgePort, '127.0.0.2');
tunnel = await startQuicTunnel(edgePort, hubPort);
expect(tunnel.hub.running).toBeTrue();
const status = await tunnel.edge.getStatus();
expect(status.connected).toBeTrue();
});
tap.test('QUIC: single TCP stream echo — 1KB', async () => {
const data = crypto.randomBytes(1024);
const hash = sha256(data);
const received = await sendAndReceive(edgePort, data, 10000);
expect(received.length).toEqual(1024);
expect(sha256(received)).toEqual(hash);
});
tap.test('QUIC: single TCP stream echo — 1MB', async () => {
const size = 1024 * 1024;
const data = crypto.randomBytes(size);
const hash = sha256(data);
const received = await sendAndReceive(edgePort, data, 30000);
expect(received.length).toEqual(size);
expect(sha256(received)).toEqual(hash);
});
tap.test('QUIC: single TCP stream echo — 16MB', async () => {
const size = 16 * 1024 * 1024;
const data = crypto.randomBytes(size);
const hash = sha256(data);
const received = await sendAndReceive(edgePort, data, 60000);
expect(received.length).toEqual(size);
expect(sha256(received)).toEqual(hash);
});
tap.test('QUIC: 10 concurrent TCP streams x 1MB each', async () => {
const streamCount = 10;
const payloadSize = 1024 * 1024;
const promises = Array.from({ length: streamCount }, () => {
const data = crypto.randomBytes(payloadSize);
const hash = sha256(data);
return sendAndReceive(edgePort, data, 30000).then((received) => ({
sent: hash,
received: sha256(received),
sizeOk: received.length === payloadSize,
}));
});
const results = await Promise.all(promises);
const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received);
expect(failures.length).toEqual(0);
});
tap.test('QUIC: 50 concurrent TCP streams x 64KB each', async () => {
const streamCount = 50;
const payloadSize = 64 * 1024;
const promises = Array.from({ length: streamCount }, () => {
const data = crypto.randomBytes(payloadSize);
const hash = sha256(data);
return sendAndReceive(edgePort, data, 30000).then((received) => ({
sent: hash,
received: sha256(received),
sizeOk: received.length === payloadSize,
}));
});
const results = await Promise.all(promises);
const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received);
expect(failures.length).toEqual(0);
});
tap.test('QUIC: 200 concurrent TCP streams x 16KB each', async () => {
const streamCount = 200;
const payloadSize = 16 * 1024;
const promises = Array.from({ length: streamCount }, () => {
const data = crypto.randomBytes(payloadSize);
const hash = sha256(data);
return sendAndReceive(edgePort, data, 60000).then((received) => ({
sent: hash,
received: sha256(received),
sizeOk: received.length === payloadSize,
}));
});
const results = await Promise.all(promises);
const failures = results.filter((r) => !r.sizeOk || r.sent !== r.received);
expect(failures.length).toEqual(0);
});
tap.test('QUIC: TCP tunnel still connected after all tests', async () => {
const status = await tunnel.edge.getStatus();
expect(status.connected).toBeTrue();
});
tap.test('QUIC teardown: stop TCP tunnel and echo server', async () => {
await tunnel.cleanup();
await forceCloseServer(echoServer);
});
export default tap.start();

284
test/test.udp.node.ts Normal file
View File

@@ -0,0 +1,284 @@
import { expect, tap } from '@push.rocks/tapbundle';
import * as dgram from 'dgram';
import * as net from 'net';
import * as crypto from 'crypto';
import { RemoteIngressHub, RemoteIngressEdge } from '../ts/index.js';
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
async function findFreePorts(count: number): Promise<number[]> {
const servers: net.Server[] = [];
const ports: number[] = [];
for (let i = 0; i < count; i++) {
const server = net.createServer();
await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve));
ports.push((server.address() as net.AddressInfo).port);
servers.push(server);
}
await Promise.all(servers.map((s) => new Promise<void>((resolve) => s.close(() => resolve()))));
return ports;
}
/**
* Start a UDP echo server that:
* 1. Receives the first datagram (PROXY v2 header — 28 bytes) and discards it
* 2. Echoes all subsequent datagrams back to the sender
*/
function startUdpEchoServer(port: number, host: string): Promise<dgram.Socket> {
return new Promise((resolve, reject) => {
const server = dgram.createSocket('udp4');
// Track which source endpoints have sent their PROXY v2 header.
// The hub sends a 28-byte PROXY v2 header as the first datagram per session.
const seenSources = new Set<string>();
server.on('message', (msg, rinfo) => {
const sourceKey = `${rinfo.address}:${rinfo.port}`;
if (!seenSources.has(sourceKey)) {
seenSources.add(sourceKey);
// First datagram from this source is the PROXY v2 header — skip it
return;
}
// Echo back
server.send(msg, rinfo.port, rinfo.address);
});
server.on('error', reject);
server.bind(port, host, () => resolve(server));
});
}
/**
* Send a UDP datagram through the tunnel and wait for the echo response.
*/
function udpSendAndReceive(
port: number,
data: Buffer,
timeoutMs = 10000,
): Promise<Buffer> {
return new Promise((resolve, reject) => {
const client = dgram.createSocket('udp4');
let settled = false;
const timer = setTimeout(() => {
if (!settled) {
settled = true;
client.close();
reject(new Error(`UDP timeout after ${timeoutMs}ms`));
}
}, timeoutMs);
client.on('message', (msg) => {
if (!settled) {
settled = true;
clearTimeout(timer);
client.close();
resolve(msg);
}
});
client.on('error', (err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
client.close();
reject(err);
}
});
client.send(data, port, '127.0.0.1');
});
}
// ---------------------------------------------------------------------------
// Test state
// ---------------------------------------------------------------------------
let hub: RemoteIngressHub;
let edge: RemoteIngressEdge;
let echoServer: dgram.Socket;
let hubPort: number;
let edgeUdpPort: number;
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
tap.test('UDP/TLS setup: start UDP echo server and TCP+TLS tunnel with UDP ports', async () => {
[hubPort, edgeUdpPort] = await findFreePorts(2);
// Start UDP echo server on upstream (127.0.0.2)
echoServer = await startUdpEchoServer(edgeUdpPort, '127.0.0.2');
hub = new RemoteIngressHub();
edge = new RemoteIngressEdge();
await hub.start({ tunnelPort: hubPort, targetHost: '127.0.0.2' });
await hub.updateAllowedEdges([
{ id: 'test-edge', secret: 'test-secret', listenPorts: [], listenPortsUdp: [edgeUdpPort] },
]);
const connectedPromise = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('Edge did not connect within 10s')), 10000);
edge.once('tunnelConnected', () => {
clearTimeout(timeout);
resolve();
});
});
await edge.start({
hubHost: '127.0.0.1',
hubPort,
edgeId: 'test-edge',
secret: 'test-secret',
bindAddress: '127.0.0.1',
});
await connectedPromise;
// Wait for UDP listener to bind
await new Promise((resolve) => setTimeout(resolve, 500));
const status = await edge.getStatus();
expect(status.connected).toBeTrue();
});
tap.test('UDP/TLS: single UDP datagram echo — 64 bytes', async () => {
const data = crypto.randomBytes(64);
const received = await udpSendAndReceive(edgeUdpPort, data, 5000);
expect(received.length).toEqual(64);
expect(Buffer.compare(received, data)).toEqual(0);
});
tap.test('UDP/TLS: single UDP datagram echo — 1KB', async () => {
const data = crypto.randomBytes(1024);
const received = await udpSendAndReceive(edgeUdpPort, data, 5000);
expect(received.length).toEqual(1024);
expect(Buffer.compare(received, data)).toEqual(0);
});
tap.test('UDP/TLS: 10 sequential UDP datagrams', async () => {
for (let i = 0; i < 10; i++) {
const data = crypto.randomBytes(128);
const received = await udpSendAndReceive(edgeUdpPort, data, 5000);
expect(received.length).toEqual(128);
expect(Buffer.compare(received, data)).toEqual(0);
}
});
tap.test('UDP/TLS: 10 concurrent UDP datagrams from different source ports', async () => {
const promises = Array.from({ length: 10 }, () => {
const data = crypto.randomBytes(256);
return udpSendAndReceive(edgeUdpPort, data, 5000).then((received) => ({
sizeOk: received.length === 256,
dataOk: Buffer.compare(received, data) === 0,
}));
});
const results = await Promise.all(promises);
const failures = results.filter((r) => !r.sizeOk || !r.dataOk);
expect(failures.length).toEqual(0);
});
tap.test('UDP/TLS: tunnel still connected after UDP tests', async () => {
const status = await edge.getStatus();
expect(status.connected).toBeTrue();
});
tap.test('UDP/TLS teardown: stop tunnel and UDP echo server', async () => {
await edge.stop();
await hub.stop();
await new Promise<void>((resolve) => echoServer.close(() => resolve()));
});
// ---------------------------------------------------------------------------
// QUIC transport UDP tests
// ---------------------------------------------------------------------------
let quicHub: RemoteIngressHub;
let quicEdge: RemoteIngressEdge;
let quicEchoServer: dgram.Socket;
let quicHubPort: number;
let quicEdgeUdpPort: number;
tap.test('UDP/QUIC setup: start UDP echo server and QUIC tunnel with UDP ports', async () => {
[quicHubPort, quicEdgeUdpPort] = await findFreePorts(2);
quicEchoServer = await startUdpEchoServer(quicEdgeUdpPort, '127.0.0.2');
quicHub = new RemoteIngressHub();
quicEdge = new RemoteIngressEdge();
await quicHub.start({ tunnelPort: quicHubPort, targetHost: '127.0.0.2' });
await quicHub.updateAllowedEdges([
{ id: 'test-edge', secret: 'test-secret', listenPorts: [], listenPortsUdp: [quicEdgeUdpPort] },
]);
const connectedPromise = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error('QUIC edge did not connect within 10s')), 10000);
quicEdge.once('tunnelConnected', () => {
clearTimeout(timeout);
resolve();
});
});
await quicEdge.start({
hubHost: '127.0.0.1',
hubPort: quicHubPort,
edgeId: 'test-edge',
secret: 'test-secret',
bindAddress: '127.0.0.1',
transportMode: 'quic',
});
await connectedPromise;
await new Promise((resolve) => setTimeout(resolve, 500));
const status = await quicEdge.getStatus();
expect(status.connected).toBeTrue();
});
tap.test('UDP/QUIC: single UDP datagram echo — 64 bytes', async () => {
const data = crypto.randomBytes(64);
const received = await udpSendAndReceive(quicEdgeUdpPort, data, 5000);
expect(received.length).toEqual(64);
expect(Buffer.compare(received, data)).toEqual(0);
});
tap.test('UDP/QUIC: single UDP datagram echo — 1KB', async () => {
const data = crypto.randomBytes(1024);
const received = await udpSendAndReceive(quicEdgeUdpPort, data, 5000);
expect(received.length).toEqual(1024);
expect(Buffer.compare(received, data)).toEqual(0);
});
tap.test('UDP/QUIC: 10 sequential UDP datagrams', async () => {
for (let i = 0; i < 10; i++) {
const data = crypto.randomBytes(128);
const received = await udpSendAndReceive(quicEdgeUdpPort, data, 5000);
expect(received.length).toEqual(128);
expect(Buffer.compare(received, data)).toEqual(0);
}
});
tap.test('UDP/QUIC: 10 concurrent UDP datagrams', async () => {
const promises = Array.from({ length: 10 }, () => {
const data = crypto.randomBytes(256);
return udpSendAndReceive(quicEdgeUdpPort, data, 5000).then((received) => ({
sizeOk: received.length === 256,
dataOk: Buffer.compare(received, data) === 0,
}));
});
const results = await Promise.all(promises);
const failures = results.filter((r) => !r.sizeOk || !r.dataOk);
expect(failures.length).toEqual(0);
});
tap.test('UDP/QUIC teardown: stop QUIC tunnel and UDP echo server', async () => {
await quicEdge.stop();
await quicHub.stop();
await new Promise<void>((resolve) => quicEchoServer.close(() => resolve()));
});
export default tap.start();

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@serve.zone/remoteingress',
version: '4.5.9',
version: '4.12.1',
description: 'Edge ingress tunnel for DcRouter - accepts incoming TCP connections at network edge and tunnels them to DcRouter SmartProxy preserving client IP via PROXY protocol v1.'
}

View File

@@ -14,6 +14,8 @@ type TEdgeCommands = {
hubPort: number;
edgeId: string;
secret: string;
bindAddress?: string;
transportMode?: 'tcpTls' | 'quic' | 'quicWithFallback';
};
result: { started: boolean };
};
@@ -38,6 +40,8 @@ export interface IEdgeConfig {
hubPort?: number;
edgeId: string;
secret: string;
bindAddress?: string;
transportMode?: 'tcpTls' | 'quic' | 'quicWithFallback';
}
const MAX_RESTART_ATTEMPTS = 10;
@@ -81,8 +85,10 @@ export class RemoteIngressEdge extends EventEmitter {
this.bridge.on('management:tunnelConnected', () => {
this.emit('tunnelConnected');
});
this.bridge.on('management:tunnelDisconnected', () => {
this.emit('tunnelDisconnected');
this.bridge.on('management:tunnelDisconnected', (data: { reason?: string }) => {
const reason = data?.reason ?? 'unknown';
console.log(`[RemoteIngressEdge] Tunnel disconnected: ${reason}`);
this.emit('tunnelDisconnected', data);
});
this.bridge.on('management:publicIpDiscovered', (data: { ip: string }) => {
this.emit('publicIpDiscovered', data);
@@ -132,6 +138,8 @@ export class RemoteIngressEdge extends EventEmitter {
hubPort: edgeConfig.hubPort ?? 8443,
edgeId: edgeConfig.edgeId,
secret: edgeConfig.secret,
...(edgeConfig.bindAddress ? { bindAddress: edgeConfig.bindAddress } : {}),
...(edgeConfig.transportMode ? { transportMode: edgeConfig.transportMode } : {}),
});
this.started = true;
@@ -227,6 +235,8 @@ export class RemoteIngressEdge extends EventEmitter {
hubPort: this.savedConfig.hubPort ?? 8443,
edgeId: this.savedConfig.edgeId,
secret: this.savedConfig.secret,
...(this.savedConfig.bindAddress ? { bindAddress: this.savedConfig.bindAddress } : {}),
...(this.savedConfig.transportMode ? { transportMode: this.savedConfig.transportMode } : {}),
});
this.started = true;

View File

@@ -22,7 +22,7 @@ type THubCommands = {
};
updateAllowedEdges: {
params: {
edges: Array<{ id: string; secret: string; listenPorts?: number[]; stunIntervalSecs?: number }>;
edges: Array<{ id: string; secret: string; listenPorts?: number[]; listenPortsUdp?: number[]; stunIntervalSecs?: number }>;
};
result: { updated: boolean };
};
@@ -50,7 +50,7 @@ export interface IHubConfig {
};
}
type TAllowedEdge = { id: string; secret: string; listenPorts?: number[]; stunIntervalSecs?: number };
type TAllowedEdge = { id: string; secret: string; listenPorts?: number[]; listenPortsUdp?: number[]; stunIntervalSecs?: number };
const MAX_RESTART_ATTEMPTS = 10;
const MAX_RESTART_BACKOFF_MS = 30_000;
@@ -93,7 +93,9 @@ export class RemoteIngressHub extends EventEmitter {
this.bridge.on('management:edgeConnected', (data: { edgeId: string; peerAddr: string }) => {
this.emit('edgeConnected', data);
});
this.bridge.on('management:edgeDisconnected', (data: { edgeId: string }) => {
this.bridge.on('management:edgeDisconnected', (data: { edgeId: string; reason?: string }) => {
const reason = data?.reason ?? 'unknown';
console.log(`[RemoteIngressHub] Edge ${data.edgeId} disconnected: ${reason}`);
this.emit('edgeDisconnected', data);
});
this.bridge.on('management:streamOpened', (data: { edgeId: string; streamId: number }) => {