Compare commits

...

16 Commits

Author SHA1 Message Date
07d88f6f6a v25.17.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 23:16:42 +00:00
4b64de2c67 feat(rustproxy-passthrough): add PROXY protocol v2 client IP handling for UDP and QUIC listeners 2026-03-19 23:16:42 +00:00
e8db7bc96d v25.16.3
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 22:00:07 +00:00
2621dea9fa fix(rustproxy): upgrade fallback UDP listeners to QUIC when TLS certificates become available 2026-03-19 22:00:07 +00:00
bb5b9b3d12 v25.16.2
Some checks failed
Default (tags) / security (push) Failing after 12s
Default (tags) / test (push) Failing after 13s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 21:24:05 +00:00
d70c2d77ed fix(rustproxy-http): cache backend Alt-Svc only from original upstream responses during protocol auto-detection 2026-03-19 21:24:05 +00:00
4cf13c36f8 v25.16.1
Some checks failed
Default (tags) / security (push) Failing after 19s
Default (tags) / test (push) Failing after 18s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 20:57:48 +00:00
37c7233780 fix(http-proxy): avoid repeated HTTP/3 recaching after QUIC fallback and document backend protocol selection 2026-03-19 20:57:48 +00:00
15d0a721d5 v25.16.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 20:27:57 +00:00
af970c447e feat(quic,http3): add HTTP/3 proxy handling and hot-reload QUIC TLS configuration 2026-03-19 20:27:57 +00:00
9e1103e7a7 v25.15.0
Some checks failed
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 18:55:31 +00:00
2b990527ac feat(readme): document UDP, QUIC, and HTTP/3 support in the README 2026-03-19 18:55:31 +00:00
9595f0a9fc v25.14.1
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 16:21:37 +00:00
0fb3988123 fix(deps): update build and runtime dependencies and align route validation test expectations 2026-03-19 16:21:37 +00:00
53938df8db v25.14.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 2s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-19 16:09:51 +00:00
e890bda8fc feat(udp,http3): add UDP datagram handler relay support and stream HTTP/3 request bodies to backends 2026-03-19 16:09:51 +00:00
19 changed files with 5626 additions and 4554 deletions

View File

@@ -1,5 +1,63 @@
# Changelog # Changelog
## 2026-03-19 - 25.17.0 - feat(rustproxy-passthrough)
add PROXY protocol v2 client IP handling for UDP and QUIC listeners
- propagate trusted proxy IP configuration into UDP and QUIC listener managers
- extract and preserve real client addresses from PROXY protocol v2 headers for HTTP/3 and QUIC stream handling
- apply rate limiting, session limits, routing, and metrics using the resolved client IP while preserving correct proxy return-path routing
## 2026-03-19 - 25.16.3 - fix(rustproxy)
upgrade fallback UDP listeners to QUIC when TLS certificates become available
- Rebuild and apply QUIC TLS configuration during route and certificate updates instead of only when adding new UDP ports.
- Add logic to drain UDP sessions, stop raw fallback listeners, and start QUIC endpoints on existing ports once TLS is available.
- Retry QUIC endpoint creation during upgrade and fall back to rebinding raw UDP if the upgrade cannot complete.
## 2026-03-19 - 25.16.2 - fix(rustproxy-http)
cache backend Alt-Svc only from original upstream responses during protocol auto-detection
- Moves Alt-Svc discovery into streaming response construction so it reads backend headers before response filters inject client-facing Alt-Svc values
- Stores the protocol cache key in connection activity during auto-detect mode and clears it after HTTP/3 connection failure to avoid re-caching failed H3 routes
- Prevents fallback requests from reintroducing stale or self-injected Alt-Svc entries that could cause repeated H3 retry loops
## 2026-03-19 - 25.16.1 - fix(http-proxy)
avoid repeated HTTP/3 recaching after QUIC fallback and document backend protocol selection
- Suppress Alt-Svc HTTP/3 recaching after a failed QUIC backend connection to prevent repeated H3 timeout fallback loops
- Force an ALPN probe on TCP fallback so auto detection correctly reselects HTTP/2 or HTTP/1.1 after H3 connection failure
- Add README documentation for best-effort backendProtocol selection and supported protocol modes
## 2026-03-19 - 25.16.0 - feat(quic,http3)
add HTTP/3 proxy handling and hot-reload QUIC TLS configuration
- initialize and wire H3ProxyService into QUIC listeners so HTTP/3 requests are handled instead of being kept as placeholder connections
- add backend HTTP/3 support with protocol caching that stores Alt-Svc advertised H3 ports for auto-detection
- hot-swap TLS certificates across active QUIC endpoints and require terminating TLS for QUIC route validation
- document QUIC route setup with required TLS and ACME configuration
## 2026-03-19 - 25.15.0 - feat(readme)
document UDP, QUIC, and HTTP/3 support in the README
- Adds README examples for UDP datagram handlers, QUIC/HTTP3 forwarding, and dual-stack TCP/UDP routes
- Expands configuration and API reference sections to cover transport matching, UDP/QUIC options, backend transport selection, and UDP metrics
- Updates architecture and feature descriptions to reflect UDP, QUIC, HTTP/3, and datagram handler capabilities
## 2026-03-19 - 25.14.1 - fix(deps)
update build and runtime dependencies and align route validation test expectations
- split the test preparation step into a dedicated test:before script while keeping test execution separate
- bump development tooling and runtime package versions in package.json
- adjust the route validation test to match the current generic handler error message
## 2026-03-19 - 25.14.0 - feat(udp,http3)
add UDP datagram handler relay support and stream HTTP/3 request bodies to backends
- establish a persistent Unix socket relay for UDP datagram handlers and process handler replies back to clients
- update route validation and smart proxy route reload logic to support datagramHandler routes
- record UDP, QUIC, and HTTP/3 byte metrics more accurately, including request bytes in and UDP session cleanup connection tracking
- add integration tests for UDP forwarding, datagram handlers, and UDP metrics
## 2026-03-19 - 25.13.0 - feat(smart-proxy) ## 2026-03-19 - 25.13.0 - feat(smart-proxy)
add UDP transport support with QUIC/HTTP3 routing and datagram handler relay add UDP transport support with QUIC/HTTP3 routing and datagram handler relay

3033
deno.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartproxy", "name": "@push.rocks/smartproxy",
"version": "25.13.0", "version": "25.17.0",
"private": false, "private": false,
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.", "description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",
@@ -9,27 +9,28 @@
"author": "Lossless GmbH", "author": "Lossless GmbH",
"license": "MIT", "license": "MIT",
"scripts": { "scripts": {
"test": "(tsrust) && (tstest test/**/test*.ts --verbose --timeout 60 --logfile)", "test:before": "(tsrust)",
"test": "(tstest test/**/test*.ts --verbose --timeout 60 --logfile)",
"build": "(tsbuild tsfolders --allowimplicitany) && (tsrust)", "build": "(tsbuild tsfolders --allowimplicitany) && (tsrust)",
"format": "(gitzone format)", "format": "(gitzone format)",
"buildDocs": "tsdoc" "buildDocs": "tsdoc"
}, },
"devDependencies": { "devDependencies": {
"@git.zone/tsbuild": "^4.1.2", "@git.zone/tsbuild": "^4.3.0",
"@git.zone/tsrun": "^2.0.1", "@git.zone/tsrun": "^2.0.1",
"@git.zone/tsrust": "^1.3.0", "@git.zone/tsrust": "^1.3.0",
"@git.zone/tstest": "^3.1.8", "@git.zone/tstest": "^3.5.0",
"@push.rocks/smartserve": "^2.0.1", "@push.rocks/smartserve": "^2.0.1",
"@types/node": "^25.2.3", "@types/node": "^25.5.0",
"typescript": "^5.9.3", "typescript": "^5.9.3",
"why-is-node-running": "^3.2.2" "why-is-node-running": "^3.2.2"
}, },
"dependencies": { "dependencies": {
"@push.rocks/smartcrypto": "^2.0.4", "@push.rocks/smartcrypto": "^2.0.4",
"@push.rocks/smartlog": "^3.1.10", "@push.rocks/smartlog": "^3.2.1",
"@push.rocks/smartrust": "^1.2.1", "@push.rocks/smartrust": "^1.3.2",
"@tsclass/tsclass": "^9.3.0", "@tsclass/tsclass": "^9.5.0",
"minimatch": "^10.2.0" "minimatch": "^10.2.4"
}, },
"files": [ "files": [
"ts/**/*", "ts/**/*",

4777
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

275
readme.md
View File

@@ -1,6 +1,6 @@
# @push.rocks/smartproxy 🚀 # @push.rocks/smartproxy 🚀
**A high-performance, Rust-powered proxy toolkit for Node.js** — unified route-based configuration for SSL/TLS termination, HTTP/HTTPS reverse proxying, WebSocket support, load balancing, custom protocol handlers, and kernel-level NFTables forwarding. **A high-performance, Rust-powered proxy toolkit for Node.js** — unified route-based configuration for SSL/TLS termination, HTTP/HTTPS reverse proxying, WebSocket support, UDP/QUIC/HTTP3, load balancing, custom protocol handlers, and kernel-level NFTables forwarding.
## 📦 Installation ## 📦 Installation
@@ -16,9 +16,9 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
## 🎯 What is SmartProxy? ## 🎯 What is SmartProxy?
SmartProxy is a production-ready proxy solution that takes the complexity out of traffic management. Under the hood, all networking — TCP, TLS, HTTP reverse proxy, connection tracking, security enforcement, and NFTables — is handled by a **Rust engine** for maximum performance, while you configure everything through a clean TypeScript API with full type safety. SmartProxy is a production-ready proxy solution that takes the complexity out of traffic management. Under the hood, all networking — TCP, UDP, TLS, HTTP reverse proxy, QUIC/HTTP3, connection tracking, security enforcement, and NFTables — is handled by a **Rust engine** for maximum performance, while you configure everything through a clean TypeScript API with full type safety.
Whether you're building microservices, deploying edge infrastructure, or need a battle-tested reverse proxy with automatic Let's Encrypt certificates, SmartProxy has you covered. Whether you're building microservices, deploying edge infrastructure, proxying UDP-based protocols, or need a battle-tested reverse proxy with automatic Let's Encrypt certificates, SmartProxy has you covered.
### ⚡ Key Features ### ⚡ Key Features
@@ -29,11 +29,12 @@ Whether you're building microservices, deploying edge infrastructure, or need a
| 🔒 **Automatic SSL/TLS** | Zero-config HTTPS with Let's Encrypt ACME integration | | 🔒 **Automatic SSL/TLS** | Zero-config HTTPS with Let's Encrypt ACME integration |
| 🎯 **Flexible Matching** | Route by port, domain, path, protocol, client IP, TLS version, headers, or custom logic | | 🎯 **Flexible Matching** | Route by port, domain, path, protocol, client IP, TLS version, headers, or custom logic |
| 🚄 **High-Performance** | Choose between user-space or kernel-level (NFTables) forwarding | | 🚄 **High-Performance** | Choose between user-space or kernel-level (NFTables) forwarding |
| 📡 **UDP & QUIC/HTTP3** | First-class UDP transport, datagram handlers, QUIC tunneling, and HTTP/3 support |
| ⚖️ **Load Balancing** | Round-robin, least-connections, IP-hash with health checks | | ⚖️ **Load Balancing** | Round-robin, least-connections, IP-hash with health checks |
| 🛡️ **Enterprise Security** | IP filtering, rate limiting, basic auth, JWT auth, connection limits | | 🛡️ **Enterprise Security** | IP filtering, rate limiting, basic auth, JWT auth, connection limits |
| 🔌 **WebSocket Support** | First-class WebSocket proxying with ping/pong keep-alive | | 🔌 **WebSocket Support** | First-class WebSocket proxying with ping/pong keep-alive |
| 🎮 **Custom Protocols** | Socket handlers for implementing any protocol in TypeScript | | 🎮 **Custom Protocols** | Socket and datagram handlers for implementing any protocol in TypeScript |
| 📊 **Live Metrics** | Real-time throughput, connection counts, and performance data | | 📊 **Live Metrics** | Real-time throughput, connection counts, UDP sessions, and performance data |
| 🔧 **Dynamic Management** | Add/remove ports and routes at runtime without restarts | | 🔧 **Dynamic Management** | Add/remove ports and routes at runtime without restarts |
| 🔄 **PROXY Protocol** | Full PROXY protocol v1/v2 support for preserving client information | | 🔄 **PROXY Protocol** | Full PROXY protocol v1/v2 support for preserving client information |
| 💾 **Consumer Cert Storage** | Bring your own persistence — SmartProxy never writes certs to disk | | 💾 **Consumer Cert Storage** | Bring your own persistence — SmartProxy never writes certs to disk |
@@ -89,7 +90,7 @@ SmartProxy uses a powerful **match/action** pattern that makes routing predictab
``` ```
Every route consists of: Every route consists of:
- **Match** — What traffic to capture (ports, domains, paths, protocol, IPs, headers) - **Match** — What traffic to capture (ports, domains, paths, transport, protocol, IPs, headers)
- **Action** — What to do with it (`forward` or `socket-handler`) - **Action** — What to do with it (`forward` or `socket-handler`)
- **Security** (optional) — IP allow/block lists, rate limits, authentication - **Security** (optional) — IP allow/block lists, rate limits, authentication
- **Headers** (optional) — Request/response header manipulation with template variables - **Headers** (optional) — Request/response header manipulation with template variables
@@ -197,7 +198,7 @@ apiRoute = addRateLimiting(apiRoute, {
const proxy = new SmartProxy({ routes: [apiRoute] }); const proxy = new SmartProxy({ routes: [apiRoute] });
``` ```
### 🎮 Custom Protocol Handler ### 🎮 Custom Protocol Handler (TCP)
SmartProxy lets you implement any protocol with full socket control. Routes with JavaScript socket handlers are automatically relayed from the Rust engine back to your TypeScript code: SmartProxy lets you implement any protocol with full socket control. Routes with JavaScript socket handlers are automatically relayed from the Rust engine back to your TypeScript code:
@@ -247,6 +248,140 @@ const proxy = new SmartProxy({ routes: [echoRoute, customRoute] });
| `SocketHandlers.httpBlock(status, message)` | HTTP block response | | `SocketHandlers.httpBlock(status, message)` | HTTP block response |
| `SocketHandlers.block(message)` | Block with optional message | | `SocketHandlers.block(message)` | Block with optional message |
### 📡 UDP Datagram Handler
Handle raw UDP datagrams with custom TypeScript logic — perfect for DNS, game servers, IoT protocols, or any UDP-based service:
```typescript
import { SmartProxy } from '@push.rocks/smartproxy';
import type { IRouteConfig, TDatagramHandler, IDatagramInfo } from '@push.rocks/smartproxy';
// Custom UDP echo handler
const udpHandler: TDatagramHandler = (datagram, info, reply) => {
console.log(`UDP from ${info.sourceIp}:${info.sourcePort} on port ${info.destPort}`);
reply(datagram); // Echo it back
};
const proxy = new SmartProxy({
routes: [{
name: 'udp-echo',
match: {
ports: 5353,
transport: 'udp' // 👈 Listen for UDP datagrams
},
action: {
type: 'socket-handler',
datagramHandler: udpHandler, // 👈 Process each datagram
udp: {
sessionTimeout: 60000, // Session idle timeout (ms)
maxSessionsPerIP: 100,
maxDatagramSize: 65535
}
}
}]
});
await proxy.start();
```
### 📡 QUIC / HTTP3 Forwarding
Forward QUIC traffic to backends with optional protocol translation (e.g., receive QUIC, forward as TCP/HTTP1):
```typescript
import { SmartProxy } from '@push.rocks/smartproxy';
import type { IRouteConfig } from '@push.rocks/smartproxy';
const quicRoute: IRouteConfig = {
name: 'quic-to-backend',
match: {
ports: 443,
transport: 'udp',
protocol: 'quic' // 👈 Match QUIC protocol
},
action: {
type: 'forward',
targets: [{
host: 'backend-server',
port: 8443,
backendTransport: 'tcp' // 👈 Translate QUIC → TCP for backend
}],
tls: {
mode: 'terminate',
certificate: 'auto' // 👈 QUIC requires TLS 1.3
},
udp: {
quic: {
enableHttp3: true,
maxIdleTimeout: 30000,
maxConcurrentBidiStreams: 100,
altSvcPort: 443, // Advertise in Alt-Svc header
altSvcMaxAge: 86400
}
}
}
};
const proxy = new SmartProxy({
acme: { email: 'ssl@example.com' },
routes: [quicRoute]
});
```
### 🚄 Best-Effort Backend Protocol (H3 > H2 > H1)
SmartProxy automatically uses the **highest protocol your backend supports** for HTTP requests. The backend protocol is independent of the client protocol — a client using HTTP/1.1 can be forwarded over HTTP/3 to the backend, and vice versa.
```typescript
const route: IRouteConfig = {
name: 'auto-protocol',
match: { ports: 443, domains: 'app.example.com' },
action: {
type: 'forward',
targets: [{ host: 'backend', port: 8443 }],
tls: { mode: 'terminate', certificate: 'auto' },
options: {
backendProtocol: 'auto' // 👈 Default — best-effort selection
}
}
};
```
**How protocol discovery works (browser model):**
1. First request → TLS ALPN probe detects H2 or H1
2. Backend response inspected for `Alt-Svc: h3=":port"` header
3. If H3 advertised → cached and used for subsequent requests via QUIC
4. Graceful fallback: H3 failure → H2 → H1 with automatic cache invalidation
| `backendProtocol` | Behavior |
|---|---|
| `'auto'` (default) | Best-effort: H3 > H2 > H1 with Alt-Svc discovery |
| `'http1'` | Always HTTP/1.1 |
| `'http2'` | Always HTTP/2 (hard-fail if unsupported) |
| `'http3'` | Always HTTP/3 via QUIC (hard-fail if unsupported) |
> **Note:** WebSocket upgrades always use HTTP/1.1 to the backend regardless of `backendProtocol`, since there's no performance benefit from H2/H3 Extended CONNECT for tunneled connections, and backend support is rare.
### 🔁 Dual-Stack TCP + UDP Route
Listen on both TCP and UDP with a single route — handle each transport with its own handler:
```typescript
const dualStackRoute: IRouteConfig = {
name: 'dual-stack-dns',
match: {
ports: 53,
transport: 'all' // 👈 Listen on both TCP and UDP
},
action: {
type: 'socket-handler',
socketHandler: handleTcpDns, // 👈 TCP connections
datagramHandler: handleUdpDns, // 👈 UDP datagrams
}
};
```
### ⚡ High-Performance NFTables Forwarding ### ⚡ High-Performance NFTables Forwarding
For ultra-low latency on Linux, use kernel-level forwarding (requires root): For ultra-low latency on Linux, use kernel-level forwarding (requires root):
@@ -419,6 +554,10 @@ console.log(`Bytes in: ${metrics.totals.bytesIn()}`);
console.log(`Requests/sec: ${metrics.requests.perSecond()}`); console.log(`Requests/sec: ${metrics.requests.perSecond()}`);
console.log(`Throughput in: ${metrics.throughput.instant().in} bytes/sec`); console.log(`Throughput in: ${metrics.throughput.instant().in} bytes/sec`);
// UDP metrics
console.log(`UDP sessions: ${metrics.udp.activeSessions()}`);
console.log(`Datagrams in: ${metrics.udp.datagramsIn()}`);
// Get detailed statistics from the Rust engine // Get detailed statistics from the Rust engine
const stats = await proxy.getStatistics(); const stats = await proxy.getStatistics();
@@ -545,7 +684,7 @@ SmartProxy uses a hybrid **Rust + TypeScript** architecture:
``` ```
┌─────────────────────────────────────────────────────┐ ┌─────────────────────────────────────────────────────┐
│ Your Application │ │ Your Application │
│ (TypeScript — routes, config, socket handlers) │ │ (TypeScript — routes, config, handlers)
└──────────────────┬──────────────────────────────────┘ └──────────────────┬──────────────────────────────────┘
│ IPC (JSON over stdin/stdout) │ IPC (JSON over stdin/stdout)
┌──────────────────▼──────────────────────────────────┐ ┌──────────────────▼──────────────────────────────────┐
@@ -556,22 +695,23 @@ SmartProxy uses a hybrid **Rust + TypeScript** architecture:
│ │ │ │ Proxy │ │ │ │ │ │ │ │ │ │ Proxy │ │ │ │ │ │
│ └─────────┘ └─────────┘ └─────────┘ └──────────┘ │ │ └─────────┘ └─────────┘ └─────────┘ └──────────┘ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌──────────┐ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌──────────┐ │
│ │ Security│ │ Metrics │ │ Connec- │ │ NFTables │ │ │ │ UDP │ │ Security│ │ Metrics │ │ NFTables │ │
│ │ Enforce │ │ Collect │ │ tion │ │ Mgr │ │ │ │ QUIC │ │ Enforce │ │ Collect │ │ Mgr │ │
│ │ │ │ │ │ Tracker │ │ │ │ │ │ HTTP/3 │ │ │ │ │ │ │ │
│ └─────────┘ └─────────┘ └─────────┘ └──────────┘ │ │ └─────────┘ └─────────┘ └─────────┘ └──────────┘ │
└──────────────────┬──────────────────────────────────┘ └──────────────────┬──────────────────────────────────┘
│ Unix Socket Relay │ Unix Socket Relay
┌──────────────────▼──────────────────────────────────┐ ┌──────────────────▼──────────────────────────────────┐
TypeScript Socket Handler Server │ TypeScript Socket & Datagram Handler Servers
│ (for JS-defined socket handlers & dynamic routes) │ (for JS socket handlers, datagram handlers,
│ and dynamic routes) │
└─────────────────────────────────────────────────────┘ └─────────────────────────────────────────────────────┘
``` ```
- **Rust Engine** handles all networking, TLS, HTTP proxying, connection management, security, and metrics - **Rust Engine** handles all networking: TCP, UDP, TLS, QUIC, HTTP proxying, connection management, security, and metrics
- **TypeScript** provides the npm API, configuration types, route helpers, validation, and socket handler callbacks - **TypeScript** provides the npm API, configuration types, route helpers, validation, and handler callbacks
- **IPC** — The TypeScript wrapper uses JSON commands/events over stdin/stdout to communicate with the Rust binary - **IPC** — The TypeScript wrapper uses JSON commands/events over stdin/stdout to communicate with the Rust binary
- **Socket Relay** — A Unix domain socket server for routes requiring TypeScript-side handling (socket handlers, dynamic host/port functions) - **Socket/Datagram Relay** — Unix domain socket servers for routes requiring TypeScript-side handling (socket handlers, datagram handlers, dynamic host/port functions)
## 🎯 Route Configuration Reference ## 🎯 Route Configuration Reference
@@ -579,22 +719,26 @@ SmartProxy uses a hybrid **Rust + TypeScript** architecture:
```typescript ```typescript
interface IRouteMatch { interface IRouteMatch {
ports: number | number[] | Array<{ from: number; to: number }>; // Required — port(s) to listen on ports: TPortRange; // Required — port(s) to listen on
domains?: string | string[]; // 'example.com', '*.example.com' transport?: 'tcp' | 'udp' | 'all'; // Transport protocol (default: 'tcp')
path?: string; // '/api/*', '/users/:id' domains?: string | string[]; // 'example.com', '*.example.com'
clientIp?: string[]; // ['10.0.0.0/8', '192.168.*'] path?: string; // '/api/*', '/users/:id'
tlsVersion?: string[]; // ['TLSv1.2', 'TLSv1.3'] clientIp?: string[]; // ['10.0.0.0/8', '192.168.*']
tlsVersion?: string[]; // ['TLSv1.2', 'TLSv1.3']
headers?: Record<string, string | RegExp>; // Match by HTTP headers headers?: Record<string, string | RegExp>; // Match by HTTP headers
protocol?: 'http' | 'tcp'; // Match specific protocol ('http' includes h2 + WebSocket upgrades) protocol?: 'http' | 'tcp' | 'udp' | 'quic' | 'http3'; // Application-layer protocol
} }
// Port range supports single numbers, arrays, and ranges
type TPortRange = number | Array<number | { from: number; to: number }>;
``` ```
### Action Types ### Action Types
| Type | Description | | Type | Description |
|------|-------------| |------|-------------|
| `forward` | Proxy to one or more backend targets (with optional TLS, WebSocket, load balancing) | | `forward` | Proxy to one or more backend targets (with optional TLS, WebSocket, load balancing, UDP/QUIC) |
| `socket-handler` | Custom socket handling function in TypeScript | | `socket-handler` | Custom socket/datagram handling function in TypeScript |
### Target Options ### Target Options
@@ -602,14 +746,15 @@ interface IRouteMatch {
interface IRouteTarget { interface IRouteTarget {
host: string | string[] | ((context: IRouteContext) => string | string[]); host: string | string[] | ((context: IRouteContext) => string | string[]);
port: number | 'preserve' | ((context: IRouteContext) => number); port: number | 'preserve' | ((context: IRouteContext) => number);
tls?: IRouteTls; // Per-target TLS override tls?: IRouteTls; // Per-target TLS override
priority?: number; // Target priority priority?: number; // Target priority
match?: ITargetMatch; // Sub-match within a route (by port, path, headers, method) match?: ITargetMatch; // Sub-match within a route (by port, path, headers, method)
websocket?: IRouteWebSocket; websocket?: IRouteWebSocket;
loadBalancing?: IRouteLoadBalancing; loadBalancing?: IRouteLoadBalancing;
sendProxyProtocol?: boolean; sendProxyProtocol?: boolean;
headers?: IRouteHeaders; headers?: IRouteHeaders;
advanced?: IRouteAdvanced; advanced?: IRouteAdvanced;
backendTransport?: 'tcp' | 'udp'; // Backend transport (e.g., receive QUIC, forward as TCP)
} }
``` ```
@@ -666,6 +811,49 @@ interface IRouteLoadBalancing {
} }
``` ```
### Backend Protocol Options
```typescript
// Set on action.options
{
action: {
type: 'forward',
targets: [...],
options: {
backendProtocol: 'auto' | 'http1' | 'http2' | 'http3'
}
}
}
```
| Value | Backend Behavior |
|-------|-----------------|
| `'auto'` | Best-effort: discovers H3 via Alt-Svc, probes H2 via ALPN, falls back to H1 |
| `'http1'` | Always HTTP/1.1 (no ALPN probe) |
| `'http2'` | Always HTTP/2 (hard-fail if handshake fails) |
| `'http3'` | Always HTTP/3 over QUIC (3s connect timeout, hard-fail if unreachable) |
### UDP & QUIC Options
```typescript
interface IRouteUdp {
sessionTimeout?: number; // Idle timeout per UDP session (ms, default: 60000)
maxSessionsPerIP?: number; // Max concurrent sessions per IP (default: 1000)
maxDatagramSize?: number; // Max datagram size in bytes (default: 65535)
quic?: IRouteQuic;
}
interface IRouteQuic {
maxIdleTimeout?: number; // QUIC idle timeout (ms, default: 30000)
maxConcurrentBidiStreams?: number; // Max bidi streams (default: 100)
maxConcurrentUniStreams?: number; // Max uni streams (default: 100)
enableHttp3?: boolean; // Enable HTTP/3 (default: false)
altSvcPort?: number; // Port for Alt-Svc header
altSvcMaxAge?: number; // Alt-Svc max age in seconds (default: 86400)
initialCongestionWindow?: number; // Initial congestion window (bytes)
}
```
## 🛠️ Helper Functions Reference ## 🛠️ Helper Functions Reference
All helpers are fully typed and return `IRouteConfig` or `IRouteConfig[]`: All helpers are fully typed and return `IRouteConfig` or `IRouteConfig[]`:
@@ -689,7 +877,7 @@ import {
createWebSocketRoute, // WebSocket-enabled route createWebSocketRoute, // WebSocket-enabled route
// Custom Protocols // Custom Protocols
createSocketHandlerRoute, // Custom socket handler createSocketHandlerRoute, // Custom TCP socket handler
SocketHandlers, // Pre-built handlers (echo, proxy, block, etc.) SocketHandlers, // Pre-built handlers (echo, proxy, block, etc.)
// NFTables (Linux, requires root) // NFTables (Linux, requires root)
@@ -718,6 +906,8 @@ import {
} from '@push.rocks/smartproxy'; } from '@push.rocks/smartproxy';
``` ```
> **Tip:** For UDP datagram handler routes or QUIC/HTTP3 routes, construct `IRouteConfig` objects directly — there are no helper functions for these yet. See the [UDP Datagram Handler](#-udp-datagram-handler) and [QUIC / HTTP3 Forwarding](#-quic--http3-forwarding) examples above.
## 📖 API Documentation ## 📖 API Documentation
### SmartProxy Class ### SmartProxy Class
@@ -753,6 +943,8 @@ class SmartProxy extends EventEmitter {
// Events // Events
on(event: 'error', handler: (err: Error) => void): this; on(event: 'error', handler: (err: Error) => void): this;
on(event: 'certificate-issued', handler: (ev: ICertificateIssuedEvent) => void): this;
on(event: 'certificate-failed', handler: (ev: ICertificateFailedEvent) => void): this;
} }
``` ```
@@ -775,6 +967,8 @@ interface ISmartProxyOptions {
// Custom certificate provisioning // Custom certificate provisioning
certProvisionFunction?: (domain: string) => Promise<ICert | 'http01'>; certProvisionFunction?: (domain: string) => Promise<ICert | 'http01'>;
certProvisionFallbackToAcme?: boolean; // Fall back to ACME on failure (default: true) certProvisionFallbackToAcme?: boolean; // Fall back to ACME on failure (default: true)
certProvisionTimeout?: number; // Timeout per provision call (ms)
certProvisionConcurrency?: number; // Max concurrent provisions
// Consumer-managed certificate persistence (see "Consumer-Managed Certificate Storage") // Consumer-managed certificate persistence (see "Consumer-Managed Certificate Storage")
certStore?: ISmartProxyCertStore; certStore?: ISmartProxyCertStore;
@@ -782,6 +976,9 @@ interface ISmartProxyOptions {
// Self-signed fallback // Self-signed fallback
disableDefaultCert?: boolean; // Disable '*' self-signed fallback (default: false) disableDefaultCert?: boolean; // Disable '*' self-signed fallback (default: false)
// Rust binary path override
rustBinaryPath?: string; // Custom path to the Rust proxy binary
// Global defaults // Global defaults
defaults?: { defaults?: {
target?: { host: string; port: number }; target?: { host: string; port: number };
@@ -868,11 +1065,22 @@ metrics.requests.perSecond(); // Requests per second
metrics.requests.perMinute(); // Requests per minute metrics.requests.perMinute(); // Requests per minute
metrics.requests.total(); // Total requests metrics.requests.total(); // Total requests
// UDP metrics
metrics.udp.activeSessions(); // Current active UDP sessions
metrics.udp.totalSessions(); // Total UDP sessions since start
metrics.udp.datagramsIn(); // Datagrams received
metrics.udp.datagramsOut(); // Datagrams sent
// Cumulative totals // Cumulative totals
metrics.totals.bytesIn(); // Total bytes received metrics.totals.bytesIn(); // Total bytes received
metrics.totals.bytesOut(); // Total bytes sent metrics.totals.bytesOut(); // Total bytes sent
metrics.totals.connections(); // Total connections metrics.totals.connections(); // Total connections
// Backend metrics
metrics.backends.byBackend(); // Map<backend, IBackendMetrics>
metrics.backends.protocols(); // Map<backend, protocol>
metrics.backends.topByErrors(10); // Top N error-prone backends
// Percentiles // Percentiles
metrics.percentiles.connectionDuration(); // { p50, p95, p99 } metrics.percentiles.connectionDuration(); // { p50, p95, p99 }
metrics.percentiles.bytesTransferred(); // { in: { p50, p95, p99 }, out: { p50, p95, p99 } } metrics.percentiles.bytesTransferred(); // { in: { p50, p95, p99 }, out: { p50, p95, p99 } }
@@ -896,11 +1104,12 @@ metrics.percentiles.bytesTransferred(); // { in: { p50, p95, p99 }, out: { p5
### Rust Binary Not Found ### Rust Binary Not Found
SmartProxy searches for the Rust binary in this order: SmartProxy searches for the Rust binary in this order:
1. `SMARTPROXY_RUST_BINARY` environment variable 1. `rustBinaryPath` option in `ISmartProxyOptions`
2. Platform-specific npm package (`@push.rocks/smartproxy-linux-x64`, etc.) 2. `SMARTPROXY_RUST_BINARY` environment variable
3. `dist_rust/rustproxy` relative to the package root (built by `tsrust`) 3. Platform-specific npm package (`@push.rocks/smartproxy-linux-x64`, etc.)
4. Local dev build (`./rust/target/release/rustproxy`) 4. `dist_rust/rustproxy` relative to the package root (built by `tsrust`)
5. System PATH (`rustproxy`) 5. Local dev build (`./rust/target/release/rustproxy`)
6. System PATH (`rustproxy`)
### Performance Tuning ### Performance Tuning
- ✅ Use NFTables forwarding for high-traffic routes (Linux only) - ✅ Use NFTables forwarding for high-traffic routes (Linux only)

View File

@@ -4,11 +4,15 @@
//! and forwards them to backends using the same routing and pool infrastructure //! and forwards them to backends using the same routing and pool infrastructure
//! as the HTTP/1+2 proxy. //! as the HTTP/1+2 proxy.
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes};
use http_body::Frame;
use tracing::{debug, warn}; use tracing::{debug, warn};
use rustproxy_config::{RouteConfig, TransportProtocol}; use rustproxy_config::{RouteConfig, TransportProtocol};
@@ -58,13 +62,17 @@ impl H3ProxyService {
} }
/// Handle an accepted QUIC connection as HTTP/3. /// Handle an accepted QUIC connection as HTTP/3.
///
/// If `real_client_addr` is provided (from PROXY protocol), it overrides
/// `connection.remote_address()` for client IP attribution.
pub async fn handle_connection( pub async fn handle_connection(
&self, &self,
connection: quinn::Connection, connection: quinn::Connection,
_fallback_route: &RouteConfig, _fallback_route: &RouteConfig,
port: u16, port: u16,
real_client_addr: Option<SocketAddr>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let remote_addr = connection.remote_address(); let remote_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
debug!("HTTP/3 connection from {} on port {}", remote_addr, port); debug!("HTTP/3 connection from {} on port {}", remote_addr, port);
let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> = let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
@@ -165,15 +173,6 @@ async fn handle_h3_request(
let backend_port = target.port.resolve(port); let backend_port = target.port.resolve(port);
let backend_addr = format!("{}:{}", backend_host, backend_port); let backend_addr = format!("{}:{}", backend_host, backend_port);
// Read request body
let mut body_data = Vec::new();
while let Some(mut chunk) = stream.recv_data().await
.map_err(|e| anyhow::anyhow!("Failed to read H3 request body: {}", e))?
{
body_data.extend_from_slice(chunk.chunk());
chunk.advance(chunk.remaining());
}
// Connect to backend via TCP HTTP/1.1 with timeout // Connect to backend via TCP HTTP/1.1 with timeout
let tcp_stream = tokio::time::timeout( let tcp_stream = tokio::time::timeout(
connect_timeout, connect_timeout,
@@ -194,11 +193,37 @@ async fn handle_h3_request(
} }
}); });
let body = http_body_util::Full::new(Bytes::from(body_data)); // Stream request body from H3 client to backend via an mpsc channel.
// This avoids buffering the entire request body in memory.
let (body_tx, body_rx) = tokio::sync::mpsc::channel::<Bytes>(4);
let total_bytes_in = Arc::new(std::sync::atomic::AtomicU64::new(0));
let total_bytes_in_writer = Arc::clone(&total_bytes_in);
// Spawn the H3 body reader task
let body_reader = tokio::spawn(async move {
while let Ok(Some(mut chunk)) = stream.recv_data().await {
let data = Bytes::copy_from_slice(chunk.chunk());
total_bytes_in_writer.fetch_add(data.len() as u64, std::sync::atomic::Ordering::Relaxed);
chunk.advance(chunk.remaining());
if body_tx.send(data).await.is_err() {
break;
}
}
stream
});
// Create a body that polls from the mpsc receiver
let body = H3RequestBody { receiver: body_rx };
let backend_req = build_backend_request(&method, &backend_addr, &path, &host, &request, body)?; let backend_req = build_backend_request(&method, &backend_addr, &path, &host, &request, body)?;
let response = sender.send_request(backend_req).await let response = sender.send_request(backend_req).await
.map_err(|e| anyhow::anyhow!("Backend request failed: {}", e))?; .map_err(|e| anyhow::anyhow!("Backend request failed: {}", e))?;
// Await the body reader to get the stream back
let mut stream = body_reader.await
.map_err(|e| anyhow::anyhow!("Body reader task failed: {}", e))?;
let total_bytes_in = total_bytes_in.load(std::sync::atomic::Ordering::Relaxed);
// Build H3 response // Build H3 response
let status = response.status(); let status = response.status();
let mut h3_response = hyper::Response::builder().status(status); let mut h3_response = hyper::Response::builder().status(status);
@@ -252,7 +277,7 @@ async fn handle_h3_request(
// Record metrics // Record metrics
let route_id = route.name.as_deref().or(route.id.as_deref()); let route_id = route.name.as_deref().or(route.id.as_deref());
metrics.record_bytes(0, total_bytes_out, route_id, Some(client_ip)); metrics.record_bytes(total_bytes_in, total_bytes_out, route_id, Some(client_ip));
// Finish the stream // Finish the stream
stream.finish().await stream.finish().await
@@ -262,14 +287,14 @@ async fn handle_h3_request(
} }
/// Build an HTTP/1.1 backend request from the H3 frontend request. /// Build an HTTP/1.1 backend request from the H3 frontend request.
fn build_backend_request( fn build_backend_request<B>(
method: &hyper::Method, method: &hyper::Method,
backend_addr: &str, backend_addr: &str,
path: &str, path: &str,
host: &str, host: &str,
original_request: &hyper::Request<()>, original_request: &hyper::Request<()>,
body: http_body_util::Full<Bytes>, body: B,
) -> anyhow::Result<hyper::Request<http_body_util::Full<Bytes>>> { ) -> anyhow::Result<hyper::Request<B>> {
let mut req = hyper::Request::builder() let mut req = hyper::Request::builder()
.method(method) .method(method)
.uri(format!("http://{}{}", backend_addr, path)) .uri(format!("http://{}{}", backend_addr, path))
@@ -286,3 +311,27 @@ fn build_backend_request(
req.body(body) req.body(body)
.map_err(|e| anyhow::anyhow!("Failed to build backend request: {}", e)) .map_err(|e| anyhow::anyhow!("Failed to build backend request: {}", e))
} }
/// A streaming request body backed by an mpsc channel receiver.
///
/// Implements `http_body::Body` so hyper can poll chunks as they arrive
/// from the H3 client, avoiding buffering the entire request body in memory.
struct H3RequestBody {
receiver: tokio::sync::mpsc::Receiver<Bytes>,
}
impl http_body::Body for H3RequestBody {
type Data = Bytes;
type Error = hyper::Error;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.receiver.poll_recv(cx) {
Poll::Ready(Some(data)) => Poll::Ready(Some(Ok(Frame::data(data)))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

View File

@@ -1,8 +1,11 @@
//! Bounded, TTL-based protocol detection cache for HTTP/2 auto-detection. //! Bounded, TTL-based protocol detection cache for backend protocol auto-detection.
//! //!
//! Caches the ALPN-negotiated protocol (H1 or H2) per backend endpoint and requested //! Caches the detected protocol (H1, H2, or H3) per backend endpoint and requested
//! domain (host:port + requested_host). This prevents cache oscillation when multiple //! domain (host:port + requested_host). This prevents cache oscillation when multiple
//! frontend domains share the same backend but differ in HTTP/2 support. //! frontend domains share the same backend but differ in protocol support.
//!
//! H3 detection uses the browser model: Alt-Svc headers from H1/H2 responses are
//! parsed and cached, including the advertised H3 port (which may differ from TCP).
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@@ -29,6 +32,14 @@ pub enum DetectedProtocol {
H3, H3,
} }
/// Result of a protocol cache lookup.
#[derive(Debug, Clone, Copy)]
pub struct CachedProtocol {
pub protocol: DetectedProtocol,
/// For H3: the port advertised by Alt-Svc (may differ from TCP port).
pub h3_port: Option<u16>,
}
/// Key for the protocol cache: (host, port, requested_host). /// Key for the protocol cache: (host, port, requested_host).
#[derive(Clone, Debug, Hash, Eq, PartialEq)] #[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub struct ProtocolCacheKey { pub struct ProtocolCacheKey {
@@ -43,6 +54,8 @@ pub struct ProtocolCacheKey {
struct CachedEntry { struct CachedEntry {
protocol: DetectedProtocol, protocol: DetectedProtocol,
detected_at: Instant, detected_at: Instant,
/// For H3: the port advertised by Alt-Svc (may differ from TCP port).
h3_port: Option<u16>,
} }
/// Bounded, TTL-based protocol detection cache. /// Bounded, TTL-based protocol detection cache.
@@ -75,11 +88,14 @@ impl ProtocolCache {
/// Look up the cached protocol for a backend endpoint. /// Look up the cached protocol for a backend endpoint.
/// Returns `None` if not cached or expired (caller should probe via ALPN). /// Returns `None` if not cached or expired (caller should probe via ALPN).
pub fn get(&self, key: &ProtocolCacheKey) -> Option<DetectedProtocol> { pub fn get(&self, key: &ProtocolCacheKey) -> Option<CachedProtocol> {
let entry = self.cache.get(key)?; let entry = self.cache.get(key)?;
if entry.detected_at.elapsed() < PROTOCOL_CACHE_TTL { if entry.detected_at.elapsed() < PROTOCOL_CACHE_TTL {
debug!("Protocol cache hit: {:?} for {}:{} (requested: {:?})", entry.protocol, key.host, key.port, key.requested_host); debug!("Protocol cache hit: {:?} for {}:{} (requested: {:?})", entry.protocol, key.host, key.port, key.requested_host);
Some(entry.protocol) Some(CachedProtocol {
protocol: entry.protocol,
h3_port: entry.h3_port,
})
} else { } else {
// Expired — remove and return None to trigger re-probe // Expired — remove and return None to trigger re-probe
drop(entry); // release DashMap ref before remove drop(entry); // release DashMap ref before remove
@@ -91,6 +107,16 @@ impl ProtocolCache {
/// Insert a detected protocol into the cache. /// Insert a detected protocol into the cache.
/// If the cache is at capacity, evict the oldest entry first. /// If the cache is at capacity, evict the oldest entry first.
pub fn insert(&self, key: ProtocolCacheKey, protocol: DetectedProtocol) { pub fn insert(&self, key: ProtocolCacheKey, protocol: DetectedProtocol) {
self.insert_with_h3_port(key, protocol, None);
}
/// Insert an H3 detection result with the Alt-Svc advertised port.
pub fn insert_h3(&self, key: ProtocolCacheKey, h3_port: u16) {
self.insert_with_h3_port(key, DetectedProtocol::H3, Some(h3_port));
}
/// Insert a protocol detection result with an optional H3 port.
fn insert_with_h3_port(&self, key: ProtocolCacheKey, protocol: DetectedProtocol, h3_port: Option<u16>) {
if self.cache.len() >= PROTOCOL_CACHE_MAX_ENTRIES && !self.cache.contains_key(&key) { if self.cache.len() >= PROTOCOL_CACHE_MAX_ENTRIES && !self.cache.contains_key(&key) {
// Evict the oldest entry to stay within bounds // Evict the oldest entry to stay within bounds
let oldest = self.cache.iter() let oldest = self.cache.iter()
@@ -103,6 +129,7 @@ impl ProtocolCache {
self.cache.insert(key, CachedEntry { self.cache.insert(key, CachedEntry {
protocol, protocol,
detected_at: Instant::now(), detected_at: Instant::now(),
h3_port,
}); });
} }

View File

@@ -43,6 +43,10 @@ struct ConnActivity {
/// increments on creation and decrements on Drop, keeping the watchdog aware that /// increments on creation and decrements on Drop, keeping the watchdog aware that
/// a response body is still streaming after the request handler has returned. /// a response body is still streaming after the request handler has returned.
active_requests: Option<Arc<AtomicU64>>, active_requests: Option<Arc<AtomicU64>>,
/// Protocol cache key for Alt-Svc discovery. When set, `build_streaming_response`
/// checks the backend's original response headers for Alt-Svc before our
/// ResponseFilter injects its own. None when not in auto-detect mode or after H3 failure.
alt_svc_cache_key: Option<crate::protocol_cache::ProtocolCacheKey>,
} }
/// Default upstream connect timeout (30 seconds). /// Default upstream connect timeout (30 seconds).
@@ -58,6 +62,18 @@ const DEFAULT_WS_INACTIVITY_TIMEOUT: std::time::Duration = std::time::Duration::
/// Default WebSocket max lifetime (24 hours). /// Default WebSocket max lifetime (24 hours).
const DEFAULT_WS_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(86400); const DEFAULT_WS_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(86400);
/// Timeout for QUIC (H3) backend connections. Short because UDP is often firewalled.
const QUIC_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
/// Protocol decision for backend connection.
#[derive(Debug)]
enum ProtocolDecision {
H1,
H2,
H3 { port: u16 },
AlpnProbe,
}
/// RAII guard that decrements the active request counter on drop. /// RAII guard that decrements the active request counter on drop.
/// Ensures the counter is correct even if the request handler panics. /// Ensures the counter is correct even if the request handler panics.
struct ActiveRequestGuard { struct ActiveRequestGuard {
@@ -190,6 +206,9 @@ pub struct HttpProxyService {
ws_inactivity_timeout: std::time::Duration, ws_inactivity_timeout: std::time::Duration,
/// WebSocket maximum connection lifetime. /// WebSocket maximum connection lifetime.
ws_max_lifetime: std::time::Duration, ws_max_lifetime: std::time::Duration,
/// Shared QUIC client endpoint for outbound H3 backend connections.
/// Lazily initialized on first H3 backend attempt.
quinn_client_endpoint: Arc<quinn::Endpoint>,
} }
impl HttpProxyService { impl HttpProxyService {
@@ -209,6 +228,7 @@ impl HttpProxyService {
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT, http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT, ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME, ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
} }
} }
@@ -233,6 +253,7 @@ impl HttpProxyService {
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT, http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT, ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME, ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
} }
} }
@@ -324,7 +345,7 @@ impl HttpProxyService {
let cn = cancel_inner.clone(); let cn = cancel_inner.clone();
let la = Arc::clone(&la_inner); let la = Arc::clone(&la_inner);
let st = start; let st = start;
let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start, active_requests: Some(Arc::clone(&ar_inner)) }; let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start, active_requests: Some(Arc::clone(&ar_inner)), alt_svc_cache_key: None };
async move { async move {
let result = svc.handle_request(req, peer, port, cn, ca).await; let result = svc.handle_request(req, peer, port, cn, ca).await;
// Mark request end — update activity timestamp before guard drops // Mark request end — update activity timestamp before guard drops
@@ -401,7 +422,7 @@ impl HttpProxyService {
peer_addr: std::net::SocketAddr, peer_addr: std::net::SocketAddr,
port: u16, port: u16,
cancel: CancellationToken, cancel: CancellationToken,
conn_activity: ConnActivity, mut conn_activity: ConnActivity,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let host = req.headers() let host = req.headers()
.get("host") .get("host")
@@ -645,37 +666,101 @@ impl HttpProxyService {
// --- Resolve protocol decision based on backend protocol mode --- // --- Resolve protocol decision based on backend protocol mode ---
let is_auto_detect_mode = matches!(backend_protocol_mode, rustproxy_config::BackendProtocol::Auto); let is_auto_detect_mode = matches!(backend_protocol_mode, rustproxy_config::BackendProtocol::Auto);
let (use_h2, needs_alpn_probe) = match backend_protocol_mode { let protocol_cache_key = crate::protocol_cache::ProtocolCacheKey {
rustproxy_config::BackendProtocol::Http1 => (false, false), host: upstream.host.clone(),
rustproxy_config::BackendProtocol::Http2 => (true, false), port: upstream.port,
rustproxy_config::BackendProtocol::Http3 => { requested_host: host.clone(),
// HTTP/3 (QUIC) backend connections not yet implemented — fall back to H1 };
warn!("backendProtocol 'http3' not yet implemented, falling back to http1"); let protocol_decision = match backend_protocol_mode {
(false, false) rustproxy_config::BackendProtocol::Http1 => ProtocolDecision::H1,
} rustproxy_config::BackendProtocol::Http2 => ProtocolDecision::H2,
rustproxy_config::BackendProtocol::Http3 => ProtocolDecision::H3 { port: upstream.port },
rustproxy_config::BackendProtocol::Auto => { rustproxy_config::BackendProtocol::Auto => {
if !upstream.use_tls { if !upstream.use_tls {
// No ALPN without TLS — default to H1 // No ALPN without TLS, no QUIC without TLS — default to H1
(false, false) ProtocolDecision::H1
} else { } else {
let cache_key = crate::protocol_cache::ProtocolCacheKey { match self.protocol_cache.get(&protocol_cache_key) {
host: upstream.host.clone(), Some(cached) => match cached.protocol {
port: upstream.port, crate::protocol_cache::DetectedProtocol::H3 => {
requested_host: host.clone(), if let Some(h3_port) = cached.h3_port {
}; ProtocolDecision::H3 { port: h3_port }
match self.protocol_cache.get(&cache_key) { } else {
Some(crate::protocol_cache::DetectedProtocol::H2) => (true, false), // H3 cached but no port — fall back to ALPN probe
Some(crate::protocol_cache::DetectedProtocol::H1) => (false, false), ProtocolDecision::AlpnProbe
Some(crate::protocol_cache::DetectedProtocol::H3) => { }
// H3 cached but we're on TCP — fall back to H2 probe }
(false, true) crate::protocol_cache::DetectedProtocol::H2 => ProtocolDecision::H2,
} crate::protocol_cache::DetectedProtocol::H1 => ProtocolDecision::H1,
None => (false, true), // needs ALPN probe },
None => ProtocolDecision::AlpnProbe,
} }
} }
} }
}; };
// Derive legacy flags for the existing H1/H2 connection path
let (use_h2, mut needs_alpn_probe) = match &protocol_decision {
ProtocolDecision::H1 => (false, false),
ProtocolDecision::H2 => (true, false),
ProtocolDecision::H3 { .. } => (false, false), // H3 path handled separately below
ProtocolDecision::AlpnProbe => (false, true),
};
// Set Alt-Svc cache key on conn_activity so build_streaming_response can check
// the backend's original Alt-Svc header before ResponseFilter injects our own.
if is_auto_detect_mode {
conn_activity.alt_svc_cache_key = Some(protocol_cache_key.clone());
}
// --- H3 path: try QUIC connection before TCP ---
if let ProtocolDecision::H3 { port: h3_port } = protocol_decision {
let h3_pool_key = crate::connection_pool::PoolKey {
host: upstream.host.clone(),
port: h3_port,
use_tls: true,
protocol: crate::connection_pool::PoolProtocol::H3,
};
// Try H3 pool checkout first
if let Some((quic_conn, _age)) = self.connection_pool.checkout_h3(&h3_pool_key) {
self.metrics.backend_pool_hit(&upstream_key);
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
return result;
}
// Try fresh QUIC connection
match self.connect_quic_backend(&upstream.host, h3_port).await {
Ok(quic_conn) => {
self.metrics.backend_pool_miss(&upstream_key);
self.metrics.backend_connection_opened(&upstream_key, std::time::Instant::now().elapsed());
let result = self.forward_h3(
quic_conn, parts, body, upstream_headers, &upstream_path,
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
).await;
self.upstream_selector.connection_ended(&upstream_key);
return result;
}
Err(e) => {
warn!(backend = %upstream_key, error = %e,
"H3 backend connect failed, falling back to H2/H1");
// Suppress Alt-Svc caching for the fallback to prevent re-caching H3
// from our own injected Alt-Svc header or a stale backend Alt-Svc
conn_activity.alt_svc_cache_key = None;
// Force ALPN probe on TCP fallback so we correctly detect H2 vs H1
// (don't cache anything yet — let the ALPN probe decide)
if is_auto_detect_mode && upstream.use_tls {
needs_alpn_probe = true;
}
// Fall through to TCP path
}
}
}
// --- Connection pooling: try reusing an existing connection first --- // --- Connection pooling: try reusing an existing connection first ---
// For ALPN probe mode, skip pool checkout (we don't know the protocol yet) // For ALPN probe mode, skip pool checkout (we don't know the protocol yet)
if !needs_alpn_probe { if !needs_alpn_probe {
@@ -870,6 +955,7 @@ impl HttpProxyService {
}; };
self.upstream_selector.connection_ended(&upstream_key); self.upstream_selector.connection_ended(&upstream_key);
self.metrics.backend_connection_closed(&upstream_key); self.metrics.backend_connection_closed(&upstream_key);
result result
} }
@@ -1668,6 +1754,19 @@ impl HttpProxyService {
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let (resp_parts, resp_body) = upstream_response.into_parts(); let (resp_parts, resp_body) = upstream_response.into_parts();
// Check for Alt-Svc in the backend's ORIGINAL response headers BEFORE
// ResponseFilter::apply_headers runs — the filter may inject our own Alt-Svc
// for client-facing HTTP/3 advertisement, which must not be confused with
// backend-originated Alt-Svc.
if let Some(ref cache_key) = conn_activity.alt_svc_cache_key {
if let Some(alt_svc) = resp_parts.headers.get("alt-svc").and_then(|v| v.to_str().ok()) {
if let Some(h3_port) = parse_alt_svc_h3_port(alt_svc) {
debug!(h3_port, "Backend advertises H3 via Alt-Svc");
self.protocol_cache.insert_h3(cache_key.clone(), h3_port);
}
}
}
let mut response = Response::builder() let mut response = Response::builder()
.status(resp_parts.status); .status(resp_parts.status);
@@ -2393,6 +2492,252 @@ impl HttpProxyService {
config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
Arc::new(config) Arc::new(config)
} }
/// Create a shared QUIC client endpoint for outbound H3 backend connections.
fn create_quinn_client_endpoint() -> quinn::Endpoint {
let _ = rustls::crypto::ring::default_provider().install_default();
let mut tls_config = rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(Arc::new(InsecureBackendVerifier))
.with_no_client_auth();
tls_config.alpn_protocols = vec![b"h3".to_vec()];
let quic_crypto = quinn::crypto::rustls::QuicClientConfig::try_from(tls_config)
.expect("Failed to create QUIC client crypto config");
let client_config = quinn::ClientConfig::new(Arc::new(quic_crypto));
let mut endpoint = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())
.expect("Failed to create QUIC client endpoint");
endpoint.set_default_client_config(client_config);
endpoint
}
/// Connect to a backend via QUIC (H3).
async fn connect_quic_backend(
&self,
host: &str,
port: u16,
) -> Result<quinn::Connection, Box<dyn std::error::Error + Send + Sync>> {
let addr = tokio::net::lookup_host(format!("{}:{}", host, port))
.await?
.next()
.ok_or("DNS resolution returned no addresses")?;
let server_name = host.to_string();
let connecting = self.quinn_client_endpoint.connect(addr, &server_name)?;
let connection = tokio::time::timeout(QUIC_CONNECT_TIMEOUT, connecting).await
.map_err(|_| "QUIC connect timeout (3s)")??;
debug!("QUIC backend connection established to {}:{}", host, port);
Ok(connection)
}
/// Forward request to backend via HTTP/3 over QUIC.
async fn forward_h3(
&self,
quic_conn: quinn::Connection,
parts: hyper::http::request::Parts,
body: Incoming,
upstream_headers: hyper::HeaderMap,
upstream_path: &str,
route: &rustproxy_config::RouteConfig,
route_id: Option<&str>,
source_ip: &str,
pool_key: &crate::connection_pool::PoolKey,
domain: &str,
conn_activity: &ConnActivity,
backend_key: &str,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let h3_quinn_conn = h3_quinn::Connection::new(quic_conn.clone());
let (mut driver, mut send_request) = match h3::client::new(h3_quinn_conn).await {
Ok(pair) => pair,
Err(e) => {
error!(backend = %backend_key, domain = %domain, error = %e, "H3 client handshake failed");
self.metrics.backend_handshake_error(backend_key);
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 handshake failed"));
}
};
// Spawn the h3 connection driver
let driver_pool = Arc::clone(&self.connection_pool);
let driver_pool_key = pool_key.clone();
let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX));
let driver_gen = Arc::clone(&gen_holder);
tokio::spawn(async move {
let close_err = std::future::poll_fn(|cx| driver.poll_close(cx)).await;
debug!("H3 connection driver closed: {:?}", close_err);
let g = driver_gen.load(std::sync::atomic::Ordering::Relaxed);
if g != u64::MAX {
driver_pool.remove_h3_if_generation(&driver_pool_key, g);
}
});
// Build the H3 request
let uri = hyper::Uri::builder()
.scheme("https")
.authority(domain)
.path_and_query(upstream_path)
.build()
.unwrap_or_else(|_| upstream_path.parse().unwrap_or_default());
let mut h3_req = hyper::Request::builder()
.method(parts.method.clone())
.uri(uri);
if let Some(headers) = h3_req.headers_mut() {
*headers = upstream_headers;
}
let h3_req = h3_req.body(()).unwrap();
// Send the request
let mut stream = match send_request.send_request(h3_req).await {
Ok(s) => s,
Err(e) => {
error!(backend = %backend_key, domain = %domain, error = %e, "H3 send_request failed");
self.metrics.backend_request_error(backend_key);
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 request failed"));
}
};
// Stream request body
let rid: Option<Arc<str>> = route_id.map(Arc::from);
let sip: Arc<str> = Arc::from(source_ip);
{
use http_body_util::BodyExt;
let mut body = body;
while let Some(frame) = body.frame().await {
match frame {
Ok(frame) => {
if let Some(data) = frame.data_ref() {
self.metrics.record_bytes(data.len() as u64, 0, rid.as_deref(), Some(&sip));
if let Err(e) = stream.send_data(Bytes::copy_from_slice(data)).await {
error!(backend = %backend_key, error = %e, "H3 send_data failed");
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 body send failed"));
}
}
}
Err(e) => {
warn!(backend = %backend_key, error = %e, "Client body read error during H3 forward");
break;
}
}
}
// Signal end of body
stream.finish().await.ok();
}
// Read response
let h3_response = match stream.recv_response().await {
Ok(resp) => resp,
Err(e) => {
error!(backend = %backend_key, domain = %domain, error = %e, "H3 recv_response failed");
self.metrics.backend_request_error(backend_key);
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 response failed"));
}
};
// Build the response for the client
let status = h3_response.status();
let mut response = Response::builder().status(status);
if let Some(headers) = response.headers_mut() {
for (name, value) in h3_response.headers() {
let n = name.as_str();
// Skip hop-by-hop headers
if n == "transfer-encoding" || n == "connection" || n == "keep-alive" {
continue;
}
headers.insert(name.clone(), value.clone());
}
ResponseFilter::apply_headers(route, headers, None);
}
// Stream response body back via an adapter
let h3_body = H3ClientResponseBody { stream };
let counting_body = CountingBody::new(
h3_body,
Arc::clone(&self.metrics),
rid,
Some(sip),
Direction::Out,
).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start);
let counting_body = if let Some(ref ar) = conn_activity.active_requests {
counting_body.with_active_requests(Arc::clone(ar))
} else {
counting_body
};
let body: BoxBody<Bytes, hyper::Error> = BoxBody::new(counting_body);
// Register connection in pool on success
if status != StatusCode::BAD_GATEWAY {
let g = self.connection_pool.register_h3(pool_key.clone(), quic_conn);
gen_holder.store(g, std::sync::atomic::Ordering::Relaxed);
}
self.metrics.set_backend_protocol(backend_key, "h3");
Ok(response.body(body).unwrap())
}
}
/// Parse an Alt-Svc header value to extract the H3 port.
/// Handles formats like `h3=":443"; ma=86400` and `h3=":8443", h2=":443"`.
fn parse_alt_svc_h3_port(header_value: &str) -> Option<u16> {
for directive in header_value.split(',') {
let directive = directive.trim();
// Match h3=":<port>" or h3-29=":<port>" etc.
if directive.starts_with("h3=") || directive.starts_with("h3-") {
// Find the port in ":<port>"
if let Some(start) = directive.find("\":") {
let rest = &directive[start + 2..];
if let Some(end) = rest.find('"') {
if let Ok(port) = rest[..end].parse::<u16>() {
return Some(port);
}
}
}
}
}
None
}
/// Response body adapter for H3 client responses.
/// Reads data from the h3 `RequestStream` recv side and presents it as an `http_body::Body`.
struct H3ClientResponseBody {
stream: h3::client::RequestStream<h3_quinn::BidiStream<Bytes>, Bytes>,
}
impl http_body::Body for H3ClientResponseBody {
type Data = Bytes;
type Error = hyper::Error;
fn poll_frame(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
// h3's recv_data is async, so we need to poll it manually.
// Use a small future to poll the recv_data call.
use std::future::Future;
let mut fut = Box::pin(self.stream.recv_data());
match fut.as_mut().poll(_cx) {
Poll::Ready(Ok(Some(mut buf))) => {
use bytes::Buf;
let data = Bytes::copy_from_slice(buf.chunk());
buf.advance(buf.remaining());
Poll::Ready(Some(Ok(http_body::Frame::data(data))))
}
Poll::Ready(Ok(None)) => Poll::Ready(None),
Poll::Ready(Err(e)) => {
warn!("H3 response body recv error: {}", e);
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
} }
/// Insecure certificate verifier for backend TLS connections (fallback only). /// Insecure certificate verifier for backend TLS connections (fallback only).
@@ -2463,6 +2808,7 @@ impl Default for HttpProxyService {
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT, http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT, ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME, ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
} }
} }
} }

View File

@@ -3,13 +3,21 @@
//! Manages QUIC endpoints (via quinn), accepts connections, and either: //! Manages QUIC endpoints (via quinn), accepts connections, and either:
//! - Forwards streams bidirectionally to TCP backends (QUIC termination) //! - Forwards streams bidirectionally to TCP backends (QUIC termination)
//! - Dispatches to H3ProxyService for HTTP/3 handling (Phase 5) //! - Dispatches to H3ProxyService for HTTP/3 handling (Phase 5)
//!
//! When `proxy_ips` is configured, a UDP relay layer intercepts PROXY protocol v2
//! headers before they reach quinn, extracting real client IPs for attribution.
use std::net::SocketAddr; use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::net::UdpSocket;
use tokio::task::JoinHandle;
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use dashmap::DashMap;
use quinn::{Endpoint, ServerConfig as QuinnServerConfig}; use quinn::{Endpoint, ServerConfig as QuinnServerConfig};
use rustls::ServerConfig as RustlsServerConfig; use rustls::ServerConfig as RustlsServerConfig;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@@ -19,8 +27,9 @@ use rustproxy_config::{RouteConfig, TransportProtocol};
use rustproxy_metrics::MetricsCollector; use rustproxy_metrics::MetricsCollector;
use rustproxy_routing::{MatchContext, RouteManager}; use rustproxy_routing::{MatchContext, RouteManager};
use rustproxy_http::h3_service::H3ProxyService;
use crate::connection_tracker::ConnectionTracker; use crate::connection_tracker::ConnectionTracker;
use crate::forwarder::ForwardMetricsCtx;
/// Create a QUIC server endpoint on the given port with the provided TLS config. /// Create a QUIC server endpoint on the given port with the provided TLS config.
/// ///
@@ -46,9 +55,274 @@ pub fn create_quic_endpoint(
Ok(endpoint) Ok(endpoint)
} }
// ===== PROXY protocol relay for QUIC =====
/// Result of creating a QUIC endpoint with a PROXY protocol relay layer.
pub struct QuicProxyRelay {
/// The quinn endpoint (bound to 127.0.0.1:ephemeral).
pub endpoint: Endpoint,
/// The relay recv loop task handle.
pub relay_task: JoinHandle<()>,
/// Maps relay socket local addr → real client SocketAddr (from PROXY v2).
/// Consulted by `quic_accept_loop` to resolve real client IPs.
pub real_client_map: Arc<DashMap<SocketAddr, SocketAddr>>,
}
/// A single relay session for forwarding datagrams between an external source
/// and the internal quinn endpoint.
struct RelaySession {
socket: Arc<UdpSocket>,
last_activity: AtomicU64,
return_task: JoinHandle<()>,
cancel: CancellationToken,
}
/// Create a QUIC endpoint with a PROXY protocol v2 relay layer.
///
/// Instead of giving the external socket to quinn, we:
/// 1. Bind a raw UDP socket on 0.0.0.0:port (external)
/// 2. Bind quinn on 127.0.0.1:0 (internal, ephemeral)
/// 3. Run a relay loop that filters PROXY v2 headers and forwards datagrams
///
/// Only used when `proxy_ips` is non-empty.
pub fn create_quic_endpoint_with_proxy_relay(
port: u16,
tls_config: Arc<RustlsServerConfig>,
proxy_ips: Arc<Vec<IpAddr>>,
cancel: CancellationToken,
) -> anyhow::Result<QuicProxyRelay> {
// Bind external socket on the real port
let external_socket = std::net::UdpSocket::bind(SocketAddr::from(([0, 0, 0, 0], port)))?;
external_socket.set_nonblocking(true)?;
let external_socket = Arc::new(
UdpSocket::from_std(external_socket)
.map_err(|e| anyhow::anyhow!("Failed to wrap external socket: {}", e))?,
);
// Bind quinn on localhost ephemeral port
let internal_socket = std::net::UdpSocket::bind("127.0.0.1:0")?;
let quinn_internal_addr = internal_socket.local_addr()?;
let quic_crypto = quinn::crypto::rustls::QuicServerConfig::try_from(tls_config)
.map_err(|e| anyhow::anyhow!("Failed to create QUIC crypto config: {}", e))?;
let server_config = QuinnServerConfig::with_crypto(Arc::new(quic_crypto));
let endpoint = Endpoint::new(
quinn::EndpointConfig::default(),
Some(server_config),
internal_socket,
quinn::default_runtime()
.ok_or_else(|| anyhow::anyhow!("No async runtime for quinn"))?,
)?;
let real_client_map = Arc::new(DashMap::new());
let relay_task = tokio::spawn(quic_proxy_relay_loop(
external_socket,
quinn_internal_addr,
proxy_ips,
Arc::clone(&real_client_map),
cancel,
));
info!("QUIC endpoint with PROXY relay on port {} (quinn internal: {})", port, quinn_internal_addr);
Ok(QuicProxyRelay { endpoint, relay_task, real_client_map })
}
/// Main relay loop: reads datagrams from the external socket, filters PROXY v2
/// headers from trusted proxy IPs, and forwards everything else to quinn via
/// per-session relay sockets.
async fn quic_proxy_relay_loop(
external_socket: Arc<UdpSocket>,
quinn_internal_addr: SocketAddr,
proxy_ips: Arc<Vec<IpAddr>>,
real_client_map: Arc<DashMap<SocketAddr, SocketAddr>>,
cancel: CancellationToken,
) {
// Maps external source addr → real client addr (from PROXY v2 headers)
let proxy_addr_map: DashMap<SocketAddr, SocketAddr> = DashMap::new();
// Maps external source addr → relay session
let relay_sessions: DashMap<SocketAddr, Arc<RelaySession>> = DashMap::new();
let epoch = Instant::now();
let mut buf = vec![0u8; 65535];
// Inline cleanup: periodically scan relay_sessions for stale entries
let mut last_cleanup = Instant::now();
let cleanup_interval = std::time::Duration::from_secs(30);
let session_timeout_ms: u64 = 120_000;
loop {
let (len, src_addr) = tokio::select! {
_ = cancel.cancelled() => {
debug!("QUIC proxy relay loop cancelled");
break;
}
result = external_socket.recv_from(&mut buf) => {
match result {
Ok(r) => r,
Err(e) => {
warn!("QUIC proxy relay recv error: {}", e);
continue;
}
}
}
};
let datagram = &buf[..len];
// PROXY v2 handling: only on first datagram from a trusted proxy IP
// (before a relay session exists for this source)
if proxy_ips.contains(&src_addr.ip()) && relay_sessions.get(&src_addr).is_none() {
if crate::proxy_protocol::is_proxy_protocol_v2(datagram) {
match crate::proxy_protocol::parse_v2(datagram) {
Ok((header, _consumed)) => {
debug!("QUIC PROXY v2 from {}: real client {}", src_addr, header.source_addr);
proxy_addr_map.insert(src_addr, header.source_addr);
continue; // consume the PROXY v2 datagram
}
Err(e) => {
debug!("QUIC proxy relay: failed to parse PROXY v2 from {}: {}", src_addr, e);
}
}
}
}
// Determine real client address
let real_client = proxy_addr_map.get(&src_addr)
.map(|r| *r)
.unwrap_or(src_addr);
// Get or create relay session for this external source
let session = match relay_sessions.get(&src_addr) {
Some(s) => {
s.last_activity.store(epoch.elapsed().as_millis() as u64, Ordering::Relaxed);
Arc::clone(s.value())
}
None => {
// Create new relay socket connected to quinn's internal address
let relay_socket = match UdpSocket::bind("127.0.0.1:0").await {
Ok(s) => s,
Err(e) => {
warn!("QUIC relay: failed to bind relay socket: {}", e);
continue;
}
};
if let Err(e) = relay_socket.connect(quinn_internal_addr).await {
warn!("QUIC relay: failed to connect relay socket to {}: {}", quinn_internal_addr, e);
continue;
}
let relay_local_addr = match relay_socket.local_addr() {
Ok(a) => a,
Err(e) => {
warn!("QUIC relay: failed to get relay socket local addr: {}", e);
continue;
}
};
let relay_socket = Arc::new(relay_socket);
// Store the real client mapping for the QUIC accept loop
real_client_map.insert(relay_local_addr, real_client);
// Spawn return-path relay: quinn -> external socket -> original source
let session_cancel = cancel.child_token();
let return_task = tokio::spawn(relay_return_path(
Arc::clone(&relay_socket),
Arc::clone(&external_socket),
src_addr,
session_cancel.child_token(),
));
let session = Arc::new(RelaySession {
socket: relay_socket,
last_activity: AtomicU64::new(epoch.elapsed().as_millis() as u64),
return_task,
cancel: session_cancel,
});
relay_sessions.insert(src_addr, Arc::clone(&session));
debug!("QUIC relay: new session for {} (relay {}), real client {}",
src_addr, relay_local_addr, real_client);
session
}
};
// Forward datagram to quinn via the relay socket
if let Err(e) = session.socket.send(datagram).await {
debug!("QUIC relay: forward error to quinn for {}: {}", src_addr, e);
}
// Periodic cleanup of stale relay sessions
if last_cleanup.elapsed() >= cleanup_interval {
last_cleanup = Instant::now();
let now_ms = epoch.elapsed().as_millis() as u64;
let stale_keys: Vec<SocketAddr> = relay_sessions.iter()
.filter(|entry| {
let age = now_ms.saturating_sub(entry.value().last_activity.load(Ordering::Relaxed));
age > session_timeout_ms
})
.map(|entry| *entry.key())
.collect();
for key in stale_keys {
if let Some((_, session)) = relay_sessions.remove(&key) {
session.cancel.cancel();
session.return_task.abort();
// Clean up real_client_map entry
if let Ok(addr) = session.socket.local_addr() {
real_client_map.remove(&addr);
}
proxy_addr_map.remove(&key);
debug!("QUIC relay: cleaned up stale session for {}", key);
}
}
}
}
// Shutdown: cancel all relay sessions
for entry in relay_sessions.iter() {
entry.value().cancel.cancel();
entry.value().return_task.abort();
}
}
/// Return-path relay: receives datagrams from quinn (via the relay socket)
/// and forwards them back to the external client through the external socket.
async fn relay_return_path(
relay_socket: Arc<UdpSocket>,
external_socket: Arc<UdpSocket>,
external_src_addr: SocketAddr,
cancel: CancellationToken,
) {
let mut buf = vec![0u8; 65535];
loop {
let len = tokio::select! {
_ = cancel.cancelled() => break,
result = relay_socket.recv(&mut buf) => {
match result {
Ok(len) => len,
Err(e) => {
debug!("QUIC relay return recv error for {}: {}", external_src_addr, e);
break;
}
}
}
};
if let Err(e) = external_socket.send_to(&buf[..len], external_src_addr).await {
debug!("QUIC relay return send error to {}: {}", external_src_addr, e);
break;
}
}
}
// ===== QUIC accept loop =====
/// Run the QUIC accept loop for a single endpoint. /// Run the QUIC accept loop for a single endpoint.
/// ///
/// Accepts incoming QUIC connections and spawns a task per connection. /// Accepts incoming QUIC connections and spawns a task per connection.
/// When `real_client_map` is provided, it is consulted to resolve real client
/// IPs from PROXY protocol v2 headers (relay socket addr → real client addr).
pub async fn quic_accept_loop( pub async fn quic_accept_loop(
endpoint: Endpoint, endpoint: Endpoint,
port: u16, port: u16,
@@ -56,6 +330,8 @@ pub async fn quic_accept_loop(
metrics: Arc<MetricsCollector>, metrics: Arc<MetricsCollector>,
conn_tracker: Arc<ConnectionTracker>, conn_tracker: Arc<ConnectionTracker>,
cancel: CancellationToken, cancel: CancellationToken,
h3_service: Option<Arc<H3ProxyService>>,
real_client_map: Option<Arc<DashMap<SocketAddr, SocketAddr>>>,
) { ) {
loop { loop {
let incoming = tokio::select! { let incoming = tokio::select! {
@@ -75,11 +351,16 @@ pub async fn quic_accept_loop(
}; };
let remote_addr = incoming.remote_address(); let remote_addr = incoming.remote_address();
let ip = remote_addr.ip();
// Resolve real client IP from PROXY protocol map if available
let real_addr = real_client_map.as_ref()
.and_then(|map| map.get(&remote_addr).map(|r| *r))
.unwrap_or(remote_addr);
let ip = real_addr.ip();
// Per-IP rate limiting // Per-IP rate limiting
if !conn_tracker.try_accept(&ip) { if !conn_tracker.try_accept(&ip) {
debug!("QUIC connection rejected from {} (rate limit)", remote_addr); debug!("QUIC connection rejected from {} (rate limit)", real_addr);
// Drop `incoming` to refuse the connection // Drop `incoming` to refuse the connection
continue; continue;
} }
@@ -102,7 +383,7 @@ pub async fn quic_accept_loop(
let route = match rm.find_route(&ctx) { let route = match rm.find_route(&ctx) {
Some(m) => m.route.clone(), Some(m) => m.route.clone(),
None => { None => {
debug!("No QUIC route matched for port {} from {}", port, remote_addr); debug!("No QUIC route matched for port {} from {}", port, real_addr);
continue; continue;
} }
}; };
@@ -114,11 +395,13 @@ pub async fn quic_accept_loop(
let metrics = Arc::clone(&metrics); let metrics = Arc::clone(&metrics);
let conn_tracker = Arc::clone(&conn_tracker); let conn_tracker = Arc::clone(&conn_tracker);
let cancel = cancel.child_token(); let cancel = cancel.child_token();
let h3_svc = h3_service.clone();
let real_client_addr = if real_addr != remote_addr { Some(real_addr) } else { None };
tokio::spawn(async move { tokio::spawn(async move {
match handle_quic_connection(incoming, route, port, &metrics, &cancel).await { match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel, h3_svc, real_client_addr).await {
Ok(()) => debug!("QUIC connection from {} completed", remote_addr), Ok(()) => debug!("QUIC connection from {} completed", real_addr),
Err(e) => debug!("QUIC connection from {} error: {}", remote_addr, e), Err(e) => debug!("QUIC connection from {} error: {}", real_addr, e),
} }
// Cleanup // Cleanup
@@ -138,12 +421,14 @@ async fn handle_quic_connection(
incoming: quinn::Incoming, incoming: quinn::Incoming,
route: RouteConfig, route: RouteConfig,
port: u16, port: u16,
metrics: &MetricsCollector, metrics: Arc<MetricsCollector>,
cancel: &CancellationToken, cancel: &CancellationToken,
h3_service: Option<Arc<H3ProxyService>>,
real_client_addr: Option<SocketAddr>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let connection = incoming.await?; let connection = incoming.await?;
let remote_addr = connection.remote_address(); let effective_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
debug!("QUIC connection established from {}", remote_addr); debug!("QUIC connection established from {}", effective_addr);
// Check if this route has HTTP/3 enabled // Check if this route has HTTP/3 enabled
let enable_http3 = route.action.udp.as_ref() let enable_http3 = route.action.udp.as_ref()
@@ -152,13 +437,23 @@ async fn handle_quic_connection(
.unwrap_or(false); .unwrap_or(false);
if enable_http3 { if enable_http3 {
// Phase 5: dispatch to H3ProxyService if let Some(ref h3_svc) = h3_service {
// For now, log and accept streams for basic handling debug!("HTTP/3 enabled for route {:?}, dispatching to H3ProxyService", route.name);
debug!("HTTP/3 enabled for route {:?}, dispatching to H3 handler", route.name); h3_svc.handle_connection(connection, &route, port, real_client_addr).await
handle_h3_connection(connection, route, port, metrics, cancel).await } else {
warn!("HTTP/3 enabled for route {:?} but H3ProxyService not initialized", route.name);
// Keep connection alive until cancelled
tokio::select! {
_ = cancel.cancelled() => {}
reason = connection.closed() => {
debug!("HTTP/3 connection closed (no service): {}", reason);
}
}
Ok(())
}
} else { } else {
// Non-HTTP3 QUIC: bidirectional stream forwarding to TCP backend // Non-HTTP3 QUIC: bidirectional stream forwarding to TCP backend
handle_quic_stream_forwarding(connection, route, port, metrics, cancel).await handle_quic_stream_forwarding(connection, route, port, metrics, cancel, real_client_addr).await
} }
} }
@@ -171,11 +466,13 @@ async fn handle_quic_stream_forwarding(
connection: quinn::Connection, connection: quinn::Connection,
route: RouteConfig, route: RouteConfig,
port: u16, port: u16,
_metrics: &MetricsCollector, metrics: Arc<MetricsCollector>,
cancel: &CancellationToken, cancel: &CancellationToken,
real_client_addr: Option<SocketAddr>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let remote_addr = connection.remote_address(); let effective_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
let route_id = route.name.as_deref().or(route.id.as_deref()); let route_id = route.name.as_deref().or(route.id.as_deref());
let metrics_arc = metrics;
// Resolve backend target // Resolve backend target
let target = route.action.targets.as_ref() let target = route.action.targets.as_ref()
@@ -194,7 +491,7 @@ async fn handle_quic_stream_forwarding(
Err(quinn::ConnectionError::ApplicationClosed(_)) => break, Err(quinn::ConnectionError::ApplicationClosed(_)) => break,
Err(quinn::ConnectionError::LocallyClosed) => break, Err(quinn::ConnectionError::LocallyClosed) => break,
Err(e) => { Err(e) => {
debug!("QUIC stream accept error from {}: {}", remote_addr, e); debug!("QUIC stream accept error from {}: {}", effective_addr, e);
break; break;
} }
} }
@@ -202,12 +499,9 @@ async fn handle_quic_stream_forwarding(
}; };
let backend_addr = backend_addr.clone(); let backend_addr = backend_addr.clone();
let ip_str = remote_addr.ip().to_string(); let ip_str = effective_addr.ip().to_string();
let _fwd_ctx = ForwardMetricsCtx { let stream_metrics = Arc::clone(&metrics_arc);
collector: Arc::new(MetricsCollector::new()), // TODO: share real metrics let stream_route_id = route_id.map(|s| s.to_string());
route_id: route_id.map(|s| s.to_string()),
source_ip: Some(ip_str),
};
// Spawn a task for each QUIC stream → TCP bidirectional forwarding // Spawn a task for each QUIC stream → TCP bidirectional forwarding
tokio::spawn(async move { tokio::spawn(async move {
@@ -217,6 +511,11 @@ async fn handle_quic_stream_forwarding(
&backend_addr, &backend_addr,
).await { ).await {
Ok((bytes_in, bytes_out)) => { Ok((bytes_in, bytes_out)) => {
stream_metrics.record_bytes(
bytes_in, bytes_out,
stream_route_id.as_deref(),
Some(&ip_str),
);
debug!("QUIC stream forwarded: {}B in, {}B out", bytes_in, bytes_out); debug!("QUIC stream forwarded: {}B in, {}B out", bytes_in, bytes_out);
} }
Err(e) => { Err(e) => {
@@ -255,29 +554,6 @@ async fn forward_quic_stream_to_tcp(
Ok((bytes_in, bytes_out)) Ok((bytes_in, bytes_out))
} }
/// Placeholder for HTTP/3 connection handling (Phase 5).
///
/// Once h3_service is implemented, this will delegate to it.
async fn handle_h3_connection(
connection: quinn::Connection,
_route: RouteConfig,
_port: u16,
_metrics: &MetricsCollector,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
warn!("HTTP/3 handling not yet fully implemented — accepting connection but no request processing");
// Keep the connection alive until cancelled or closed
tokio::select! {
_ = cancel.cancelled() => {}
reason = connection.closed() => {
debug!("HTTP/3 connection closed: {}", reason);
}
}
Ok(())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@@ -2,18 +2,23 @@
//! //!
//! Binds UDP sockets on configured ports, receives datagrams, matches routes, //! Binds UDP sockets on configured ports, receives datagrams, matches routes,
//! tracks sessions (flows), and forwards datagrams to backend UDP sockets. //! tracks sessions (flows), and forwards datagrams to backend UDP sockets.
//!
//! Supports PROXY protocol v2 on both raw UDP and QUIC paths when `proxy_ips`
//! is configured. For QUIC, a relay layer intercepts datagrams before they
//! reach the quinn endpoint.
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::AsyncWriteExt; use dashmap::DashMap;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::sync::RwLock; use tokio::sync::{Mutex, RwLock};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
@@ -21,13 +26,15 @@ use rustproxy_config::{RouteActionType, TransportProtocol};
use rustproxy_metrics::MetricsCollector; use rustproxy_metrics::MetricsCollector;
use rustproxy_routing::{MatchContext, RouteManager}; use rustproxy_routing::{MatchContext, RouteManager};
use rustproxy_http::h3_service::H3ProxyService;
use crate::connection_tracker::ConnectionTracker; use crate::connection_tracker::ConnectionTracker;
use crate::udp_session::{SessionKey, UdpSession, UdpSessionConfig, UdpSessionTable}; use crate::udp_session::{SessionKey, UdpSession, UdpSessionConfig, UdpSessionTable};
/// Manages UDP listeners across all configured ports. /// Manages UDP listeners across all configured ports.
pub struct UdpListenerManager { pub struct UdpListenerManager {
/// Port → recv loop task handle /// Port → (recv loop task handle, optional QUIC endpoint for TLS updates)
listeners: HashMap<u16, JoinHandle<()>>, listeners: HashMap<u16, (JoinHandle<()>, Option<quinn::Endpoint>)>,
/// Hot-reloadable route table /// Hot-reloadable route table
route_manager: Arc<ArcSwap<RouteManager>>, route_manager: Arc<ArcSwap<RouteManager>>,
/// Shared metrics collector /// Shared metrics collector
@@ -40,6 +47,27 @@ pub struct UdpListenerManager {
cancel_token: CancellationToken, cancel_token: CancellationToken,
/// Unix socket path for datagram handler relay /// Unix socket path for datagram handler relay
datagram_handler_relay: Arc<RwLock<Option<String>>>, datagram_handler_relay: Arc<RwLock<Option<String>>>,
/// Persistent write half of the relay connection
relay_writer: Arc<Mutex<Option<tokio::net::unix::OwnedWriteHalf>>>,
/// Cancel token for the current relay reply reader task
relay_reader_cancel: Option<CancellationToken>,
/// H3 proxy service for HTTP/3 request handling
h3_service: Option<Arc<H3ProxyService>>,
/// Trusted proxy IPs that may send PROXY protocol v2 headers.
/// When non-empty, PROXY v2 detection is enabled on both raw UDP and QUIC paths.
proxy_ips: Arc<Vec<IpAddr>>,
}
impl Drop for UdpListenerManager {
fn drop(&mut self) {
self.cancel_token.cancel();
for (_, (handle, endpoint)) in self.listeners.drain() {
handle.abort();
if let Some(ep) = endpoint {
ep.close(quinn::VarInt::from_u32(0), b"shutdown");
}
}
}
} }
impl UdpListenerManager { impl UdpListenerManager {
@@ -57,9 +85,26 @@ impl UdpListenerManager {
session_table: Arc::new(UdpSessionTable::new()), session_table: Arc::new(UdpSessionTable::new()),
cancel_token, cancel_token,
datagram_handler_relay: Arc::new(RwLock::new(None)), datagram_handler_relay: Arc::new(RwLock::new(None)),
relay_writer: Arc::new(Mutex::new(None)),
relay_reader_cancel: None,
h3_service: None,
proxy_ips: Arc::new(Vec::new()),
} }
} }
/// Set the trusted proxy IPs for PROXY protocol v2 detection.
pub fn set_proxy_ips(&mut self, ips: Vec<IpAddr>) {
if !ips.is_empty() {
info!("UDP/QUIC PROXY protocol v2 enabled for {} trusted IPs", ips.len());
}
self.proxy_ips = Arc::new(ips);
}
/// Set the H3 proxy service for HTTP/3 request handling.
pub fn set_h3_service(&mut self, svc: Arc<H3ProxyService>) {
self.h3_service = Some(svc);
}
/// Update the route manager (for hot-reload). /// Update the route manager (for hot-reload).
pub fn update_routes(&self, route_manager: Arc<RouteManager>) { pub fn update_routes(&self, route_manager: Arc<RouteManager>) {
self.route_manager.store(route_manager); self.route_manager.store(route_manager);
@@ -94,18 +139,44 @@ impl UdpListenerManager {
if has_quic { if has_quic {
if let Some(tls) = tls_config { if let Some(tls) = tls_config {
// Create QUIC endpoint if self.proxy_ips.is_empty() {
let endpoint = crate::quic_handler::create_quic_endpoint(port, tls)?; // Direct path: quinn owns the external socket (zero overhead)
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop( let endpoint = crate::quic_handler::create_quic_endpoint(port, tls)?;
endpoint, let endpoint_for_updates = endpoint.clone();
port, let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
Arc::clone(&self.route_manager), endpoint,
Arc::clone(&self.metrics), port,
Arc::clone(&self.conn_tracker), Arc::clone(&self.route_manager),
self.cancel_token.child_token(), Arc::clone(&self.metrics),
)); Arc::clone(&self.conn_tracker),
self.listeners.insert(port, handle); self.cancel_token.child_token(),
info!("QUIC endpoint started on port {}", port); self.h3_service.clone(),
None,
));
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
info!("QUIC endpoint started on port {}", port);
} else {
// Proxy relay path: we own external socket, quinn on localhost
let relay = crate::quic_handler::create_quic_endpoint_with_proxy_relay(
port,
tls,
Arc::clone(&self.proxy_ips),
self.cancel_token.child_token(),
)?;
let endpoint_for_updates = relay.endpoint.clone();
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
relay.endpoint,
port,
Arc::clone(&self.route_manager),
Arc::clone(&self.metrics),
Arc::clone(&self.conn_tracker),
self.cancel_token.child_token(),
self.h3_service.clone(),
Some(relay.real_client_map),
));
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
info!("QUIC endpoint with PROXY relay started on port {}", port);
}
return Ok(()); return Ok(());
} else { } else {
warn!("QUIC routes on port {} but no TLS config provided, falling back to raw UDP", port); warn!("QUIC routes on port {} but no TLS config provided, falling back to raw UDP", port);
@@ -126,10 +197,12 @@ impl UdpListenerManager {
Arc::clone(&self.conn_tracker), Arc::clone(&self.conn_tracker),
Arc::clone(&self.session_table), Arc::clone(&self.session_table),
Arc::clone(&self.datagram_handler_relay), Arc::clone(&self.datagram_handler_relay),
Arc::clone(&self.relay_writer),
self.cancel_token.child_token(), self.cancel_token.child_token(),
Arc::clone(&self.proxy_ips),
)); ));
self.listeners.insert(port, handle); self.listeners.insert(port, (handle, None));
// Start the session cleanup task if this is the first port // Start the session cleanup task if this is the first port
if self.listeners.len() == 1 { if self.listeners.len() == 1 {
@@ -141,8 +214,11 @@ impl UdpListenerManager {
/// Stop listening on a UDP port. /// Stop listening on a UDP port.
pub fn remove_port(&mut self, port: u16) { pub fn remove_port(&mut self, port: u16) {
if let Some(handle) = self.listeners.remove(&port) { if let Some((handle, endpoint)) = self.listeners.remove(&port) {
handle.abort(); handle.abort();
if let Some(ep) = endpoint {
ep.close(quinn::VarInt::from_u32(0), b"port removed");
}
info!("UDP listener removed from port {}", port); info!("UDP listener removed from port {}", port);
} }
} }
@@ -157,24 +233,223 @@ impl UdpListenerManager {
/// Stop all listeners and clean up. /// Stop all listeners and clean up.
pub async fn stop(&mut self) { pub async fn stop(&mut self) {
self.cancel_token.cancel(); self.cancel_token.cancel();
for (port, handle) in self.listeners.drain() { for (port, (handle, endpoint)) in self.listeners.drain() {
handle.abort(); handle.abort();
if let Some(ep) = endpoint {
ep.close(quinn::VarInt::from_u32(0), b"shutdown");
}
debug!("UDP listener stopped on port {}", port); debug!("UDP listener stopped on port {}", port);
} }
info!("All UDP listeners stopped, {} sessions remaining", info!("All UDP listeners stopped, {} sessions remaining",
self.session_table.session_count()); self.session_table.session_count());
} }
/// Set the datagram handler relay socket path. /// Update TLS config on all active QUIC endpoints (cert refresh).
pub async fn set_datagram_handler_relay(&self, path: String) { /// Only affects new incoming connections — existing connections are undisturbed.
let mut relay = self.datagram_handler_relay.write().await; /// Uses quinn's Endpoint::set_server_config() for zero-downtime hot-swap.
*relay = Some(path); pub fn update_quic_tls(&self, tls_config: Arc<rustls::ServerConfig>) {
for (port, (_handle, endpoint)) in &self.listeners {
if let Some(ep) = endpoint {
match quinn::crypto::rustls::QuicServerConfig::try_from(Arc::clone(&tls_config)) {
Ok(quic_crypto) => {
let server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_crypto));
ep.set_server_config(Some(server_config));
info!("Updated QUIC TLS config on port {}", port);
}
Err(e) => {
warn!("Failed to update QUIC TLS config on port {}: {}", port, e);
}
}
}
}
}
/// Upgrade raw UDP fallback listeners to QUIC endpoints.
///
/// At startup, if no TLS certs are available, QUIC routes fall back to raw UDP.
/// When certs become available later (via loadCertificate IPC or ACME), this method
/// stops the raw UDP listener, drains sessions, and creates a proper QUIC endpoint.
///
/// This is idempotent — ports that already have QUIC endpoints are skipped.
pub async fn upgrade_raw_to_quic(&mut self, tls_config: Arc<rustls::ServerConfig>) {
// Find ports that are raw UDP fallback (endpoint=None) but have QUIC routes
let rm = self.route_manager.load();
let upgrade_ports: Vec<u16> = self.listeners.iter()
.filter(|(_, (_, endpoint))| endpoint.is_none())
.filter(|(port, _)| {
rm.routes_for_port(**port).iter().any(|r| {
r.action.udp.as_ref()
.and_then(|u| u.quic.as_ref())
.is_some()
})
})
.map(|(port, _)| *port)
.collect();
for port in upgrade_ports {
info!("Upgrading raw UDP listener on port {} to QUIC endpoint", port);
// Stop the raw UDP listener task and drain sessions to release the socket
if let Some((handle, _)) = self.listeners.remove(&port) {
handle.abort();
}
let drained = self.session_table.drain_port(
port, &self.metrics, &self.conn_tracker,
);
if drained > 0 {
debug!("Drained {} UDP sessions on port {} for QUIC upgrade", drained, port);
}
// Brief yield to let aborted tasks drop their socket references
tokio::task::yield_now().await;
// Create QUIC endpoint on the now-free port
let create_result = if self.proxy_ips.is_empty() {
self.create_quic_direct(port, Arc::clone(&tls_config))
} else {
self.create_quic_with_relay(port, Arc::clone(&tls_config))
};
match create_result {
Ok(()) => {
info!("QUIC endpoint started on port {} (upgraded from raw UDP)", port);
}
Err(e) => {
// Port may still be held — retry once after a brief delay
warn!("QUIC endpoint creation failed on port {}, retrying: {}", port, e);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let retry_result = if self.proxy_ips.is_empty() {
self.create_quic_direct(port, Arc::clone(&tls_config))
} else {
self.create_quic_with_relay(port, Arc::clone(&tls_config))
};
match retry_result {
Ok(()) => {
info!("QUIC endpoint started on port {} (upgraded from raw UDP, retry)", port);
}
Err(e2) => {
error!("Failed to upgrade port {} to QUIC after retry: {}. \
Rebinding as raw UDP.", port, e2);
// Fallback: rebind as raw UDP so the port isn't dead
if let Ok(()) = self.rebind_raw_udp(port).await {
warn!("Port {} rebound as raw UDP (QUIC upgrade failed)", port);
}
}
}
}
}
}
}
/// Create a direct QUIC endpoint (quinn owns the socket).
fn create_quic_direct(&mut self, port: u16, tls_config: Arc<rustls::ServerConfig>) -> anyhow::Result<()> {
let endpoint = crate::quic_handler::create_quic_endpoint(port, tls_config)?;
let endpoint_for_updates = endpoint.clone();
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
endpoint,
port,
Arc::clone(&self.route_manager),
Arc::clone(&self.metrics),
Arc::clone(&self.conn_tracker),
self.cancel_token.child_token(),
self.h3_service.clone(),
None,
));
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
Ok(())
}
/// Create a QUIC endpoint with PROXY protocol relay.
fn create_quic_with_relay(&mut self, port: u16, tls_config: Arc<rustls::ServerConfig>) -> anyhow::Result<()> {
let relay = crate::quic_handler::create_quic_endpoint_with_proxy_relay(
port,
tls_config,
Arc::clone(&self.proxy_ips),
self.cancel_token.child_token(),
)?;
let endpoint_for_updates = relay.endpoint.clone();
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
relay.endpoint,
port,
Arc::clone(&self.route_manager),
Arc::clone(&self.metrics),
Arc::clone(&self.conn_tracker),
self.cancel_token.child_token(),
self.h3_service.clone(),
Some(relay.real_client_map),
));
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
Ok(())
}
/// Rebind a port as a raw UDP listener (fallback when QUIC upgrade fails).
async fn rebind_raw_udp(&mut self, port: u16) -> anyhow::Result<()> {
let addr: std::net::SocketAddr = ([0, 0, 0, 0], port).into();
let socket = UdpSocket::bind(addr).await?;
let socket = Arc::new(socket);
let handle = tokio::spawn(Self::recv_loop(
socket,
port,
Arc::clone(&self.route_manager),
Arc::clone(&self.metrics),
Arc::clone(&self.conn_tracker),
Arc::clone(&self.session_table),
Arc::clone(&self.datagram_handler_relay),
Arc::clone(&self.relay_writer),
self.cancel_token.child_token(),
Arc::clone(&self.proxy_ips),
));
self.listeners.insert(port, (handle, None));
Ok(())
}
/// Set the datagram handler relay socket path and establish connection.
pub async fn set_datagram_handler_relay(&mut self, path: String) {
// Cancel previous relay reader task if any
if let Some(old_cancel) = self.relay_reader_cancel.take() {
old_cancel.cancel();
}
// Store the path
{
let mut relay = self.datagram_handler_relay.write().await;
*relay = Some(path.clone());
}
// Connect to the Unix socket
match tokio::net::UnixStream::connect(&path).await {
Ok(stream) => {
let (read_half, write_half) = stream.into_split();
// Store write half for sending datagrams
{
let mut writer = self.relay_writer.lock().await;
*writer = Some(write_half);
}
// Spawn reply reader — reads length-prefixed JSON replies from TS
// and sends them back to clients via the listener sockets
let cancel = self.cancel_token.child_token();
self.relay_reader_cancel = Some(cancel.clone());
tokio::spawn(Self::relay_reply_reader(read_half, cancel));
info!("Datagram handler relay connected to {}", path);
}
Err(e) => {
error!("Failed to connect datagram handler relay to {}: {}", path, e);
}
}
} }
/// Start periodic session cleanup task. /// Start periodic session cleanup task.
fn start_cleanup_task(&self) { fn start_cleanup_task(&self) {
let session_table = Arc::clone(&self.session_table); let session_table = Arc::clone(&self.session_table);
let metrics = Arc::clone(&self.metrics); let metrics = Arc::clone(&self.metrics);
let conn_tracker = Arc::clone(&self.conn_tracker);
let cancel = self.cancel_token.child_token(); let cancel = self.cancel_token.child_token();
let route_manager = Arc::clone(&self.route_manager); let route_manager = Arc::clone(&self.route_manager);
@@ -188,7 +463,7 @@ impl UdpListenerManager {
// or default 60s if none configured) // or default 60s if none configured)
let rm = route_manager.load(); let rm = route_manager.load();
let timeout_ms = Self::get_min_session_timeout(&rm); let timeout_ms = Self::get_min_session_timeout(&rm);
let removed = session_table.cleanup_idle(timeout_ms, &metrics); let removed = session_table.cleanup_idle(timeout_ms, &metrics, &conn_tracker);
if removed > 0 { if removed > 0 {
debug!("UDP session cleanup: removed {} idle sessions, {} remaining", debug!("UDP session cleanup: removed {} idle sessions, {} remaining",
removed, session_table.session_count()); removed, session_table.session_count());
@@ -206,6 +481,10 @@ impl UdpListenerManager {
} }
/// Main receive loop for a UDP port. /// Main receive loop for a UDP port.
///
/// When `proxy_ips` is non-empty, the first datagram from a trusted proxy IP
/// is checked for PROXY protocol v2. If found, the real client IP is extracted
/// and used for all subsequent session handling for that source address.
async fn recv_loop( async fn recv_loop(
socket: Arc<UdpSocket>, socket: Arc<UdpSocket>,
port: u16, port: u16,
@@ -213,12 +492,18 @@ impl UdpListenerManager {
metrics: Arc<MetricsCollector>, metrics: Arc<MetricsCollector>,
conn_tracker: Arc<ConnectionTracker>, conn_tracker: Arc<ConnectionTracker>,
session_table: Arc<UdpSessionTable>, session_table: Arc<UdpSessionTable>,
datagram_handler_relay: Arc<RwLock<Option<String>>>, _datagram_handler_relay: Arc<RwLock<Option<String>>>,
relay_writer: Arc<Mutex<Option<tokio::net::unix::OwnedWriteHalf>>>,
cancel: CancellationToken, cancel: CancellationToken,
proxy_ips: Arc<Vec<IpAddr>>,
) { ) {
// Use a reasonably large buffer; actual max is per-route but we need a single buffer // Use a reasonably large buffer; actual max is per-route but we need a single buffer
let mut buf = vec![0u8; 65535]; let mut buf = vec![0u8; 65535];
// Maps proxy source addr → real client addr (from PROXY v2 headers).
// Only populated when proxy_ips is non-empty.
let proxy_addr_map: DashMap<SocketAddr, SocketAddr> = DashMap::new();
loop { loop {
let (len, client_addr) = tokio::select! { let (len, client_addr) = tokio::select! {
_ = cancel.cancelled() => { _ = cancel.cancelled() => {
@@ -238,9 +523,39 @@ impl UdpListenerManager {
let datagram = &buf[..len]; let datagram = &buf[..len];
// Route matching // PROXY protocol v2 detection for datagrams from trusted proxy IPs
let effective_client_ip = if !proxy_ips.is_empty() && proxy_ips.contains(&client_addr.ip()) {
let session_key: SessionKey = (client_addr, port);
if session_table.get(&session_key).is_none() && !proxy_addr_map.contains_key(&client_addr) {
// No session and no prior PROXY header — check for PROXY v2
if crate::proxy_protocol::is_proxy_protocol_v2(datagram) {
match crate::proxy_protocol::parse_v2(datagram) {
Ok((header, _consumed)) => {
debug!("UDP PROXY v2 from {}: real client {}", client_addr, header.source_addr);
proxy_addr_map.insert(client_addr, header.source_addr);
continue; // discard the PROXY v2 datagram
}
Err(e) => {
debug!("UDP PROXY v2 parse error from {}: {}", client_addr, e);
client_addr.ip()
}
}
} else {
client_addr.ip()
}
} else {
// Use real client IP if we've previously seen a PROXY v2 header
proxy_addr_map.get(&client_addr)
.map(|r| r.ip())
.unwrap_or_else(|| client_addr.ip())
}
} else {
client_addr.ip()
};
// Route matching — use effective (real) client IP
let rm = route_manager.load(); let rm = route_manager.load();
let ip_str = client_addr.ip().to_string(); let ip_str = effective_client_ip.to_string();
let ctx = MatchContext { let ctx = MatchContext {
port, port,
domain: None, domain: None,
@@ -264,21 +579,16 @@ impl UdpListenerManager {
let route = route_match.route; let route = route_match.route;
let route_id = route.name.as_deref().or(route.id.as_deref()); let route_id = route.name.as_deref().or(route.id.as_deref());
// Socket handler routes → relay datagram to TS via Unix socket // Socket handler routes → relay datagram to TS via persistent Unix socket
if route.action.action_type == RouteActionType::SocketHandler { if route.action.action_type == RouteActionType::SocketHandler {
let relay_path = datagram_handler_relay.read().await; if let Err(e) = Self::relay_datagram_via_writer(
if let Some(ref path) = *relay_path { &relay_writer,
if let Err(e) = Self::relay_datagram_to_ts( route_id.unwrap_or("unknown"),
path, &client_addr,
route_id.unwrap_or("unknown"), port,
&client_addr, datagram,
port, ).await {
datagram, debug!("Failed to relay UDP datagram to TS: {}", e);
).await {
debug!("Failed to relay UDP datagram to TS: {}", e);
}
} else {
debug!("UDP datagram handler relay not configured for route {:?}", route_id);
} }
continue; continue;
} }
@@ -294,20 +604,21 @@ impl UdpListenerManager {
} }
// Session lookup or create // Session lookup or create
// Session key uses the proxy's source addr for correct return-path routing
let session_key: SessionKey = (client_addr, port); let session_key: SessionKey = (client_addr, port);
let session = match session_table.get(&session_key) { let session = match session_table.get(&session_key) {
Some(s) => s, Some(s) => s,
None => { None => {
// New session — check per-IP limits // New session — check per-IP limits using the real client IP
if !conn_tracker.try_accept(&client_addr.ip()) { if !conn_tracker.try_accept(&effective_client_ip) {
debug!("UDP session rejected for {} (rate limit)", client_addr); debug!("UDP session rejected for {} (rate limit)", effective_client_ip);
continue; continue;
} }
if !session_table.can_create_session( if !session_table.can_create_session(
&client_addr.ip(), &effective_client_ip,
udp_config.max_sessions_per_ip, udp_config.max_sessions_per_ip,
) { ) {
debug!("UDP session rejected for {} (per-IP session limit)", client_addr); debug!("UDP session rejected for {} (per-IP session limit)", effective_client_ip);
continue; continue;
} }
@@ -340,8 +651,8 @@ impl UdpListenerManager {
} }
let backend_socket = Arc::new(backend_socket); let backend_socket = Arc::new(backend_socket);
debug!("New UDP session: {} -> {} (via port {})", debug!("New UDP session: {} -> {} (via port {}, real client {})",
client_addr, backend_addr, port); client_addr, backend_addr, port, effective_client_ip);
// Spawn return-path relay task // Spawn return-path relay task
let session_cancel = CancellationToken::new(); let session_cancel = CancellationToken::new();
@@ -361,7 +672,7 @@ impl UdpListenerManager {
last_activity: std::sync::atomic::AtomicU64::new(session_table.elapsed_ms()), last_activity: std::sync::atomic::AtomicU64::new(session_table.elapsed_ms()),
created_at: std::time::Instant::now(), created_at: std::time::Instant::now(),
route_id: route_id.map(|s| s.to_string()), route_id: route_id.map(|s| s.to_string()),
source_ip: client_addr.ip(), source_ip: effective_client_ip,
client_addr, client_addr,
return_task, return_task,
cancel: session_cancel, cancel: session_cancel,
@@ -372,8 +683,8 @@ impl UdpListenerManager {
continue; continue;
} }
// Track in metrics // Track in metrics using the real client IP
conn_tracker.connection_opened(&client_addr.ip()); conn_tracker.connection_opened(&effective_client_ip);
metrics.connection_opened(route_id, Some(&ip_str)); metrics.connection_opened(route_id, Some(&ip_str));
metrics.udp_session_opened(); metrics.udp_session_opened();
@@ -441,10 +752,9 @@ impl UdpListenerManager {
} }
} }
/// Relay a UDP datagram to the TypeScript handler via Unix socket. /// Send a datagram to TS via the persistent relay writer.
/// Uses length-prefixed JSON framing: [4-byte BE length][JSON payload] async fn relay_datagram_via_writer(
async fn relay_datagram_to_ts( writer: &Mutex<Option<tokio::net::unix::OwnedWriteHalf>>,
relay_path: &str,
route_key: &str, route_key: &str,
client_addr: &SocketAddr, client_addr: &SocketAddr,
dest_port: u16, dest_port: u16,
@@ -463,8 +773,9 @@ impl UdpListenerManager {
}); });
let json = serde_json::to_vec(&msg)?; let json = serde_json::to_vec(&msg)?;
// Connect to relay (one-shot for now; persistent connection optimization deferred) let mut guard = writer.lock().await;
let mut stream = tokio::net::UnixStream::connect(relay_path).await?; let stream = guard.as_mut()
.ok_or_else(|| anyhow::anyhow!("Datagram relay not connected"))?;
// Length-prefixed frame // Length-prefixed frame
let len_bytes = (json.len() as u32).to_be_bytes(); let len_bytes = (json.len() as u32).to_be_bytes();
@@ -474,4 +785,101 @@ impl UdpListenerManager {
Ok(()) Ok(())
} }
/// Background task reading reply frames from the TS datagram handler.
/// Parses replies and sends them back to the original client via UDP.
async fn relay_reply_reader(
mut reader: tokio::net::unix::OwnedReadHalf,
cancel: CancellationToken,
) {
use base64::Engine;
let mut len_buf = [0u8; 4];
loop {
// Read length prefix
let read_result = tokio::select! {
_ = cancel.cancelled() => break,
result = reader.read_exact(&mut len_buf) => result,
};
match read_result {
Ok(_) => {}
Err(e) => {
debug!("Datagram relay reader closed: {}", e);
break;
}
}
let frame_len = u32::from_be_bytes(len_buf) as usize;
if frame_len > 10 * 1024 * 1024 {
error!("Datagram relay frame too large: {} bytes", frame_len);
break;
}
let mut frame_buf = vec![0u8; frame_len];
match reader.read_exact(&mut frame_buf).await {
Ok(_) => {}
Err(e) => {
debug!("Datagram relay reader frame error: {}", e);
break;
}
}
// Parse the reply JSON
let reply: serde_json::Value = match serde_json::from_slice(&frame_buf) {
Ok(v) => v,
Err(e) => {
debug!("Datagram relay reply parse error: {}", e);
continue;
}
};
if reply.get("type").and_then(|v| v.as_str()) != Some("reply") {
continue;
}
let source_ip = reply.get("sourceIp").and_then(|v| v.as_str()).unwrap_or("");
let source_port = reply.get("sourcePort").and_then(|v| v.as_u64()).unwrap_or(0) as u16;
let dest_port = reply.get("destPort").and_then(|v| v.as_u64()).unwrap_or(0) as u16;
let payload_b64 = reply.get("payloadBase64").and_then(|v| v.as_str()).unwrap_or("");
let payload = match base64::engine::general_purpose::STANDARD.decode(payload_b64) {
Ok(p) => p,
Err(e) => {
debug!("Datagram relay reply base64 decode error: {}", e);
continue;
}
};
let client_addr: SocketAddr = match format!("{}:{}", source_ip, source_port).parse() {
Ok(a) => a,
Err(e) => {
debug!("Datagram relay reply address parse error: {}", e);
continue;
}
};
// Send the reply back to the client via a temporary UDP socket bound to the dest_port
// We need the listener socket for this port. For simplicity, use a fresh socket.
let reply_socket = match UdpSocket::bind(format!("0.0.0.0:{}", dest_port)).await {
Ok(s) => s,
Err(_) => {
// Port already bound by the listener — use unbound socket
match UdpSocket::bind("0.0.0.0:0").await {
Ok(s) => s,
Err(e) => {
debug!("Failed to create reply socket: {}", e);
continue;
}
}
}
};
if let Err(e) = reply_socket.send_to(&payload, client_addr).await {
debug!("Failed to send datagram reply to {}: {}", client_addr, e);
}
}
debug!("Datagram relay reply reader stopped");
}
} }

View File

@@ -18,6 +18,8 @@ use tracing::debug;
use rustproxy_metrics::MetricsCollector; use rustproxy_metrics::MetricsCollector;
use crate::connection_tracker::ConnectionTracker;
/// A single UDP session (flow). /// A single UDP session (flow).
pub struct UdpSession { pub struct UdpSession {
/// Socket bound to ephemeral port, connected to backend /// Socket bound to ephemeral port, connected to backend
@@ -165,6 +167,7 @@ impl UdpSessionTable {
&self, &self,
timeout_ms: u64, timeout_ms: u64,
metrics: &MetricsCollector, metrics: &MetricsCollector,
conn_tracker: &ConnectionTracker,
) -> usize { ) -> usize {
let now_ms = self.elapsed_ms(); let now_ms = self.elapsed_ms();
let mut removed = 0; let mut removed = 0;
@@ -185,6 +188,7 @@ impl UdpSessionTable {
session.client_addr, key.1, session.client_addr, key.1,
now_ms.saturating_sub(session.last_activity.load(Ordering::Relaxed)) now_ms.saturating_sub(session.last_activity.load(Ordering::Relaxed))
); );
conn_tracker.connection_closed(&session.source_ip);
metrics.connection_closed( metrics.connection_closed(
session.route_id.as_deref(), session.route_id.as_deref(),
Some(&session.source_ip.to_string()), Some(&session.source_ip.to_string()),
@@ -197,6 +201,36 @@ impl UdpSessionTable {
removed removed
} }
/// Drain all sessions on a given listening port, releasing socket references.
/// Used when upgrading a raw UDP listener to QUIC — the raw UDP socket's
/// Arc refcount must drop to zero so the port can be rebound.
pub fn drain_port(
&self,
port: u16,
metrics: &MetricsCollector,
conn_tracker: &ConnectionTracker,
) -> usize {
let keys: Vec<SessionKey> = self.sessions.iter()
.filter(|entry| entry.key().1 == port)
.map(|entry| *entry.key())
.collect();
let mut removed = 0;
for key in keys {
if let Some(session) = self.remove(&key) {
session.cancel.cancel();
conn_tracker.connection_closed(&session.source_ip);
metrics.connection_closed(
session.route_id.as_deref(),
Some(&session.source_ip.to_string()),
);
metrics.udp_session_closed();
removed += 1;
}
}
removed
}
/// Total number of active sessions. /// Total number of active sessions.
pub fn session_count(&self) -> usize { pub fn session_count(&self) -> usize {
self.sessions.len() self.sessions.len()

View File

@@ -264,6 +264,8 @@ impl RustProxy {
conn_config.socket_timeout_ms, conn_config.socket_timeout_ms,
conn_config.max_connection_lifetime_ms, conn_config.max_connection_lifetime_ms,
); );
// Clone proxy_ips before conn_config is moved into the TCP listener
let udp_proxy_ips = conn_config.proxy_ips.clone();
listener.set_connection_config(conn_config); listener.set_connection_config(conn_config);
// Share the socket-handler relay path with the listener // Share the socket-handler relay path with the listener
@@ -339,6 +341,18 @@ impl RustProxy {
conn_tracker, conn_tracker,
self.cancel_token.clone(), self.cancel_token.clone(),
); );
udp_mgr.set_proxy_ips(udp_proxy_ips.clone());
// Construct H3ProxyService for HTTP/3 request handling
let h3_svc = rustproxy_http::h3_service::H3ProxyService::new(
Arc::new(ArcSwap::from(Arc::clone(&*self.route_table.load()))),
Arc::clone(&self.metrics),
Arc::new(rustproxy_http::connection_pool::ConnectionPool::new()),
Arc::new(rustproxy_http::protocol_cache::ProtocolCache::new()),
rustproxy_passthrough::tls_handler::shared_backend_tls_config(),
std::time::Duration::from_secs(30),
);
udp_mgr.set_h3_service(Arc::new(h3_svc));
for port in &udp_ports { for port in &udp_ports {
udp_mgr.add_port_with_tls(*port, quic_tls_config.clone()).await?; udp_mgr.add_port_with_tls(*port, quic_tls_config.clone()).await?;
@@ -763,22 +777,31 @@ impl RustProxy {
if self.udp_listener_manager.is_none() { if self.udp_listener_manager.is_none() {
if let Some(ref listener) = self.listener_manager { if let Some(ref listener) = self.listener_manager {
let conn_tracker = listener.conn_tracker().clone(); let conn_tracker = listener.conn_tracker().clone();
self.udp_listener_manager = Some(UdpListenerManager::new( let conn_config = Self::build_connection_config(&self.options);
let mut udp_mgr = UdpListenerManager::new(
Arc::clone(&new_manager), Arc::clone(&new_manager),
Arc::clone(&self.metrics), Arc::clone(&self.metrics),
conn_tracker, conn_tracker,
self.cancel_token.clone(), self.cancel_token.clone(),
)); );
udp_mgr.set_proxy_ips(conn_config.proxy_ips);
self.udp_listener_manager = Some(udp_mgr);
} }
} }
// Build TLS config for QUIC (needed for new ports and upgrading existing raw UDP)
let quic_tls = {
let tls_configs = self.current_tls_configs().await;
Self::build_quic_tls_config(&tls_configs)
};
if let Some(ref mut udp_mgr) = self.udp_listener_manager { if let Some(ref mut udp_mgr) = self.udp_listener_manager {
udp_mgr.update_routes(Arc::clone(&new_manager)); udp_mgr.update_routes(Arc::clone(&new_manager));
// Add new UDP ports // Add new UDP ports (with TLS for QUIC)
for port in &new_udp_ports { for port in &new_udp_ports {
if !old_udp_ports.contains(port) { if !old_udp_ports.contains(port) {
udp_mgr.add_port(*port).await?; udp_mgr.add_port_with_tls(*port, quic_tls.clone()).await?;
} }
} }
// Remove old UDP ports // Remove old UDP ports
@@ -787,6 +810,12 @@ impl RustProxy {
udp_mgr.remove_port(*port); udp_mgr.remove_port(*port);
} }
} }
// Upgrade existing raw UDP fallback listeners to QUIC if TLS is now available
if let Some(ref quic_config) = quic_tls {
udp_mgr.update_quic_tls(Arc::clone(quic_config));
udp_mgr.upgrade_raw_to_quic(Arc::clone(quic_config)).await;
}
} }
} else if self.udp_listener_manager.is_some() { } else if self.udp_listener_manager.is_some() {
// All UDP routes removed — shut down UDP manager // All UDP routes removed — shut down UDP manager
@@ -843,12 +872,12 @@ impl RustProxy {
.map_err(|e| anyhow::anyhow!("ACME provisioning failed: {}", e))?; .map_err(|e| anyhow::anyhow!("ACME provisioning failed: {}", e))?;
// Hot-swap into TLS configs // Hot-swap into TLS configs
if let Some(ref mut listener) = self.listener_manager { let mut tls_configs = Self::extract_tls_configs(&self.options.routes);
let mut tls_configs = Self::extract_tls_configs(&self.options.routes); tls_configs.insert(domain.clone(), TlsCertConfig {
tls_configs.insert(domain.clone(), TlsCertConfig { cert_pem: bundle.cert_pem.clone(),
cert_pem: bundle.cert_pem.clone(), key_pem: bundle.key_pem.clone(),
key_pem: bundle.key_pem.clone(), });
}); {
let cm = cm_arc.lock().await; let cm = cm_arc.lock().await;
for (d, b) in cm.store().iter() { for (d, b) in cm.store().iter() {
if !tls_configs.contains_key(d) { if !tls_configs.contains_key(d) {
@@ -858,9 +887,22 @@ impl RustProxy {
}); });
} }
} }
}
let quic_tls = Self::build_quic_tls_config(&tls_configs);
if let Some(ref listener) = self.listener_manager {
listener.set_tls_configs(tls_configs); listener.set_tls_configs(tls_configs);
} }
// Update existing QUIC endpoints and upgrade raw UDP fallback listeners
if let Some(ref mut udp_mgr) = self.udp_listener_manager {
if let Some(ref quic_config) = quic_tls {
udp_mgr.update_quic_tls(Arc::clone(quic_config));
udp_mgr.upgrade_raw_to_quic(Arc::clone(quic_config)).await;
}
}
info!("Certificate provisioned and loaded for route '{}'", route_name); info!("Certificate provisioned and loaded for route '{}'", route_name);
Ok(()) Ok(())
} }
@@ -1005,10 +1047,37 @@ impl RustProxy {
Some(Arc::new(tls_config)) Some(Arc::new(tls_config))
} }
/// Build the current full TLS config map from all sources (route configs, loaded certs, cert manager).
async fn current_tls_configs(&self) -> HashMap<String, TlsCertConfig> {
let mut configs = Self::extract_tls_configs(&self.options.routes);
// Merge dynamically loaded certs (from loadCertificate IPC)
for (d, c) in &self.loaded_certs {
if !configs.contains_key(d) {
configs.insert(d.clone(), c.clone());
}
}
// Merge certs from cert manager store
if let Some(ref cm_arc) = self.cert_manager {
let cm = cm_arc.lock().await;
for (d, b) in cm.store().iter() {
if !configs.contains_key(d) {
configs.insert(d.clone(), TlsCertConfig {
cert_pem: b.cert_pem.clone(),
key_pem: b.key_pem.clone(),
});
}
}
}
configs
}
/// Set the Unix domain socket path for relaying UDP datagrams to TypeScript datagramHandler callbacks. /// Set the Unix domain socket path for relaying UDP datagrams to TypeScript datagramHandler callbacks.
pub async fn set_datagram_handler_relay_path(&mut self, path: Option<String>) { pub async fn set_datagram_handler_relay_path(&mut self, path: Option<String>) {
info!("Datagram handler relay path set to: {:?}", path); info!("Datagram handler relay path set to: {:?}", path);
if let Some(ref udp_mgr) = self.udp_listener_manager { if let Some(ref mut udp_mgr) = self.udp_listener_manager {
if let Some(ref p) = path { if let Some(ref p) = path {
udp_mgr.set_datagram_handler_relay(p.clone()).await; udp_mgr.set_datagram_handler_relay(p.clone()).await;
} }
@@ -1055,39 +1124,24 @@ impl RustProxy {
key_pem: key_pem.clone(), key_pem: key_pem.clone(),
}); });
// Hot-swap TLS config on the listener // Hot-swap TLS config on TCP and QUIC listeners
if let Some(ref mut listener) = self.listener_manager { let tls_configs = self.current_tls_configs().await;
let mut tls_configs = Self::extract_tls_configs(&self.options.routes);
// Add the new cert // Build QUIC TLS config before TCP consumes the map
tls_configs.insert(domain.to_string(), TlsCertConfig { let quic_tls = Self::build_quic_tls_config(&tls_configs);
cert_pem: cert_pem.clone(),
key_pem: key_pem.clone(),
});
// Also include all existing certs from cert manager
if let Some(ref cm_arc) = self.cert_manager {
let cm = cm_arc.lock().await;
for (d, b) in cm.store().iter() {
if !tls_configs.contains_key(d) {
tls_configs.insert(d.clone(), TlsCertConfig {
cert_pem: b.cert_pem.clone(),
key_pem: b.key_pem.clone(),
});
}
}
}
// Merge dynamically loaded certs from previous loadCertificate calls
for (d, c) in &self.loaded_certs {
if !tls_configs.contains_key(d) {
tls_configs.insert(d.clone(), c.clone());
}
}
if let Some(ref listener) = self.listener_manager {
listener.set_tls_configs(tls_configs); listener.set_tls_configs(tls_configs);
} }
// Update existing QUIC endpoints and upgrade raw UDP fallback listeners
if let Some(ref mut udp_mgr) = self.udp_listener_manager {
if let Some(ref quic_config) = quic_tls {
udp_mgr.update_quic_tls(Arc::clone(quic_config));
udp_mgr.upgrade_raw_to_quic(Arc::clone(quic_config)).await;
}
}
info!("Certificate loaded and TLS config updated for {}", domain); info!("Certificate loaded and TLS config updated for {}", domain);
Ok(()) Ok(())
} }

View File

@@ -0,0 +1,125 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as dgram from 'dgram';
import { SmartProxy } from '../ts/index.js';
import type { TDatagramHandler, IDatagramInfo } from '../ts/index.js';
import { findFreePorts, assertPortsFree } from './helpers/port-allocator.js';
let smartProxy: SmartProxy;
let PROXY_PORT: number;
// Helper: send a single UDP datagram and wait for a response
function sendDatagram(port: number, msg: string, timeoutMs = 5000): Promise<string> {
return new Promise((resolve, reject) => {
const client = dgram.createSocket('udp4');
const timeout = setTimeout(() => {
client.close();
reject(new Error(`UDP response timeout after ${timeoutMs}ms`));
}, timeoutMs);
client.send(Buffer.from(msg), port, '127.0.0.1');
client.on('message', (data) => {
clearTimeout(timeout);
client.close();
resolve(data.toString());
});
client.on('error', (err) => {
clearTimeout(timeout);
client.close();
reject(err);
});
});
}
tap.test('setup: start SmartProxy with datagramHandler', async () => {
[PROXY_PORT] = await findFreePorts(1);
const handler: TDatagramHandler = (datagram, info, reply) => {
reply(Buffer.from(`Handled: ${datagram.toString()}`));
};
smartProxy = new SmartProxy({
routes: [
{
name: 'dgram-handler-test',
match: {
ports: PROXY_PORT,
transport: 'udp' as const,
},
action: {
type: 'socket-handler',
datagramHandler: handler,
},
},
],
defaults: {
security: {
ipAllowList: ['127.0.0.1', '::1', '::ffff:127.0.0.1'],
},
},
});
await smartProxy.start();
});
tap.test('datagram handler: receives and replies to datagram', async () => {
const response = await sendDatagram(PROXY_PORT, 'Hello Handler');
expect(response).toEqual('Handled: Hello Handler');
});
tap.test('datagram handler: async handler works', async () => {
// Stop and restart with async handler
await smartProxy.stop();
[PROXY_PORT] = await findFreePorts(1);
const asyncHandler: TDatagramHandler = async (datagram, info, reply) => {
// Simulate async work
await new Promise<void>((resolve) => setTimeout(resolve, 10));
reply(Buffer.from(`Async: ${datagram.toString()}`));
};
smartProxy = new SmartProxy({
routes: [
{
name: 'dgram-async-handler',
match: {
ports: PROXY_PORT,
transport: 'udp' as const,
},
action: {
type: 'socket-handler',
datagramHandler: asyncHandler,
},
},
],
defaults: {
security: {
ipAllowList: ['127.0.0.1', '::1', '::ffff:127.0.0.1'],
},
},
});
await smartProxy.start();
const response = await sendDatagram(PROXY_PORT, 'Test Async');
expect(response).toEqual('Async: Test Async');
});
tap.test('datagram handler: multiple rapid datagrams', async () => {
const promises: Promise<string>[] = [];
for (let i = 0; i < 5; i++) {
promises.push(sendDatagram(PROXY_PORT, `msg-${i}`));
}
const responses = await Promise.all(promises);
for (let i = 0; i < 5; i++) {
expect(responses).toContain(`Async: msg-${i}`);
}
});
tap.test('cleanup: stop SmartProxy', async () => {
await smartProxy.stop();
await assertPortsFree([PROXY_PORT]);
});
export default tap.start();

View File

@@ -174,7 +174,7 @@ tap.test('Route Validation - validateRouteAction', async () => {
const invalidSocketResult = validateRouteAction(invalidSocketAction); const invalidSocketResult = validateRouteAction(invalidSocketAction);
expect(invalidSocketResult.valid).toBeFalse(); expect(invalidSocketResult.valid).toBeFalse();
expect(invalidSocketResult.errors.length).toBeGreaterThan(0); expect(invalidSocketResult.errors.length).toBeGreaterThan(0);
expect(invalidSocketResult.errors[0]).toInclude('Socket handler function is required'); expect(invalidSocketResult.errors[0]).toInclude('handler function is required');
}); });
tap.test('Route Validation - validateRouteConfig', async () => { tap.test('Route Validation - validateRouteConfig', async () => {

142
test/test.udp-forwarding.ts Normal file
View File

@@ -0,0 +1,142 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as dgram from 'dgram';
import { SmartProxy } from '../ts/index.js';
import { findFreePorts, assertPortsFree } from './helpers/port-allocator.js';
let smartProxy: SmartProxy;
let backendServer: dgram.Socket;
let PROXY_PORT: number;
let BACKEND_PORT: number;
// Helper: send a single UDP datagram and wait for a response
function sendDatagram(port: number, msg: string, timeoutMs = 5000): Promise<string> {
return new Promise((resolve, reject) => {
const client = dgram.createSocket('udp4');
const timeout = setTimeout(() => {
client.close();
reject(new Error(`UDP response timeout after ${timeoutMs}ms`));
}, timeoutMs);
client.send(Buffer.from(msg), port, '127.0.0.1');
client.on('message', (data) => {
clearTimeout(timeout);
client.close();
resolve(data.toString());
});
client.on('error', (err) => {
clearTimeout(timeout);
client.close();
reject(err);
});
});
}
// Helper: create a UDP echo server
function createUdpEchoServer(port: number): Promise<dgram.Socket> {
return new Promise((resolve) => {
const server = dgram.createSocket('udp4');
server.on('message', (msg, rinfo) => {
server.send(Buffer.from(`Echo: ${msg.toString()}`), rinfo.port, rinfo.address);
});
server.bind(port, '127.0.0.1', () => resolve(server));
});
}
tap.test('setup: start UDP echo server and SmartProxy', async () => {
[PROXY_PORT, BACKEND_PORT] = await findFreePorts(2);
// Start backend UDP echo server
backendServer = await createUdpEchoServer(BACKEND_PORT);
// Start SmartProxy with a UDP forwarding route
smartProxy = new SmartProxy({
routes: [
{
name: 'udp-forward-test',
match: {
ports: PROXY_PORT,
transport: 'udp' as const,
},
action: {
type: 'forward',
targets: [{ host: '127.0.0.1', port: BACKEND_PORT }],
udp: {
sessionTimeout: 5000,
},
},
},
],
defaults: {
security: {
ipAllowList: ['127.0.0.1', '::1', '::ffff:127.0.0.1'],
},
},
});
await smartProxy.start();
});
tap.test('UDP forwarding: basic datagram round-trip', async () => {
const response = await sendDatagram(PROXY_PORT, 'Hello UDP');
expect(response).toEqual('Echo: Hello UDP');
});
tap.test('UDP forwarding: multiple datagrams same session', async () => {
// Use a single client socket for session reuse
const client = dgram.createSocket('udp4');
const responses: string[] = [];
const done = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
client.close();
reject(new Error('Timeout waiting for 3 responses'));
}, 5000);
client.on('message', (data) => {
responses.push(data.toString());
if (responses.length === 3) {
clearTimeout(timeout);
client.close();
resolve();
}
});
client.on('error', (err) => {
clearTimeout(timeout);
client.close();
reject(err);
});
});
client.send(Buffer.from('msg1'), PROXY_PORT, '127.0.0.1');
client.send(Buffer.from('msg2'), PROXY_PORT, '127.0.0.1');
client.send(Buffer.from('msg3'), PROXY_PORT, '127.0.0.1');
await done;
expect(responses).toContain('Echo: msg1');
expect(responses).toContain('Echo: msg2');
expect(responses).toContain('Echo: msg3');
});
tap.test('UDP forwarding: multiple clients', async () => {
const [resp1, resp2] = await Promise.all([
sendDatagram(PROXY_PORT, 'client1'),
sendDatagram(PROXY_PORT, 'client2'),
]);
expect(resp1).toEqual('Echo: client1');
expect(resp2).toEqual('Echo: client2');
});
tap.test('UDP forwarding: large datagram (1400 bytes)', async () => {
const payload = 'X'.repeat(1400);
const response = await sendDatagram(PROXY_PORT, payload);
expect(response).toEqual(`Echo: ${payload}`);
});
tap.test('cleanup: stop SmartProxy and backend', async () => {
await smartProxy.stop();
await new Promise<void>((resolve) => backendServer.close(() => resolve()));
await assertPortsFree([PROXY_PORT, BACKEND_PORT]);
});
export default tap.start();

114
test/test.udp-metrics.ts Normal file
View File

@@ -0,0 +1,114 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as dgram from 'dgram';
import { SmartProxy } from '../ts/index.js';
import { findFreePorts, assertPortsFree } from './helpers/port-allocator.js';
let smartProxy: SmartProxy;
let backendServer: dgram.Socket;
let PROXY_PORT: number;
let BACKEND_PORT: number;
// Helper: send a single UDP datagram and wait for a response
function sendDatagram(port: number, msg: string, timeoutMs = 5000): Promise<string> {
return new Promise((resolve, reject) => {
const client = dgram.createSocket('udp4');
const timeout = setTimeout(() => {
client.close();
reject(new Error(`UDP response timeout after ${timeoutMs}ms`));
}, timeoutMs);
client.send(Buffer.from(msg), port, '127.0.0.1');
client.on('message', (data) => {
clearTimeout(timeout);
client.close();
resolve(data.toString());
});
client.on('error', (err) => {
clearTimeout(timeout);
client.close();
reject(err);
});
});
}
// Helper: create a UDP echo server
function createUdpEchoServer(port: number): Promise<dgram.Socket> {
return new Promise((resolve) => {
const server = dgram.createSocket('udp4');
server.on('message', (msg, rinfo) => {
server.send(Buffer.from(`Echo: ${msg.toString()}`), rinfo.port, rinfo.address);
});
server.bind(port, '127.0.0.1', () => resolve(server));
});
}
tap.test('setup: start UDP echo server and SmartProxy with metrics', async () => {
[PROXY_PORT, BACKEND_PORT] = await findFreePorts(2);
backendServer = await createUdpEchoServer(BACKEND_PORT);
smartProxy = new SmartProxy({
routes: [
{
name: 'udp-metrics-test',
match: {
ports: PROXY_PORT,
transport: 'udp' as const,
},
action: {
type: 'forward',
targets: [{ host: '127.0.0.1', port: BACKEND_PORT }],
udp: {
sessionTimeout: 10000,
},
},
},
],
defaults: {
security: {
ipAllowList: ['127.0.0.1', '::1', '::ffff:127.0.0.1'],
},
},
metrics: {
enabled: true,
sampleIntervalMs: 1000,
retentionSeconds: 60,
},
});
await smartProxy.start();
});
tap.test('UDP metrics: counters increase after traffic', async () => {
// Send a few datagrams
const resp1 = await sendDatagram(PROXY_PORT, 'metrics-test-1');
expect(resp1).toEqual('Echo: metrics-test-1');
const resp2 = await sendDatagram(PROXY_PORT, 'metrics-test-2');
expect(resp2).toEqual('Echo: metrics-test-2');
// Wait for metrics to propagate and cache to refresh
await new Promise<void>((resolve) => setTimeout(resolve, 2000));
// Get metrics (returns the adapter, need to ensure cache is fresh)
const metrics = smartProxy.getMetrics();
// The udp property reads from the Rust JSON snapshot
expect(metrics.udp).toBeDefined();
const totalSessions = metrics.udp.totalSessions();
const datagramsIn = metrics.udp.datagramsIn();
const datagramsOut = metrics.udp.datagramsOut();
console.log(`UDP metrics: sessions=${totalSessions}, in=${datagramsIn}, out=${datagramsOut}`);
expect(totalSessions).toBeGreaterThan(0);
expect(datagramsIn).toBeGreaterThan(0);
expect(datagramsOut).toBeGreaterThan(0);
});
tap.test('cleanup: stop SmartProxy and backend', async () => {
await smartProxy.stop();
await new Promise<void>((resolve) => backendServer.close(() => resolve()));
await assertPortsFree([PROXY_PORT, BACKEND_PORT]);
});
export default tap.start();

View File

@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartproxy', name: '@push.rocks/smartproxy',
version: '25.13.0', version: '25.17.0',
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.' description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
} }

View File

@@ -303,6 +303,21 @@ export class SmartProxy extends plugins.EventEmitter {
this.socketHandlerServer = null; this.socketHandlerServer = null;
} }
// Update datagram handler relay if datagram handler routes changed
const hasDatagramHandlers = newRoutes.some(
(r) => r.action.type === 'socket-handler' && r.action.datagramHandler
);
if (hasDatagramHandlers && !this.datagramHandlerServer) {
const dgPath = `/tmp/smartproxy-dgram-relay-${process.pid}.sock`;
this.datagramHandlerServer = new DatagramHandlerServer(dgPath, this.preprocessor);
await this.datagramHandlerServer.start();
await this.bridge.setDatagramHandlerRelay(this.datagramHandlerServer.getSocketPath());
} else if (!hasDatagramHandlers && this.datagramHandlerServer) {
await this.datagramHandlerServer.stop();
this.datagramHandlerServer = null;
}
// Update stored routes // Update stored routes
this.settings.routes = newRoutes; this.settings.routes = newRoutes;

View File

@@ -7,7 +7,7 @@ import type { IRouteConfig, IRouteMatch, IRouteAction, TPortRange } from '../mod
export class RouteValidator { export class RouteValidator {
private static readonly VALID_TLS_MODES = ['terminate', 'passthrough', 'terminate-and-reencrypt']; private static readonly VALID_TLS_MODES = ['terminate', 'passthrough', 'terminate-and-reencrypt'];
private static readonly VALID_ACTION_TYPES = ['forward', 'socket-handler']; private static readonly VALID_ACTION_TYPES = ['forward', 'socket-handler'];
private static readonly VALID_PROTOCOLS = ['tcp', 'http', 'https', 'ws', 'wss']; private static readonly VALID_PROTOCOLS = ['tcp', 'http', 'https', 'ws', 'wss', 'udp', 'quic', 'http3'];
private static readonly MAX_PORTS = 100; private static readonly MAX_PORTS = 100;
private static readonly MAX_DOMAINS = 1000; private static readonly MAX_DOMAINS = 1000;
private static readonly MAX_HEADER_SIZE = 8192; private static readonly MAX_HEADER_SIZE = 8192;
@@ -123,10 +123,10 @@ export class RouteValidator {
errors.push(`Invalid action type: ${route.action.type}. Must be one of: ${this.VALID_ACTION_TYPES.join(', ')}`); errors.push(`Invalid action type: ${route.action.type}. Must be one of: ${this.VALID_ACTION_TYPES.join(', ')}`);
} }
// Validate socket-handler // Validate socket-handler (TCP socketHandler or UDP datagramHandler)
if (route.action.type === 'socket-handler') { if (route.action.type === 'socket-handler') {
if (typeof route.action.socketHandler !== 'function') { if (typeof route.action.socketHandler !== 'function' && typeof route.action.datagramHandler !== 'function') {
errors.push('socket-handler action requires a socketHandler function'); errors.push('socket-handler action requires a socketHandler or datagramHandler function');
} }
} }
@@ -173,6 +173,22 @@ export class RouteValidator {
} }
} }
} }
// QUIC routes require TLS with termination (QUIC mandates TLS 1.3)
if (route.action.udp?.quic && route.action.type === 'forward') {
if (!route.action.tls) {
errors.push('QUIC routes require TLS configuration (action.tls) — QUIC mandates TLS 1.3');
} else if (route.action.tls.mode === 'passthrough') {
errors.push('QUIC routes cannot use TLS mode "passthrough" — use "terminate" or "terminate-and-reencrypt"');
}
}
// Protocol quic/http3 requires transport udp or all
if (route.match?.protocol && ['quic', 'http3'].includes(route.match.protocol)) {
if (route.match.transport && route.match.transport !== 'udp' && route.match.transport !== 'all') {
errors.push(`Protocol "${route.match.protocol}" requires transport "udp" or "all"`);
}
}
} }
// Validate security settings // Validate security settings
@@ -619,11 +635,22 @@ export function validateRouteAction(action: IRouteAction): { valid: boolean; err
} }
} }
// QUIC routes require TLS with termination
if (action.udp?.quic && action.type === 'forward') {
if (!action.tls) {
errors.push('QUIC routes require TLS configuration — QUIC mandates TLS 1.3');
} else if (action.tls.mode === 'passthrough') {
errors.push('QUIC routes cannot use TLS mode "passthrough"');
}
}
if (action.type === 'socket-handler') { if (action.type === 'socket-handler') {
if (!action.socketHandler) { if (!action.socketHandler && !action.datagramHandler) {
errors.push('Socket handler function is required for socket-handler action'); errors.push('Socket handler or datagram handler function is required for socket-handler action');
} else if (typeof action.socketHandler !== 'function') { } else if (action.socketHandler && typeof action.socketHandler !== 'function') {
errors.push('Socket handler must be a function'); errors.push('Socket handler must be a function');
} else if (action.datagramHandler && typeof action.datagramHandler !== 'function') {
errors.push('Datagram handler must be a function');
} }
} }
@@ -714,7 +741,8 @@ export function hasRequiredPropertiesForAction(route: IRouteConfig, actionType:
route.action.targets.length > 0 && route.action.targets.length > 0 &&
route.action.targets.every(t => t.host && t.port !== undefined); route.action.targets.every(t => t.host && t.port !== undefined);
case 'socket-handler': case 'socket-handler':
return !!route.action.socketHandler && typeof route.action.socketHandler === 'function'; return (!!route.action.socketHandler && typeof route.action.socketHandler === 'function') ||
(!!route.action.datagramHandler && typeof route.action.datagramHandler === 'function');
default: default:
return false; return false;
} }