Compare commits

...

30 Commits

Author SHA1 Message Date
2fce910795 v25.17.7
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 07:50:41 +00:00
ff09cef350 fix(readme): document QUIC and HTTP/3 compatibility caveats 2026-03-20 07:50:41 +00:00
d0148b2ac3 v25.17.6
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 07:48:26 +00:00
7217e15649 fix(rustproxy-http): disable HTTP/3 GREASE for client and server connections 2026-03-20 07:48:26 +00:00
bfcf92a855 v25.17.5
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 07:43:32 +00:00
8e0804cd20 fix(rustproxy): add HTTP/3 integration test for QUIC response stream FIN handling 2026-03-20 07:43:32 +00:00
c63f6fcd5f v25.17.4
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 03:19:57 +00:00
f3cd4d193e fix(rustproxy-http): prevent HTTP/3 response body streaming from hanging on backend completion 2026-03-20 03:19:57 +00:00
81de611255 v25.17.3
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 02:54:44 +00:00
91598b3be9 fix(repository): no changes detected 2026-03-20 02:54:44 +00:00
4e3c548012 v25.17.2
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 02:53:41 +00:00
1a2d7529db fix(rustproxy-http): enable TLS connections for HTTP/3 upstream requests when backend re-encryption or TLS is configured 2026-03-20 02:53:41 +00:00
31514f54ae v25.17.1
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-20 02:35:22 +00:00
247653c9d0 fix(rustproxy-routing): allow QUIC UDP TLS connections without SNI to match domain-restricted routes 2026-03-20 02:35:22 +00:00
07d88f6f6a v25.17.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 23:16:42 +00:00
4b64de2c67 feat(rustproxy-passthrough): add PROXY protocol v2 client IP handling for UDP and QUIC listeners 2026-03-19 23:16:42 +00:00
e8db7bc96d v25.16.3
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 22:00:07 +00:00
2621dea9fa fix(rustproxy): upgrade fallback UDP listeners to QUIC when TLS certificates become available 2026-03-19 22:00:07 +00:00
bb5b9b3d12 v25.16.2
Some checks failed
Default (tags) / security (push) Failing after 12s
Default (tags) / test (push) Failing after 13s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 21:24:05 +00:00
d70c2d77ed fix(rustproxy-http): cache backend Alt-Svc only from original upstream responses during protocol auto-detection 2026-03-19 21:24:05 +00:00
4cf13c36f8 v25.16.1
Some checks failed
Default (tags) / security (push) Failing after 19s
Default (tags) / test (push) Failing after 18s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 20:57:48 +00:00
37c7233780 fix(http-proxy): avoid repeated HTTP/3 recaching after QUIC fallback and document backend protocol selection 2026-03-19 20:57:48 +00:00
15d0a721d5 v25.16.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 20:27:57 +00:00
af970c447e feat(quic,http3): add HTTP/3 proxy handling and hot-reload QUIC TLS configuration 2026-03-19 20:27:57 +00:00
9e1103e7a7 v25.15.0
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 18:55:31 +00:00
2b990527ac feat(readme): document UDP, QUIC, and HTTP/3 support in the README 2026-03-19 18:55:31 +00:00
9595f0a9fc v25.14.1
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 16:21:37 +00:00
0fb3988123 fix(deps): update build and runtime dependencies and align route validation test expectations 2026-03-19 16:21:37 +00:00
53938df8db v25.14.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 2s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 16:09:51 +00:00
e890bda8fc feat(udp,http3): add UDP datagram handler relay support and stream HTTP/3 request bodies to backends 2026-03-19 16:09:51 +00:00
23 changed files with 6017 additions and 4572 deletions

View File

@@ -1,5 +1,106 @@
# Changelog
## 2026-03-20 - 25.17.7 - fix(readme)
document QUIC and HTTP/3 compatibility caveats
- Add notes explaining that GREASE frames are disabled on both server and client HTTP/3 paths to avoid interoperability issues
- Document that the current HTTP/3 stack depends on pre-1.0 h3 ecosystem components and may still have rough edges
## 2026-03-20 - 25.17.6 - fix(rustproxy-http)
disable HTTP/3 GREASE for client and server connections
- Switch the HTTP/3 server connection setup to use the builder API with send_grease(false)
- Switch the HTTP/3 client handshake to use the builder API with send_grease(false) to improve compatibility
## 2026-03-20 - 25.17.5 - fix(rustproxy)
add HTTP/3 integration test for QUIC response stream FIN handling
- adds an integration test covering HTTP/3 proxying over QUIC with TLS termination
- verifies response bodies fully arrive and the client receives stream termination instead of hanging
- adds test-only dependencies for quinn, h3, h3-quinn, rustls, bytes, and http
## 2026-03-20 - 25.17.4 - fix(rustproxy-http)
prevent HTTP/3 response body streaming from hanging on backend completion
- extract and track Content-Length before consuming the response body
- stop the HTTP/3 body loop when the stream reports end-of-stream or the expected byte count has been sent
- add a per-frame idle timeout to avoid indefinite waits on stalled or close-delimited backend bodies
## 2026-03-20 - 25.17.3 - fix(repository)
no changes detected
## 2026-03-20 - 25.17.2 - fix(rustproxy-http)
enable TLS connections for HTTP/3 upstream requests when backend re-encryption or TLS is configured
- Pass backend TLS client configuration into the HTTP/3 request handler.
- Detect TLS-required upstream targets using route and target TLS settings before connecting.
- Build backend request URIs with the correct http or https scheme to match the upstream connection.
## 2026-03-20 - 25.17.1 - fix(rustproxy-routing)
allow QUIC UDP TLS connections without SNI to match domain-restricted routes
- Exempts UDP transport from the no-SNI rejection logic because QUIC encrypts the TLS ClientHello and SNI is unavailable at accept time
- Adds regression tests to confirm QUIC route matching succeeds without SNI while TCP TLS without SNI remains rejected
## 2026-03-19 - 25.17.0 - feat(rustproxy-passthrough)
add PROXY protocol v2 client IP handling for UDP and QUIC listeners
- propagate trusted proxy IP configuration into UDP and QUIC listener managers
- extract and preserve real client addresses from PROXY protocol v2 headers for HTTP/3 and QUIC stream handling
- apply rate limiting, session limits, routing, and metrics using the resolved client IP while preserving correct proxy return-path routing
## 2026-03-19 - 25.16.3 - fix(rustproxy)
upgrade fallback UDP listeners to QUIC when TLS certificates become available
- Rebuild and apply QUIC TLS configuration during route and certificate updates instead of only when adding new UDP ports.
- Add logic to drain UDP sessions, stop raw fallback listeners, and start QUIC endpoints on existing ports once TLS is available.
- Retry QUIC endpoint creation during upgrade and fall back to rebinding raw UDP if the upgrade cannot complete.
## 2026-03-19 - 25.16.2 - fix(rustproxy-http)
cache backend Alt-Svc only from original upstream responses during protocol auto-detection
- Moves Alt-Svc discovery into streaming response construction so it reads backend headers before response filters inject client-facing Alt-Svc values
- Stores the protocol cache key in connection activity during auto-detect mode and clears it after HTTP/3 connection failure to avoid re-caching failed H3 routes
- Prevents fallback requests from reintroducing stale or self-injected Alt-Svc entries that could cause repeated H3 retry loops
## 2026-03-19 - 25.16.1 - fix(http-proxy)
avoid repeated HTTP/3 recaching after QUIC fallback and document backend protocol selection
- Suppress Alt-Svc HTTP/3 recaching after a failed QUIC backend connection to prevent repeated H3 timeout fallback loops
- Force an ALPN probe on TCP fallback so auto detection correctly reselects HTTP/2 or HTTP/1.1 after H3 connection failure
- Add README documentation for best-effort backendProtocol selection and supported protocol modes
## 2026-03-19 - 25.16.0 - feat(quic,http3)
add HTTP/3 proxy handling and hot-reload QUIC TLS configuration
- initialize and wire H3ProxyService into QUIC listeners so HTTP/3 requests are handled instead of being kept as placeholder connections
- add backend HTTP/3 support with protocol caching that stores Alt-Svc advertised H3 ports for auto-detection
- hot-swap TLS certificates across active QUIC endpoints and require terminating TLS for QUIC route validation
- document QUIC route setup with required TLS and ACME configuration
## 2026-03-19 - 25.15.0 - feat(readme)
document UDP, QUIC, and HTTP/3 support in the README
- Adds README examples for UDP datagram handlers, QUIC/HTTP3 forwarding, and dual-stack TCP/UDP routes
- Expands configuration and API reference sections to cover transport matching, UDP/QUIC options, backend transport selection, and UDP metrics
- Updates architecture and feature descriptions to reflect UDP, QUIC, HTTP/3, and datagram handler capabilities
## 2026-03-19 - 25.14.1 - fix(deps)
update build and runtime dependencies and align route validation test expectations
- split the test preparation step into a dedicated test:before script while keeping test execution separate
- bump development tooling and runtime package versions in package.json
- adjust the route validation test to match the current generic handler error message
## 2026-03-19 - 25.14.0 - feat(udp,http3)
add UDP datagram handler relay support and stream HTTP/3 request bodies to backends
- establish a persistent Unix socket relay for UDP datagram handlers and process handler replies back to clients
- update route validation and smart proxy route reload logic to support datagramHandler routes
- record UDP, QUIC, and HTTP/3 byte metrics more accurately, including request bytes in and UDP session cleanup connection tracking
- add integration tests for UDP forwarding, datagram handlers, and UDP metrics
## 2026-03-19 - 25.13.0 - feat(smart-proxy)
add UDP transport support with QUIC/HTTP3 routing and datagram handler relay

3033
deno.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartproxy",
"version": "25.13.0",
"version": "25.17.7",
"private": false,
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
"main": "dist_ts/index.js",
@@ -9,27 +9,28 @@
"author": "Lossless GmbH",
"license": "MIT",
"scripts": {
"test": "(tsrust) && (tstest test/**/test*.ts --verbose --timeout 60 --logfile)",
"test:before": "(tsrust)",
"test": "(tstest test/**/test*.ts --verbose --timeout 60 --logfile)",
"build": "(tsbuild tsfolders --allowimplicitany) && (tsrust)",
"format": "(gitzone format)",
"buildDocs": "tsdoc"
},
"devDependencies": {
"@git.zone/tsbuild": "^4.1.2",
"@git.zone/tsbuild": "^4.3.0",
"@git.zone/tsrun": "^2.0.1",
"@git.zone/tsrust": "^1.3.0",
"@git.zone/tstest": "^3.1.8",
"@git.zone/tstest": "^3.5.0",
"@push.rocks/smartserve": "^2.0.1",
"@types/node": "^25.2.3",
"@types/node": "^25.5.0",
"typescript": "^5.9.3",
"why-is-node-running": "^3.2.2"
},
"dependencies": {
"@push.rocks/smartcrypto": "^2.0.4",
"@push.rocks/smartlog": "^3.1.10",
"@push.rocks/smartrust": "^1.2.1",
"@tsclass/tsclass": "^9.3.0",
"minimatch": "^10.2.0"
"@push.rocks/smartlog": "^3.2.1",
"@push.rocks/smartrust": "^1.3.2",
"@tsclass/tsclass": "^9.5.0",
"minimatch": "^10.2.4"
},
"files": [
"ts/**/*",

4777
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

279
readme.md
View File

@@ -1,6 +1,6 @@
# @push.rocks/smartproxy 🚀
**A high-performance, Rust-powered proxy toolkit for Node.js** — unified route-based configuration for SSL/TLS termination, HTTP/HTTPS reverse proxying, WebSocket support, load balancing, custom protocol handlers, and kernel-level NFTables forwarding.
**A high-performance, Rust-powered proxy toolkit for Node.js** — unified route-based configuration for SSL/TLS termination, HTTP/HTTPS reverse proxying, WebSocket support, UDP/QUIC/HTTP3, load balancing, custom protocol handlers, and kernel-level NFTables forwarding.
## 📦 Installation
@@ -16,9 +16,9 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
## 🎯 What is SmartProxy?
SmartProxy is a production-ready proxy solution that takes the complexity out of traffic management. Under the hood, all networking — TCP, TLS, HTTP reverse proxy, connection tracking, security enforcement, and NFTables — is handled by a **Rust engine** for maximum performance, while you configure everything through a clean TypeScript API with full type safety.
SmartProxy is a production-ready proxy solution that takes the complexity out of traffic management. Under the hood, all networking — TCP, UDP, TLS, HTTP reverse proxy, QUIC/HTTP3, connection tracking, security enforcement, and NFTables — is handled by a **Rust engine** for maximum performance, while you configure everything through a clean TypeScript API with full type safety.
Whether you're building microservices, deploying edge infrastructure, or need a battle-tested reverse proxy with automatic Let's Encrypt certificates, SmartProxy has you covered.
Whether you're building microservices, deploying edge infrastructure, proxying UDP-based protocols, or need a battle-tested reverse proxy with automatic Let's Encrypt certificates, SmartProxy has you covered.
### ⚡ Key Features
@@ -29,11 +29,12 @@ Whether you're building microservices, deploying edge infrastructure, or need a
| 🔒 **Automatic SSL/TLS** | Zero-config HTTPS with Let's Encrypt ACME integration |
| 🎯 **Flexible Matching** | Route by port, domain, path, protocol, client IP, TLS version, headers, or custom logic |
| 🚄 **High-Performance** | Choose between user-space or kernel-level (NFTables) forwarding |
| 📡 **UDP & QUIC/HTTP3** | First-class UDP transport, datagram handlers, QUIC tunneling, and HTTP/3 support |
| ⚖️ **Load Balancing** | Round-robin, least-connections, IP-hash with health checks |
| 🛡️ **Enterprise Security** | IP filtering, rate limiting, basic auth, JWT auth, connection limits |
| 🔌 **WebSocket Support** | First-class WebSocket proxying with ping/pong keep-alive |
| 🎮 **Custom Protocols** | Socket handlers for implementing any protocol in TypeScript |
| 📊 **Live Metrics** | Real-time throughput, connection counts, and performance data |
| 🎮 **Custom Protocols** | Socket and datagram handlers for implementing any protocol in TypeScript |
| 📊 **Live Metrics** | Real-time throughput, connection counts, UDP sessions, and performance data |
| 🔧 **Dynamic Management** | Add/remove ports and routes at runtime without restarts |
| 🔄 **PROXY Protocol** | Full PROXY protocol v1/v2 support for preserving client information |
| 💾 **Consumer Cert Storage** | Bring your own persistence — SmartProxy never writes certs to disk |
@@ -89,7 +90,7 @@ SmartProxy uses a powerful **match/action** pattern that makes routing predictab
```
Every route consists of:
- **Match** — What traffic to capture (ports, domains, paths, protocol, IPs, headers)
- **Match** — What traffic to capture (ports, domains, paths, transport, protocol, IPs, headers)
- **Action** — What to do with it (`forward` or `socket-handler`)
- **Security** (optional) — IP allow/block lists, rate limits, authentication
- **Headers** (optional) — Request/response header manipulation with template variables
@@ -197,7 +198,7 @@ apiRoute = addRateLimiting(apiRoute, {
const proxy = new SmartProxy({ routes: [apiRoute] });
```
### 🎮 Custom Protocol Handler
### 🎮 Custom Protocol Handler (TCP)
SmartProxy lets you implement any protocol with full socket control. Routes with JavaScript socket handlers are automatically relayed from the Rust engine back to your TypeScript code:
@@ -247,6 +248,140 @@ const proxy = new SmartProxy({ routes: [echoRoute, customRoute] });
| `SocketHandlers.httpBlock(status, message)` | HTTP block response |
| `SocketHandlers.block(message)` | Block with optional message |
### 📡 UDP Datagram Handler
Handle raw UDP datagrams with custom TypeScript logic — perfect for DNS, game servers, IoT protocols, or any UDP-based service:
```typescript
import { SmartProxy } from '@push.rocks/smartproxy';
import type { IRouteConfig, TDatagramHandler, IDatagramInfo } from '@push.rocks/smartproxy';
// Custom UDP echo handler
const udpHandler: TDatagramHandler = (datagram, info, reply) => {
console.log(`UDP from ${info.sourceIp}:${info.sourcePort} on port ${info.destPort}`);
reply(datagram); // Echo it back
};
const proxy = new SmartProxy({
routes: [{
name: 'udp-echo',
match: {
ports: 5353,
transport: 'udp' // 👈 Listen for UDP datagrams
},
action: {
type: 'socket-handler',
datagramHandler: udpHandler, // 👈 Process each datagram
udp: {
sessionTimeout: 60000, // Session idle timeout (ms)
maxSessionsPerIP: 100,
maxDatagramSize: 65535
}
}
}]
});
await proxy.start();
```
### 📡 QUIC / HTTP3 Forwarding
Forward QUIC traffic to backends with optional protocol translation (e.g., receive QUIC, forward as TCP/HTTP1):
```typescript
import { SmartProxy } from '@push.rocks/smartproxy';
import type { IRouteConfig } from '@push.rocks/smartproxy';
const quicRoute: IRouteConfig = {
name: 'quic-to-backend',
match: {
ports: 443,
transport: 'udp',
protocol: 'quic' // 👈 Match QUIC protocol
},
action: {
type: 'forward',
targets: [{
host: 'backend-server',
port: 8443,
backendTransport: 'tcp' // 👈 Translate QUIC → TCP for backend
}],
tls: {
mode: 'terminate',
certificate: 'auto' // 👈 QUIC requires TLS 1.3
},
udp: {
quic: {
enableHttp3: true,
maxIdleTimeout: 30000,
maxConcurrentBidiStreams: 100,
altSvcPort: 443, // Advertise in Alt-Svc header
altSvcMaxAge: 86400
}
}
}
};
const proxy = new SmartProxy({
acme: { email: 'ssl@example.com' },
routes: [quicRoute]
});
```
### 🚄 Best-Effort Backend Protocol (H3 > H2 > H1)
SmartProxy automatically uses the **highest protocol your backend supports** for HTTP requests. The backend protocol is independent of the client protocol — a client using HTTP/1.1 can be forwarded over HTTP/3 to the backend, and vice versa.
```typescript
const route: IRouteConfig = {
name: 'auto-protocol',
match: { ports: 443, domains: 'app.example.com' },
action: {
type: 'forward',
targets: [{ host: 'backend', port: 8443 }],
tls: { mode: 'terminate', certificate: 'auto' },
options: {
backendProtocol: 'auto' // 👈 Default — best-effort selection
}
}
};
```
**How protocol discovery works (browser model):**
1. First request → TLS ALPN probe detects H2 or H1
2. Backend response inspected for `Alt-Svc: h3=":port"` header
3. If H3 advertised → cached and used for subsequent requests via QUIC
4. Graceful fallback: H3 failure → H2 → H1 with automatic cache invalidation
| `backendProtocol` | Behavior |
|---|---|
| `'auto'` (default) | Best-effort: H3 > H2 > H1 with Alt-Svc discovery |
| `'http1'` | Always HTTP/1.1 |
| `'http2'` | Always HTTP/2 (hard-fail if unsupported) |
| `'http3'` | Always HTTP/3 via QUIC (hard-fail if unsupported) |
> **Note:** WebSocket upgrades always use HTTP/1.1 to the backend regardless of `backendProtocol`, since there's no performance benefit from H2/H3 Extended CONNECT for tunneled connections, and backend support is rare.
### 🔁 Dual-Stack TCP + UDP Route
Listen on both TCP and UDP with a single route — handle each transport with its own handler:
```typescript
const dualStackRoute: IRouteConfig = {
name: 'dual-stack-dns',
match: {
ports: 53,
transport: 'all' // 👈 Listen on both TCP and UDP
},
action: {
type: 'socket-handler',
socketHandler: handleTcpDns, // 👈 TCP connections
datagramHandler: handleUdpDns, // 👈 UDP datagrams
}
};
```
### ⚡ High-Performance NFTables Forwarding
For ultra-low latency on Linux, use kernel-level forwarding (requires root):
@@ -419,6 +554,10 @@ console.log(`Bytes in: ${metrics.totals.bytesIn()}`);
console.log(`Requests/sec: ${metrics.requests.perSecond()}`);
console.log(`Throughput in: ${metrics.throughput.instant().in} bytes/sec`);
// UDP metrics
console.log(`UDP sessions: ${metrics.udp.activeSessions()}`);
console.log(`Datagrams in: ${metrics.udp.datagramsIn()}`);
// Get detailed statistics from the Rust engine
const stats = await proxy.getStatistics();
@@ -545,7 +684,7 @@ SmartProxy uses a hybrid **Rust + TypeScript** architecture:
```
┌─────────────────────────────────────────────────────┐
│ Your Application │
│ (TypeScript — routes, config, socket handlers) │
│ (TypeScript — routes, config, handlers)
└──────────────────┬──────────────────────────────────┘
│ IPC (JSON over stdin/stdout)
┌──────────────────▼──────────────────────────────────┐
@@ -556,22 +695,23 @@ SmartProxy uses a hybrid **Rust + TypeScript** architecture:
│ │ │ │ Proxy │ │ │ │ │ │
│ └─────────┘ └─────────┘ └─────────┘ └──────────┘ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌──────────┐ │
│ │ Security│ │ Metrics │ │ Connec- │ │ NFTables │ │
│ │ Enforce │ │ Collect │ │ tion │ │ Mgr │ │
│ │ │ │ │ │ Tracker │ │ │ │
│ │ UDP │ │ Security│ │ Metrics │ │ NFTables │ │
│ │ QUIC │ │ Enforce │ │ Collect │ │ Mgr │ │
│ │ HTTP/3 │ │ │ │ │ │ │ │
│ └─────────┘ └─────────┘ └─────────┘ └──────────┘ │
└──────────────────┬──────────────────────────────────┘
│ Unix Socket Relay
┌──────────────────▼──────────────────────────────────┐
TypeScript Socket Handler Server
│ (for JS-defined socket handlers & dynamic routes)
│ TypeScript Socket & Datagram Handler Servers
│ (for JS socket handlers, datagram handlers,
│ and dynamic routes) │
└─────────────────────────────────────────────────────┘
```
- **Rust Engine** handles all networking, TLS, HTTP proxying, connection management, security, and metrics
- **TypeScript** provides the npm API, configuration types, route helpers, validation, and socket handler callbacks
- **Rust Engine** handles all networking: TCP, UDP, TLS, QUIC, HTTP proxying, connection management, security, and metrics
- **TypeScript** provides the npm API, configuration types, route helpers, validation, and handler callbacks
- **IPC** — The TypeScript wrapper uses JSON commands/events over stdin/stdout to communicate with the Rust binary
- **Socket Relay** — A Unix domain socket server for routes requiring TypeScript-side handling (socket handlers, dynamic host/port functions)
- **Socket/Datagram Relay** — Unix domain socket servers for routes requiring TypeScript-side handling (socket handlers, datagram handlers, dynamic host/port functions)
## 🎯 Route Configuration Reference
@@ -579,22 +719,26 @@ SmartProxy uses a hybrid **Rust + TypeScript** architecture:
```typescript
interface IRouteMatch {
ports: number | number[] | Array<{ from: number; to: number }>; // Required — port(s) to listen on
domains?: string | string[]; // 'example.com', '*.example.com'
path?: string; // '/api/*', '/users/:id'
clientIp?: string[]; // ['10.0.0.0/8', '192.168.*']
tlsVersion?: string[]; // ['TLSv1.2', 'TLSv1.3']
ports: TPortRange; // Required — port(s) to listen on
transport?: 'tcp' | 'udp' | 'all'; // Transport protocol (default: 'tcp')
domains?: string | string[]; // 'example.com', '*.example.com'
path?: string; // '/api/*', '/users/:id'
clientIp?: string[]; // ['10.0.0.0/8', '192.168.*']
tlsVersion?: string[]; // ['TLSv1.2', 'TLSv1.3']
headers?: Record<string, string | RegExp>; // Match by HTTP headers
protocol?: 'http' | 'tcp'; // Match specific protocol ('http' includes h2 + WebSocket upgrades)
protocol?: 'http' | 'tcp' | 'udp' | 'quic' | 'http3'; // Application-layer protocol
}
// Port range supports single numbers, arrays, and ranges
type TPortRange = number | Array<number | { from: number; to: number }>;
```
### Action Types
| Type | Description |
|------|-------------|
| `forward` | Proxy to one or more backend targets (with optional TLS, WebSocket, load balancing) |
| `socket-handler` | Custom socket handling function in TypeScript |
| `forward` | Proxy to one or more backend targets (with optional TLS, WebSocket, load balancing, UDP/QUIC) |
| `socket-handler` | Custom socket/datagram handling function in TypeScript |
### Target Options
@@ -602,14 +746,15 @@ interface IRouteMatch {
interface IRouteTarget {
host: string | string[] | ((context: IRouteContext) => string | string[]);
port: number | 'preserve' | ((context: IRouteContext) => number);
tls?: IRouteTls; // Per-target TLS override
priority?: number; // Target priority
match?: ITargetMatch; // Sub-match within a route (by port, path, headers, method)
tls?: IRouteTls; // Per-target TLS override
priority?: number; // Target priority
match?: ITargetMatch; // Sub-match within a route (by port, path, headers, method)
websocket?: IRouteWebSocket;
loadBalancing?: IRouteLoadBalancing;
sendProxyProtocol?: boolean;
headers?: IRouteHeaders;
advanced?: IRouteAdvanced;
backendTransport?: 'tcp' | 'udp'; // Backend transport (e.g., receive QUIC, forward as TCP)
}
```
@@ -666,6 +811,49 @@ interface IRouteLoadBalancing {
}
```
### Backend Protocol Options
```typescript
// Set on action.options
{
action: {
type: 'forward',
targets: [...],
options: {
backendProtocol: 'auto' | 'http1' | 'http2' | 'http3'
}
}
}
```
| Value | Backend Behavior |
|-------|-----------------|
| `'auto'` | Best-effort: discovers H3 via Alt-Svc, probes H2 via ALPN, falls back to H1 |
| `'http1'` | Always HTTP/1.1 (no ALPN probe) |
| `'http2'` | Always HTTP/2 (hard-fail if handshake fails) |
| `'http3'` | Always HTTP/3 over QUIC (3s connect timeout, hard-fail if unreachable) |
### UDP & QUIC Options
```typescript
interface IRouteUdp {
sessionTimeout?: number; // Idle timeout per UDP session (ms, default: 60000)
maxSessionsPerIP?: number; // Max concurrent sessions per IP (default: 1000)
maxDatagramSize?: number; // Max datagram size in bytes (default: 65535)
quic?: IRouteQuic;
}
interface IRouteQuic {
maxIdleTimeout?: number; // QUIC idle timeout (ms, default: 30000)
maxConcurrentBidiStreams?: number; // Max bidi streams (default: 100)
maxConcurrentUniStreams?: number; // Max uni streams (default: 100)
enableHttp3?: boolean; // Enable HTTP/3 (default: false)
altSvcPort?: number; // Port for Alt-Svc header
altSvcMaxAge?: number; // Alt-Svc max age in seconds (default: 86400)
initialCongestionWindow?: number; // Initial congestion window (bytes)
}
```
## 🛠️ Helper Functions Reference
All helpers are fully typed and return `IRouteConfig` or `IRouteConfig[]`:
@@ -689,7 +877,7 @@ import {
createWebSocketRoute, // WebSocket-enabled route
// Custom Protocols
createSocketHandlerRoute, // Custom socket handler
createSocketHandlerRoute, // Custom TCP socket handler
SocketHandlers, // Pre-built handlers (echo, proxy, block, etc.)
// NFTables (Linux, requires root)
@@ -718,6 +906,8 @@ import {
} from '@push.rocks/smartproxy';
```
> **Tip:** For UDP datagram handler routes or QUIC/HTTP3 routes, construct `IRouteConfig` objects directly — there are no helper functions for these yet. See the [UDP Datagram Handler](#-udp-datagram-handler) and [QUIC / HTTP3 Forwarding](#-quic--http3-forwarding) examples above.
## 📖 API Documentation
### SmartProxy Class
@@ -753,6 +943,8 @@ class SmartProxy extends EventEmitter {
// Events
on(event: 'error', handler: (err: Error) => void): this;
on(event: 'certificate-issued', handler: (ev: ICertificateIssuedEvent) => void): this;
on(event: 'certificate-failed', handler: (ev: ICertificateFailedEvent) => void): this;
}
```
@@ -775,6 +967,8 @@ interface ISmartProxyOptions {
// Custom certificate provisioning
certProvisionFunction?: (domain: string) => Promise<ICert | 'http01'>;
certProvisionFallbackToAcme?: boolean; // Fall back to ACME on failure (default: true)
certProvisionTimeout?: number; // Timeout per provision call (ms)
certProvisionConcurrency?: number; // Max concurrent provisions
// Consumer-managed certificate persistence (see "Consumer-Managed Certificate Storage")
certStore?: ISmartProxyCertStore;
@@ -782,6 +976,9 @@ interface ISmartProxyOptions {
// Self-signed fallback
disableDefaultCert?: boolean; // Disable '*' self-signed fallback (default: false)
// Rust binary path override
rustBinaryPath?: string; // Custom path to the Rust proxy binary
// Global defaults
defaults?: {
target?: { host: string; port: number };
@@ -868,11 +1065,22 @@ metrics.requests.perSecond(); // Requests per second
metrics.requests.perMinute(); // Requests per minute
metrics.requests.total(); // Total requests
// UDP metrics
metrics.udp.activeSessions(); // Current active UDP sessions
metrics.udp.totalSessions(); // Total UDP sessions since start
metrics.udp.datagramsIn(); // Datagrams received
metrics.udp.datagramsOut(); // Datagrams sent
// Cumulative totals
metrics.totals.bytesIn(); // Total bytes received
metrics.totals.bytesOut(); // Total bytes sent
metrics.totals.connections(); // Total connections
// Backend metrics
metrics.backends.byBackend(); // Map<backend, IBackendMetrics>
metrics.backends.protocols(); // Map<backend, protocol>
metrics.backends.topByErrors(10); // Top N error-prone backends
// Percentiles
metrics.percentiles.connectionDuration(); // { p50, p95, p99 }
metrics.percentiles.bytesTransferred(); // { in: { p50, p95, p99 }, out: { p50, p95, p99 } }
@@ -896,11 +1104,16 @@ metrics.percentiles.bytesTransferred(); // { in: { p50, p95, p99 }, out: { p5
### Rust Binary Not Found
SmartProxy searches for the Rust binary in this order:
1. `SMARTPROXY_RUST_BINARY` environment variable
2. Platform-specific npm package (`@push.rocks/smartproxy-linux-x64`, etc.)
3. `dist_rust/rustproxy` relative to the package root (built by `tsrust`)
4. Local dev build (`./rust/target/release/rustproxy`)
5. System PATH (`rustproxy`)
1. `rustBinaryPath` option in `ISmartProxyOptions`
2. `SMARTPROXY_RUST_BINARY` environment variable
3. Platform-specific npm package (`@push.rocks/smartproxy-linux-x64`, etc.)
4. `dist_rust/rustproxy` relative to the package root (built by `tsrust`)
5. Local dev build (`./rust/target/release/rustproxy`)
6. System PATH (`rustproxy`)
### QUIC / HTTP3 Caveats
- **GREASE frames are disabled.** The underlying h3 crate sends [GREASE frames](https://www.rfc-editor.org/rfc/rfc9114.html#frame-reserved) by default to test protocol extensibility. However, some HTTP/3 clients and servers don't properly ignore unknown frame types, causing 400/500 errors or stream hangs ([h3#206](https://github.com/hyperium/h3/issues/206)). SmartProxy disables GREASE on both the server side (for incoming H3 requests) and the client side (for H3 backend connections) to maximize compatibility.
- **HTTP/3 is pre-release.** The h3 ecosystem (h3 0.0.8, h3-quinn 0.0.10, quinn 0.11) is still pre-1.0. Expect rough edges.
### Performance Tuning
- ✅ Use NFTables forwarding for high-traffic routes (Linux only)

4
rust/Cargo.lock generated
View File

@@ -1224,10 +1224,14 @@ dependencies = [
"bytes",
"clap",
"dashmap",
"h3",
"h3-quinn",
"http",
"http-body-util",
"hyper",
"hyper-util",
"mimalloc",
"quinn",
"rcgen",
"rustls",
"rustls-pemfile",

View File

@@ -4,11 +4,15 @@
//! and forwards them to backends using the same routing and pool infrastructure
//! as the HTTP/1+2 proxy.
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use arc_swap::ArcSwap;
use bytes::{Buf, Bytes};
use http_body::Frame;
use tracing::{debug, warn};
use rustproxy_config::{RouteConfig, TransportProtocol};
@@ -32,7 +36,6 @@ pub struct H3ProxyService {
protocol_cache: Arc<ProtocolCache>,
#[allow(dead_code)]
upstream_selector: UpstreamSelector,
#[allow(dead_code)]
backend_tls_config: Arc<rustls::ClientConfig>,
connect_timeout: Duration,
}
@@ -58,17 +61,23 @@ impl H3ProxyService {
}
/// Handle an accepted QUIC connection as HTTP/3.
///
/// If `real_client_addr` is provided (from PROXY protocol), it overrides
/// `connection.remote_address()` for client IP attribution.
pub async fn handle_connection(
&self,
connection: quinn::Connection,
_fallback_route: &RouteConfig,
port: u16,
real_client_addr: Option<SocketAddr>,
) -> anyhow::Result<()> {
let remote_addr = connection.remote_address();
let remote_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
debug!("HTTP/3 connection from {} on port {}", remote_addr, port);
let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
h3::server::Connection::new(h3_quinn::Connection::new(connection))
h3::server::builder()
.send_grease(false)
.build(h3_quinn::Connection::new(connection))
.await
.map_err(|e| anyhow::anyhow!("H3 connection setup failed: {}", e))?;
@@ -90,12 +99,14 @@ impl H3ProxyService {
let rm = self.route_manager.load();
let pool = Arc::clone(&self.connection_pool);
let metrics = Arc::clone(&self.metrics);
let backend_tls = Arc::clone(&self.backend_tls_config);
let connect_timeout = self.connect_timeout;
let client_ip = client_ip.clone();
tokio::spawn(async move {
if let Err(e) = handle_h3_request(
request, stream, port, &client_ip, &rm, &pool, &metrics, connect_timeout,
request, stream, port, &client_ip, &rm, &pool, &metrics,
&backend_tls, connect_timeout,
).await {
debug!("HTTP/3 request error from {}: {}", client_ip, e);
}
@@ -125,6 +136,7 @@ async fn handle_h3_request(
route_manager: &RouteManager,
_connection_pool: &ConnectionPool,
metrics: &MetricsCollector,
backend_tls_config: &Arc<rustls::ClientConfig>,
connect_timeout: Duration,
) -> anyhow::Result<()> {
let method = request.method().clone();
@@ -165,16 +177,15 @@ async fn handle_h3_request(
let backend_port = target.port.resolve(port);
let backend_addr = format!("{}:{}", backend_host, backend_port);
// Read request body
let mut body_data = Vec::new();
while let Some(mut chunk) = stream.recv_data().await
.map_err(|e| anyhow::anyhow!("Failed to read H3 request body: {}", e))?
{
body_data.extend_from_slice(chunk.chunk());
chunk.advance(chunk.remaining());
// Determine if backend requires TLS (same logic as proxy_service.rs)
let mut use_tls = target.tls.is_some();
if let Some(ref tls) = route.action.tls {
if tls.mode == rustproxy_config::TlsMode::TerminateAndReencrypt {
use_tls = true;
}
}
// Connect to backend via TCP HTTP/1.1 with timeout
// Connect to backend via TCP with timeout
let tcp_stream = tokio::time::timeout(
connect_timeout,
tokio::net::TcpStream::connect(&backend_addr),
@@ -184,21 +195,59 @@ async fn handle_h3_request(
let _ = tcp_stream.set_nodelay(true);
let io = hyper_util::rt::TokioIo::new(tcp_stream);
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await
.map_err(|e| anyhow::anyhow!("Backend handshake failed: {}", e))?;
// Branch: wrap in TLS if backend requires it, then HTTP/1.1 handshake.
// hyper's SendRequest<B> is NOT generic over the IO type, so both branches
// produce the same type and can be unified.
let mut sender = if use_tls {
let connector = tokio_rustls::TlsConnector::from(Arc::clone(backend_tls_config));
let server_name = rustls::pki_types::ServerName::try_from(backend_host.to_string())
.map_err(|e| anyhow::anyhow!("Invalid backend SNI '{}': {}", backend_host, e))?;
let tls_stream = connector.connect(server_name, tcp_stream).await
.map_err(|e| anyhow::anyhow!("Backend TLS handshake to {} failed: {}", backend_addr, e))?;
let io = hyper_util::rt::TokioIo::new(tls_stream);
let (sender, conn) = hyper::client::conn::http1::handshake(io).await
.map_err(|e| anyhow::anyhow!("Backend handshake failed: {}", e))?;
tokio::spawn(async move { let _ = conn.await; });
sender
} else {
let io = hyper_util::rt::TokioIo::new(tcp_stream);
let (sender, conn) = hyper::client::conn::http1::handshake(io).await
.map_err(|e| anyhow::anyhow!("Backend handshake failed: {}", e))?;
tokio::spawn(async move { let _ = conn.await; });
sender
};
tokio::spawn(async move {
if let Err(e) = conn.await {
debug!("Backend connection closed: {}", e);
// Stream request body from H3 client to backend via an mpsc channel.
// This avoids buffering the entire request body in memory.
let (body_tx, body_rx) = tokio::sync::mpsc::channel::<Bytes>(4);
let total_bytes_in = Arc::new(std::sync::atomic::AtomicU64::new(0));
let total_bytes_in_writer = Arc::clone(&total_bytes_in);
// Spawn the H3 body reader task
let body_reader = tokio::spawn(async move {
while let Ok(Some(mut chunk)) = stream.recv_data().await {
let data = Bytes::copy_from_slice(chunk.chunk());
total_bytes_in_writer.fetch_add(data.len() as u64, std::sync::atomic::Ordering::Relaxed);
chunk.advance(chunk.remaining());
if body_tx.send(data).await.is_err() {
break;
}
}
stream
});
let body = http_body_util::Full::new(Bytes::from(body_data));
let backend_req = build_backend_request(&method, &backend_addr, &path, &host, &request, body)?;
// Create a body that polls from the mpsc receiver
let body = H3RequestBody { receiver: body_rx };
let backend_req = build_backend_request(&method, &backend_addr, &path, &host, &request, body, use_tls)?;
let response = sender.send_request(backend_req).await
.map_err(|e| anyhow::anyhow!("Backend request failed: {}", e))?;
// Await the body reader to get the stream back
let mut stream = body_reader.await
.map_err(|e| anyhow::anyhow!("Body reader task failed: {}", e))?;
let total_bytes_in = total_bytes_in.load(std::sync::atomic::Ordering::Relaxed);
// Build H3 response
let status = response.status();
let mut h3_response = hyper::Response::builder().status(status);
@@ -212,6 +261,12 @@ async fn handle_h3_request(
h3_response = h3_response.header(name, value);
}
// Extract content-length for body loop termination (must be before into_body())
let content_length: Option<u64> = response.headers()
.get(hyper::header::CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse().ok());
// Add Alt-Svc for HTTP/3 advertisement
let alt_svc = route.action.udp.as_ref()
.and_then(|u| u.quic.as_ref())
@@ -232,27 +287,58 @@ async fn handle_h3_request(
// Stream response body back
use http_body_util::BodyExt;
use http_body::Body as _;
let mut body = response.into_body();
let mut total_bytes_out: u64 = 0;
while let Some(frame) = body.frame().await {
match frame {
Ok(frame) => {
// Per-frame idle timeout: if no frame arrives within this duration, assume
// the body is complete (or the backend has stalled). This prevents indefinite
// hangs on close-delimited bodies or when hyper's internal trailers oneshot
// never resolves after all data has been received.
const FRAME_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
loop {
// Layer 1: If the body already knows it is finished (Content-Length
// bodies track remaining bytes internally), break immediately to
// avoid blocking on hyper's internal trailers oneshot.
if body.is_end_stream() {
break;
}
// Layer 3: Per-frame idle timeout safety net
match tokio::time::timeout(FRAME_IDLE_TIMEOUT, body.frame()).await {
Ok(Some(Ok(frame))) => {
if let Some(data) = frame.data_ref() {
total_bytes_out += data.len() as u64;
stream.send_data(Bytes::copy_from_slice(data)).await
.map_err(|e| anyhow::anyhow!("Failed to send H3 data: {}", e))?;
// Layer 2: Content-Length byte count check
if let Some(cl) = content_length {
if total_bytes_out >= cl {
break;
}
}
}
}
Err(e) => {
Ok(Some(Err(e))) => {
warn!("Backend body read error: {}", e);
break;
}
Ok(None) => break, // Body ended naturally
Err(_) => {
debug!(
"H3 body frame idle timeout ({:?}) after {} bytes; finishing stream",
FRAME_IDLE_TIMEOUT, total_bytes_out
);
break;
}
}
}
// Record metrics
let route_id = route.name.as_deref().or(route.id.as_deref());
metrics.record_bytes(0, total_bytes_out, route_id, Some(client_ip));
metrics.record_bytes(total_bytes_in, total_bytes_out, route_id, Some(client_ip));
// Finish the stream
stream.finish().await
@@ -262,17 +348,19 @@ async fn handle_h3_request(
}
/// Build an HTTP/1.1 backend request from the H3 frontend request.
fn build_backend_request(
fn build_backend_request<B>(
method: &hyper::Method,
backend_addr: &str,
path: &str,
host: &str,
original_request: &hyper::Request<()>,
body: http_body_util::Full<Bytes>,
) -> anyhow::Result<hyper::Request<http_body_util::Full<Bytes>>> {
body: B,
use_tls: bool,
) -> anyhow::Result<hyper::Request<B>> {
let scheme = if use_tls { "https" } else { "http" };
let mut req = hyper::Request::builder()
.method(method)
.uri(format!("http://{}{}", backend_addr, path))
.uri(format!("{}://{}{}", scheme, backend_addr, path))
.header("host", host);
// Forward non-pseudo headers
@@ -286,3 +374,27 @@ fn build_backend_request(
req.body(body)
.map_err(|e| anyhow::anyhow!("Failed to build backend request: {}", e))
}
/// A streaming request body backed by an mpsc channel receiver.
///
/// Implements `http_body::Body` so hyper can poll chunks as they arrive
/// from the H3 client, avoiding buffering the entire request body in memory.
struct H3RequestBody {
receiver: tokio::sync::mpsc::Receiver<Bytes>,
}
impl http_body::Body for H3RequestBody {
type Data = Bytes;
type Error = hyper::Error;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.receiver.poll_recv(cx) {
Poll::Ready(Some(data)) => Poll::Ready(Some(Ok(Frame::data(data)))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

View File

@@ -1,8 +1,11 @@
//! Bounded, TTL-based protocol detection cache for HTTP/2 auto-detection.
//! Bounded, TTL-based protocol detection cache for backend protocol auto-detection.
//!
//! Caches the ALPN-negotiated protocol (H1 or H2) per backend endpoint and requested
//! Caches the detected protocol (H1, H2, or H3) per backend endpoint and requested
//! domain (host:port + requested_host). This prevents cache oscillation when multiple
//! frontend domains share the same backend but differ in HTTP/2 support.
//! frontend domains share the same backend but differ in protocol support.
//!
//! H3 detection uses the browser model: Alt-Svc headers from H1/H2 responses are
//! parsed and cached, including the advertised H3 port (which may differ from TCP).
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -29,6 +32,14 @@ pub enum DetectedProtocol {
H3,
}
/// Result of a protocol cache lookup.
#[derive(Debug, Clone, Copy)]
pub struct CachedProtocol {
pub protocol: DetectedProtocol,
/// For H3: the port advertised by Alt-Svc (may differ from TCP port).
pub h3_port: Option<u16>,
}
/// Key for the protocol cache: (host, port, requested_host).
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub struct ProtocolCacheKey {
@@ -43,6 +54,8 @@ pub struct ProtocolCacheKey {
struct CachedEntry {
protocol: DetectedProtocol,
detected_at: Instant,
/// For H3: the port advertised by Alt-Svc (may differ from TCP port).
h3_port: Option<u16>,
}
/// Bounded, TTL-based protocol detection cache.
@@ -75,11 +88,14 @@ impl ProtocolCache {
/// Look up the cached protocol for a backend endpoint.
/// Returns `None` if not cached or expired (caller should probe via ALPN).
pub fn get(&self, key: &ProtocolCacheKey) -> Option<DetectedProtocol> {
pub fn get(&self, key: &ProtocolCacheKey) -> Option<CachedProtocol> {
let entry = self.cache.get(key)?;
if entry.detected_at.elapsed() < PROTOCOL_CACHE_TTL {
debug!("Protocol cache hit: {:?} for {}:{} (requested: {:?})", entry.protocol, key.host, key.port, key.requested_host);
Some(entry.protocol)
Some(CachedProtocol {
protocol: entry.protocol,
h3_port: entry.h3_port,
})
} else {
// Expired — remove and return None to trigger re-probe
drop(entry); // release DashMap ref before remove
@@ -91,6 +107,16 @@ impl ProtocolCache {
/// Insert a detected protocol into the cache.
/// If the cache is at capacity, evict the oldest entry first.
pub fn insert(&self, key: ProtocolCacheKey, protocol: DetectedProtocol) {
self.insert_with_h3_port(key, protocol, None);
}
/// Insert an H3 detection result with the Alt-Svc advertised port.
pub fn insert_h3(&self, key: ProtocolCacheKey, h3_port: u16) {
self.insert_with_h3_port(key, DetectedProtocol::H3, Some(h3_port));
}
/// Insert a protocol detection result with an optional H3 port.
fn insert_with_h3_port(&self, key: ProtocolCacheKey, protocol: DetectedProtocol, h3_port: Option<u16>) {
if self.cache.len() >= PROTOCOL_CACHE_MAX_ENTRIES && !self.cache.contains_key(&key) {
// Evict the oldest entry to stay within bounds
let oldest = self.cache.iter()
@@ -103,6 +129,7 @@ impl ProtocolCache {
self.cache.insert(key, CachedEntry {
protocol,
detected_at: Instant::now(),
h3_port,
});
}

View File

@@ -43,6 +43,10 @@ struct ConnActivity {
/// increments on creation and decrements on Drop, keeping the watchdog aware that
/// a response body is still streaming after the request handler has returned.
active_requests: Option<Arc<AtomicU64>>,
/// Protocol cache key for Alt-Svc discovery. When set, `build_streaming_response`
/// checks the backend's original response headers for Alt-Svc before our
/// ResponseFilter injects its own. None when not in auto-detect mode or after H3 failure.
alt_svc_cache_key: Option<crate::protocol_cache::ProtocolCacheKey>,
}
/// Default upstream connect timeout (30 seconds).
@@ -58,6 +62,18 @@ const DEFAULT_WS_INACTIVITY_TIMEOUT: std::time::Duration = std::time::Duration::
/// Default WebSocket max lifetime (24 hours).
const DEFAULT_WS_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(86400);
/// Timeout for QUIC (H3) backend connections. Short because UDP is often firewalled.
const QUIC_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
/// Protocol decision for backend connection.
#[derive(Debug)]
enum ProtocolDecision {
H1,
H2,
H3 { port: u16 },
AlpnProbe,
}
/// RAII guard that decrements the active request counter on drop.
/// Ensures the counter is correct even if the request handler panics.
struct ActiveRequestGuard {
@@ -190,6 +206,9 @@ pub struct HttpProxyService {
ws_inactivity_timeout: std::time::Duration,
/// WebSocket maximum connection lifetime.
ws_max_lifetime: std::time::Duration,
/// Shared QUIC client endpoint for outbound H3 backend connections.
/// Lazily initialized on first H3 backend attempt.
quinn_client_endpoint: Arc<quinn::Endpoint>,
}
impl HttpProxyService {
@@ -209,6 +228,7 @@ impl HttpProxyService {
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
}
}
@@ -233,6 +253,7 @@ impl HttpProxyService {
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
}
}
@@ -324,7 +345,7 @@ impl HttpProxyService {
let cn = cancel_inner.clone();
let la = Arc::clone(&la_inner);
let st = start;
let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start, active_requests: Some(Arc::clone(&ar_inner)) };
let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start, active_requests: Some(Arc::clone(&ar_inner)), alt_svc_cache_key: None };
async move {
let result = svc.handle_request(req, peer, port, cn, ca).await;
// Mark request end — update activity timestamp before guard drops
@@ -401,7 +422,7 @@ impl HttpProxyService {
peer_addr: std::net::SocketAddr,
port: u16,
cancel: CancellationToken,
conn_activity: ConnActivity,
mut conn_activity: ConnActivity,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let host = req.headers()
.get("host")
@@ -645,37 +666,101 @@ impl HttpProxyService {
// --- Resolve protocol decision based on backend protocol mode ---
let is_auto_detect_mode = matches!(backend_protocol_mode, rustproxy_config::BackendProtocol::Auto);
let (use_h2, needs_alpn_probe) = match backend_protocol_mode {
rustproxy_config::BackendProtocol::Http1 => (false, false),
rustproxy_config::BackendProtocol::Http2 => (true, false),
rustproxy_config::BackendProtocol::Http3 => {
// HTTP/3 (QUIC) backend connections not yet implemented — fall back to H1
warn!("backendProtocol 'http3' not yet implemented, falling back to http1");
(false, false)
}
let protocol_cache_key = crate::protocol_cache::ProtocolCacheKey {
host: upstream.host.clone(),
port: upstream.port,
requested_host: host.clone(),
};
let protocol_decision = match backend_protocol_mode {
rustproxy_config::BackendProtocol::Http1 => ProtocolDecision::H1,
rustproxy_config::BackendProtocol::Http2 => ProtocolDecision::H2,
rustproxy_config::BackendProtocol::Http3 => ProtocolDecision::H3 { port: upstream.port },
rustproxy_config::BackendProtocol::Auto => {
if !upstream.use_tls {
// No ALPN without TLS — default to H1
(false, false)
// No ALPN without TLS, no QUIC without TLS — default to H1
ProtocolDecision::H1
} else {
let cache_key = crate::protocol_cache::ProtocolCacheKey {
host: upstream.host.clone(),
port: upstream.port,
requested_host: host.clone(),
};
match self.protocol_cache.get(&cache_key) {
Some(crate::protocol_cache::DetectedProtocol::H2) => (true, false),
Some(crate::protocol_cache::DetectedProtocol::H1) => (false, false),
Some(crate::protocol_cache::DetectedProtocol::H3) => {
// H3 cached but we're on TCP — fall back to H2 probe
(false, true)
}
None => (false, true), // needs ALPN probe
match self.protocol_cache.get(&protocol_cache_key) {
Some(cached) => match cached.protocol {
crate::protocol_cache::DetectedProtocol::H3 => {
if let Some(h3_port) = cached.h3_port {
ProtocolDecision::H3 { port: h3_port }
} else {
// H3 cached but no port — fall back to ALPN probe
ProtocolDecision::AlpnProbe
}
}
crate::protocol_cache::DetectedProtocol::H2 => ProtocolDecision::H2,
crate::protocol_cache::DetectedProtocol::H1 => ProtocolDecision::H1,
},
None => ProtocolDecision::AlpnProbe,
}
}
}
};
// Derive legacy flags for the existing H1/H2 connection path
let (use_h2, mut needs_alpn_probe) = match &protocol_decision {
ProtocolDecision::H1 => (false, false),
ProtocolDecision::H2 => (true, false),
ProtocolDecision::H3 { .. } => (false, false), // H3 path handled separately below
ProtocolDecision::AlpnProbe => (false, true),
};
// Set Alt-Svc cache key on conn_activity so build_streaming_response can check
// the backend's original Alt-Svc header before ResponseFilter injects our own.
if is_auto_detect_mode {
conn_activity.alt_svc_cache_key = Some(protocol_cache_key.clone());
}
// --- H3 path: try QUIC connection before TCP ---
if let ProtocolDecision::H3 { port: h3_port } = protocol_decision {
let h3_pool_key = crate::connection_pool::PoolKey {
host: upstream.host.clone(),
port: h3_port,
use_tls: true,
protocol: crate::connection_pool::PoolProtocol::H3,
};
// Try H3 pool checkout first
if let Some((quic_conn, _age)) = self.connection_pool.checkout_h3(&h3_pool_key) {
self.metrics.backend_pool_hit(&upstream_key);
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
return result;
}
// Try fresh QUIC connection
match self.connect_quic_backend(&upstream.host, h3_port).await {
Ok(quic_conn) => {
self.metrics.backend_pool_miss(&upstream_key);
self.metrics.backend_connection_opened(&upstream_key, std::time::Instant::now().elapsed());
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
return result;
}
Err(e) => {
warn!(backend = %upstream_key, error = %e,
"H3 backend connect failed, falling back to H2/H1");
// Suppress Alt-Svc caching for the fallback to prevent re-caching H3
// from our own injected Alt-Svc header or a stale backend Alt-Svc
conn_activity.alt_svc_cache_key = None;
// Force ALPN probe on TCP fallback so we correctly detect H2 vs H1
// (don't cache anything yet — let the ALPN probe decide)
if is_auto_detect_mode && upstream.use_tls {
needs_alpn_probe = true;
}
// Fall through to TCP path
}
}
}
// --- Connection pooling: try reusing an existing connection first ---
// For ALPN probe mode, skip pool checkout (we don't know the protocol yet)
if !needs_alpn_probe {
@@ -870,6 +955,7 @@ impl HttpProxyService {
};
self.upstream_selector.connection_ended(&upstream_key);
self.metrics.backend_connection_closed(&upstream_key);
result
}
@@ -1668,6 +1754,19 @@ impl HttpProxyService {
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let (resp_parts, resp_body) = upstream_response.into_parts();
// Check for Alt-Svc in the backend's ORIGINAL response headers BEFORE
// ResponseFilter::apply_headers runs — the filter may inject our own Alt-Svc
// for client-facing HTTP/3 advertisement, which must not be confused with
// backend-originated Alt-Svc.
if let Some(ref cache_key) = conn_activity.alt_svc_cache_key {
if let Some(alt_svc) = resp_parts.headers.get("alt-svc").and_then(|v| v.to_str().ok()) {
if let Some(h3_port) = parse_alt_svc_h3_port(alt_svc) {
debug!(h3_port, "Backend advertises H3 via Alt-Svc");
self.protocol_cache.insert_h3(cache_key.clone(), h3_port);
}
}
}
let mut response = Response::builder()
.status(resp_parts.status);
@@ -2393,6 +2492,256 @@ impl HttpProxyService {
config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
Arc::new(config)
}
/// Create a shared QUIC client endpoint for outbound H3 backend connections.
fn create_quinn_client_endpoint() -> quinn::Endpoint {
let _ = rustls::crypto::ring::default_provider().install_default();
let mut tls_config = rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(Arc::new(InsecureBackendVerifier))
.with_no_client_auth();
tls_config.alpn_protocols = vec![b"h3".to_vec()];
let quic_crypto = quinn::crypto::rustls::QuicClientConfig::try_from(tls_config)
.expect("Failed to create QUIC client crypto config");
let client_config = quinn::ClientConfig::new(Arc::new(quic_crypto));
let mut endpoint = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())
.expect("Failed to create QUIC client endpoint");
endpoint.set_default_client_config(client_config);
endpoint
}
/// Connect to a backend via QUIC (H3).
async fn connect_quic_backend(
&self,
host: &str,
port: u16,
) -> Result<quinn::Connection, Box<dyn std::error::Error + Send + Sync>> {
let addr = tokio::net::lookup_host(format!("{}:{}", host, port))
.await?
.next()
.ok_or("DNS resolution returned no addresses")?;
let server_name = host.to_string();
let connecting = self.quinn_client_endpoint.connect(addr, &server_name)?;
let connection = tokio::time::timeout(QUIC_CONNECT_TIMEOUT, connecting).await
.map_err(|_| "QUIC connect timeout (3s)")??;
debug!("QUIC backend connection established to {}:{}", host, port);
Ok(connection)
}
/// Forward request to backend via HTTP/3 over QUIC.
async fn forward_h3(
&self,
quic_conn: quinn::Connection,
parts: hyper::http::request::Parts,
body: Incoming,
upstream_headers: hyper::HeaderMap,
upstream_path: &str,
route: &rustproxy_config::RouteConfig,
route_id: Option<&str>,
source_ip: &str,
pool_key: &crate::connection_pool::PoolKey,
domain: &str,
conn_activity: &ConnActivity,
backend_key: &str,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let h3_quinn_conn = h3_quinn::Connection::new(quic_conn.clone());
let (mut driver, mut send_request) = match h3::client::builder()
.send_grease(false)
.build(h3_quinn_conn)
.await
{
Ok(pair) => pair,
Err(e) => {
error!(backend = %backend_key, domain = %domain, error = %e, "H3 client handshake failed");
self.metrics.backend_handshake_error(backend_key);
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 handshake failed"));
}
};
// Spawn the h3 connection driver
let driver_pool = Arc::clone(&self.connection_pool);
let driver_pool_key = pool_key.clone();
let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX));
let driver_gen = Arc::clone(&gen_holder);
tokio::spawn(async move {
let close_err = std::future::poll_fn(|cx| driver.poll_close(cx)).await;
debug!("H3 connection driver closed: {:?}", close_err);
let g = driver_gen.load(std::sync::atomic::Ordering::Relaxed);
if g != u64::MAX {
driver_pool.remove_h3_if_generation(&driver_pool_key, g);
}
});
// Build the H3 request
let uri = hyper::Uri::builder()
.scheme("https")
.authority(domain)
.path_and_query(upstream_path)
.build()
.unwrap_or_else(|_| upstream_path.parse().unwrap_or_default());
let mut h3_req = hyper::Request::builder()
.method(parts.method.clone())
.uri(uri);
if let Some(headers) = h3_req.headers_mut() {
*headers = upstream_headers;
}
let h3_req = h3_req.body(()).unwrap();
// Send the request
let mut stream = match send_request.send_request(h3_req).await {
Ok(s) => s,
Err(e) => {
error!(backend = %backend_key, domain = %domain, error = %e, "H3 send_request failed");
self.metrics.backend_request_error(backend_key);
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 request failed"));
}
};
// Stream request body
let rid: Option<Arc<str>> = route_id.map(Arc::from);
let sip: Arc<str> = Arc::from(source_ip);
{
use http_body_util::BodyExt;
let mut body = body;
while let Some(frame) = body.frame().await {
match frame {
Ok(frame) => {
if let Some(data) = frame.data_ref() {
self.metrics.record_bytes(data.len() as u64, 0, rid.as_deref(), Some(&sip));
if let Err(e) = stream.send_data(Bytes::copy_from_slice(data)).await {
error!(backend = %backend_key, error = %e, "H3 send_data failed");
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 body send failed"));
}
}
}
Err(e) => {
warn!(backend = %backend_key, error = %e, "Client body read error during H3 forward");
break;
}
}
}
// Signal end of body
stream.finish().await.ok();
}
// Read response
let h3_response = match stream.recv_response().await {
Ok(resp) => resp,
Err(e) => {
error!(backend = %backend_key, domain = %domain, error = %e, "H3 recv_response failed");
self.metrics.backend_request_error(backend_key);
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 response failed"));
}
};
// Build the response for the client
let status = h3_response.status();
let mut response = Response::builder().status(status);
if let Some(headers) = response.headers_mut() {
for (name, value) in h3_response.headers() {
let n = name.as_str();
// Skip hop-by-hop headers
if n == "transfer-encoding" || n == "connection" || n == "keep-alive" {
continue;
}
headers.insert(name.clone(), value.clone());
}
ResponseFilter::apply_headers(route, headers, None);
}
// Stream response body back via an adapter
let h3_body = H3ClientResponseBody { stream };
let counting_body = CountingBody::new(
h3_body,
Arc::clone(&self.metrics),
rid,
Some(sip),
Direction::Out,
).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start);
let counting_body = if let Some(ref ar) = conn_activity.active_requests {
counting_body.with_active_requests(Arc::clone(ar))
} else {
counting_body
};
let body: BoxBody<Bytes, hyper::Error> = BoxBody::new(counting_body);
// Register connection in pool on success
if status != StatusCode::BAD_GATEWAY {
let g = self.connection_pool.register_h3(pool_key.clone(), quic_conn);
gen_holder.store(g, std::sync::atomic::Ordering::Relaxed);
}
self.metrics.set_backend_protocol(backend_key, "h3");
Ok(response.body(body).unwrap())
}
}
/// Parse an Alt-Svc header value to extract the H3 port.
/// Handles formats like `h3=":443"; ma=86400` and `h3=":8443", h2=":443"`.
fn parse_alt_svc_h3_port(header_value: &str) -> Option<u16> {
for directive in header_value.split(',') {
let directive = directive.trim();
// Match h3=":<port>" or h3-29=":<port>" etc.
if directive.starts_with("h3=") || directive.starts_with("h3-") {
// Find the port in ":<port>"
if let Some(start) = directive.find("\":") {
let rest = &directive[start + 2..];
if let Some(end) = rest.find('"') {
if let Ok(port) = rest[..end].parse::<u16>() {
return Some(port);
}
}
}
}
}
None
}
/// Response body adapter for H3 client responses.
/// Reads data from the h3 `RequestStream` recv side and presents it as an `http_body::Body`.
struct H3ClientResponseBody {
stream: h3::client::RequestStream<h3_quinn::BidiStream<Bytes>, Bytes>,
}
impl http_body::Body for H3ClientResponseBody {
type Data = Bytes;
type Error = hyper::Error;
fn poll_frame(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
// h3's recv_data is async, so we need to poll it manually.
// Use a small future to poll the recv_data call.
use std::future::Future;
let mut fut = Box::pin(self.stream.recv_data());
match fut.as_mut().poll(_cx) {
Poll::Ready(Ok(Some(mut buf))) => {
use bytes::Buf;
let data = Bytes::copy_from_slice(buf.chunk());
buf.advance(buf.remaining());
Poll::Ready(Some(Ok(http_body::Frame::data(data))))
}
Poll::Ready(Ok(None)) => Poll::Ready(None),
Poll::Ready(Err(e)) => {
warn!("H3 response body recv error: {}", e);
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}
/// Insecure certificate verifier for backend TLS connections (fallback only).
@@ -2463,6 +2812,7 @@ impl Default for HttpProxyService {
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
}
}
}

View File

@@ -3,13 +3,21 @@
//! Manages QUIC endpoints (via quinn), accepts connections, and either:
//! - Forwards streams bidirectionally to TCP backends (QUIC termination)
//! - Dispatches to H3ProxyService for HTTP/3 handling (Phase 5)
//!
//! When `proxy_ips` is configured, a UDP relay layer intercepts PROXY protocol v2
//! headers before they reach quinn, extracting real client IPs for attribution.
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tokio::io::AsyncWriteExt;
use tokio::net::UdpSocket;
use tokio::task::JoinHandle;
use arc_swap::ArcSwap;
use dashmap::DashMap;
use quinn::{Endpoint, ServerConfig as QuinnServerConfig};
use rustls::ServerConfig as RustlsServerConfig;
use tokio_util::sync::CancellationToken;
@@ -19,8 +27,9 @@ use rustproxy_config::{RouteConfig, TransportProtocol};
use rustproxy_metrics::MetricsCollector;
use rustproxy_routing::{MatchContext, RouteManager};
use rustproxy_http::h3_service::H3ProxyService;
use crate::connection_tracker::ConnectionTracker;
use crate::forwarder::ForwardMetricsCtx;
/// Create a QUIC server endpoint on the given port with the provided TLS config.
///
@@ -46,9 +55,274 @@ pub fn create_quic_endpoint(
Ok(endpoint)
}
// ===== PROXY protocol relay for QUIC =====
/// Result of creating a QUIC endpoint with a PROXY protocol relay layer.
pub struct QuicProxyRelay {
/// The quinn endpoint (bound to 127.0.0.1:ephemeral).
pub endpoint: Endpoint,
/// The relay recv loop task handle.
pub relay_task: JoinHandle<()>,
/// Maps relay socket local addr → real client SocketAddr (from PROXY v2).
/// Consulted by `quic_accept_loop` to resolve real client IPs.
pub real_client_map: Arc<DashMap<SocketAddr, SocketAddr>>,
}
/// A single relay session for forwarding datagrams between an external source
/// and the internal quinn endpoint.
struct RelaySession {
socket: Arc<UdpSocket>,
last_activity: AtomicU64,
return_task: JoinHandle<()>,
cancel: CancellationToken,
}
/// Create a QUIC endpoint with a PROXY protocol v2 relay layer.
///
/// Instead of giving the external socket to quinn, we:
/// 1. Bind a raw UDP socket on 0.0.0.0:port (external)
/// 2. Bind quinn on 127.0.0.1:0 (internal, ephemeral)
/// 3. Run a relay loop that filters PROXY v2 headers and forwards datagrams
///
/// Only used when `proxy_ips` is non-empty.
pub fn create_quic_endpoint_with_proxy_relay(
port: u16,
tls_config: Arc<RustlsServerConfig>,
proxy_ips: Arc<Vec<IpAddr>>,
cancel: CancellationToken,
) -> anyhow::Result<QuicProxyRelay> {
// Bind external socket on the real port
let external_socket = std::net::UdpSocket::bind(SocketAddr::from(([0, 0, 0, 0], port)))?;
external_socket.set_nonblocking(true)?;
let external_socket = Arc::new(
UdpSocket::from_std(external_socket)
.map_err(|e| anyhow::anyhow!("Failed to wrap external socket: {}", e))?,
);
// Bind quinn on localhost ephemeral port
let internal_socket = std::net::UdpSocket::bind("127.0.0.1:0")?;
let quinn_internal_addr = internal_socket.local_addr()?;
let quic_crypto = quinn::crypto::rustls::QuicServerConfig::try_from(tls_config)
.map_err(|e| anyhow::anyhow!("Failed to create QUIC crypto config: {}", e))?;
let server_config = QuinnServerConfig::with_crypto(Arc::new(quic_crypto));
let endpoint = Endpoint::new(
quinn::EndpointConfig::default(),
Some(server_config),
internal_socket,
quinn::default_runtime()
.ok_or_else(|| anyhow::anyhow!("No async runtime for quinn"))?,
)?;
let real_client_map = Arc::new(DashMap::new());
let relay_task = tokio::spawn(quic_proxy_relay_loop(
external_socket,
quinn_internal_addr,
proxy_ips,
Arc::clone(&real_client_map),
cancel,
));
info!("QUIC endpoint with PROXY relay on port {} (quinn internal: {})", port, quinn_internal_addr);
Ok(QuicProxyRelay { endpoint, relay_task, real_client_map })
}
/// Main relay loop: reads datagrams from the external socket, filters PROXY v2
/// headers from trusted proxy IPs, and forwards everything else to quinn via
/// per-session relay sockets.
async fn quic_proxy_relay_loop(
external_socket: Arc<UdpSocket>,
quinn_internal_addr: SocketAddr,
proxy_ips: Arc<Vec<IpAddr>>,
real_client_map: Arc<DashMap<SocketAddr, SocketAddr>>,
cancel: CancellationToken,
) {
// Maps external source addr → real client addr (from PROXY v2 headers)
let proxy_addr_map: DashMap<SocketAddr, SocketAddr> = DashMap::new();
// Maps external source addr → relay session
let relay_sessions: DashMap<SocketAddr, Arc<RelaySession>> = DashMap::new();
let epoch = Instant::now();
let mut buf = vec![0u8; 65535];
// Inline cleanup: periodically scan relay_sessions for stale entries
let mut last_cleanup = Instant::now();
let cleanup_interval = std::time::Duration::from_secs(30);
let session_timeout_ms: u64 = 120_000;
loop {
let (len, src_addr) = tokio::select! {
_ = cancel.cancelled() => {
debug!("QUIC proxy relay loop cancelled");
break;
}
result = external_socket.recv_from(&mut buf) => {
match result {
Ok(r) => r,
Err(e) => {
warn!("QUIC proxy relay recv error: {}", e);
continue;
}
}
}
};
let datagram = &buf[..len];
// PROXY v2 handling: only on first datagram from a trusted proxy IP
// (before a relay session exists for this source)
if proxy_ips.contains(&src_addr.ip()) && relay_sessions.get(&src_addr).is_none() {
if crate::proxy_protocol::is_proxy_protocol_v2(datagram) {
match crate::proxy_protocol::parse_v2(datagram) {
Ok((header, _consumed)) => {
debug!("QUIC PROXY v2 from {}: real client {}", src_addr, header.source_addr);
proxy_addr_map.insert(src_addr, header.source_addr);
continue; // consume the PROXY v2 datagram
}
Err(e) => {
debug!("QUIC proxy relay: failed to parse PROXY v2 from {}: {}", src_addr, e);
}
}
}
}
// Determine real client address
let real_client = proxy_addr_map.get(&src_addr)
.map(|r| *r)
.unwrap_or(src_addr);
// Get or create relay session for this external source
let session = match relay_sessions.get(&src_addr) {
Some(s) => {
s.last_activity.store(epoch.elapsed().as_millis() as u64, Ordering::Relaxed);
Arc::clone(s.value())
}
None => {
// Create new relay socket connected to quinn's internal address
let relay_socket = match UdpSocket::bind("127.0.0.1:0").await {
Ok(s) => s,
Err(e) => {
warn!("QUIC relay: failed to bind relay socket: {}", e);
continue;
}
};
if let Err(e) = relay_socket.connect(quinn_internal_addr).await {
warn!("QUIC relay: failed to connect relay socket to {}: {}", quinn_internal_addr, e);
continue;
}
let relay_local_addr = match relay_socket.local_addr() {
Ok(a) => a,
Err(e) => {
warn!("QUIC relay: failed to get relay socket local addr: {}", e);
continue;
}
};
let relay_socket = Arc::new(relay_socket);
// Store the real client mapping for the QUIC accept loop
real_client_map.insert(relay_local_addr, real_client);
// Spawn return-path relay: quinn -> external socket -> original source
let session_cancel = cancel.child_token();
let return_task = tokio::spawn(relay_return_path(
Arc::clone(&relay_socket),
Arc::clone(&external_socket),
src_addr,
session_cancel.child_token(),
));
let session = Arc::new(RelaySession {
socket: relay_socket,
last_activity: AtomicU64::new(epoch.elapsed().as_millis() as u64),
return_task,
cancel: session_cancel,
});
relay_sessions.insert(src_addr, Arc::clone(&session));
debug!("QUIC relay: new session for {} (relay {}), real client {}",
src_addr, relay_local_addr, real_client);
session
}
};
// Forward datagram to quinn via the relay socket
if let Err(e) = session.socket.send(datagram).await {
debug!("QUIC relay: forward error to quinn for {}: {}", src_addr, e);
}
// Periodic cleanup of stale relay sessions
if last_cleanup.elapsed() >= cleanup_interval {
last_cleanup = Instant::now();
let now_ms = epoch.elapsed().as_millis() as u64;
let stale_keys: Vec<SocketAddr> = relay_sessions.iter()
.filter(|entry| {
let age = now_ms.saturating_sub(entry.value().last_activity.load(Ordering::Relaxed));
age > session_timeout_ms
})
.map(|entry| *entry.key())
.collect();
for key in stale_keys {
if let Some((_, session)) = relay_sessions.remove(&key) {
session.cancel.cancel();
session.return_task.abort();
// Clean up real_client_map entry
if let Ok(addr) = session.socket.local_addr() {
real_client_map.remove(&addr);
}
proxy_addr_map.remove(&key);
debug!("QUIC relay: cleaned up stale session for {}", key);
}
}
}
}
// Shutdown: cancel all relay sessions
for entry in relay_sessions.iter() {
entry.value().cancel.cancel();
entry.value().return_task.abort();
}
}
/// Return-path relay: receives datagrams from quinn (via the relay socket)
/// and forwards them back to the external client through the external socket.
async fn relay_return_path(
relay_socket: Arc<UdpSocket>,
external_socket: Arc<UdpSocket>,
external_src_addr: SocketAddr,
cancel: CancellationToken,
) {
let mut buf = vec![0u8; 65535];
loop {
let len = tokio::select! {
_ = cancel.cancelled() => break,
result = relay_socket.recv(&mut buf) => {
match result {
Ok(len) => len,
Err(e) => {
debug!("QUIC relay return recv error for {}: {}", external_src_addr, e);
break;
}
}
}
};
if let Err(e) = external_socket.send_to(&buf[..len], external_src_addr).await {
debug!("QUIC relay return send error to {}: {}", external_src_addr, e);
break;
}
}
}
// ===== QUIC accept loop =====
/// Run the QUIC accept loop for a single endpoint.
///
/// Accepts incoming QUIC connections and spawns a task per connection.
/// When `real_client_map` is provided, it is consulted to resolve real client
/// IPs from PROXY protocol v2 headers (relay socket addr → real client addr).
pub async fn quic_accept_loop(
endpoint: Endpoint,
port: u16,
@@ -56,6 +330,8 @@ pub async fn quic_accept_loop(
metrics: Arc<MetricsCollector>,
conn_tracker: Arc<ConnectionTracker>,
cancel: CancellationToken,
h3_service: Option<Arc<H3ProxyService>>,
real_client_map: Option<Arc<DashMap<SocketAddr, SocketAddr>>>,
) {
loop {
let incoming = tokio::select! {
@@ -75,11 +351,16 @@ pub async fn quic_accept_loop(
};
let remote_addr = incoming.remote_address();
let ip = remote_addr.ip();
// Resolve real client IP from PROXY protocol map if available
let real_addr = real_client_map.as_ref()
.and_then(|map| map.get(&remote_addr).map(|r| *r))
.unwrap_or(remote_addr);
let ip = real_addr.ip();
// Per-IP rate limiting
if !conn_tracker.try_accept(&ip) {
debug!("QUIC connection rejected from {} (rate limit)", remote_addr);
debug!("QUIC connection rejected from {} (rate limit)", real_addr);
// Drop `incoming` to refuse the connection
continue;
}
@@ -102,7 +383,7 @@ pub async fn quic_accept_loop(
let route = match rm.find_route(&ctx) {
Some(m) => m.route.clone(),
None => {
debug!("No QUIC route matched for port {} from {}", port, remote_addr);
debug!("No QUIC route matched for port {} from {}", port, real_addr);
continue;
}
};
@@ -114,11 +395,13 @@ pub async fn quic_accept_loop(
let metrics = Arc::clone(&metrics);
let conn_tracker = Arc::clone(&conn_tracker);
let cancel = cancel.child_token();
let h3_svc = h3_service.clone();
let real_client_addr = if real_addr != remote_addr { Some(real_addr) } else { None };
tokio::spawn(async move {
match handle_quic_connection(incoming, route, port, &metrics, &cancel).await {
Ok(()) => debug!("QUIC connection from {} completed", remote_addr),
Err(e) => debug!("QUIC connection from {} error: {}", remote_addr, e),
match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel, h3_svc, real_client_addr).await {
Ok(()) => debug!("QUIC connection from {} completed", real_addr),
Err(e) => debug!("QUIC connection from {} error: {}", real_addr, e),
}
// Cleanup
@@ -138,12 +421,14 @@ async fn handle_quic_connection(
incoming: quinn::Incoming,
route: RouteConfig,
port: u16,
metrics: &MetricsCollector,
metrics: Arc<MetricsCollector>,
cancel: &CancellationToken,
h3_service: Option<Arc<H3ProxyService>>,
real_client_addr: Option<SocketAddr>,
) -> anyhow::Result<()> {
let connection = incoming.await?;
let remote_addr = connection.remote_address();
debug!("QUIC connection established from {}", remote_addr);
let effective_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
debug!("QUIC connection established from {}", effective_addr);
// Check if this route has HTTP/3 enabled
let enable_http3 = route.action.udp.as_ref()
@@ -152,13 +437,23 @@ async fn handle_quic_connection(
.unwrap_or(false);
if enable_http3 {
// Phase 5: dispatch to H3ProxyService
// For now, log and accept streams for basic handling
debug!("HTTP/3 enabled for route {:?}, dispatching to H3 handler", route.name);
handle_h3_connection(connection, route, port, metrics, cancel).await
if let Some(ref h3_svc) = h3_service {
debug!("HTTP/3 enabled for route {:?}, dispatching to H3ProxyService", route.name);
h3_svc.handle_connection(connection, &route, port, real_client_addr).await
} else {
warn!("HTTP/3 enabled for route {:?} but H3ProxyService not initialized", route.name);
// Keep connection alive until cancelled
tokio::select! {
_ = cancel.cancelled() => {}
reason = connection.closed() => {
debug!("HTTP/3 connection closed (no service): {}", reason);
}
}
Ok(())
}
} else {
// Non-HTTP3 QUIC: bidirectional stream forwarding to TCP backend
handle_quic_stream_forwarding(connection, route, port, metrics, cancel).await
handle_quic_stream_forwarding(connection, route, port, metrics, cancel, real_client_addr).await
}
}
@@ -171,11 +466,13 @@ async fn handle_quic_stream_forwarding(
connection: quinn::Connection,
route: RouteConfig,
port: u16,
_metrics: &MetricsCollector,
metrics: Arc<MetricsCollector>,
cancel: &CancellationToken,
real_client_addr: Option<SocketAddr>,
) -> anyhow::Result<()> {
let remote_addr = connection.remote_address();
let effective_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
let route_id = route.name.as_deref().or(route.id.as_deref());
let metrics_arc = metrics;
// Resolve backend target
let target = route.action.targets.as_ref()
@@ -194,7 +491,7 @@ async fn handle_quic_stream_forwarding(
Err(quinn::ConnectionError::ApplicationClosed(_)) => break,
Err(quinn::ConnectionError::LocallyClosed) => break,
Err(e) => {
debug!("QUIC stream accept error from {}: {}", remote_addr, e);
debug!("QUIC stream accept error from {}: {}", effective_addr, e);
break;
}
}
@@ -202,12 +499,9 @@ async fn handle_quic_stream_forwarding(
};
let backend_addr = backend_addr.clone();
let ip_str = remote_addr.ip().to_string();
let _fwd_ctx = ForwardMetricsCtx {
collector: Arc::new(MetricsCollector::new()), // TODO: share real metrics
route_id: route_id.map(|s| s.to_string()),
source_ip: Some(ip_str),
};
let ip_str = effective_addr.ip().to_string();
let stream_metrics = Arc::clone(&metrics_arc);
let stream_route_id = route_id.map(|s| s.to_string());
// Spawn a task for each QUIC stream → TCP bidirectional forwarding
tokio::spawn(async move {
@@ -217,6 +511,11 @@ async fn handle_quic_stream_forwarding(
&backend_addr,
).await {
Ok((bytes_in, bytes_out)) => {
stream_metrics.record_bytes(
bytes_in, bytes_out,
stream_route_id.as_deref(),
Some(&ip_str),
);
debug!("QUIC stream forwarded: {}B in, {}B out", bytes_in, bytes_out);
}
Err(e) => {
@@ -255,29 +554,6 @@ async fn forward_quic_stream_to_tcp(
Ok((bytes_in, bytes_out))
}
/// Placeholder for HTTP/3 connection handling (Phase 5).
///
/// Once h3_service is implemented, this will delegate to it.
async fn handle_h3_connection(
connection: quinn::Connection,
_route: RouteConfig,
_port: u16,
_metrics: &MetricsCollector,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
warn!("HTTP/3 handling not yet fully implemented — accepting connection but no request processing");
// Keep the connection alive until cancelled or closed
tokio::select! {
_ = cancel.cancelled() => {}
reason = connection.closed() => {
debug!("HTTP/3 connection closed: {}", reason);
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -2,18 +2,23 @@
//!
//! Binds UDP sockets on configured ports, receives datagrams, matches routes,
//! tracks sessions (flows), and forwards datagrams to backend UDP sockets.
//!
//! Supports PROXY protocol v2 on both raw UDP and QUIC paths when `proxy_ips`
//! is configured. For QUIC, a relay layer intercepts datagrams before they
//! reach the quinn endpoint.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use dashmap::DashMap;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use arc_swap::ArcSwap;
use tokio::net::UdpSocket;
use tokio::task::JoinHandle;
use tokio::sync::RwLock;
use tokio::sync::{Mutex, RwLock};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
@@ -21,13 +26,15 @@ use rustproxy_config::{RouteActionType, TransportProtocol};
use rustproxy_metrics::MetricsCollector;
use rustproxy_routing::{MatchContext, RouteManager};
use rustproxy_http::h3_service::H3ProxyService;
use crate::connection_tracker::ConnectionTracker;
use crate::udp_session::{SessionKey, UdpSession, UdpSessionConfig, UdpSessionTable};
/// Manages UDP listeners across all configured ports.
pub struct UdpListenerManager {
/// Port → recv loop task handle
listeners: HashMap<u16, JoinHandle<()>>,
/// Port → (recv loop task handle, optional QUIC endpoint for TLS updates)
listeners: HashMap<u16, (JoinHandle<()>, Option<quinn::Endpoint>)>,
/// Hot-reloadable route table
route_manager: Arc<ArcSwap<RouteManager>>,
/// Shared metrics collector
@@ -40,6 +47,27 @@ pub struct UdpListenerManager {
cancel_token: CancellationToken,
/// Unix socket path for datagram handler relay
datagram_handler_relay: Arc<RwLock<Option<String>>>,
/// Persistent write half of the relay connection
relay_writer: Arc<Mutex<Option<tokio::net::unix::OwnedWriteHalf>>>,
/// Cancel token for the current relay reply reader task
relay_reader_cancel: Option<CancellationToken>,
/// H3 proxy service for HTTP/3 request handling
h3_service: Option<Arc<H3ProxyService>>,
/// Trusted proxy IPs that may send PROXY protocol v2 headers.
/// When non-empty, PROXY v2 detection is enabled on both raw UDP and QUIC paths.
proxy_ips: Arc<Vec<IpAddr>>,
}
impl Drop for UdpListenerManager {
fn drop(&mut self) {
self.cancel_token.cancel();
for (_, (handle, endpoint)) in self.listeners.drain() {
handle.abort();
if let Some(ep) = endpoint {
ep.close(quinn::VarInt::from_u32(0), b"shutdown");
}
}
}
}
impl UdpListenerManager {
@@ -57,9 +85,26 @@ impl UdpListenerManager {
session_table: Arc::new(UdpSessionTable::new()),
cancel_token,
datagram_handler_relay: Arc::new(RwLock::new(None)),
relay_writer: Arc::new(Mutex::new(None)),
relay_reader_cancel: None,
h3_service: None,
proxy_ips: Arc::new(Vec::new()),
}
}
/// Set the trusted proxy IPs for PROXY protocol v2 detection.
pub fn set_proxy_ips(&mut self, ips: Vec<IpAddr>) {
if !ips.is_empty() {
info!("UDP/QUIC PROXY protocol v2 enabled for {} trusted IPs", ips.len());
}
self.proxy_ips = Arc::new(ips);
}
/// Set the H3 proxy service for HTTP/3 request handling.
pub fn set_h3_service(&mut self, svc: Arc<H3ProxyService>) {
self.h3_service = Some(svc);
}
/// Update the route manager (for hot-reload).
pub fn update_routes(&self, route_manager: Arc<RouteManager>) {
self.route_manager.store(route_manager);
@@ -94,18 +139,44 @@ impl UdpListenerManager {
if has_quic {
if let Some(tls) = tls_config {
// Create QUIC endpoint
let endpoint = crate::quic_handler::create_quic_endpoint(port, tls)?;
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
endpoint,
port,
Arc::clone(&self.route_manager),
Arc::clone(&self.metrics),
Arc::clone(&self.conn_tracker),
self.cancel_token.child_token(),
));
self.listeners.insert(port, handle);
info!("QUIC endpoint started on port {}", port);
if self.proxy_ips.is_empty() {
// Direct path: quinn owns the external socket (zero overhead)
let endpoint = crate::quic_handler::create_quic_endpoint(port, tls)?;
let endpoint_for_updates = endpoint.clone();
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
endpoint,
port,
Arc::clone(&self.route_manager),
Arc::clone(&self.metrics),
Arc::clone(&self.conn_tracker),
self.cancel_token.child_token(),
self.h3_service.clone(),
None,
));
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
info!("QUIC endpoint started on port {}", port);
} else {
// Proxy relay path: we own external socket, quinn on localhost
let relay = crate::quic_handler::create_quic_endpoint_with_proxy_relay(
port,
tls,
Arc::clone(&self.proxy_ips),
self.cancel_token.child_token(),
)?;
let endpoint_for_updates = relay.endpoint.clone();
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
relay.endpoint,
port,
Arc::clone(&self.route_manager),
Arc::clone(&self.metrics),
Arc::clone(&self.conn_tracker),
self.cancel_token.child_token(),
self.h3_service.clone(),
Some(relay.real_client_map),
));
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
info!("QUIC endpoint with PROXY relay started on port {}", port);
}
return Ok(());
} else {
warn!("QUIC routes on port {} but no TLS config provided, falling back to raw UDP", port);
@@ -126,10 +197,12 @@ impl UdpListenerManager {
Arc::clone(&self.conn_tracker),
Arc::clone(&self.session_table),
Arc::clone(&self.datagram_handler_relay),
Arc::clone(&self.relay_writer),
self.cancel_token.child_token(),
Arc::clone(&self.proxy_ips),
));
self.listeners.insert(port, handle);
self.listeners.insert(port, (handle, None));
// Start the session cleanup task if this is the first port
if self.listeners.len() == 1 {
@@ -141,8 +214,11 @@ impl UdpListenerManager {
/// Stop listening on a UDP port.
pub fn remove_port(&mut self, port: u16) {
if let Some(handle) = self.listeners.remove(&port) {
if let Some((handle, endpoint)) = self.listeners.remove(&port) {
handle.abort();
if let Some(ep) = endpoint {
ep.close(quinn::VarInt::from_u32(0), b"port removed");
}
info!("UDP listener removed from port {}", port);
}
}
@@ -157,24 +233,223 @@ impl UdpListenerManager {
/// Stop all listeners and clean up.
pub async fn stop(&mut self) {
self.cancel_token.cancel();
for (port, handle) in self.listeners.drain() {
for (port, (handle, endpoint)) in self.listeners.drain() {
handle.abort();
if let Some(ep) = endpoint {
ep.close(quinn::VarInt::from_u32(0), b"shutdown");
}
debug!("UDP listener stopped on port {}", port);
}
info!("All UDP listeners stopped, {} sessions remaining",
self.session_table.session_count());
}
/// Set the datagram handler relay socket path.
pub async fn set_datagram_handler_relay(&self, path: String) {
let mut relay = self.datagram_handler_relay.write().await;
*relay = Some(path);
/// Update TLS config on all active QUIC endpoints (cert refresh).
/// Only affects new incoming connections — existing connections are undisturbed.
/// Uses quinn's Endpoint::set_server_config() for zero-downtime hot-swap.
pub fn update_quic_tls(&self, tls_config: Arc<rustls::ServerConfig>) {
for (port, (_handle, endpoint)) in &self.listeners {
if let Some(ep) = endpoint {
match quinn::crypto::rustls::QuicServerConfig::try_from(Arc::clone(&tls_config)) {
Ok(quic_crypto) => {
let server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_crypto));
ep.set_server_config(Some(server_config));
info!("Updated QUIC TLS config on port {}", port);
}
Err(e) => {
warn!("Failed to update QUIC TLS config on port {}: {}", port, e);
}
}
}
}
}
/// Upgrade raw UDP fallback listeners to QUIC endpoints.
///
/// At startup, if no TLS certs are available, QUIC routes fall back to raw UDP.
/// When certs become available later (via loadCertificate IPC or ACME), this method
/// stops the raw UDP listener, drains sessions, and creates a proper QUIC endpoint.
///
/// This is idempotent — ports that already have QUIC endpoints are skipped.
pub async fn upgrade_raw_to_quic(&mut self, tls_config: Arc<rustls::ServerConfig>) {
// Find ports that are raw UDP fallback (endpoint=None) but have QUIC routes
let rm = self.route_manager.load();
let upgrade_ports: Vec<u16> = self.listeners.iter()
.filter(|(_, (_, endpoint))| endpoint.is_none())
.filter(|(port, _)| {
rm.routes_for_port(**port).iter().any(|r| {
r.action.udp.as_ref()
.and_then(|u| u.quic.as_ref())
.is_some()
})
})
.map(|(port, _)| *port)
.collect();
for port in upgrade_ports {
info!("Upgrading raw UDP listener on port {} to QUIC endpoint", port);
// Stop the raw UDP listener task and drain sessions to release the socket
if let Some((handle, _)) = self.listeners.remove(&port) {
handle.abort();
}
let drained = self.session_table.drain_port(
port, &self.metrics, &self.conn_tracker,
);
if drained > 0 {
debug!("Drained {} UDP sessions on port {} for QUIC upgrade", drained, port);
}
// Brief yield to let aborted tasks drop their socket references
tokio::task::yield_now().await;
// Create QUIC endpoint on the now-free port
let create_result = if self.proxy_ips.is_empty() {
self.create_quic_direct(port, Arc::clone(&tls_config))
} else {
self.create_quic_with_relay(port, Arc::clone(&tls_config))
};
match create_result {
Ok(()) => {
info!("QUIC endpoint started on port {} (upgraded from raw UDP)", port);
}
Err(e) => {
// Port may still be held — retry once after a brief delay
warn!("QUIC endpoint creation failed on port {}, retrying: {}", port, e);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let retry_result = if self.proxy_ips.is_empty() {
self.create_quic_direct(port, Arc::clone(&tls_config))
} else {
self.create_quic_with_relay(port, Arc::clone(&tls_config))
};
match retry_result {
Ok(()) => {
info!("QUIC endpoint started on port {} (upgraded from raw UDP, retry)", port);
}
Err(e2) => {
error!("Failed to upgrade port {} to QUIC after retry: {}. \
Rebinding as raw UDP.", port, e2);
// Fallback: rebind as raw UDP so the port isn't dead
if let Ok(()) = self.rebind_raw_udp(port).await {
warn!("Port {} rebound as raw UDP (QUIC upgrade failed)", port);
}
}
}
}
}
}
}
/// Create a direct QUIC endpoint (quinn owns the socket).
fn create_quic_direct(&mut self, port: u16, tls_config: Arc<rustls::ServerConfig>) -> anyhow::Result<()> {
let endpoint = crate::quic_handler::create_quic_endpoint(port, tls_config)?;
let endpoint_for_updates = endpoint.clone();
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
endpoint,
port,
Arc::clone(&self.route_manager),
Arc::clone(&self.metrics),
Arc::clone(&self.conn_tracker),
self.cancel_token.child_token(),
self.h3_service.clone(),
None,
));
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
Ok(())
}
/// Create a QUIC endpoint with PROXY protocol relay.
fn create_quic_with_relay(&mut self, port: u16, tls_config: Arc<rustls::ServerConfig>) -> anyhow::Result<()> {
let relay = crate::quic_handler::create_quic_endpoint_with_proxy_relay(
port,
tls_config,
Arc::clone(&self.proxy_ips),
self.cancel_token.child_token(),
)?;
let endpoint_for_updates = relay.endpoint.clone();
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
relay.endpoint,
port,
Arc::clone(&self.route_manager),
Arc::clone(&self.metrics),
Arc::clone(&self.conn_tracker),
self.cancel_token.child_token(),
self.h3_service.clone(),
Some(relay.real_client_map),
));
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
Ok(())
}
/// Rebind a port as a raw UDP listener (fallback when QUIC upgrade fails).
async fn rebind_raw_udp(&mut self, port: u16) -> anyhow::Result<()> {
let addr: std::net::SocketAddr = ([0, 0, 0, 0], port).into();
let socket = UdpSocket::bind(addr).await?;
let socket = Arc::new(socket);
let handle = tokio::spawn(Self::recv_loop(
socket,
port,
Arc::clone(&self.route_manager),
Arc::clone(&self.metrics),
Arc::clone(&self.conn_tracker),
Arc::clone(&self.session_table),
Arc::clone(&self.datagram_handler_relay),
Arc::clone(&self.relay_writer),
self.cancel_token.child_token(),
Arc::clone(&self.proxy_ips),
));
self.listeners.insert(port, (handle, None));
Ok(())
}
/// Set the datagram handler relay socket path and establish connection.
pub async fn set_datagram_handler_relay(&mut self, path: String) {
// Cancel previous relay reader task if any
if let Some(old_cancel) = self.relay_reader_cancel.take() {
old_cancel.cancel();
}
// Store the path
{
let mut relay = self.datagram_handler_relay.write().await;
*relay = Some(path.clone());
}
// Connect to the Unix socket
match tokio::net::UnixStream::connect(&path).await {
Ok(stream) => {
let (read_half, write_half) = stream.into_split();
// Store write half for sending datagrams
{
let mut writer = self.relay_writer.lock().await;
*writer = Some(write_half);
}
// Spawn reply reader — reads length-prefixed JSON replies from TS
// and sends them back to clients via the listener sockets
let cancel = self.cancel_token.child_token();
self.relay_reader_cancel = Some(cancel.clone());
tokio::spawn(Self::relay_reply_reader(read_half, cancel));
info!("Datagram handler relay connected to {}", path);
}
Err(e) => {
error!("Failed to connect datagram handler relay to {}: {}", path, e);
}
}
}
/// Start periodic session cleanup task.
fn start_cleanup_task(&self) {
let session_table = Arc::clone(&self.session_table);
let metrics = Arc::clone(&self.metrics);
let conn_tracker = Arc::clone(&self.conn_tracker);
let cancel = self.cancel_token.child_token();
let route_manager = Arc::clone(&self.route_manager);
@@ -188,7 +463,7 @@ impl UdpListenerManager {
// or default 60s if none configured)
let rm = route_manager.load();
let timeout_ms = Self::get_min_session_timeout(&rm);
let removed = session_table.cleanup_idle(timeout_ms, &metrics);
let removed = session_table.cleanup_idle(timeout_ms, &metrics, &conn_tracker);
if removed > 0 {
debug!("UDP session cleanup: removed {} idle sessions, {} remaining",
removed, session_table.session_count());
@@ -206,6 +481,10 @@ impl UdpListenerManager {
}
/// Main receive loop for a UDP port.
///
/// When `proxy_ips` is non-empty, the first datagram from a trusted proxy IP
/// is checked for PROXY protocol v2. If found, the real client IP is extracted
/// and used for all subsequent session handling for that source address.
async fn recv_loop(
socket: Arc<UdpSocket>,
port: u16,
@@ -213,12 +492,18 @@ impl UdpListenerManager {
metrics: Arc<MetricsCollector>,
conn_tracker: Arc<ConnectionTracker>,
session_table: Arc<UdpSessionTable>,
datagram_handler_relay: Arc<RwLock<Option<String>>>,
_datagram_handler_relay: Arc<RwLock<Option<String>>>,
relay_writer: Arc<Mutex<Option<tokio::net::unix::OwnedWriteHalf>>>,
cancel: CancellationToken,
proxy_ips: Arc<Vec<IpAddr>>,
) {
// Use a reasonably large buffer; actual max is per-route but we need a single buffer
let mut buf = vec![0u8; 65535];
// Maps proxy source addr → real client addr (from PROXY v2 headers).
// Only populated when proxy_ips is non-empty.
let proxy_addr_map: DashMap<SocketAddr, SocketAddr> = DashMap::new();
loop {
let (len, client_addr) = tokio::select! {
_ = cancel.cancelled() => {
@@ -238,9 +523,39 @@ impl UdpListenerManager {
let datagram = &buf[..len];
// Route matching
// PROXY protocol v2 detection for datagrams from trusted proxy IPs
let effective_client_ip = if !proxy_ips.is_empty() && proxy_ips.contains(&client_addr.ip()) {
let session_key: SessionKey = (client_addr, port);
if session_table.get(&session_key).is_none() && !proxy_addr_map.contains_key(&client_addr) {
// No session and no prior PROXY header — check for PROXY v2
if crate::proxy_protocol::is_proxy_protocol_v2(datagram) {
match crate::proxy_protocol::parse_v2(datagram) {
Ok((header, _consumed)) => {
debug!("UDP PROXY v2 from {}: real client {}", client_addr, header.source_addr);
proxy_addr_map.insert(client_addr, header.source_addr);
continue; // discard the PROXY v2 datagram
}
Err(e) => {
debug!("UDP PROXY v2 parse error from {}: {}", client_addr, e);
client_addr.ip()
}
}
} else {
client_addr.ip()
}
} else {
// Use real client IP if we've previously seen a PROXY v2 header
proxy_addr_map.get(&client_addr)
.map(|r| r.ip())
.unwrap_or_else(|| client_addr.ip())
}
} else {
client_addr.ip()
};
// Route matching — use effective (real) client IP
let rm = route_manager.load();
let ip_str = client_addr.ip().to_string();
let ip_str = effective_client_ip.to_string();
let ctx = MatchContext {
port,
domain: None,
@@ -264,21 +579,16 @@ impl UdpListenerManager {
let route = route_match.route;
let route_id = route.name.as_deref().or(route.id.as_deref());
// Socket handler routes → relay datagram to TS via Unix socket
// Socket handler routes → relay datagram to TS via persistent Unix socket
if route.action.action_type == RouteActionType::SocketHandler {
let relay_path = datagram_handler_relay.read().await;
if let Some(ref path) = *relay_path {
if let Err(e) = Self::relay_datagram_to_ts(
path,
route_id.unwrap_or("unknown"),
&client_addr,
port,
datagram,
).await {
debug!("Failed to relay UDP datagram to TS: {}", e);
}
} else {
debug!("UDP datagram handler relay not configured for route {:?}", route_id);
if let Err(e) = Self::relay_datagram_via_writer(
&relay_writer,
route_id.unwrap_or("unknown"),
&client_addr,
port,
datagram,
).await {
debug!("Failed to relay UDP datagram to TS: {}", e);
}
continue;
}
@@ -294,20 +604,21 @@ impl UdpListenerManager {
}
// Session lookup or create
// Session key uses the proxy's source addr for correct return-path routing
let session_key: SessionKey = (client_addr, port);
let session = match session_table.get(&session_key) {
Some(s) => s,
None => {
// New session — check per-IP limits
if !conn_tracker.try_accept(&client_addr.ip()) {
debug!("UDP session rejected for {} (rate limit)", client_addr);
// New session — check per-IP limits using the real client IP
if !conn_tracker.try_accept(&effective_client_ip) {
debug!("UDP session rejected for {} (rate limit)", effective_client_ip);
continue;
}
if !session_table.can_create_session(
&client_addr.ip(),
&effective_client_ip,
udp_config.max_sessions_per_ip,
) {
debug!("UDP session rejected for {} (per-IP session limit)", client_addr);
debug!("UDP session rejected for {} (per-IP session limit)", effective_client_ip);
continue;
}
@@ -340,8 +651,8 @@ impl UdpListenerManager {
}
let backend_socket = Arc::new(backend_socket);
debug!("New UDP session: {} -> {} (via port {})",
client_addr, backend_addr, port);
debug!("New UDP session: {} -> {} (via port {}, real client {})",
client_addr, backend_addr, port, effective_client_ip);
// Spawn return-path relay task
let session_cancel = CancellationToken::new();
@@ -361,7 +672,7 @@ impl UdpListenerManager {
last_activity: std::sync::atomic::AtomicU64::new(session_table.elapsed_ms()),
created_at: std::time::Instant::now(),
route_id: route_id.map(|s| s.to_string()),
source_ip: client_addr.ip(),
source_ip: effective_client_ip,
client_addr,
return_task,
cancel: session_cancel,
@@ -372,8 +683,8 @@ impl UdpListenerManager {
continue;
}
// Track in metrics
conn_tracker.connection_opened(&client_addr.ip());
// Track in metrics using the real client IP
conn_tracker.connection_opened(&effective_client_ip);
metrics.connection_opened(route_id, Some(&ip_str));
metrics.udp_session_opened();
@@ -441,10 +752,9 @@ impl UdpListenerManager {
}
}
/// Relay a UDP datagram to the TypeScript handler via Unix socket.
/// Uses length-prefixed JSON framing: [4-byte BE length][JSON payload]
async fn relay_datagram_to_ts(
relay_path: &str,
/// Send a datagram to TS via the persistent relay writer.
async fn relay_datagram_via_writer(
writer: &Mutex<Option<tokio::net::unix::OwnedWriteHalf>>,
route_key: &str,
client_addr: &SocketAddr,
dest_port: u16,
@@ -463,8 +773,9 @@ impl UdpListenerManager {
});
let json = serde_json::to_vec(&msg)?;
// Connect to relay (one-shot for now; persistent connection optimization deferred)
let mut stream = tokio::net::UnixStream::connect(relay_path).await?;
let mut guard = writer.lock().await;
let stream = guard.as_mut()
.ok_or_else(|| anyhow::anyhow!("Datagram relay not connected"))?;
// Length-prefixed frame
let len_bytes = (json.len() as u32).to_be_bytes();
@@ -474,4 +785,101 @@ impl UdpListenerManager {
Ok(())
}
/// Background task reading reply frames from the TS datagram handler.
/// Parses replies and sends them back to the original client via UDP.
async fn relay_reply_reader(
mut reader: tokio::net::unix::OwnedReadHalf,
cancel: CancellationToken,
) {
use base64::Engine;
let mut len_buf = [0u8; 4];
loop {
// Read length prefix
let read_result = tokio::select! {
_ = cancel.cancelled() => break,
result = reader.read_exact(&mut len_buf) => result,
};
match read_result {
Ok(_) => {}
Err(e) => {
debug!("Datagram relay reader closed: {}", e);
break;
}
}
let frame_len = u32::from_be_bytes(len_buf) as usize;
if frame_len > 10 * 1024 * 1024 {
error!("Datagram relay frame too large: {} bytes", frame_len);
break;
}
let mut frame_buf = vec![0u8; frame_len];
match reader.read_exact(&mut frame_buf).await {
Ok(_) => {}
Err(e) => {
debug!("Datagram relay reader frame error: {}", e);
break;
}
}
// Parse the reply JSON
let reply: serde_json::Value = match serde_json::from_slice(&frame_buf) {
Ok(v) => v,
Err(e) => {
debug!("Datagram relay reply parse error: {}", e);
continue;
}
};
if reply.get("type").and_then(|v| v.as_str()) != Some("reply") {
continue;
}
let source_ip = reply.get("sourceIp").and_then(|v| v.as_str()).unwrap_or("");
let source_port = reply.get("sourcePort").and_then(|v| v.as_u64()).unwrap_or(0) as u16;
let dest_port = reply.get("destPort").and_then(|v| v.as_u64()).unwrap_or(0) as u16;
let payload_b64 = reply.get("payloadBase64").and_then(|v| v.as_str()).unwrap_or("");
let payload = match base64::engine::general_purpose::STANDARD.decode(payload_b64) {
Ok(p) => p,
Err(e) => {
debug!("Datagram relay reply base64 decode error: {}", e);
continue;
}
};
let client_addr: SocketAddr = match format!("{}:{}", source_ip, source_port).parse() {
Ok(a) => a,
Err(e) => {
debug!("Datagram relay reply address parse error: {}", e);
continue;
}
};
// Send the reply back to the client via a temporary UDP socket bound to the dest_port
// We need the listener socket for this port. For simplicity, use a fresh socket.
let reply_socket = match UdpSocket::bind(format!("0.0.0.0:{}", dest_port)).await {
Ok(s) => s,
Err(_) => {
// Port already bound by the listener — use unbound socket
match UdpSocket::bind("0.0.0.0:0").await {
Ok(s) => s,
Err(e) => {
debug!("Failed to create reply socket: {}", e);
continue;
}
}
}
};
if let Err(e) = reply_socket.send_to(&payload, client_addr).await {
debug!("Failed to send datagram reply to {}: {}", client_addr, e);
}
}
debug!("Datagram relay reply reader stopped");
}
}

View File

@@ -18,6 +18,8 @@ use tracing::debug;
use rustproxy_metrics::MetricsCollector;
use crate::connection_tracker::ConnectionTracker;
/// A single UDP session (flow).
pub struct UdpSession {
/// Socket bound to ephemeral port, connected to backend
@@ -165,6 +167,7 @@ impl UdpSessionTable {
&self,
timeout_ms: u64,
metrics: &MetricsCollector,
conn_tracker: &ConnectionTracker,
) -> usize {
let now_ms = self.elapsed_ms();
let mut removed = 0;
@@ -185,6 +188,7 @@ impl UdpSessionTable {
session.client_addr, key.1,
now_ms.saturating_sub(session.last_activity.load(Ordering::Relaxed))
);
conn_tracker.connection_closed(&session.source_ip);
metrics.connection_closed(
session.route_id.as_deref(),
Some(&session.source_ip.to_string()),
@@ -197,6 +201,36 @@ impl UdpSessionTable {
removed
}
/// Drain all sessions on a given listening port, releasing socket references.
/// Used when upgrading a raw UDP listener to QUIC — the raw UDP socket's
/// Arc refcount must drop to zero so the port can be rebound.
pub fn drain_port(
&self,
port: u16,
metrics: &MetricsCollector,
conn_tracker: &ConnectionTracker,
) -> usize {
let keys: Vec<SessionKey> = self.sessions.iter()
.filter(|entry| entry.key().1 == port)
.map(|entry| *entry.key())
.collect();
let mut removed = 0;
for key in keys {
if let Some(session) = self.remove(&key) {
session.cancel.cancel();
conn_tracker.connection_closed(&session.source_ip);
metrics.connection_closed(
session.route_id.as_deref(),
Some(&session.source_ip.to_string()),
);
metrics.udp_session_closed();
removed += 1;
}
}
removed
}
/// Total number of active sessions.
pub fn session_count(&self) -> usize {
self.sessions.len()

View File

@@ -122,10 +122,16 @@ impl RouteManager {
// This prevents session-ticket resumption from misrouting when clients
// omit SNI (RFC 8446 recommends but doesn't mandate SNI on resumption).
// Wildcard-only routes (domains: ["*"]) still match since they accept all.
let patterns = domains.to_vec();
let is_wildcard_only = patterns.iter().all(|d| *d == "*");
if !is_wildcard_only {
return false;
//
// Exception: QUIC (UDP transport) encrypts the TLS ClientHello, so SNI
// is unavailable at accept time. Domain verification happens per-request
// in H3ProxyService via the :authority header.
if ctx.transport != Some(TransportProtocol::Udp) {
let patterns = domains.to_vec();
let is_wildcard_only = patterns.iter().all(|d| *d == "*");
if !is_wildcard_only {
return false;
}
}
}
}
@@ -997,4 +1003,52 @@ mod tests {
let result = manager.find_route(&udp_ctx).unwrap();
assert_eq!(result.route.name.as_deref(), Some("udp-route"));
}
#[test]
fn test_quic_tls_no_sni_matches_domain_restricted_route() {
// QUIC accept-level matching: is_tls=true, domain=None, transport=Udp.
// Should match because QUIC encrypts the ClientHello — SNI is unavailable
// at accept time but verified per-request in H3ProxyService.
let mut route = make_route(443, Some("example.com"), 0);
route.route_match.transport = Some(TransportProtocol::Udp);
let routes = vec![route];
let manager = RouteManager::new(routes);
let ctx = MatchContext {
port: 443,
domain: None,
path: None,
client_ip: None,
tls_version: None,
headers: None,
is_tls: true,
protocol: Some("quic"),
transport: Some(TransportProtocol::Udp),
};
assert!(manager.find_route(&ctx).is_some(),
"QUIC (UDP) with is_tls=true and domain=None should match domain-restricted routes");
}
#[test]
fn test_tcp_tls_no_sni_still_rejects_domain_restricted_route() {
// TCP TLS without SNI must still be rejected (no QUIC exemption).
let routes = vec![make_route(443, Some("example.com"), 0)];
let manager = RouteManager::new(routes);
let ctx = MatchContext {
port: 443,
domain: None,
path: None,
client_ip: None,
tls_version: None,
headers: None,
is_tls: true,
protocol: None,
transport: None, // TCP (default)
};
assert!(manager.find_route(&ctx).is_none(),
"TCP TLS without SNI should NOT match domain-restricted routes");
}
}

View File

@@ -44,3 +44,9 @@ mimalloc = { workspace = true }
[dev-dependencies]
rcgen = { workspace = true }
quinn = { workspace = true }
h3 = { workspace = true }
h3-quinn = { workspace = true }
bytes = { workspace = true }
rustls = { workspace = true }
http = "1"

View File

@@ -264,6 +264,8 @@ impl RustProxy {
conn_config.socket_timeout_ms,
conn_config.max_connection_lifetime_ms,
);
// Clone proxy_ips before conn_config is moved into the TCP listener
let udp_proxy_ips = conn_config.proxy_ips.clone();
listener.set_connection_config(conn_config);
// Share the socket-handler relay path with the listener
@@ -339,6 +341,18 @@ impl RustProxy {
conn_tracker,
self.cancel_token.clone(),
);
udp_mgr.set_proxy_ips(udp_proxy_ips.clone());
// Construct H3ProxyService for HTTP/3 request handling
let h3_svc = rustproxy_http::h3_service::H3ProxyService::new(
Arc::new(ArcSwap::from(Arc::clone(&*self.route_table.load()))),
Arc::clone(&self.metrics),
Arc::new(rustproxy_http::connection_pool::ConnectionPool::new()),
Arc::new(rustproxy_http::protocol_cache::ProtocolCache::new()),
rustproxy_passthrough::tls_handler::shared_backend_tls_config(),
std::time::Duration::from_secs(30),
);
udp_mgr.set_h3_service(Arc::new(h3_svc));
for port in &udp_ports {
udp_mgr.add_port_with_tls(*port, quic_tls_config.clone()).await?;
@@ -763,22 +777,31 @@ impl RustProxy {
if self.udp_listener_manager.is_none() {
if let Some(ref listener) = self.listener_manager {
let conn_tracker = listener.conn_tracker().clone();
self.udp_listener_manager = Some(UdpListenerManager::new(
let conn_config = Self::build_connection_config(&self.options);
let mut udp_mgr = UdpListenerManager::new(
Arc::clone(&new_manager),
Arc::clone(&self.metrics),
conn_tracker,
self.cancel_token.clone(),
));
);
udp_mgr.set_proxy_ips(conn_config.proxy_ips);
self.udp_listener_manager = Some(udp_mgr);
}
}
// Build TLS config for QUIC (needed for new ports and upgrading existing raw UDP)
let quic_tls = {
let tls_configs = self.current_tls_configs().await;
Self::build_quic_tls_config(&tls_configs)
};
if let Some(ref mut udp_mgr) = self.udp_listener_manager {
udp_mgr.update_routes(Arc::clone(&new_manager));
// Add new UDP ports
// Add new UDP ports (with TLS for QUIC)
for port in &new_udp_ports {
if !old_udp_ports.contains(port) {
udp_mgr.add_port(*port).await?;
udp_mgr.add_port_with_tls(*port, quic_tls.clone()).await?;
}
}
// Remove old UDP ports
@@ -787,6 +810,12 @@ impl RustProxy {
udp_mgr.remove_port(*port);
}
}
// Upgrade existing raw UDP fallback listeners to QUIC if TLS is now available
if let Some(ref quic_config) = quic_tls {
udp_mgr.update_quic_tls(Arc::clone(quic_config));
udp_mgr.upgrade_raw_to_quic(Arc::clone(quic_config)).await;
}
}
} else if self.udp_listener_manager.is_some() {
// All UDP routes removed — shut down UDP manager
@@ -843,12 +872,12 @@ impl RustProxy {
.map_err(|e| anyhow::anyhow!("ACME provisioning failed: {}", e))?;
// Hot-swap into TLS configs
if let Some(ref mut listener) = self.listener_manager {
let mut tls_configs = Self::extract_tls_configs(&self.options.routes);
tls_configs.insert(domain.clone(), TlsCertConfig {
cert_pem: bundle.cert_pem.clone(),
key_pem: bundle.key_pem.clone(),
});
let mut tls_configs = Self::extract_tls_configs(&self.options.routes);
tls_configs.insert(domain.clone(), TlsCertConfig {
cert_pem: bundle.cert_pem.clone(),
key_pem: bundle.key_pem.clone(),
});
{
let cm = cm_arc.lock().await;
for (d, b) in cm.store().iter() {
if !tls_configs.contains_key(d) {
@@ -858,9 +887,22 @@ impl RustProxy {
});
}
}
}
let quic_tls = Self::build_quic_tls_config(&tls_configs);
if let Some(ref listener) = self.listener_manager {
listener.set_tls_configs(tls_configs);
}
// Update existing QUIC endpoints and upgrade raw UDP fallback listeners
if let Some(ref mut udp_mgr) = self.udp_listener_manager {
if let Some(ref quic_config) = quic_tls {
udp_mgr.update_quic_tls(Arc::clone(quic_config));
udp_mgr.upgrade_raw_to_quic(Arc::clone(quic_config)).await;
}
}
info!("Certificate provisioned and loaded for route '{}'", route_name);
Ok(())
}
@@ -1005,10 +1047,37 @@ impl RustProxy {
Some(Arc::new(tls_config))
}
/// Build the current full TLS config map from all sources (route configs, loaded certs, cert manager).
async fn current_tls_configs(&self) -> HashMap<String, TlsCertConfig> {
let mut configs = Self::extract_tls_configs(&self.options.routes);
// Merge dynamically loaded certs (from loadCertificate IPC)
for (d, c) in &self.loaded_certs {
if !configs.contains_key(d) {
configs.insert(d.clone(), c.clone());
}
}
// Merge certs from cert manager store
if let Some(ref cm_arc) = self.cert_manager {
let cm = cm_arc.lock().await;
for (d, b) in cm.store().iter() {
if !configs.contains_key(d) {
configs.insert(d.clone(), TlsCertConfig {
cert_pem: b.cert_pem.clone(),
key_pem: b.key_pem.clone(),
});
}
}
}
configs
}
/// Set the Unix domain socket path for relaying UDP datagrams to TypeScript datagramHandler callbacks.
pub async fn set_datagram_handler_relay_path(&mut self, path: Option<String>) {
info!("Datagram handler relay path set to: {:?}", path);
if let Some(ref udp_mgr) = self.udp_listener_manager {
if let Some(ref mut udp_mgr) = self.udp_listener_manager {
if let Some(ref p) = path {
udp_mgr.set_datagram_handler_relay(p.clone()).await;
}
@@ -1055,39 +1124,24 @@ impl RustProxy {
key_pem: key_pem.clone(),
});
// Hot-swap TLS config on the listener
if let Some(ref mut listener) = self.listener_manager {
let mut tls_configs = Self::extract_tls_configs(&self.options.routes);
// Hot-swap TLS config on TCP and QUIC listeners
let tls_configs = self.current_tls_configs().await;
// Add the new cert
tls_configs.insert(domain.to_string(), TlsCertConfig {
cert_pem: cert_pem.clone(),
key_pem: key_pem.clone(),
});
// Also include all existing certs from cert manager
if let Some(ref cm_arc) = self.cert_manager {
let cm = cm_arc.lock().await;
for (d, b) in cm.store().iter() {
if !tls_configs.contains_key(d) {
tls_configs.insert(d.clone(), TlsCertConfig {
cert_pem: b.cert_pem.clone(),
key_pem: b.key_pem.clone(),
});
}
}
}
// Merge dynamically loaded certs from previous loadCertificate calls
for (d, c) in &self.loaded_certs {
if !tls_configs.contains_key(d) {
tls_configs.insert(d.clone(), c.clone());
}
}
// Build QUIC TLS config before TCP consumes the map
let quic_tls = Self::build_quic_tls_config(&tls_configs);
if let Some(ref listener) = self.listener_manager {
listener.set_tls_configs(tls_configs);
}
// Update existing QUIC endpoints and upgrade raw UDP fallback listeners
if let Some(ref mut udp_mgr) = self.udp_listener_manager {
if let Some(ref quic_config) = quic_tls {
udp_mgr.update_quic_tls(Arc::clone(quic_config));
udp_mgr.upgrade_raw_to_quic(Arc::clone(quic_config)).await;
}
}
info!("Certificate loaded and TLS config updated for {}", domain);
Ok(())
}

View File

@@ -0,0 +1,195 @@
mod common;
use common::*;
use rustproxy::RustProxy;
use rustproxy_config::{RustProxyOptions, TransportProtocol, RouteUdp, RouteQuic};
use bytes::Buf;
use std::sync::Arc;
/// Build a route that listens on UDP with HTTP/3 enabled and TLS terminate.
fn make_h3_route(
port: u16,
target_host: &str,
target_port: u16,
cert_pem: &str,
key_pem: &str,
) -> rustproxy_config::RouteConfig {
let mut route = make_tls_terminate_route(port, "localhost", target_host, target_port, cert_pem, key_pem);
route.route_match.transport = Some(TransportProtocol::Udp);
// Keep domain="localhost" from make_tls_terminate_route — needed for TLS cert extraction
route.action.udp = Some(RouteUdp {
session_timeout: None,
max_sessions_per_ip: None,
max_datagram_size: None,
quic: Some(RouteQuic {
max_idle_timeout: Some(30000),
max_concurrent_bidi_streams: None,
max_concurrent_uni_streams: None,
enable_http3: Some(true),
alt_svc_port: None,
alt_svc_max_age: None,
initial_congestion_window: None,
}),
});
route
}
/// Build a quinn client endpoint with insecure TLS for testing.
fn make_h3_client_endpoint() -> quinn::Endpoint {
let mut tls_config = rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(Arc::new(InsecureVerifier))
.with_no_client_auth();
tls_config.alpn_protocols = vec![b"h3".to_vec()];
let quic_client_config = quinn::crypto::rustls::QuicClientConfig::try_from(tls_config)
.expect("Failed to build QUIC client config");
let client_config = quinn::ClientConfig::new(Arc::new(quic_client_config));
let mut endpoint = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())
.expect("Failed to create QUIC client endpoint");
endpoint.set_default_client_config(client_config);
endpoint
}
/// Test that HTTP/3 response streams properly finish (FIN is received by client).
///
/// This is the critical regression test for the FIN bug: the proxy must send
/// a QUIC stream FIN after the response body so the client's `recv_data()`
/// returns `None` instead of hanging forever.
#[tokio::test]
async fn test_h3_response_stream_finishes() {
let backend_port = next_port();
let proxy_port = next_port();
let body_text = "Hello from HTTP/3 backend! This body has a known length for testing.";
// 1. Start plain HTTP backend with known body + content-length
let _backend = start_http_server(backend_port, 200, body_text).await;
// 2. Generate self-signed cert and configure H3 route
let (cert_pem, key_pem) = generate_self_signed_cert("localhost");
let route = make_h3_route(proxy_port, "127.0.0.1", backend_port, &cert_pem, &key_pem);
let options = RustProxyOptions {
routes: vec![route],
..Default::default()
};
// 3. Start proxy and wait for UDP bind
let mut proxy = RustProxy::new(options).unwrap();
proxy.start().await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
// 4. Connect QUIC/H3 client
let endpoint = make_h3_client_endpoint();
let addr: std::net::SocketAddr = format!("127.0.0.1:{}", proxy_port).parse().unwrap();
let connection = endpoint
.connect(addr, "localhost")
.expect("Failed to initiate QUIC connection")
.await
.expect("QUIC handshake failed");
let (mut driver, mut send_request) = h3::client::new(
h3_quinn::Connection::new(connection),
)
.await
.expect("H3 connection setup failed");
// Drive the H3 connection in background
tokio::spawn(async move {
let _ = driver.wait_idle().await;
});
// 5. Send GET request
let req = http::Request::builder()
.method("GET")
.uri("https://localhost/")
.header("host", "localhost")
.body(())
.unwrap();
let mut stream = send_request.send_request(req).await
.expect("Failed to send H3 request");
stream.finish().await
.expect("Failed to finish sending H3 request body");
// 6. Read response headers
let resp = stream.recv_response().await
.expect("Failed to receive H3 response");
assert_eq!(resp.status(), http::StatusCode::OK,
"Expected 200 OK, got {}", resp.status());
// 7. Read body and verify stream ends (FIN received)
// This is the critical assertion: recv_data() must return None (stream ended)
// within the timeout, NOT hang forever waiting for a FIN that never arrives.
let result = with_timeout(async {
let mut total = 0usize;
while let Some(chunk) = stream.recv_data().await.expect("H3 data receive error") {
total += chunk.remaining();
}
// recv_data() returned None => stream ended (FIN received)
total
}, 10)
.await;
let bytes_received = result.expect(
"TIMEOUT: H3 stream never ended (FIN not received by client). \
The proxy sent all response data but failed to send the QUIC stream FIN."
);
assert_eq!(
bytes_received,
body_text.len(),
"Expected {} bytes, got {}",
body_text.len(),
bytes_received
);
// 8. Cleanup
endpoint.close(quinn::VarInt::from_u32(0), b"test done");
proxy.stop().await.unwrap();
}
/// Insecure TLS verifier that accepts any certificate (for tests only).
#[derive(Debug)]
struct InsecureVerifier;
impl rustls::client::danger::ServerCertVerifier for InsecureVerifier {
fn verify_server_cert(
&self,
_end_entity: &rustls::pki_types::CertificateDer<'_>,
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
_server_name: &rustls::pki_types::ServerName<'_>,
_ocsp_response: &[u8],
_now: rustls::pki_types::UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
Ok(rustls::client::danger::ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
vec![
rustls::SignatureScheme::RSA_PKCS1_SHA256,
rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
rustls::SignatureScheme::ED25519,
rustls::SignatureScheme::RSA_PSS_SHA256,
]
}
}

View File

@@ -0,0 +1,125 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as dgram from 'dgram';
import { SmartProxy } from '../ts/index.js';
import type { TDatagramHandler, IDatagramInfo } from '../ts/index.js';
import { findFreePorts, assertPortsFree } from './helpers/port-allocator.js';
let smartProxy: SmartProxy;
let PROXY_PORT: number;
// Helper: send a single UDP datagram and wait for a response
function sendDatagram(port: number, msg: string, timeoutMs = 5000): Promise<string> {
return new Promise((resolve, reject) => {
const client = dgram.createSocket('udp4');
const timeout = setTimeout(() => {
client.close();
reject(new Error(`UDP response timeout after ${timeoutMs}ms`));
}, timeoutMs);
client.send(Buffer.from(msg), port, '127.0.0.1');
client.on('message', (data) => {
clearTimeout(timeout);
client.close();
resolve(data.toString());
});
client.on('error', (err) => {
clearTimeout(timeout);
client.close();
reject(err);
});
});
}
tap.test('setup: start SmartProxy with datagramHandler', async () => {
[PROXY_PORT] = await findFreePorts(1);
const handler: TDatagramHandler = (datagram, info, reply) => {
reply(Buffer.from(`Handled: ${datagram.toString()}`));
};
smartProxy = new SmartProxy({
routes: [
{
name: 'dgram-handler-test',
match: {
ports: PROXY_PORT,
transport: 'udp' as const,
},
action: {
type: 'socket-handler',
datagramHandler: handler,
},
},
],
defaults: {
security: {
ipAllowList: ['127.0.0.1', '::1', '::ffff:127.0.0.1'],
},
},
});
await smartProxy.start();
});
tap.test('datagram handler: receives and replies to datagram', async () => {
const response = await sendDatagram(PROXY_PORT, 'Hello Handler');
expect(response).toEqual('Handled: Hello Handler');
});
tap.test('datagram handler: async handler works', async () => {
// Stop and restart with async handler
await smartProxy.stop();
[PROXY_PORT] = await findFreePorts(1);
const asyncHandler: TDatagramHandler = async (datagram, info, reply) => {
// Simulate async work
await new Promise<void>((resolve) => setTimeout(resolve, 10));
reply(Buffer.from(`Async: ${datagram.toString()}`));
};
smartProxy = new SmartProxy({
routes: [
{
name: 'dgram-async-handler',
match: {
ports: PROXY_PORT,
transport: 'udp' as const,
},
action: {
type: 'socket-handler',
datagramHandler: asyncHandler,
},
},
],
defaults: {
security: {
ipAllowList: ['127.0.0.1', '::1', '::ffff:127.0.0.1'],
},
},
});
await smartProxy.start();
const response = await sendDatagram(PROXY_PORT, 'Test Async');
expect(response).toEqual('Async: Test Async');
});
tap.test('datagram handler: multiple rapid datagrams', async () => {
const promises: Promise<string>[] = [];
for (let i = 0; i < 5; i++) {
promises.push(sendDatagram(PROXY_PORT, `msg-${i}`));
}
const responses = await Promise.all(promises);
for (let i = 0; i < 5; i++) {
expect(responses).toContain(`Async: msg-${i}`);
}
});
tap.test('cleanup: stop SmartProxy', async () => {
await smartProxy.stop();
await assertPortsFree([PROXY_PORT]);
});
export default tap.start();

View File

@@ -174,7 +174,7 @@ tap.test('Route Validation - validateRouteAction', async () => {
const invalidSocketResult = validateRouteAction(invalidSocketAction);
expect(invalidSocketResult.valid).toBeFalse();
expect(invalidSocketResult.errors.length).toBeGreaterThan(0);
expect(invalidSocketResult.errors[0]).toInclude('Socket handler function is required');
expect(invalidSocketResult.errors[0]).toInclude('handler function is required');
});
tap.test('Route Validation - validateRouteConfig', async () => {

142
test/test.udp-forwarding.ts Normal file
View File

@@ -0,0 +1,142 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as dgram from 'dgram';
import { SmartProxy } from '../ts/index.js';
import { findFreePorts, assertPortsFree } from './helpers/port-allocator.js';
let smartProxy: SmartProxy;
let backendServer: dgram.Socket;
let PROXY_PORT: number;
let BACKEND_PORT: number;
// Helper: send a single UDP datagram and wait for a response
function sendDatagram(port: number, msg: string, timeoutMs = 5000): Promise<string> {
return new Promise((resolve, reject) => {
const client = dgram.createSocket('udp4');
const timeout = setTimeout(() => {
client.close();
reject(new Error(`UDP response timeout after ${timeoutMs}ms`));
}, timeoutMs);
client.send(Buffer.from(msg), port, '127.0.0.1');
client.on('message', (data) => {
clearTimeout(timeout);
client.close();
resolve(data.toString());
});
client.on('error', (err) => {
clearTimeout(timeout);
client.close();
reject(err);
});
});
}
// Helper: create a UDP echo server
function createUdpEchoServer(port: number): Promise<dgram.Socket> {
return new Promise((resolve) => {
const server = dgram.createSocket('udp4');
server.on('message', (msg, rinfo) => {
server.send(Buffer.from(`Echo: ${msg.toString()}`), rinfo.port, rinfo.address);
});
server.bind(port, '127.0.0.1', () => resolve(server));
});
}
tap.test('setup: start UDP echo server and SmartProxy', async () => {
[PROXY_PORT, BACKEND_PORT] = await findFreePorts(2);
// Start backend UDP echo server
backendServer = await createUdpEchoServer(BACKEND_PORT);
// Start SmartProxy with a UDP forwarding route
smartProxy = new SmartProxy({
routes: [
{
name: 'udp-forward-test',
match: {
ports: PROXY_PORT,
transport: 'udp' as const,
},
action: {
type: 'forward',
targets: [{ host: '127.0.0.1', port: BACKEND_PORT }],
udp: {
sessionTimeout: 5000,
},
},
},
],
defaults: {
security: {
ipAllowList: ['127.0.0.1', '::1', '::ffff:127.0.0.1'],
},
},
});
await smartProxy.start();
});
tap.test('UDP forwarding: basic datagram round-trip', async () => {
const response = await sendDatagram(PROXY_PORT, 'Hello UDP');
expect(response).toEqual('Echo: Hello UDP');
});
tap.test('UDP forwarding: multiple datagrams same session', async () => {
// Use a single client socket for session reuse
const client = dgram.createSocket('udp4');
const responses: string[] = [];
const done = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
client.close();
reject(new Error('Timeout waiting for 3 responses'));
}, 5000);
client.on('message', (data) => {
responses.push(data.toString());
if (responses.length === 3) {
clearTimeout(timeout);
client.close();
resolve();
}
});
client.on('error', (err) => {
clearTimeout(timeout);
client.close();
reject(err);
});
});
client.send(Buffer.from('msg1'), PROXY_PORT, '127.0.0.1');
client.send(Buffer.from('msg2'), PROXY_PORT, '127.0.0.1');
client.send(Buffer.from('msg3'), PROXY_PORT, '127.0.0.1');
await done;
expect(responses).toContain('Echo: msg1');
expect(responses).toContain('Echo: msg2');
expect(responses).toContain('Echo: msg3');
});
tap.test('UDP forwarding: multiple clients', async () => {
const [resp1, resp2] = await Promise.all([
sendDatagram(PROXY_PORT, 'client1'),
sendDatagram(PROXY_PORT, 'client2'),
]);
expect(resp1).toEqual('Echo: client1');
expect(resp2).toEqual('Echo: client2');
});
tap.test('UDP forwarding: large datagram (1400 bytes)', async () => {
const payload = 'X'.repeat(1400);
const response = await sendDatagram(PROXY_PORT, payload);
expect(response).toEqual(`Echo: ${payload}`);
});
tap.test('cleanup: stop SmartProxy and backend', async () => {
await smartProxy.stop();
await new Promise<void>((resolve) => backendServer.close(() => resolve()));
await assertPortsFree([PROXY_PORT, BACKEND_PORT]);
});
export default tap.start();

114
test/test.udp-metrics.ts Normal file
View File

@@ -0,0 +1,114 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as dgram from 'dgram';
import { SmartProxy } from '../ts/index.js';
import { findFreePorts, assertPortsFree } from './helpers/port-allocator.js';
let smartProxy: SmartProxy;
let backendServer: dgram.Socket;
let PROXY_PORT: number;
let BACKEND_PORT: number;
// Helper: send a single UDP datagram and wait for a response
function sendDatagram(port: number, msg: string, timeoutMs = 5000): Promise<string> {
return new Promise((resolve, reject) => {
const client = dgram.createSocket('udp4');
const timeout = setTimeout(() => {
client.close();
reject(new Error(`UDP response timeout after ${timeoutMs}ms`));
}, timeoutMs);
client.send(Buffer.from(msg), port, '127.0.0.1');
client.on('message', (data) => {
clearTimeout(timeout);
client.close();
resolve(data.toString());
});
client.on('error', (err) => {
clearTimeout(timeout);
client.close();
reject(err);
});
});
}
// Helper: create a UDP echo server
function createUdpEchoServer(port: number): Promise<dgram.Socket> {
return new Promise((resolve) => {
const server = dgram.createSocket('udp4');
server.on('message', (msg, rinfo) => {
server.send(Buffer.from(`Echo: ${msg.toString()}`), rinfo.port, rinfo.address);
});
server.bind(port, '127.0.0.1', () => resolve(server));
});
}
tap.test('setup: start UDP echo server and SmartProxy with metrics', async () => {
[PROXY_PORT, BACKEND_PORT] = await findFreePorts(2);
backendServer = await createUdpEchoServer(BACKEND_PORT);
smartProxy = new SmartProxy({
routes: [
{
name: 'udp-metrics-test',
match: {
ports: PROXY_PORT,
transport: 'udp' as const,
},
action: {
type: 'forward',
targets: [{ host: '127.0.0.1', port: BACKEND_PORT }],
udp: {
sessionTimeout: 10000,
},
},
},
],
defaults: {
security: {
ipAllowList: ['127.0.0.1', '::1', '::ffff:127.0.0.1'],
},
},
metrics: {
enabled: true,
sampleIntervalMs: 1000,
retentionSeconds: 60,
},
});
await smartProxy.start();
});
tap.test('UDP metrics: counters increase after traffic', async () => {
// Send a few datagrams
const resp1 = await sendDatagram(PROXY_PORT, 'metrics-test-1');
expect(resp1).toEqual('Echo: metrics-test-1');
const resp2 = await sendDatagram(PROXY_PORT, 'metrics-test-2');
expect(resp2).toEqual('Echo: metrics-test-2');
// Wait for metrics to propagate and cache to refresh
await new Promise<void>((resolve) => setTimeout(resolve, 2000));
// Get metrics (returns the adapter, need to ensure cache is fresh)
const metrics = smartProxy.getMetrics();
// The udp property reads from the Rust JSON snapshot
expect(metrics.udp).toBeDefined();
const totalSessions = metrics.udp.totalSessions();
const datagramsIn = metrics.udp.datagramsIn();
const datagramsOut = metrics.udp.datagramsOut();
console.log(`UDP metrics: sessions=${totalSessions}, in=${datagramsIn}, out=${datagramsOut}`);
expect(totalSessions).toBeGreaterThan(0);
expect(datagramsIn).toBeGreaterThan(0);
expect(datagramsOut).toBeGreaterThan(0);
});
tap.test('cleanup: stop SmartProxy and backend', async () => {
await smartProxy.stop();
await new Promise<void>((resolve) => backendServer.close(() => resolve()));
await assertPortsFree([PROXY_PORT, BACKEND_PORT]);
});
export default tap.start();

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartproxy',
version: '25.13.0',
version: '25.17.7',
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
}

View File

@@ -303,6 +303,21 @@ export class SmartProxy extends plugins.EventEmitter {
this.socketHandlerServer = null;
}
// Update datagram handler relay if datagram handler routes changed
const hasDatagramHandlers = newRoutes.some(
(r) => r.action.type === 'socket-handler' && r.action.datagramHandler
);
if (hasDatagramHandlers && !this.datagramHandlerServer) {
const dgPath = `/tmp/smartproxy-dgram-relay-${process.pid}.sock`;
this.datagramHandlerServer = new DatagramHandlerServer(dgPath, this.preprocessor);
await this.datagramHandlerServer.start();
await this.bridge.setDatagramHandlerRelay(this.datagramHandlerServer.getSocketPath());
} else if (!hasDatagramHandlers && this.datagramHandlerServer) {
await this.datagramHandlerServer.stop();
this.datagramHandlerServer = null;
}
// Update stored routes
this.settings.routes = newRoutes;

View File

@@ -7,7 +7,7 @@ import type { IRouteConfig, IRouteMatch, IRouteAction, TPortRange } from '../mod
export class RouteValidator {
private static readonly VALID_TLS_MODES = ['terminate', 'passthrough', 'terminate-and-reencrypt'];
private static readonly VALID_ACTION_TYPES = ['forward', 'socket-handler'];
private static readonly VALID_PROTOCOLS = ['tcp', 'http', 'https', 'ws', 'wss'];
private static readonly VALID_PROTOCOLS = ['tcp', 'http', 'https', 'ws', 'wss', 'udp', 'quic', 'http3'];
private static readonly MAX_PORTS = 100;
private static readonly MAX_DOMAINS = 1000;
private static readonly MAX_HEADER_SIZE = 8192;
@@ -123,10 +123,10 @@ export class RouteValidator {
errors.push(`Invalid action type: ${route.action.type}. Must be one of: ${this.VALID_ACTION_TYPES.join(', ')}`);
}
// Validate socket-handler
// Validate socket-handler (TCP socketHandler or UDP datagramHandler)
if (route.action.type === 'socket-handler') {
if (typeof route.action.socketHandler !== 'function') {
errors.push('socket-handler action requires a socketHandler function');
if (typeof route.action.socketHandler !== 'function' && typeof route.action.datagramHandler !== 'function') {
errors.push('socket-handler action requires a socketHandler or datagramHandler function');
}
}
@@ -173,6 +173,22 @@ export class RouteValidator {
}
}
}
// QUIC routes require TLS with termination (QUIC mandates TLS 1.3)
if (route.action.udp?.quic && route.action.type === 'forward') {
if (!route.action.tls) {
errors.push('QUIC routes require TLS configuration (action.tls) — QUIC mandates TLS 1.3');
} else if (route.action.tls.mode === 'passthrough') {
errors.push('QUIC routes cannot use TLS mode "passthrough" — use "terminate" or "terminate-and-reencrypt"');
}
}
// Protocol quic/http3 requires transport udp or all
if (route.match?.protocol && ['quic', 'http3'].includes(route.match.protocol)) {
if (route.match.transport && route.match.transport !== 'udp' && route.match.transport !== 'all') {
errors.push(`Protocol "${route.match.protocol}" requires transport "udp" or "all"`);
}
}
}
// Validate security settings
@@ -619,11 +635,22 @@ export function validateRouteAction(action: IRouteAction): { valid: boolean; err
}
}
// QUIC routes require TLS with termination
if (action.udp?.quic && action.type === 'forward') {
if (!action.tls) {
errors.push('QUIC routes require TLS configuration — QUIC mandates TLS 1.3');
} else if (action.tls.mode === 'passthrough') {
errors.push('QUIC routes cannot use TLS mode "passthrough"');
}
}
if (action.type === 'socket-handler') {
if (!action.socketHandler) {
errors.push('Socket handler function is required for socket-handler action');
} else if (typeof action.socketHandler !== 'function') {
if (!action.socketHandler && !action.datagramHandler) {
errors.push('Socket handler or datagram handler function is required for socket-handler action');
} else if (action.socketHandler && typeof action.socketHandler !== 'function') {
errors.push('Socket handler must be a function');
} else if (action.datagramHandler && typeof action.datagramHandler !== 'function') {
errors.push('Datagram handler must be a function');
}
}
@@ -714,7 +741,8 @@ export function hasRequiredPropertiesForAction(route: IRouteConfig, actionType:
route.action.targets.length > 0 &&
route.action.targets.every(t => t.host && t.port !== undefined);
case 'socket-handler':
return !!route.action.socketHandler && typeof route.action.socketHandler === 'function';
return (!!route.action.socketHandler && typeof route.action.socketHandler === 'function') ||
(!!route.action.datagramHandler && typeof route.action.datagramHandler === 'function');
default:
return false;
}