Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 15d0a721d5 | |||
| af970c447e | |||
| 9e1103e7a7 | |||
| 2b990527ac | |||
| 9595f0a9fc | |||
| 0fb3988123 | |||
| 53938df8db | |||
| e890bda8fc |
30
changelog.md
30
changelog.md
@@ -1,5 +1,35 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-03-19 - 25.16.0 - feat(quic,http3)
|
||||
add HTTP/3 proxy handling and hot-reload QUIC TLS configuration
|
||||
|
||||
- initialize and wire H3ProxyService into QUIC listeners so HTTP/3 requests are handled instead of being kept as placeholder connections
|
||||
- add backend HTTP/3 support with protocol caching that stores Alt-Svc advertised H3 ports for auto-detection
|
||||
- hot-swap TLS certificates across active QUIC endpoints and require terminating TLS for QUIC route validation
|
||||
- document QUIC route setup with required TLS and ACME configuration
|
||||
|
||||
## 2026-03-19 - 25.15.0 - feat(readme)
|
||||
document UDP, QUIC, and HTTP/3 support in the README
|
||||
|
||||
- Adds README examples for UDP datagram handlers, QUIC/HTTP3 forwarding, and dual-stack TCP/UDP routes
|
||||
- Expands configuration and API reference sections to cover transport matching, UDP/QUIC options, backend transport selection, and UDP metrics
|
||||
- Updates architecture and feature descriptions to reflect UDP, QUIC, HTTP/3, and datagram handler capabilities
|
||||
|
||||
## 2026-03-19 - 25.14.1 - fix(deps)
|
||||
update build and runtime dependencies and align route validation test expectations
|
||||
|
||||
- split the test preparation step into a dedicated test:before script while keeping test execution separate
|
||||
- bump development tooling and runtime package versions in package.json
|
||||
- adjust the route validation test to match the current generic handler error message
|
||||
|
||||
## 2026-03-19 - 25.14.0 - feat(udp,http3)
|
||||
add UDP datagram handler relay support and stream HTTP/3 request bodies to backends
|
||||
|
||||
- establish a persistent Unix socket relay for UDP datagram handlers and process handler replies back to clients
|
||||
- update route validation and smart proxy route reload logic to support datagramHandler routes
|
||||
- record UDP, QUIC, and HTTP/3 byte metrics more accurately, including request bytes in and UDP session cleanup connection tracking
|
||||
- add integration tests for UDP forwarding, datagram handlers, and UDP metrics
|
||||
|
||||
## 2026-03-19 - 25.13.0 - feat(smart-proxy)
|
||||
add UDP transport support with QUIC/HTTP3 routing and datagram handler relay
|
||||
|
||||
|
||||
19
package.json
19
package.json
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartproxy",
|
||||
"version": "25.13.0",
|
||||
"version": "25.16.0",
|
||||
"private": false,
|
||||
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
|
||||
"main": "dist_ts/index.js",
|
||||
@@ -9,27 +9,28 @@
|
||||
"author": "Lossless GmbH",
|
||||
"license": "MIT",
|
||||
"scripts": {
|
||||
"test": "(tsrust) && (tstest test/**/test*.ts --verbose --timeout 60 --logfile)",
|
||||
"test:before": "(tsrust)",
|
||||
"test": "(tstest test/**/test*.ts --verbose --timeout 60 --logfile)",
|
||||
"build": "(tsbuild tsfolders --allowimplicitany) && (tsrust)",
|
||||
"format": "(gitzone format)",
|
||||
"buildDocs": "tsdoc"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@git.zone/tsbuild": "^4.1.2",
|
||||
"@git.zone/tsbuild": "^4.3.0",
|
||||
"@git.zone/tsrun": "^2.0.1",
|
||||
"@git.zone/tsrust": "^1.3.0",
|
||||
"@git.zone/tstest": "^3.1.8",
|
||||
"@git.zone/tstest": "^3.5.0",
|
||||
"@push.rocks/smartserve": "^2.0.1",
|
||||
"@types/node": "^25.2.3",
|
||||
"@types/node": "^25.5.0",
|
||||
"typescript": "^5.9.3",
|
||||
"why-is-node-running": "^3.2.2"
|
||||
},
|
||||
"dependencies": {
|
||||
"@push.rocks/smartcrypto": "^2.0.4",
|
||||
"@push.rocks/smartlog": "^3.1.10",
|
||||
"@push.rocks/smartrust": "^1.2.1",
|
||||
"@tsclass/tsclass": "^9.3.0",
|
||||
"minimatch": "^10.2.0"
|
||||
"@push.rocks/smartlog": "^3.2.1",
|
||||
"@push.rocks/smartrust": "^1.3.2",
|
||||
"@tsclass/tsclass": "^9.5.0",
|
||||
"minimatch": "^10.2.4"
|
||||
},
|
||||
"files": [
|
||||
"ts/**/*",
|
||||
|
||||
4777
pnpm-lock.yaml
generated
4777
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
218
readme.md
218
readme.md
@@ -1,6 +1,6 @@
|
||||
# @push.rocks/smartproxy 🚀
|
||||
|
||||
**A high-performance, Rust-powered proxy toolkit for Node.js** — unified route-based configuration for SSL/TLS termination, HTTP/HTTPS reverse proxying, WebSocket support, load balancing, custom protocol handlers, and kernel-level NFTables forwarding.
|
||||
**A high-performance, Rust-powered proxy toolkit for Node.js** — unified route-based configuration for SSL/TLS termination, HTTP/HTTPS reverse proxying, WebSocket support, UDP/QUIC/HTTP3, load balancing, custom protocol handlers, and kernel-level NFTables forwarding.
|
||||
|
||||
## 📦 Installation
|
||||
|
||||
@@ -16,9 +16,9 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
|
||||
|
||||
## 🎯 What is SmartProxy?
|
||||
|
||||
SmartProxy is a production-ready proxy solution that takes the complexity out of traffic management. Under the hood, all networking — TCP, TLS, HTTP reverse proxy, connection tracking, security enforcement, and NFTables — is handled by a **Rust engine** for maximum performance, while you configure everything through a clean TypeScript API with full type safety.
|
||||
SmartProxy is a production-ready proxy solution that takes the complexity out of traffic management. Under the hood, all networking — TCP, UDP, TLS, HTTP reverse proxy, QUIC/HTTP3, connection tracking, security enforcement, and NFTables — is handled by a **Rust engine** for maximum performance, while you configure everything through a clean TypeScript API with full type safety.
|
||||
|
||||
Whether you're building microservices, deploying edge infrastructure, or need a battle-tested reverse proxy with automatic Let's Encrypt certificates, SmartProxy has you covered.
|
||||
Whether you're building microservices, deploying edge infrastructure, proxying UDP-based protocols, or need a battle-tested reverse proxy with automatic Let's Encrypt certificates, SmartProxy has you covered.
|
||||
|
||||
### ⚡ Key Features
|
||||
|
||||
@@ -29,11 +29,12 @@ Whether you're building microservices, deploying edge infrastructure, or need a
|
||||
| 🔒 **Automatic SSL/TLS** | Zero-config HTTPS with Let's Encrypt ACME integration |
|
||||
| 🎯 **Flexible Matching** | Route by port, domain, path, protocol, client IP, TLS version, headers, or custom logic |
|
||||
| 🚄 **High-Performance** | Choose between user-space or kernel-level (NFTables) forwarding |
|
||||
| 📡 **UDP & QUIC/HTTP3** | First-class UDP transport, datagram handlers, QUIC tunneling, and HTTP/3 support |
|
||||
| ⚖️ **Load Balancing** | Round-robin, least-connections, IP-hash with health checks |
|
||||
| 🛡️ **Enterprise Security** | IP filtering, rate limiting, basic auth, JWT auth, connection limits |
|
||||
| 🔌 **WebSocket Support** | First-class WebSocket proxying with ping/pong keep-alive |
|
||||
| 🎮 **Custom Protocols** | Socket handlers for implementing any protocol in TypeScript |
|
||||
| 📊 **Live Metrics** | Real-time throughput, connection counts, and performance data |
|
||||
| 🎮 **Custom Protocols** | Socket and datagram handlers for implementing any protocol in TypeScript |
|
||||
| 📊 **Live Metrics** | Real-time throughput, connection counts, UDP sessions, and performance data |
|
||||
| 🔧 **Dynamic Management** | Add/remove ports and routes at runtime without restarts |
|
||||
| 🔄 **PROXY Protocol** | Full PROXY protocol v1/v2 support for preserving client information |
|
||||
| 💾 **Consumer Cert Storage** | Bring your own persistence — SmartProxy never writes certs to disk |
|
||||
@@ -89,7 +90,7 @@ SmartProxy uses a powerful **match/action** pattern that makes routing predictab
|
||||
```
|
||||
|
||||
Every route consists of:
|
||||
- **Match** — What traffic to capture (ports, domains, paths, protocol, IPs, headers)
|
||||
- **Match** — What traffic to capture (ports, domains, paths, transport, protocol, IPs, headers)
|
||||
- **Action** — What to do with it (`forward` or `socket-handler`)
|
||||
- **Security** (optional) — IP allow/block lists, rate limits, authentication
|
||||
- **Headers** (optional) — Request/response header manipulation with template variables
|
||||
@@ -197,7 +198,7 @@ apiRoute = addRateLimiting(apiRoute, {
|
||||
const proxy = new SmartProxy({ routes: [apiRoute] });
|
||||
```
|
||||
|
||||
### 🎮 Custom Protocol Handler
|
||||
### 🎮 Custom Protocol Handler (TCP)
|
||||
|
||||
SmartProxy lets you implement any protocol with full socket control. Routes with JavaScript socket handlers are automatically relayed from the Rust engine back to your TypeScript code:
|
||||
|
||||
@@ -247,6 +248,105 @@ const proxy = new SmartProxy({ routes: [echoRoute, customRoute] });
|
||||
| `SocketHandlers.httpBlock(status, message)` | HTTP block response |
|
||||
| `SocketHandlers.block(message)` | Block with optional message |
|
||||
|
||||
### 📡 UDP Datagram Handler
|
||||
|
||||
Handle raw UDP datagrams with custom TypeScript logic — perfect for DNS, game servers, IoT protocols, or any UDP-based service:
|
||||
|
||||
```typescript
|
||||
import { SmartProxy } from '@push.rocks/smartproxy';
|
||||
import type { IRouteConfig, TDatagramHandler, IDatagramInfo } from '@push.rocks/smartproxy';
|
||||
|
||||
// Custom UDP echo handler
|
||||
const udpHandler: TDatagramHandler = (datagram, info, reply) => {
|
||||
console.log(`UDP from ${info.sourceIp}:${info.sourcePort} on port ${info.destPort}`);
|
||||
reply(datagram); // Echo it back
|
||||
};
|
||||
|
||||
const proxy = new SmartProxy({
|
||||
routes: [{
|
||||
name: 'udp-echo',
|
||||
match: {
|
||||
ports: 5353,
|
||||
transport: 'udp' // 👈 Listen for UDP datagrams
|
||||
},
|
||||
action: {
|
||||
type: 'socket-handler',
|
||||
datagramHandler: udpHandler, // 👈 Process each datagram
|
||||
udp: {
|
||||
sessionTimeout: 60000, // Session idle timeout (ms)
|
||||
maxSessionsPerIP: 100,
|
||||
maxDatagramSize: 65535
|
||||
}
|
||||
}
|
||||
}]
|
||||
});
|
||||
|
||||
await proxy.start();
|
||||
```
|
||||
|
||||
### 📡 QUIC / HTTP3 Forwarding
|
||||
|
||||
Forward QUIC traffic to backends with optional protocol translation (e.g., receive QUIC, forward as TCP/HTTP1):
|
||||
|
||||
```typescript
|
||||
import { SmartProxy } from '@push.rocks/smartproxy';
|
||||
import type { IRouteConfig } from '@push.rocks/smartproxy';
|
||||
|
||||
const quicRoute: IRouteConfig = {
|
||||
name: 'quic-to-backend',
|
||||
match: {
|
||||
ports: 443,
|
||||
transport: 'udp',
|
||||
protocol: 'quic' // 👈 Match QUIC protocol
|
||||
},
|
||||
action: {
|
||||
type: 'forward',
|
||||
targets: [{
|
||||
host: 'backend-server',
|
||||
port: 8443,
|
||||
backendTransport: 'tcp' // 👈 Translate QUIC → TCP for backend
|
||||
}],
|
||||
tls: {
|
||||
mode: 'terminate',
|
||||
certificate: 'auto' // 👈 QUIC requires TLS 1.3
|
||||
},
|
||||
udp: {
|
||||
quic: {
|
||||
enableHttp3: true,
|
||||
maxIdleTimeout: 30000,
|
||||
maxConcurrentBidiStreams: 100,
|
||||
altSvcPort: 443, // Advertise in Alt-Svc header
|
||||
altSvcMaxAge: 86400
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const proxy = new SmartProxy({
|
||||
acme: { email: 'ssl@example.com' },
|
||||
routes: [quicRoute]
|
||||
});
|
||||
```
|
||||
|
||||
### 🔁 Dual-Stack TCP + UDP Route
|
||||
|
||||
Listen on both TCP and UDP with a single route — handle each transport with its own handler:
|
||||
|
||||
```typescript
|
||||
const dualStackRoute: IRouteConfig = {
|
||||
name: 'dual-stack-dns',
|
||||
match: {
|
||||
ports: 53,
|
||||
transport: 'all' // 👈 Listen on both TCP and UDP
|
||||
},
|
||||
action: {
|
||||
type: 'socket-handler',
|
||||
socketHandler: handleTcpDns, // 👈 TCP connections
|
||||
datagramHandler: handleUdpDns, // 👈 UDP datagrams
|
||||
}
|
||||
};
|
||||
```
|
||||
|
||||
### ⚡ High-Performance NFTables Forwarding
|
||||
|
||||
For ultra-low latency on Linux, use kernel-level forwarding (requires root):
|
||||
@@ -419,6 +519,10 @@ console.log(`Bytes in: ${metrics.totals.bytesIn()}`);
|
||||
console.log(`Requests/sec: ${metrics.requests.perSecond()}`);
|
||||
console.log(`Throughput in: ${metrics.throughput.instant().in} bytes/sec`);
|
||||
|
||||
// UDP metrics
|
||||
console.log(`UDP sessions: ${metrics.udp.activeSessions()}`);
|
||||
console.log(`Datagrams in: ${metrics.udp.datagramsIn()}`);
|
||||
|
||||
// Get detailed statistics from the Rust engine
|
||||
const stats = await proxy.getStatistics();
|
||||
|
||||
@@ -545,7 +649,7 @@ SmartProxy uses a hybrid **Rust + TypeScript** architecture:
|
||||
```
|
||||
┌─────────────────────────────────────────────────────┐
|
||||
│ Your Application │
|
||||
│ (TypeScript — routes, config, socket handlers) │
|
||||
│ (TypeScript — routes, config, handlers) │
|
||||
└──────────────────┬──────────────────────────────────┘
|
||||
│ IPC (JSON over stdin/stdout)
|
||||
┌──────────────────▼──────────────────────────────────┐
|
||||
@@ -556,22 +660,23 @@ SmartProxy uses a hybrid **Rust + TypeScript** architecture:
|
||||
│ │ │ │ Proxy │ │ │ │ │ │
|
||||
│ └─────────┘ └─────────┘ └─────────┘ └──────────┘ │
|
||||
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌──────────┐ │
|
||||
│ │ Security│ │ Metrics │ │ Connec- │ │ NFTables │ │
|
||||
│ │ Enforce │ │ Collect │ │ tion │ │ Mgr │ │
|
||||
│ │ │ │ │ │ Tracker │ │ │ │
|
||||
│ │ UDP │ │ Security│ │ Metrics │ │ NFTables │ │
|
||||
│ │ QUIC │ │ Enforce │ │ Collect │ │ Mgr │ │
|
||||
│ │ HTTP/3 │ │ │ │ │ │ │ │
|
||||
│ └─────────┘ └─────────┘ └─────────┘ └──────────┘ │
|
||||
└──────────────────┬──────────────────────────────────┘
|
||||
│ Unix Socket Relay
|
||||
┌──────────────────▼──────────────────────────────────┐
|
||||
│ TypeScript Socket Handler Server │
|
||||
│ (for JS-defined socket handlers & dynamic routes) │
|
||||
│ TypeScript Socket & Datagram Handler Servers │
|
||||
│ (for JS socket handlers, datagram handlers, │
|
||||
│ and dynamic routes) │
|
||||
└─────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
- **Rust Engine** handles all networking, TLS, HTTP proxying, connection management, security, and metrics
|
||||
- **TypeScript** provides the npm API, configuration types, route helpers, validation, and socket handler callbacks
|
||||
- **Rust Engine** handles all networking: TCP, UDP, TLS, QUIC, HTTP proxying, connection management, security, and metrics
|
||||
- **TypeScript** provides the npm API, configuration types, route helpers, validation, and handler callbacks
|
||||
- **IPC** — The TypeScript wrapper uses JSON commands/events over stdin/stdout to communicate with the Rust binary
|
||||
- **Socket Relay** — A Unix domain socket server for routes requiring TypeScript-side handling (socket handlers, dynamic host/port functions)
|
||||
- **Socket/Datagram Relay** — Unix domain socket servers for routes requiring TypeScript-side handling (socket handlers, datagram handlers, dynamic host/port functions)
|
||||
|
||||
## 🎯 Route Configuration Reference
|
||||
|
||||
@@ -579,22 +684,26 @@ SmartProxy uses a hybrid **Rust + TypeScript** architecture:
|
||||
|
||||
```typescript
|
||||
interface IRouteMatch {
|
||||
ports: number | number[] | Array<{ from: number; to: number }>; // Required — port(s) to listen on
|
||||
domains?: string | string[]; // 'example.com', '*.example.com'
|
||||
path?: string; // '/api/*', '/users/:id'
|
||||
clientIp?: string[]; // ['10.0.0.0/8', '192.168.*']
|
||||
tlsVersion?: string[]; // ['TLSv1.2', 'TLSv1.3']
|
||||
ports: TPortRange; // Required — port(s) to listen on
|
||||
transport?: 'tcp' | 'udp' | 'all'; // Transport protocol (default: 'tcp')
|
||||
domains?: string | string[]; // 'example.com', '*.example.com'
|
||||
path?: string; // '/api/*', '/users/:id'
|
||||
clientIp?: string[]; // ['10.0.0.0/8', '192.168.*']
|
||||
tlsVersion?: string[]; // ['TLSv1.2', 'TLSv1.3']
|
||||
headers?: Record<string, string | RegExp>; // Match by HTTP headers
|
||||
protocol?: 'http' | 'tcp'; // Match specific protocol ('http' includes h2 + WebSocket upgrades)
|
||||
protocol?: 'http' | 'tcp' | 'udp' | 'quic' | 'http3'; // Application-layer protocol
|
||||
}
|
||||
|
||||
// Port range supports single numbers, arrays, and ranges
|
||||
type TPortRange = number | Array<number | { from: number; to: number }>;
|
||||
```
|
||||
|
||||
### Action Types
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `forward` | Proxy to one or more backend targets (with optional TLS, WebSocket, load balancing) |
|
||||
| `socket-handler` | Custom socket handling function in TypeScript |
|
||||
| `forward` | Proxy to one or more backend targets (with optional TLS, WebSocket, load balancing, UDP/QUIC) |
|
||||
| `socket-handler` | Custom socket/datagram handling function in TypeScript |
|
||||
|
||||
### Target Options
|
||||
|
||||
@@ -602,14 +711,15 @@ interface IRouteMatch {
|
||||
interface IRouteTarget {
|
||||
host: string | string[] | ((context: IRouteContext) => string | string[]);
|
||||
port: number | 'preserve' | ((context: IRouteContext) => number);
|
||||
tls?: IRouteTls; // Per-target TLS override
|
||||
priority?: number; // Target priority
|
||||
match?: ITargetMatch; // Sub-match within a route (by port, path, headers, method)
|
||||
tls?: IRouteTls; // Per-target TLS override
|
||||
priority?: number; // Target priority
|
||||
match?: ITargetMatch; // Sub-match within a route (by port, path, headers, method)
|
||||
websocket?: IRouteWebSocket;
|
||||
loadBalancing?: IRouteLoadBalancing;
|
||||
sendProxyProtocol?: boolean;
|
||||
headers?: IRouteHeaders;
|
||||
advanced?: IRouteAdvanced;
|
||||
backendTransport?: 'tcp' | 'udp'; // Backend transport (e.g., receive QUIC, forward as TCP)
|
||||
}
|
||||
```
|
||||
|
||||
@@ -666,6 +776,27 @@ interface IRouteLoadBalancing {
|
||||
}
|
||||
```
|
||||
|
||||
### UDP & QUIC Options
|
||||
|
||||
```typescript
|
||||
interface IRouteUdp {
|
||||
sessionTimeout?: number; // Idle timeout per UDP session (ms, default: 60000)
|
||||
maxSessionsPerIP?: number; // Max concurrent sessions per IP (default: 1000)
|
||||
maxDatagramSize?: number; // Max datagram size in bytes (default: 65535)
|
||||
quic?: IRouteQuic;
|
||||
}
|
||||
|
||||
interface IRouteQuic {
|
||||
maxIdleTimeout?: number; // QUIC idle timeout (ms, default: 30000)
|
||||
maxConcurrentBidiStreams?: number; // Max bidi streams (default: 100)
|
||||
maxConcurrentUniStreams?: number; // Max uni streams (default: 100)
|
||||
enableHttp3?: boolean; // Enable HTTP/3 (default: false)
|
||||
altSvcPort?: number; // Port for Alt-Svc header
|
||||
altSvcMaxAge?: number; // Alt-Svc max age in seconds (default: 86400)
|
||||
initialCongestionWindow?: number; // Initial congestion window (bytes)
|
||||
}
|
||||
```
|
||||
|
||||
## 🛠️ Helper Functions Reference
|
||||
|
||||
All helpers are fully typed and return `IRouteConfig` or `IRouteConfig[]`:
|
||||
@@ -689,7 +820,7 @@ import {
|
||||
createWebSocketRoute, // WebSocket-enabled route
|
||||
|
||||
// Custom Protocols
|
||||
createSocketHandlerRoute, // Custom socket handler
|
||||
createSocketHandlerRoute, // Custom TCP socket handler
|
||||
SocketHandlers, // Pre-built handlers (echo, proxy, block, etc.)
|
||||
|
||||
// NFTables (Linux, requires root)
|
||||
@@ -718,6 +849,8 @@ import {
|
||||
} from '@push.rocks/smartproxy';
|
||||
```
|
||||
|
||||
> **Tip:** For UDP datagram handler routes or QUIC/HTTP3 routes, construct `IRouteConfig` objects directly — there are no helper functions for these yet. See the [UDP Datagram Handler](#-udp-datagram-handler) and [QUIC / HTTP3 Forwarding](#-quic--http3-forwarding) examples above.
|
||||
|
||||
## 📖 API Documentation
|
||||
|
||||
### SmartProxy Class
|
||||
@@ -753,6 +886,8 @@ class SmartProxy extends EventEmitter {
|
||||
|
||||
// Events
|
||||
on(event: 'error', handler: (err: Error) => void): this;
|
||||
on(event: 'certificate-issued', handler: (ev: ICertificateIssuedEvent) => void): this;
|
||||
on(event: 'certificate-failed', handler: (ev: ICertificateFailedEvent) => void): this;
|
||||
}
|
||||
```
|
||||
|
||||
@@ -775,6 +910,8 @@ interface ISmartProxyOptions {
|
||||
// Custom certificate provisioning
|
||||
certProvisionFunction?: (domain: string) => Promise<ICert | 'http01'>;
|
||||
certProvisionFallbackToAcme?: boolean; // Fall back to ACME on failure (default: true)
|
||||
certProvisionTimeout?: number; // Timeout per provision call (ms)
|
||||
certProvisionConcurrency?: number; // Max concurrent provisions
|
||||
|
||||
// Consumer-managed certificate persistence (see "Consumer-Managed Certificate Storage")
|
||||
certStore?: ISmartProxyCertStore;
|
||||
@@ -782,6 +919,9 @@ interface ISmartProxyOptions {
|
||||
// Self-signed fallback
|
||||
disableDefaultCert?: boolean; // Disable '*' self-signed fallback (default: false)
|
||||
|
||||
// Rust binary path override
|
||||
rustBinaryPath?: string; // Custom path to the Rust proxy binary
|
||||
|
||||
// Global defaults
|
||||
defaults?: {
|
||||
target?: { host: string; port: number };
|
||||
@@ -868,11 +1008,22 @@ metrics.requests.perSecond(); // Requests per second
|
||||
metrics.requests.perMinute(); // Requests per minute
|
||||
metrics.requests.total(); // Total requests
|
||||
|
||||
// UDP metrics
|
||||
metrics.udp.activeSessions(); // Current active UDP sessions
|
||||
metrics.udp.totalSessions(); // Total UDP sessions since start
|
||||
metrics.udp.datagramsIn(); // Datagrams received
|
||||
metrics.udp.datagramsOut(); // Datagrams sent
|
||||
|
||||
// Cumulative totals
|
||||
metrics.totals.bytesIn(); // Total bytes received
|
||||
metrics.totals.bytesOut(); // Total bytes sent
|
||||
metrics.totals.connections(); // Total connections
|
||||
|
||||
// Backend metrics
|
||||
metrics.backends.byBackend(); // Map<backend, IBackendMetrics>
|
||||
metrics.backends.protocols(); // Map<backend, protocol>
|
||||
metrics.backends.topByErrors(10); // Top N error-prone backends
|
||||
|
||||
// Percentiles
|
||||
metrics.percentiles.connectionDuration(); // { p50, p95, p99 }
|
||||
metrics.percentiles.bytesTransferred(); // { in: { p50, p95, p99 }, out: { p50, p95, p99 } }
|
||||
@@ -896,11 +1047,12 @@ metrics.percentiles.bytesTransferred(); // { in: { p50, p95, p99 }, out: { p5
|
||||
### Rust Binary Not Found
|
||||
|
||||
SmartProxy searches for the Rust binary in this order:
|
||||
1. `SMARTPROXY_RUST_BINARY` environment variable
|
||||
2. Platform-specific npm package (`@push.rocks/smartproxy-linux-x64`, etc.)
|
||||
3. `dist_rust/rustproxy` relative to the package root (built by `tsrust`)
|
||||
4. Local dev build (`./rust/target/release/rustproxy`)
|
||||
5. System PATH (`rustproxy`)
|
||||
1. `rustBinaryPath` option in `ISmartProxyOptions`
|
||||
2. `SMARTPROXY_RUST_BINARY` environment variable
|
||||
3. Platform-specific npm package (`@push.rocks/smartproxy-linux-x64`, etc.)
|
||||
4. `dist_rust/rustproxy` relative to the package root (built by `tsrust`)
|
||||
5. Local dev build (`./rust/target/release/rustproxy`)
|
||||
6. System PATH (`rustproxy`)
|
||||
|
||||
### Performance Tuning
|
||||
- ✅ Use NFTables forwarding for high-traffic routes (Linux only)
|
||||
|
||||
@@ -4,11 +4,14 @@
|
||||
//! and forwards them to backends using the same routing and pool infrastructure
|
||||
//! as the HTTP/1+2 proxy.
|
||||
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use bytes::{Buf, Bytes};
|
||||
use http_body::Frame;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
use rustproxy_config::{RouteConfig, TransportProtocol};
|
||||
@@ -165,15 +168,6 @@ async fn handle_h3_request(
|
||||
let backend_port = target.port.resolve(port);
|
||||
let backend_addr = format!("{}:{}", backend_host, backend_port);
|
||||
|
||||
// Read request body
|
||||
let mut body_data = Vec::new();
|
||||
while let Some(mut chunk) = stream.recv_data().await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to read H3 request body: {}", e))?
|
||||
{
|
||||
body_data.extend_from_slice(chunk.chunk());
|
||||
chunk.advance(chunk.remaining());
|
||||
}
|
||||
|
||||
// Connect to backend via TCP HTTP/1.1 with timeout
|
||||
let tcp_stream = tokio::time::timeout(
|
||||
connect_timeout,
|
||||
@@ -194,11 +188,37 @@ async fn handle_h3_request(
|
||||
}
|
||||
});
|
||||
|
||||
let body = http_body_util::Full::new(Bytes::from(body_data));
|
||||
// Stream request body from H3 client to backend via an mpsc channel.
|
||||
// This avoids buffering the entire request body in memory.
|
||||
let (body_tx, body_rx) = tokio::sync::mpsc::channel::<Bytes>(4);
|
||||
let total_bytes_in = Arc::new(std::sync::atomic::AtomicU64::new(0));
|
||||
let total_bytes_in_writer = Arc::clone(&total_bytes_in);
|
||||
|
||||
// Spawn the H3 body reader task
|
||||
let body_reader = tokio::spawn(async move {
|
||||
while let Ok(Some(mut chunk)) = stream.recv_data().await {
|
||||
let data = Bytes::copy_from_slice(chunk.chunk());
|
||||
total_bytes_in_writer.fetch_add(data.len() as u64, std::sync::atomic::Ordering::Relaxed);
|
||||
chunk.advance(chunk.remaining());
|
||||
if body_tx.send(data).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
stream
|
||||
});
|
||||
|
||||
// Create a body that polls from the mpsc receiver
|
||||
let body = H3RequestBody { receiver: body_rx };
|
||||
let backend_req = build_backend_request(&method, &backend_addr, &path, &host, &request, body)?;
|
||||
|
||||
let response = sender.send_request(backend_req).await
|
||||
.map_err(|e| anyhow::anyhow!("Backend request failed: {}", e))?;
|
||||
|
||||
// Await the body reader to get the stream back
|
||||
let mut stream = body_reader.await
|
||||
.map_err(|e| anyhow::anyhow!("Body reader task failed: {}", e))?;
|
||||
let total_bytes_in = total_bytes_in.load(std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
// Build H3 response
|
||||
let status = response.status();
|
||||
let mut h3_response = hyper::Response::builder().status(status);
|
||||
@@ -252,7 +272,7 @@ async fn handle_h3_request(
|
||||
|
||||
// Record metrics
|
||||
let route_id = route.name.as_deref().or(route.id.as_deref());
|
||||
metrics.record_bytes(0, total_bytes_out, route_id, Some(client_ip));
|
||||
metrics.record_bytes(total_bytes_in, total_bytes_out, route_id, Some(client_ip));
|
||||
|
||||
// Finish the stream
|
||||
stream.finish().await
|
||||
@@ -262,14 +282,14 @@ async fn handle_h3_request(
|
||||
}
|
||||
|
||||
/// Build an HTTP/1.1 backend request from the H3 frontend request.
|
||||
fn build_backend_request(
|
||||
fn build_backend_request<B>(
|
||||
method: &hyper::Method,
|
||||
backend_addr: &str,
|
||||
path: &str,
|
||||
host: &str,
|
||||
original_request: &hyper::Request<()>,
|
||||
body: http_body_util::Full<Bytes>,
|
||||
) -> anyhow::Result<hyper::Request<http_body_util::Full<Bytes>>> {
|
||||
body: B,
|
||||
) -> anyhow::Result<hyper::Request<B>> {
|
||||
let mut req = hyper::Request::builder()
|
||||
.method(method)
|
||||
.uri(format!("http://{}{}", backend_addr, path))
|
||||
@@ -286,3 +306,27 @@ fn build_backend_request(
|
||||
req.body(body)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to build backend request: {}", e))
|
||||
}
|
||||
|
||||
/// A streaming request body backed by an mpsc channel receiver.
|
||||
///
|
||||
/// Implements `http_body::Body` so hyper can poll chunks as they arrive
|
||||
/// from the H3 client, avoiding buffering the entire request body in memory.
|
||||
struct H3RequestBody {
|
||||
receiver: tokio::sync::mpsc::Receiver<Bytes>,
|
||||
}
|
||||
|
||||
impl http_body::Body for H3RequestBody {
|
||||
type Data = Bytes;
|
||||
type Error = hyper::Error;
|
||||
|
||||
fn poll_frame(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
|
||||
match self.receiver.poll_recv(cx) {
|
||||
Poll::Ready(Some(data)) => Poll::Ready(Some(Ok(Frame::data(data)))),
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
//! Bounded, TTL-based protocol detection cache for HTTP/2 auto-detection.
|
||||
//! Bounded, TTL-based protocol detection cache for backend protocol auto-detection.
|
||||
//!
|
||||
//! Caches the ALPN-negotiated protocol (H1 or H2) per backend endpoint and requested
|
||||
//! Caches the detected protocol (H1, H2, or H3) per backend endpoint and requested
|
||||
//! domain (host:port + requested_host). This prevents cache oscillation when multiple
|
||||
//! frontend domains share the same backend but differ in HTTP/2 support.
|
||||
//! frontend domains share the same backend but differ in protocol support.
|
||||
//!
|
||||
//! H3 detection uses the browser model: Alt-Svc headers from H1/H2 responses are
|
||||
//! parsed and cached, including the advertised H3 port (which may differ from TCP).
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -29,6 +32,14 @@ pub enum DetectedProtocol {
|
||||
H3,
|
||||
}
|
||||
|
||||
/// Result of a protocol cache lookup.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct CachedProtocol {
|
||||
pub protocol: DetectedProtocol,
|
||||
/// For H3: the port advertised by Alt-Svc (may differ from TCP port).
|
||||
pub h3_port: Option<u16>,
|
||||
}
|
||||
|
||||
/// Key for the protocol cache: (host, port, requested_host).
|
||||
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
|
||||
pub struct ProtocolCacheKey {
|
||||
@@ -43,6 +54,8 @@ pub struct ProtocolCacheKey {
|
||||
struct CachedEntry {
|
||||
protocol: DetectedProtocol,
|
||||
detected_at: Instant,
|
||||
/// For H3: the port advertised by Alt-Svc (may differ from TCP port).
|
||||
h3_port: Option<u16>,
|
||||
}
|
||||
|
||||
/// Bounded, TTL-based protocol detection cache.
|
||||
@@ -75,11 +88,14 @@ impl ProtocolCache {
|
||||
|
||||
/// Look up the cached protocol for a backend endpoint.
|
||||
/// Returns `None` if not cached or expired (caller should probe via ALPN).
|
||||
pub fn get(&self, key: &ProtocolCacheKey) -> Option<DetectedProtocol> {
|
||||
pub fn get(&self, key: &ProtocolCacheKey) -> Option<CachedProtocol> {
|
||||
let entry = self.cache.get(key)?;
|
||||
if entry.detected_at.elapsed() < PROTOCOL_CACHE_TTL {
|
||||
debug!("Protocol cache hit: {:?} for {}:{} (requested: {:?})", entry.protocol, key.host, key.port, key.requested_host);
|
||||
Some(entry.protocol)
|
||||
Some(CachedProtocol {
|
||||
protocol: entry.protocol,
|
||||
h3_port: entry.h3_port,
|
||||
})
|
||||
} else {
|
||||
// Expired — remove and return None to trigger re-probe
|
||||
drop(entry); // release DashMap ref before remove
|
||||
@@ -91,6 +107,16 @@ impl ProtocolCache {
|
||||
/// Insert a detected protocol into the cache.
|
||||
/// If the cache is at capacity, evict the oldest entry first.
|
||||
pub fn insert(&self, key: ProtocolCacheKey, protocol: DetectedProtocol) {
|
||||
self.insert_with_h3_port(key, protocol, None);
|
||||
}
|
||||
|
||||
/// Insert an H3 detection result with the Alt-Svc advertised port.
|
||||
pub fn insert_h3(&self, key: ProtocolCacheKey, h3_port: u16) {
|
||||
self.insert_with_h3_port(key, DetectedProtocol::H3, Some(h3_port));
|
||||
}
|
||||
|
||||
/// Insert a protocol detection result with an optional H3 port.
|
||||
fn insert_with_h3_port(&self, key: ProtocolCacheKey, protocol: DetectedProtocol, h3_port: Option<u16>) {
|
||||
if self.cache.len() >= PROTOCOL_CACHE_MAX_ENTRIES && !self.cache.contains_key(&key) {
|
||||
// Evict the oldest entry to stay within bounds
|
||||
let oldest = self.cache.iter()
|
||||
@@ -103,6 +129,7 @@ impl ProtocolCache {
|
||||
self.cache.insert(key, CachedEntry {
|
||||
protocol,
|
||||
detected_at: Instant::now(),
|
||||
h3_port,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -58,6 +58,18 @@ const DEFAULT_WS_INACTIVITY_TIMEOUT: std::time::Duration = std::time::Duration::
|
||||
/// Default WebSocket max lifetime (24 hours).
|
||||
const DEFAULT_WS_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(86400);
|
||||
|
||||
/// Timeout for QUIC (H3) backend connections. Short because UDP is often firewalled.
|
||||
const QUIC_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
|
||||
|
||||
/// Protocol decision for backend connection.
|
||||
#[derive(Debug)]
|
||||
enum ProtocolDecision {
|
||||
H1,
|
||||
H2,
|
||||
H3 { port: u16 },
|
||||
AlpnProbe,
|
||||
}
|
||||
|
||||
/// RAII guard that decrements the active request counter on drop.
|
||||
/// Ensures the counter is correct even if the request handler panics.
|
||||
struct ActiveRequestGuard {
|
||||
@@ -190,6 +202,9 @@ pub struct HttpProxyService {
|
||||
ws_inactivity_timeout: std::time::Duration,
|
||||
/// WebSocket maximum connection lifetime.
|
||||
ws_max_lifetime: std::time::Duration,
|
||||
/// Shared QUIC client endpoint for outbound H3 backend connections.
|
||||
/// Lazily initialized on first H3 backend attempt.
|
||||
quinn_client_endpoint: Arc<quinn::Endpoint>,
|
||||
}
|
||||
|
||||
impl HttpProxyService {
|
||||
@@ -209,6 +224,7 @@ impl HttpProxyService {
|
||||
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
|
||||
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
|
||||
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
|
||||
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -233,6 +249,7 @@ impl HttpProxyService {
|
||||
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
|
||||
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
|
||||
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
|
||||
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -645,37 +662,94 @@ impl HttpProxyService {
|
||||
|
||||
// --- Resolve protocol decision based on backend protocol mode ---
|
||||
let is_auto_detect_mode = matches!(backend_protocol_mode, rustproxy_config::BackendProtocol::Auto);
|
||||
let (use_h2, needs_alpn_probe) = match backend_protocol_mode {
|
||||
rustproxy_config::BackendProtocol::Http1 => (false, false),
|
||||
rustproxy_config::BackendProtocol::Http2 => (true, false),
|
||||
rustproxy_config::BackendProtocol::Http3 => {
|
||||
// HTTP/3 (QUIC) backend connections not yet implemented — fall back to H1
|
||||
warn!("backendProtocol 'http3' not yet implemented, falling back to http1");
|
||||
(false, false)
|
||||
}
|
||||
let protocol_cache_key = crate::protocol_cache::ProtocolCacheKey {
|
||||
host: upstream.host.clone(),
|
||||
port: upstream.port,
|
||||
requested_host: host.clone(),
|
||||
};
|
||||
let protocol_decision = match backend_protocol_mode {
|
||||
rustproxy_config::BackendProtocol::Http1 => ProtocolDecision::H1,
|
||||
rustproxy_config::BackendProtocol::Http2 => ProtocolDecision::H2,
|
||||
rustproxy_config::BackendProtocol::Http3 => ProtocolDecision::H3 { port: upstream.port },
|
||||
rustproxy_config::BackendProtocol::Auto => {
|
||||
if !upstream.use_tls {
|
||||
// No ALPN without TLS — default to H1
|
||||
(false, false)
|
||||
// No ALPN without TLS, no QUIC without TLS — default to H1
|
||||
ProtocolDecision::H1
|
||||
} else {
|
||||
let cache_key = crate::protocol_cache::ProtocolCacheKey {
|
||||
host: upstream.host.clone(),
|
||||
port: upstream.port,
|
||||
requested_host: host.clone(),
|
||||
};
|
||||
match self.protocol_cache.get(&cache_key) {
|
||||
Some(crate::protocol_cache::DetectedProtocol::H2) => (true, false),
|
||||
Some(crate::protocol_cache::DetectedProtocol::H1) => (false, false),
|
||||
Some(crate::protocol_cache::DetectedProtocol::H3) => {
|
||||
// H3 cached but we're on TCP — fall back to H2 probe
|
||||
(false, true)
|
||||
}
|
||||
None => (false, true), // needs ALPN probe
|
||||
match self.protocol_cache.get(&protocol_cache_key) {
|
||||
Some(cached) => match cached.protocol {
|
||||
crate::protocol_cache::DetectedProtocol::H3 => {
|
||||
if let Some(h3_port) = cached.h3_port {
|
||||
ProtocolDecision::H3 { port: h3_port }
|
||||
} else {
|
||||
// H3 cached but no port — fall back to ALPN probe
|
||||
ProtocolDecision::AlpnProbe
|
||||
}
|
||||
}
|
||||
crate::protocol_cache::DetectedProtocol::H2 => ProtocolDecision::H2,
|
||||
crate::protocol_cache::DetectedProtocol::H1 => ProtocolDecision::H1,
|
||||
},
|
||||
None => ProtocolDecision::AlpnProbe,
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Derive legacy flags for the existing H1/H2 connection path
|
||||
let (use_h2, needs_alpn_probe) = match &protocol_decision {
|
||||
ProtocolDecision::H1 => (false, false),
|
||||
ProtocolDecision::H2 => (true, false),
|
||||
ProtocolDecision::H3 { .. } => (false, false), // H3 path handled separately below
|
||||
ProtocolDecision::AlpnProbe => (false, true),
|
||||
};
|
||||
|
||||
// --- H3 path: try QUIC connection before TCP ---
|
||||
if let ProtocolDecision::H3 { port: h3_port } = protocol_decision {
|
||||
let h3_pool_key = crate::connection_pool::PoolKey {
|
||||
host: upstream.host.clone(),
|
||||
port: h3_port,
|
||||
use_tls: true,
|
||||
protocol: crate::connection_pool::PoolProtocol::H3,
|
||||
};
|
||||
|
||||
// Try H3 pool checkout first
|
||||
if let Some((quic_conn, _age)) = self.connection_pool.checkout_h3(&h3_pool_key) {
|
||||
self.metrics.backend_pool_hit(&upstream_key);
|
||||
let result = self.forward_h3(
|
||||
quic_conn, parts, body, upstream_headers, &upstream_path,
|
||||
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
|
||||
).await;
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
return result;
|
||||
}
|
||||
|
||||
// Try fresh QUIC connection
|
||||
match self.connect_quic_backend(&upstream.host, h3_port).await {
|
||||
Ok(quic_conn) => {
|
||||
self.metrics.backend_pool_miss(&upstream_key);
|
||||
self.metrics.backend_connection_opened(&upstream_key, std::time::Instant::now().elapsed());
|
||||
let result = self.forward_h3(
|
||||
quic_conn, parts, body, upstream_headers, &upstream_path,
|
||||
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
|
||||
).await;
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
return result;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(backend = %upstream_key, error = %e,
|
||||
"H3 backend connect failed, falling back to H2/H1");
|
||||
// Invalidate H3 from cache — next request will ALPN probe for H2/H1
|
||||
if is_auto_detect_mode {
|
||||
self.protocol_cache.insert(
|
||||
protocol_cache_key.clone(),
|
||||
crate::protocol_cache::DetectedProtocol::H1,
|
||||
);
|
||||
}
|
||||
// Fall through to TCP path (ALPN probe for auto, or H1 for explicit)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- Connection pooling: try reusing an existing connection first ---
|
||||
// For ALPN probe mode, skip pool checkout (we don't know the protocol yet)
|
||||
if !needs_alpn_probe {
|
||||
@@ -870,6 +944,19 @@ impl HttpProxyService {
|
||||
};
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
self.metrics.backend_connection_closed(&upstream_key);
|
||||
|
||||
// --- Alt-Svc discovery: check if backend advertises H3 ---
|
||||
if is_auto_detect_mode {
|
||||
if let Ok(ref resp) = result {
|
||||
if let Some(alt_svc) = resp.headers().get("alt-svc").and_then(|v| v.to_str().ok()) {
|
||||
if let Some(h3_port) = parse_alt_svc_h3_port(alt_svc) {
|
||||
debug!(backend = %upstream_key, h3_port, "Backend advertises H3 via Alt-Svc");
|
||||
self.protocol_cache.insert_h3(protocol_cache_key, h3_port);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
@@ -2393,6 +2480,252 @@ impl HttpProxyService {
|
||||
config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
|
||||
Arc::new(config)
|
||||
}
|
||||
|
||||
/// Create a shared QUIC client endpoint for outbound H3 backend connections.
|
||||
fn create_quinn_client_endpoint() -> quinn::Endpoint {
|
||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||
let mut tls_config = rustls::ClientConfig::builder()
|
||||
.dangerous()
|
||||
.with_custom_certificate_verifier(Arc::new(InsecureBackendVerifier))
|
||||
.with_no_client_auth();
|
||||
tls_config.alpn_protocols = vec![b"h3".to_vec()];
|
||||
|
||||
let quic_crypto = quinn::crypto::rustls::QuicClientConfig::try_from(tls_config)
|
||||
.expect("Failed to create QUIC client crypto config");
|
||||
let client_config = quinn::ClientConfig::new(Arc::new(quic_crypto));
|
||||
|
||||
let mut endpoint = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())
|
||||
.expect("Failed to create QUIC client endpoint");
|
||||
endpoint.set_default_client_config(client_config);
|
||||
endpoint
|
||||
}
|
||||
|
||||
/// Connect to a backend via QUIC (H3).
|
||||
async fn connect_quic_backend(
|
||||
&self,
|
||||
host: &str,
|
||||
port: u16,
|
||||
) -> Result<quinn::Connection, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let addr = tokio::net::lookup_host(format!("{}:{}", host, port))
|
||||
.await?
|
||||
.next()
|
||||
.ok_or("DNS resolution returned no addresses")?;
|
||||
|
||||
let server_name = host.to_string();
|
||||
let connecting = self.quinn_client_endpoint.connect(addr, &server_name)?;
|
||||
|
||||
let connection = tokio::time::timeout(QUIC_CONNECT_TIMEOUT, connecting).await
|
||||
.map_err(|_| "QUIC connect timeout (3s)")??;
|
||||
|
||||
debug!("QUIC backend connection established to {}:{}", host, port);
|
||||
Ok(connection)
|
||||
}
|
||||
|
||||
/// Forward request to backend via HTTP/3 over QUIC.
|
||||
async fn forward_h3(
|
||||
&self,
|
||||
quic_conn: quinn::Connection,
|
||||
parts: hyper::http::request::Parts,
|
||||
body: Incoming,
|
||||
upstream_headers: hyper::HeaderMap,
|
||||
upstream_path: &str,
|
||||
route: &rustproxy_config::RouteConfig,
|
||||
route_id: Option<&str>,
|
||||
source_ip: &str,
|
||||
pool_key: &crate::connection_pool::PoolKey,
|
||||
domain: &str,
|
||||
conn_activity: &ConnActivity,
|
||||
backend_key: &str,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let h3_quinn_conn = h3_quinn::Connection::new(quic_conn.clone());
|
||||
let (mut driver, mut send_request) = match h3::client::new(h3_quinn_conn).await {
|
||||
Ok(pair) => pair,
|
||||
Err(e) => {
|
||||
error!(backend = %backend_key, domain = %domain, error = %e, "H3 client handshake failed");
|
||||
self.metrics.backend_handshake_error(backend_key);
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 handshake failed"));
|
||||
}
|
||||
};
|
||||
|
||||
// Spawn the h3 connection driver
|
||||
let driver_pool = Arc::clone(&self.connection_pool);
|
||||
let driver_pool_key = pool_key.clone();
|
||||
let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX));
|
||||
let driver_gen = Arc::clone(&gen_holder);
|
||||
tokio::spawn(async move {
|
||||
let close_err = std::future::poll_fn(|cx| driver.poll_close(cx)).await;
|
||||
debug!("H3 connection driver closed: {:?}", close_err);
|
||||
let g = driver_gen.load(std::sync::atomic::Ordering::Relaxed);
|
||||
if g != u64::MAX {
|
||||
driver_pool.remove_h3_if_generation(&driver_pool_key, g);
|
||||
}
|
||||
});
|
||||
|
||||
// Build the H3 request
|
||||
let uri = hyper::Uri::builder()
|
||||
.scheme("https")
|
||||
.authority(domain)
|
||||
.path_and_query(upstream_path)
|
||||
.build()
|
||||
.unwrap_or_else(|_| upstream_path.parse().unwrap_or_default());
|
||||
|
||||
let mut h3_req = hyper::Request::builder()
|
||||
.method(parts.method.clone())
|
||||
.uri(uri);
|
||||
|
||||
if let Some(headers) = h3_req.headers_mut() {
|
||||
*headers = upstream_headers;
|
||||
}
|
||||
|
||||
let h3_req = h3_req.body(()).unwrap();
|
||||
|
||||
// Send the request
|
||||
let mut stream = match send_request.send_request(h3_req).await {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
error!(backend = %backend_key, domain = %domain, error = %e, "H3 send_request failed");
|
||||
self.metrics.backend_request_error(backend_key);
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 request failed"));
|
||||
}
|
||||
};
|
||||
|
||||
// Stream request body
|
||||
let rid: Option<Arc<str>> = route_id.map(Arc::from);
|
||||
let sip: Arc<str> = Arc::from(source_ip);
|
||||
|
||||
{
|
||||
use http_body_util::BodyExt;
|
||||
let mut body = body;
|
||||
while let Some(frame) = body.frame().await {
|
||||
match frame {
|
||||
Ok(frame) => {
|
||||
if let Some(data) = frame.data_ref() {
|
||||
self.metrics.record_bytes(data.len() as u64, 0, rid.as_deref(), Some(&sip));
|
||||
if let Err(e) = stream.send_data(Bytes::copy_from_slice(data)).await {
|
||||
error!(backend = %backend_key, error = %e, "H3 send_data failed");
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 body send failed"));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(backend = %backend_key, error = %e, "Client body read error during H3 forward");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Signal end of body
|
||||
stream.finish().await.ok();
|
||||
}
|
||||
|
||||
// Read response
|
||||
let h3_response = match stream.recv_response().await {
|
||||
Ok(resp) => resp,
|
||||
Err(e) => {
|
||||
error!(backend = %backend_key, domain = %domain, error = %e, "H3 recv_response failed");
|
||||
self.metrics.backend_request_error(backend_key);
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 response failed"));
|
||||
}
|
||||
};
|
||||
|
||||
// Build the response for the client
|
||||
let status = h3_response.status();
|
||||
let mut response = Response::builder().status(status);
|
||||
|
||||
if let Some(headers) = response.headers_mut() {
|
||||
for (name, value) in h3_response.headers() {
|
||||
let n = name.as_str();
|
||||
// Skip hop-by-hop headers
|
||||
if n == "transfer-encoding" || n == "connection" || n == "keep-alive" {
|
||||
continue;
|
||||
}
|
||||
headers.insert(name.clone(), value.clone());
|
||||
}
|
||||
ResponseFilter::apply_headers(route, headers, None);
|
||||
}
|
||||
|
||||
// Stream response body back via an adapter
|
||||
let h3_body = H3ClientResponseBody { stream };
|
||||
let counting_body = CountingBody::new(
|
||||
h3_body,
|
||||
Arc::clone(&self.metrics),
|
||||
rid,
|
||||
Some(sip),
|
||||
Direction::Out,
|
||||
).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start);
|
||||
|
||||
let counting_body = if let Some(ref ar) = conn_activity.active_requests {
|
||||
counting_body.with_active_requests(Arc::clone(ar))
|
||||
} else {
|
||||
counting_body
|
||||
};
|
||||
|
||||
let body: BoxBody<Bytes, hyper::Error> = BoxBody::new(counting_body);
|
||||
|
||||
// Register connection in pool on success
|
||||
if status != StatusCode::BAD_GATEWAY {
|
||||
let g = self.connection_pool.register_h3(pool_key.clone(), quic_conn);
|
||||
gen_holder.store(g, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
self.metrics.set_backend_protocol(backend_key, "h3");
|
||||
Ok(response.body(body).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse an Alt-Svc header value to extract the H3 port.
|
||||
/// Handles formats like `h3=":443"; ma=86400` and `h3=":8443", h2=":443"`.
|
||||
fn parse_alt_svc_h3_port(header_value: &str) -> Option<u16> {
|
||||
for directive in header_value.split(',') {
|
||||
let directive = directive.trim();
|
||||
// Match h3=":<port>" or h3-29=":<port>" etc.
|
||||
if directive.starts_with("h3=") || directive.starts_with("h3-") {
|
||||
// Find the port in ":<port>"
|
||||
if let Some(start) = directive.find("\":") {
|
||||
let rest = &directive[start + 2..];
|
||||
if let Some(end) = rest.find('"') {
|
||||
if let Ok(port) = rest[..end].parse::<u16>() {
|
||||
return Some(port);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Response body adapter for H3 client responses.
|
||||
/// Reads data from the h3 `RequestStream` recv side and presents it as an `http_body::Body`.
|
||||
struct H3ClientResponseBody {
|
||||
stream: h3::client::RequestStream<h3_quinn::BidiStream<Bytes>, Bytes>,
|
||||
}
|
||||
|
||||
impl http_body::Body for H3ClientResponseBody {
|
||||
type Data = Bytes;
|
||||
type Error = hyper::Error;
|
||||
|
||||
fn poll_frame(
|
||||
mut self: Pin<&mut Self>,
|
||||
_cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
|
||||
// h3's recv_data is async, so we need to poll it manually.
|
||||
// Use a small future to poll the recv_data call.
|
||||
use std::future::Future;
|
||||
let mut fut = Box::pin(self.stream.recv_data());
|
||||
match fut.as_mut().poll(_cx) {
|
||||
Poll::Ready(Ok(Some(mut buf))) => {
|
||||
use bytes::Buf;
|
||||
let data = Bytes::copy_from_slice(buf.chunk());
|
||||
buf.advance(buf.remaining());
|
||||
Poll::Ready(Some(Ok(http_body::Frame::data(data))))
|
||||
}
|
||||
Poll::Ready(Ok(None)) => Poll::Ready(None),
|
||||
Poll::Ready(Err(e)) => {
|
||||
warn!("H3 response body recv error: {}", e);
|
||||
Poll::Ready(None)
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Insecure certificate verifier for backend TLS connections (fallback only).
|
||||
@@ -2463,6 +2796,7 @@ impl Default for HttpProxyService {
|
||||
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
|
||||
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
|
||||
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
|
||||
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,8 +19,9 @@ use rustproxy_config::{RouteConfig, TransportProtocol};
|
||||
use rustproxy_metrics::MetricsCollector;
|
||||
use rustproxy_routing::{MatchContext, RouteManager};
|
||||
|
||||
use rustproxy_http::h3_service::H3ProxyService;
|
||||
|
||||
use crate::connection_tracker::ConnectionTracker;
|
||||
use crate::forwarder::ForwardMetricsCtx;
|
||||
|
||||
/// Create a QUIC server endpoint on the given port with the provided TLS config.
|
||||
///
|
||||
@@ -56,6 +57,7 @@ pub async fn quic_accept_loop(
|
||||
metrics: Arc<MetricsCollector>,
|
||||
conn_tracker: Arc<ConnectionTracker>,
|
||||
cancel: CancellationToken,
|
||||
h3_service: Option<Arc<H3ProxyService>>,
|
||||
) {
|
||||
loop {
|
||||
let incoming = tokio::select! {
|
||||
@@ -114,9 +116,10 @@ pub async fn quic_accept_loop(
|
||||
let metrics = Arc::clone(&metrics);
|
||||
let conn_tracker = Arc::clone(&conn_tracker);
|
||||
let cancel = cancel.child_token();
|
||||
let h3_svc = h3_service.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
match handle_quic_connection(incoming, route, port, &metrics, &cancel).await {
|
||||
match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel, h3_svc).await {
|
||||
Ok(()) => debug!("QUIC connection from {} completed", remote_addr),
|
||||
Err(e) => debug!("QUIC connection from {} error: {}", remote_addr, e),
|
||||
}
|
||||
@@ -138,8 +141,9 @@ async fn handle_quic_connection(
|
||||
incoming: quinn::Incoming,
|
||||
route: RouteConfig,
|
||||
port: u16,
|
||||
metrics: &MetricsCollector,
|
||||
metrics: Arc<MetricsCollector>,
|
||||
cancel: &CancellationToken,
|
||||
h3_service: Option<Arc<H3ProxyService>>,
|
||||
) -> anyhow::Result<()> {
|
||||
let connection = incoming.await?;
|
||||
let remote_addr = connection.remote_address();
|
||||
@@ -152,10 +156,20 @@ async fn handle_quic_connection(
|
||||
.unwrap_or(false);
|
||||
|
||||
if enable_http3 {
|
||||
// Phase 5: dispatch to H3ProxyService
|
||||
// For now, log and accept streams for basic handling
|
||||
debug!("HTTP/3 enabled for route {:?}, dispatching to H3 handler", route.name);
|
||||
handle_h3_connection(connection, route, port, metrics, cancel).await
|
||||
if let Some(ref h3_svc) = h3_service {
|
||||
debug!("HTTP/3 enabled for route {:?}, dispatching to H3ProxyService", route.name);
|
||||
h3_svc.handle_connection(connection, &route, port).await
|
||||
} else {
|
||||
warn!("HTTP/3 enabled for route {:?} but H3ProxyService not initialized", route.name);
|
||||
// Keep connection alive until cancelled
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {}
|
||||
reason = connection.closed() => {
|
||||
debug!("HTTP/3 connection closed (no service): {}", reason);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
} else {
|
||||
// Non-HTTP3 QUIC: bidirectional stream forwarding to TCP backend
|
||||
handle_quic_stream_forwarding(connection, route, port, metrics, cancel).await
|
||||
@@ -171,11 +185,12 @@ async fn handle_quic_stream_forwarding(
|
||||
connection: quinn::Connection,
|
||||
route: RouteConfig,
|
||||
port: u16,
|
||||
_metrics: &MetricsCollector,
|
||||
metrics: Arc<MetricsCollector>,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let remote_addr = connection.remote_address();
|
||||
let route_id = route.name.as_deref().or(route.id.as_deref());
|
||||
let metrics_arc = metrics;
|
||||
|
||||
// Resolve backend target
|
||||
let target = route.action.targets.as_ref()
|
||||
@@ -203,11 +218,8 @@ async fn handle_quic_stream_forwarding(
|
||||
|
||||
let backend_addr = backend_addr.clone();
|
||||
let ip_str = remote_addr.ip().to_string();
|
||||
let _fwd_ctx = ForwardMetricsCtx {
|
||||
collector: Arc::new(MetricsCollector::new()), // TODO: share real metrics
|
||||
route_id: route_id.map(|s| s.to_string()),
|
||||
source_ip: Some(ip_str),
|
||||
};
|
||||
let stream_metrics = Arc::clone(&metrics_arc);
|
||||
let stream_route_id = route_id.map(|s| s.to_string());
|
||||
|
||||
// Spawn a task for each QUIC stream → TCP bidirectional forwarding
|
||||
tokio::spawn(async move {
|
||||
@@ -217,6 +229,11 @@ async fn handle_quic_stream_forwarding(
|
||||
&backend_addr,
|
||||
).await {
|
||||
Ok((bytes_in, bytes_out)) => {
|
||||
stream_metrics.record_bytes(
|
||||
bytes_in, bytes_out,
|
||||
stream_route_id.as_deref(),
|
||||
Some(&ip_str),
|
||||
);
|
||||
debug!("QUIC stream forwarded: {}B in, {}B out", bytes_in, bytes_out);
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -255,29 +272,6 @@ async fn forward_quic_stream_to_tcp(
|
||||
Ok((bytes_in, bytes_out))
|
||||
}
|
||||
|
||||
/// Placeholder for HTTP/3 connection handling (Phase 5).
|
||||
///
|
||||
/// Once h3_service is implemented, this will delegate to it.
|
||||
async fn handle_h3_connection(
|
||||
connection: quinn::Connection,
|
||||
_route: RouteConfig,
|
||||
_port: u16,
|
||||
_metrics: &MetricsCollector,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
warn!("HTTP/3 handling not yet fully implemented — accepting connection but no request processing");
|
||||
|
||||
// Keep the connection alive until cancelled or closed
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {}
|
||||
reason = connection.closed() => {
|
||||
debug!("HTTP/3 connection closed: {}", reason);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -8,12 +8,12 @@ use std::net::SocketAddr;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
@@ -21,13 +21,15 @@ use rustproxy_config::{RouteActionType, TransportProtocol};
|
||||
use rustproxy_metrics::MetricsCollector;
|
||||
use rustproxy_routing::{MatchContext, RouteManager};
|
||||
|
||||
use rustproxy_http::h3_service::H3ProxyService;
|
||||
|
||||
use crate::connection_tracker::ConnectionTracker;
|
||||
use crate::udp_session::{SessionKey, UdpSession, UdpSessionConfig, UdpSessionTable};
|
||||
|
||||
/// Manages UDP listeners across all configured ports.
|
||||
pub struct UdpListenerManager {
|
||||
/// Port → recv loop task handle
|
||||
listeners: HashMap<u16, JoinHandle<()>>,
|
||||
/// Port → (recv loop task handle, optional QUIC endpoint for TLS updates)
|
||||
listeners: HashMap<u16, (JoinHandle<()>, Option<quinn::Endpoint>)>,
|
||||
/// Hot-reloadable route table
|
||||
route_manager: Arc<ArcSwap<RouteManager>>,
|
||||
/// Shared metrics collector
|
||||
@@ -40,6 +42,24 @@ pub struct UdpListenerManager {
|
||||
cancel_token: CancellationToken,
|
||||
/// Unix socket path for datagram handler relay
|
||||
datagram_handler_relay: Arc<RwLock<Option<String>>>,
|
||||
/// Persistent write half of the relay connection
|
||||
relay_writer: Arc<Mutex<Option<tokio::net::unix::OwnedWriteHalf>>>,
|
||||
/// Cancel token for the current relay reply reader task
|
||||
relay_reader_cancel: Option<CancellationToken>,
|
||||
/// H3 proxy service for HTTP/3 request handling
|
||||
h3_service: Option<Arc<H3ProxyService>>,
|
||||
}
|
||||
|
||||
impl Drop for UdpListenerManager {
|
||||
fn drop(&mut self) {
|
||||
self.cancel_token.cancel();
|
||||
for (_, (handle, endpoint)) in self.listeners.drain() {
|
||||
handle.abort();
|
||||
if let Some(ep) = endpoint {
|
||||
ep.close(quinn::VarInt::from_u32(0), b"shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UdpListenerManager {
|
||||
@@ -57,9 +77,17 @@ impl UdpListenerManager {
|
||||
session_table: Arc::new(UdpSessionTable::new()),
|
||||
cancel_token,
|
||||
datagram_handler_relay: Arc::new(RwLock::new(None)),
|
||||
relay_writer: Arc::new(Mutex::new(None)),
|
||||
relay_reader_cancel: None,
|
||||
h3_service: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the H3 proxy service for HTTP/3 request handling.
|
||||
pub fn set_h3_service(&mut self, svc: Arc<H3ProxyService>) {
|
||||
self.h3_service = Some(svc);
|
||||
}
|
||||
|
||||
/// Update the route manager (for hot-reload).
|
||||
pub fn update_routes(&self, route_manager: Arc<RouteManager>) {
|
||||
self.route_manager.store(route_manager);
|
||||
@@ -94,8 +122,9 @@ impl UdpListenerManager {
|
||||
|
||||
if has_quic {
|
||||
if let Some(tls) = tls_config {
|
||||
// Create QUIC endpoint
|
||||
// Create QUIC endpoint; clone it so we can hot-swap TLS later
|
||||
let endpoint = crate::quic_handler::create_quic_endpoint(port, tls)?;
|
||||
let endpoint_for_updates = endpoint.clone(); // quinn::Endpoint is Arc-based
|
||||
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
|
||||
endpoint,
|
||||
port,
|
||||
@@ -103,8 +132,9 @@ impl UdpListenerManager {
|
||||
Arc::clone(&self.metrics),
|
||||
Arc::clone(&self.conn_tracker),
|
||||
self.cancel_token.child_token(),
|
||||
self.h3_service.clone(),
|
||||
));
|
||||
self.listeners.insert(port, handle);
|
||||
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||
info!("QUIC endpoint started on port {}", port);
|
||||
return Ok(());
|
||||
} else {
|
||||
@@ -126,10 +156,11 @@ impl UdpListenerManager {
|
||||
Arc::clone(&self.conn_tracker),
|
||||
Arc::clone(&self.session_table),
|
||||
Arc::clone(&self.datagram_handler_relay),
|
||||
Arc::clone(&self.relay_writer),
|
||||
self.cancel_token.child_token(),
|
||||
));
|
||||
|
||||
self.listeners.insert(port, handle);
|
||||
self.listeners.insert(port, (handle, None));
|
||||
|
||||
// Start the session cleanup task if this is the first port
|
||||
if self.listeners.len() == 1 {
|
||||
@@ -141,8 +172,11 @@ impl UdpListenerManager {
|
||||
|
||||
/// Stop listening on a UDP port.
|
||||
pub fn remove_port(&mut self, port: u16) {
|
||||
if let Some(handle) = self.listeners.remove(&port) {
|
||||
if let Some((handle, endpoint)) = self.listeners.remove(&port) {
|
||||
handle.abort();
|
||||
if let Some(ep) = endpoint {
|
||||
ep.close(quinn::VarInt::from_u32(0), b"port removed");
|
||||
}
|
||||
info!("UDP listener removed from port {}", port);
|
||||
}
|
||||
}
|
||||
@@ -157,24 +191,80 @@ impl UdpListenerManager {
|
||||
/// Stop all listeners and clean up.
|
||||
pub async fn stop(&mut self) {
|
||||
self.cancel_token.cancel();
|
||||
for (port, handle) in self.listeners.drain() {
|
||||
for (port, (handle, endpoint)) in self.listeners.drain() {
|
||||
handle.abort();
|
||||
if let Some(ep) = endpoint {
|
||||
ep.close(quinn::VarInt::from_u32(0), b"shutdown");
|
||||
}
|
||||
debug!("UDP listener stopped on port {}", port);
|
||||
}
|
||||
info!("All UDP listeners stopped, {} sessions remaining",
|
||||
self.session_table.session_count());
|
||||
}
|
||||
|
||||
/// Set the datagram handler relay socket path.
|
||||
pub async fn set_datagram_handler_relay(&self, path: String) {
|
||||
let mut relay = self.datagram_handler_relay.write().await;
|
||||
*relay = Some(path);
|
||||
/// Update TLS config on all active QUIC endpoints (cert refresh).
|
||||
/// Only affects new incoming connections — existing connections are undisturbed.
|
||||
/// Uses quinn's Endpoint::set_server_config() for zero-downtime hot-swap.
|
||||
pub fn update_quic_tls(&self, tls_config: Arc<rustls::ServerConfig>) {
|
||||
for (port, (_handle, endpoint)) in &self.listeners {
|
||||
if let Some(ep) = endpoint {
|
||||
match quinn::crypto::rustls::QuicServerConfig::try_from(Arc::clone(&tls_config)) {
|
||||
Ok(quic_crypto) => {
|
||||
let server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_crypto));
|
||||
ep.set_server_config(Some(server_config));
|
||||
info!("Updated QUIC TLS config on port {}", port);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to update QUIC TLS config on port {}: {}", port, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the datagram handler relay socket path and establish connection.
|
||||
pub async fn set_datagram_handler_relay(&mut self, path: String) {
|
||||
// Cancel previous relay reader task if any
|
||||
if let Some(old_cancel) = self.relay_reader_cancel.take() {
|
||||
old_cancel.cancel();
|
||||
}
|
||||
|
||||
// Store the path
|
||||
{
|
||||
let mut relay = self.datagram_handler_relay.write().await;
|
||||
*relay = Some(path.clone());
|
||||
}
|
||||
|
||||
// Connect to the Unix socket
|
||||
match tokio::net::UnixStream::connect(&path).await {
|
||||
Ok(stream) => {
|
||||
let (read_half, write_half) = stream.into_split();
|
||||
|
||||
// Store write half for sending datagrams
|
||||
{
|
||||
let mut writer = self.relay_writer.lock().await;
|
||||
*writer = Some(write_half);
|
||||
}
|
||||
|
||||
// Spawn reply reader — reads length-prefixed JSON replies from TS
|
||||
// and sends them back to clients via the listener sockets
|
||||
let cancel = self.cancel_token.child_token();
|
||||
self.relay_reader_cancel = Some(cancel.clone());
|
||||
tokio::spawn(Self::relay_reply_reader(read_half, cancel));
|
||||
|
||||
info!("Datagram handler relay connected to {}", path);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to connect datagram handler relay to {}: {}", path, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Start periodic session cleanup task.
|
||||
fn start_cleanup_task(&self) {
|
||||
let session_table = Arc::clone(&self.session_table);
|
||||
let metrics = Arc::clone(&self.metrics);
|
||||
let conn_tracker = Arc::clone(&self.conn_tracker);
|
||||
let cancel = self.cancel_token.child_token();
|
||||
let route_manager = Arc::clone(&self.route_manager);
|
||||
|
||||
@@ -188,7 +278,7 @@ impl UdpListenerManager {
|
||||
// or default 60s if none configured)
|
||||
let rm = route_manager.load();
|
||||
let timeout_ms = Self::get_min_session_timeout(&rm);
|
||||
let removed = session_table.cleanup_idle(timeout_ms, &metrics);
|
||||
let removed = session_table.cleanup_idle(timeout_ms, &metrics, &conn_tracker);
|
||||
if removed > 0 {
|
||||
debug!("UDP session cleanup: removed {} idle sessions, {} remaining",
|
||||
removed, session_table.session_count());
|
||||
@@ -213,7 +303,8 @@ impl UdpListenerManager {
|
||||
metrics: Arc<MetricsCollector>,
|
||||
conn_tracker: Arc<ConnectionTracker>,
|
||||
session_table: Arc<UdpSessionTable>,
|
||||
datagram_handler_relay: Arc<RwLock<Option<String>>>,
|
||||
_datagram_handler_relay: Arc<RwLock<Option<String>>>,
|
||||
relay_writer: Arc<Mutex<Option<tokio::net::unix::OwnedWriteHalf>>>,
|
||||
cancel: CancellationToken,
|
||||
) {
|
||||
// Use a reasonably large buffer; actual max is per-route but we need a single buffer
|
||||
@@ -264,21 +355,16 @@ impl UdpListenerManager {
|
||||
let route = route_match.route;
|
||||
let route_id = route.name.as_deref().or(route.id.as_deref());
|
||||
|
||||
// Socket handler routes → relay datagram to TS via Unix socket
|
||||
// Socket handler routes → relay datagram to TS via persistent Unix socket
|
||||
if route.action.action_type == RouteActionType::SocketHandler {
|
||||
let relay_path = datagram_handler_relay.read().await;
|
||||
if let Some(ref path) = *relay_path {
|
||||
if let Err(e) = Self::relay_datagram_to_ts(
|
||||
path,
|
||||
route_id.unwrap_or("unknown"),
|
||||
&client_addr,
|
||||
port,
|
||||
datagram,
|
||||
).await {
|
||||
debug!("Failed to relay UDP datagram to TS: {}", e);
|
||||
}
|
||||
} else {
|
||||
debug!("UDP datagram handler relay not configured for route {:?}", route_id);
|
||||
if let Err(e) = Self::relay_datagram_via_writer(
|
||||
&relay_writer,
|
||||
route_id.unwrap_or("unknown"),
|
||||
&client_addr,
|
||||
port,
|
||||
datagram,
|
||||
).await {
|
||||
debug!("Failed to relay UDP datagram to TS: {}", e);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -441,10 +527,9 @@ impl UdpListenerManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Relay a UDP datagram to the TypeScript handler via Unix socket.
|
||||
/// Uses length-prefixed JSON framing: [4-byte BE length][JSON payload]
|
||||
async fn relay_datagram_to_ts(
|
||||
relay_path: &str,
|
||||
/// Send a datagram to TS via the persistent relay writer.
|
||||
async fn relay_datagram_via_writer(
|
||||
writer: &Mutex<Option<tokio::net::unix::OwnedWriteHalf>>,
|
||||
route_key: &str,
|
||||
client_addr: &SocketAddr,
|
||||
dest_port: u16,
|
||||
@@ -463,8 +548,9 @@ impl UdpListenerManager {
|
||||
});
|
||||
let json = serde_json::to_vec(&msg)?;
|
||||
|
||||
// Connect to relay (one-shot for now; persistent connection optimization deferred)
|
||||
let mut stream = tokio::net::UnixStream::connect(relay_path).await?;
|
||||
let mut guard = writer.lock().await;
|
||||
let stream = guard.as_mut()
|
||||
.ok_or_else(|| anyhow::anyhow!("Datagram relay not connected"))?;
|
||||
|
||||
// Length-prefixed frame
|
||||
let len_bytes = (json.len() as u32).to_be_bytes();
|
||||
@@ -474,4 +560,101 @@ impl UdpListenerManager {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Background task reading reply frames from the TS datagram handler.
|
||||
/// Parses replies and sends them back to the original client via UDP.
|
||||
async fn relay_reply_reader(
|
||||
mut reader: tokio::net::unix::OwnedReadHalf,
|
||||
cancel: CancellationToken,
|
||||
) {
|
||||
use base64::Engine;
|
||||
|
||||
let mut len_buf = [0u8; 4];
|
||||
loop {
|
||||
// Read length prefix
|
||||
let read_result = tokio::select! {
|
||||
_ = cancel.cancelled() => break,
|
||||
result = reader.read_exact(&mut len_buf) => result,
|
||||
};
|
||||
|
||||
match read_result {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
debug!("Datagram relay reader closed: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let frame_len = u32::from_be_bytes(len_buf) as usize;
|
||||
if frame_len > 10 * 1024 * 1024 {
|
||||
error!("Datagram relay frame too large: {} bytes", frame_len);
|
||||
break;
|
||||
}
|
||||
|
||||
let mut frame_buf = vec![0u8; frame_len];
|
||||
match reader.read_exact(&mut frame_buf).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
debug!("Datagram relay reader frame error: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Parse the reply JSON
|
||||
let reply: serde_json::Value = match serde_json::from_slice(&frame_buf) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
debug!("Datagram relay reply parse error: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if reply.get("type").and_then(|v| v.as_str()) != Some("reply") {
|
||||
continue;
|
||||
}
|
||||
|
||||
let source_ip = reply.get("sourceIp").and_then(|v| v.as_str()).unwrap_or("");
|
||||
let source_port = reply.get("sourcePort").and_then(|v| v.as_u64()).unwrap_or(0) as u16;
|
||||
let dest_port = reply.get("destPort").and_then(|v| v.as_u64()).unwrap_or(0) as u16;
|
||||
let payload_b64 = reply.get("payloadBase64").and_then(|v| v.as_str()).unwrap_or("");
|
||||
|
||||
let payload = match base64::engine::general_purpose::STANDARD.decode(payload_b64) {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
debug!("Datagram relay reply base64 decode error: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let client_addr: SocketAddr = match format!("{}:{}", source_ip, source_port).parse() {
|
||||
Ok(a) => a,
|
||||
Err(e) => {
|
||||
debug!("Datagram relay reply address parse error: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Send the reply back to the client via a temporary UDP socket bound to the dest_port
|
||||
// We need the listener socket for this port. For simplicity, use a fresh socket.
|
||||
let reply_socket = match UdpSocket::bind(format!("0.0.0.0:{}", dest_port)).await {
|
||||
Ok(s) => s,
|
||||
Err(_) => {
|
||||
// Port already bound by the listener — use unbound socket
|
||||
match UdpSocket::bind("0.0.0.0:0").await {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
debug!("Failed to create reply socket: {}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = reply_socket.send_to(&payload, client_addr).await {
|
||||
debug!("Failed to send datagram reply to {}: {}", client_addr, e);
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Datagram relay reply reader stopped");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,8 @@ use tracing::debug;
|
||||
|
||||
use rustproxy_metrics::MetricsCollector;
|
||||
|
||||
use crate::connection_tracker::ConnectionTracker;
|
||||
|
||||
/// A single UDP session (flow).
|
||||
pub struct UdpSession {
|
||||
/// Socket bound to ephemeral port, connected to backend
|
||||
@@ -165,6 +167,7 @@ impl UdpSessionTable {
|
||||
&self,
|
||||
timeout_ms: u64,
|
||||
metrics: &MetricsCollector,
|
||||
conn_tracker: &ConnectionTracker,
|
||||
) -> usize {
|
||||
let now_ms = self.elapsed_ms();
|
||||
let mut removed = 0;
|
||||
@@ -185,6 +188,7 @@ impl UdpSessionTable {
|
||||
session.client_addr, key.1,
|
||||
now_ms.saturating_sub(session.last_activity.load(Ordering::Relaxed))
|
||||
);
|
||||
conn_tracker.connection_closed(&session.source_ip);
|
||||
metrics.connection_closed(
|
||||
session.route_id.as_deref(),
|
||||
Some(&session.source_ip.to_string()),
|
||||
|
||||
@@ -340,6 +340,17 @@ impl RustProxy {
|
||||
self.cancel_token.clone(),
|
||||
);
|
||||
|
||||
// Construct H3ProxyService for HTTP/3 request handling
|
||||
let h3_svc = rustproxy_http::h3_service::H3ProxyService::new(
|
||||
Arc::new(ArcSwap::from(Arc::clone(&*self.route_table.load()))),
|
||||
Arc::clone(&self.metrics),
|
||||
Arc::new(rustproxy_http::connection_pool::ConnectionPool::new()),
|
||||
Arc::new(rustproxy_http::protocol_cache::ProtocolCache::new()),
|
||||
rustproxy_passthrough::tls_handler::shared_backend_tls_config(),
|
||||
std::time::Duration::from_secs(30),
|
||||
);
|
||||
udp_mgr.set_h3_service(Arc::new(h3_svc));
|
||||
|
||||
for port in &udp_ports {
|
||||
udp_mgr.add_port_with_tls(*port, quic_tls_config.clone()).await?;
|
||||
}
|
||||
@@ -772,13 +783,21 @@ impl RustProxy {
|
||||
}
|
||||
}
|
||||
|
||||
// Build TLS config for QUIC before taking mutable borrow on udp_mgr
|
||||
let quic_tls = if new_udp_ports.iter().any(|p| !old_udp_ports.contains(p)) {
|
||||
let tls_configs = self.current_tls_configs().await;
|
||||
Self::build_quic_tls_config(&tls_configs)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
if let Some(ref mut udp_mgr) = self.udp_listener_manager {
|
||||
udp_mgr.update_routes(Arc::clone(&new_manager));
|
||||
|
||||
// Add new UDP ports
|
||||
// Add new UDP ports (with TLS for QUIC)
|
||||
for port in &new_udp_ports {
|
||||
if !old_udp_ports.contains(port) {
|
||||
udp_mgr.add_port(*port).await?;
|
||||
udp_mgr.add_port_with_tls(*port, quic_tls.clone()).await?;
|
||||
}
|
||||
}
|
||||
// Remove old UDP ports
|
||||
@@ -1005,10 +1024,37 @@ impl RustProxy {
|
||||
Some(Arc::new(tls_config))
|
||||
}
|
||||
|
||||
/// Build the current full TLS config map from all sources (route configs, loaded certs, cert manager).
|
||||
async fn current_tls_configs(&self) -> HashMap<String, TlsCertConfig> {
|
||||
let mut configs = Self::extract_tls_configs(&self.options.routes);
|
||||
|
||||
// Merge dynamically loaded certs (from loadCertificate IPC)
|
||||
for (d, c) in &self.loaded_certs {
|
||||
if !configs.contains_key(d) {
|
||||
configs.insert(d.clone(), c.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Merge certs from cert manager store
|
||||
if let Some(ref cm_arc) = self.cert_manager {
|
||||
let cm = cm_arc.lock().await;
|
||||
for (d, b) in cm.store().iter() {
|
||||
if !configs.contains_key(d) {
|
||||
configs.insert(d.clone(), TlsCertConfig {
|
||||
cert_pem: b.cert_pem.clone(),
|
||||
key_pem: b.key_pem.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
configs
|
||||
}
|
||||
|
||||
/// Set the Unix domain socket path for relaying UDP datagrams to TypeScript datagramHandler callbacks.
|
||||
pub async fn set_datagram_handler_relay_path(&mut self, path: Option<String>) {
|
||||
info!("Datagram handler relay path set to: {:?}", path);
|
||||
if let Some(ref udp_mgr) = self.udp_listener_manager {
|
||||
if let Some(ref mut udp_mgr) = self.udp_listener_manager {
|
||||
if let Some(ref p) = path {
|
||||
udp_mgr.set_datagram_handler_relay(p.clone()).await;
|
||||
}
|
||||
@@ -1055,37 +1101,21 @@ impl RustProxy {
|
||||
key_pem: key_pem.clone(),
|
||||
});
|
||||
|
||||
// Hot-swap TLS config on the listener
|
||||
if let Some(ref mut listener) = self.listener_manager {
|
||||
let mut tls_configs = Self::extract_tls_configs(&self.options.routes);
|
||||
// Hot-swap TLS config on TCP and QUIC listeners
|
||||
let tls_configs = self.current_tls_configs().await;
|
||||
|
||||
// Add the new cert
|
||||
tls_configs.insert(domain.to_string(), TlsCertConfig {
|
||||
cert_pem: cert_pem.clone(),
|
||||
key_pem: key_pem.clone(),
|
||||
});
|
||||
|
||||
// Also include all existing certs from cert manager
|
||||
if let Some(ref cm_arc) = self.cert_manager {
|
||||
let cm = cm_arc.lock().await;
|
||||
for (d, b) in cm.store().iter() {
|
||||
if !tls_configs.contains_key(d) {
|
||||
tls_configs.insert(d.clone(), TlsCertConfig {
|
||||
cert_pem: b.cert_pem.clone(),
|
||||
key_pem: b.key_pem.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Merge dynamically loaded certs from previous loadCertificate calls
|
||||
for (d, c) in &self.loaded_certs {
|
||||
if !tls_configs.contains_key(d) {
|
||||
tls_configs.insert(d.clone(), c.clone());
|
||||
}
|
||||
}
|
||||
if let Some(ref listener) = self.listener_manager {
|
||||
// Build QUIC TLS config before TCP consumes the map
|
||||
let quic_tls = Self::build_quic_tls_config(&tls_configs);
|
||||
|
||||
listener.set_tls_configs(tls_configs);
|
||||
|
||||
// Also update QUIC endpoints with the new certs
|
||||
if let Some(ref udp_mgr) = self.udp_listener_manager {
|
||||
if let Some(quic_config) = quic_tls {
|
||||
udp_mgr.update_quic_tls(quic_config);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Certificate loaded and TLS config updated for {}", domain);
|
||||
|
||||
125
test/test.datagram-handler.ts
Normal file
125
test/test.datagram-handler.ts
Normal file
@@ -0,0 +1,125 @@
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import * as dgram from 'dgram';
|
||||
import { SmartProxy } from '../ts/index.js';
|
||||
import type { TDatagramHandler, IDatagramInfo } from '../ts/index.js';
|
||||
import { findFreePorts, assertPortsFree } from './helpers/port-allocator.js';
|
||||
|
||||
let smartProxy: SmartProxy;
|
||||
let PROXY_PORT: number;
|
||||
|
||||
// Helper: send a single UDP datagram and wait for a response
|
||||
function sendDatagram(port: number, msg: string, timeoutMs = 5000): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const client = dgram.createSocket('udp4');
|
||||
const timeout = setTimeout(() => {
|
||||
client.close();
|
||||
reject(new Error(`UDP response timeout after ${timeoutMs}ms`));
|
||||
}, timeoutMs);
|
||||
client.send(Buffer.from(msg), port, '127.0.0.1');
|
||||
client.on('message', (data) => {
|
||||
clearTimeout(timeout);
|
||||
client.close();
|
||||
resolve(data.toString());
|
||||
});
|
||||
client.on('error', (err) => {
|
||||
clearTimeout(timeout);
|
||||
client.close();
|
||||
reject(err);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
tap.test('setup: start SmartProxy with datagramHandler', async () => {
|
||||
[PROXY_PORT] = await findFreePorts(1);
|
||||
|
||||
const handler: TDatagramHandler = (datagram, info, reply) => {
|
||||
reply(Buffer.from(`Handled: ${datagram.toString()}`));
|
||||
};
|
||||
|
||||
smartProxy = new SmartProxy({
|
||||
routes: [
|
||||
{
|
||||
name: 'dgram-handler-test',
|
||||
match: {
|
||||
ports: PROXY_PORT,
|
||||
transport: 'udp' as const,
|
||||
},
|
||||
action: {
|
||||
type: 'socket-handler',
|
||||
datagramHandler: handler,
|
||||
},
|
||||
},
|
||||
],
|
||||
defaults: {
|
||||
security: {
|
||||
ipAllowList: ['127.0.0.1', '::1', '::ffff:127.0.0.1'],
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await smartProxy.start();
|
||||
});
|
||||
|
||||
tap.test('datagram handler: receives and replies to datagram', async () => {
|
||||
const response = await sendDatagram(PROXY_PORT, 'Hello Handler');
|
||||
expect(response).toEqual('Handled: Hello Handler');
|
||||
});
|
||||
|
||||
tap.test('datagram handler: async handler works', async () => {
|
||||
// Stop and restart with async handler
|
||||
await smartProxy.stop();
|
||||
|
||||
[PROXY_PORT] = await findFreePorts(1);
|
||||
|
||||
const asyncHandler: TDatagramHandler = async (datagram, info, reply) => {
|
||||
// Simulate async work
|
||||
await new Promise<void>((resolve) => setTimeout(resolve, 10));
|
||||
reply(Buffer.from(`Async: ${datagram.toString()}`));
|
||||
};
|
||||
|
||||
smartProxy = new SmartProxy({
|
||||
routes: [
|
||||
{
|
||||
name: 'dgram-async-handler',
|
||||
match: {
|
||||
ports: PROXY_PORT,
|
||||
transport: 'udp' as const,
|
||||
},
|
||||
action: {
|
||||
type: 'socket-handler',
|
||||
datagramHandler: asyncHandler,
|
||||
},
|
||||
},
|
||||
],
|
||||
defaults: {
|
||||
security: {
|
||||
ipAllowList: ['127.0.0.1', '::1', '::ffff:127.0.0.1'],
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await smartProxy.start();
|
||||
|
||||
const response = await sendDatagram(PROXY_PORT, 'Test Async');
|
||||
expect(response).toEqual('Async: Test Async');
|
||||
});
|
||||
|
||||
tap.test('datagram handler: multiple rapid datagrams', async () => {
|
||||
const promises: Promise<string>[] = [];
|
||||
for (let i = 0; i < 5; i++) {
|
||||
promises.push(sendDatagram(PROXY_PORT, `msg-${i}`));
|
||||
}
|
||||
|
||||
const responses = await Promise.all(promises);
|
||||
|
||||
for (let i = 0; i < 5; i++) {
|
||||
expect(responses).toContain(`Async: msg-${i}`);
|
||||
}
|
||||
});
|
||||
|
||||
tap.test('cleanup: stop SmartProxy', async () => {
|
||||
await smartProxy.stop();
|
||||
await assertPortsFree([PROXY_PORT]);
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -174,7 +174,7 @@ tap.test('Route Validation - validateRouteAction', async () => {
|
||||
const invalidSocketResult = validateRouteAction(invalidSocketAction);
|
||||
expect(invalidSocketResult.valid).toBeFalse();
|
||||
expect(invalidSocketResult.errors.length).toBeGreaterThan(0);
|
||||
expect(invalidSocketResult.errors[0]).toInclude('Socket handler function is required');
|
||||
expect(invalidSocketResult.errors[0]).toInclude('handler function is required');
|
||||
});
|
||||
|
||||
tap.test('Route Validation - validateRouteConfig', async () => {
|
||||
|
||||
142
test/test.udp-forwarding.ts
Normal file
142
test/test.udp-forwarding.ts
Normal file
@@ -0,0 +1,142 @@
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import * as dgram from 'dgram';
|
||||
import { SmartProxy } from '../ts/index.js';
|
||||
import { findFreePorts, assertPortsFree } from './helpers/port-allocator.js';
|
||||
|
||||
let smartProxy: SmartProxy;
|
||||
let backendServer: dgram.Socket;
|
||||
let PROXY_PORT: number;
|
||||
let BACKEND_PORT: number;
|
||||
|
||||
// Helper: send a single UDP datagram and wait for a response
|
||||
function sendDatagram(port: number, msg: string, timeoutMs = 5000): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const client = dgram.createSocket('udp4');
|
||||
const timeout = setTimeout(() => {
|
||||
client.close();
|
||||
reject(new Error(`UDP response timeout after ${timeoutMs}ms`));
|
||||
}, timeoutMs);
|
||||
client.send(Buffer.from(msg), port, '127.0.0.1');
|
||||
client.on('message', (data) => {
|
||||
clearTimeout(timeout);
|
||||
client.close();
|
||||
resolve(data.toString());
|
||||
});
|
||||
client.on('error', (err) => {
|
||||
clearTimeout(timeout);
|
||||
client.close();
|
||||
reject(err);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Helper: create a UDP echo server
|
||||
function createUdpEchoServer(port: number): Promise<dgram.Socket> {
|
||||
return new Promise((resolve) => {
|
||||
const server = dgram.createSocket('udp4');
|
||||
server.on('message', (msg, rinfo) => {
|
||||
server.send(Buffer.from(`Echo: ${msg.toString()}`), rinfo.port, rinfo.address);
|
||||
});
|
||||
server.bind(port, '127.0.0.1', () => resolve(server));
|
||||
});
|
||||
}
|
||||
|
||||
tap.test('setup: start UDP echo server and SmartProxy', async () => {
|
||||
[PROXY_PORT, BACKEND_PORT] = await findFreePorts(2);
|
||||
|
||||
// Start backend UDP echo server
|
||||
backendServer = await createUdpEchoServer(BACKEND_PORT);
|
||||
|
||||
// Start SmartProxy with a UDP forwarding route
|
||||
smartProxy = new SmartProxy({
|
||||
routes: [
|
||||
{
|
||||
name: 'udp-forward-test',
|
||||
match: {
|
||||
ports: PROXY_PORT,
|
||||
transport: 'udp' as const,
|
||||
},
|
||||
action: {
|
||||
type: 'forward',
|
||||
targets: [{ host: '127.0.0.1', port: BACKEND_PORT }],
|
||||
udp: {
|
||||
sessionTimeout: 5000,
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
defaults: {
|
||||
security: {
|
||||
ipAllowList: ['127.0.0.1', '::1', '::ffff:127.0.0.1'],
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await smartProxy.start();
|
||||
});
|
||||
|
||||
tap.test('UDP forwarding: basic datagram round-trip', async () => {
|
||||
const response = await sendDatagram(PROXY_PORT, 'Hello UDP');
|
||||
expect(response).toEqual('Echo: Hello UDP');
|
||||
});
|
||||
|
||||
tap.test('UDP forwarding: multiple datagrams same session', async () => {
|
||||
// Use a single client socket for session reuse
|
||||
const client = dgram.createSocket('udp4');
|
||||
const responses: string[] = [];
|
||||
|
||||
const done = new Promise<void>((resolve, reject) => {
|
||||
const timeout = setTimeout(() => {
|
||||
client.close();
|
||||
reject(new Error('Timeout waiting for 3 responses'));
|
||||
}, 5000);
|
||||
|
||||
client.on('message', (data) => {
|
||||
responses.push(data.toString());
|
||||
if (responses.length === 3) {
|
||||
clearTimeout(timeout);
|
||||
client.close();
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
client.on('error', (err) => {
|
||||
clearTimeout(timeout);
|
||||
client.close();
|
||||
reject(err);
|
||||
});
|
||||
});
|
||||
|
||||
client.send(Buffer.from('msg1'), PROXY_PORT, '127.0.0.1');
|
||||
client.send(Buffer.from('msg2'), PROXY_PORT, '127.0.0.1');
|
||||
client.send(Buffer.from('msg3'), PROXY_PORT, '127.0.0.1');
|
||||
|
||||
await done;
|
||||
|
||||
expect(responses).toContain('Echo: msg1');
|
||||
expect(responses).toContain('Echo: msg2');
|
||||
expect(responses).toContain('Echo: msg3');
|
||||
});
|
||||
|
||||
tap.test('UDP forwarding: multiple clients', async () => {
|
||||
const [resp1, resp2] = await Promise.all([
|
||||
sendDatagram(PROXY_PORT, 'client1'),
|
||||
sendDatagram(PROXY_PORT, 'client2'),
|
||||
]);
|
||||
|
||||
expect(resp1).toEqual('Echo: client1');
|
||||
expect(resp2).toEqual('Echo: client2');
|
||||
});
|
||||
|
||||
tap.test('UDP forwarding: large datagram (1400 bytes)', async () => {
|
||||
const payload = 'X'.repeat(1400);
|
||||
const response = await sendDatagram(PROXY_PORT, payload);
|
||||
expect(response).toEqual(`Echo: ${payload}`);
|
||||
});
|
||||
|
||||
tap.test('cleanup: stop SmartProxy and backend', async () => {
|
||||
await smartProxy.stop();
|
||||
await new Promise<void>((resolve) => backendServer.close(() => resolve()));
|
||||
await assertPortsFree([PROXY_PORT, BACKEND_PORT]);
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
114
test/test.udp-metrics.ts
Normal file
114
test/test.udp-metrics.ts
Normal file
@@ -0,0 +1,114 @@
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import * as dgram from 'dgram';
|
||||
import { SmartProxy } from '../ts/index.js';
|
||||
import { findFreePorts, assertPortsFree } from './helpers/port-allocator.js';
|
||||
|
||||
let smartProxy: SmartProxy;
|
||||
let backendServer: dgram.Socket;
|
||||
let PROXY_PORT: number;
|
||||
let BACKEND_PORT: number;
|
||||
|
||||
// Helper: send a single UDP datagram and wait for a response
|
||||
function sendDatagram(port: number, msg: string, timeoutMs = 5000): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const client = dgram.createSocket('udp4');
|
||||
const timeout = setTimeout(() => {
|
||||
client.close();
|
||||
reject(new Error(`UDP response timeout after ${timeoutMs}ms`));
|
||||
}, timeoutMs);
|
||||
client.send(Buffer.from(msg), port, '127.0.0.1');
|
||||
client.on('message', (data) => {
|
||||
clearTimeout(timeout);
|
||||
client.close();
|
||||
resolve(data.toString());
|
||||
});
|
||||
client.on('error', (err) => {
|
||||
clearTimeout(timeout);
|
||||
client.close();
|
||||
reject(err);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Helper: create a UDP echo server
|
||||
function createUdpEchoServer(port: number): Promise<dgram.Socket> {
|
||||
return new Promise((resolve) => {
|
||||
const server = dgram.createSocket('udp4');
|
||||
server.on('message', (msg, rinfo) => {
|
||||
server.send(Buffer.from(`Echo: ${msg.toString()}`), rinfo.port, rinfo.address);
|
||||
});
|
||||
server.bind(port, '127.0.0.1', () => resolve(server));
|
||||
});
|
||||
}
|
||||
|
||||
tap.test('setup: start UDP echo server and SmartProxy with metrics', async () => {
|
||||
[PROXY_PORT, BACKEND_PORT] = await findFreePorts(2);
|
||||
|
||||
backendServer = await createUdpEchoServer(BACKEND_PORT);
|
||||
|
||||
smartProxy = new SmartProxy({
|
||||
routes: [
|
||||
{
|
||||
name: 'udp-metrics-test',
|
||||
match: {
|
||||
ports: PROXY_PORT,
|
||||
transport: 'udp' as const,
|
||||
},
|
||||
action: {
|
||||
type: 'forward',
|
||||
targets: [{ host: '127.0.0.1', port: BACKEND_PORT }],
|
||||
udp: {
|
||||
sessionTimeout: 10000,
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
defaults: {
|
||||
security: {
|
||||
ipAllowList: ['127.0.0.1', '::1', '::ffff:127.0.0.1'],
|
||||
},
|
||||
},
|
||||
metrics: {
|
||||
enabled: true,
|
||||
sampleIntervalMs: 1000,
|
||||
retentionSeconds: 60,
|
||||
},
|
||||
});
|
||||
|
||||
await smartProxy.start();
|
||||
});
|
||||
|
||||
tap.test('UDP metrics: counters increase after traffic', async () => {
|
||||
// Send a few datagrams
|
||||
const resp1 = await sendDatagram(PROXY_PORT, 'metrics-test-1');
|
||||
expect(resp1).toEqual('Echo: metrics-test-1');
|
||||
|
||||
const resp2 = await sendDatagram(PROXY_PORT, 'metrics-test-2');
|
||||
expect(resp2).toEqual('Echo: metrics-test-2');
|
||||
|
||||
// Wait for metrics to propagate and cache to refresh
|
||||
await new Promise<void>((resolve) => setTimeout(resolve, 2000));
|
||||
|
||||
// Get metrics (returns the adapter, need to ensure cache is fresh)
|
||||
const metrics = smartProxy.getMetrics();
|
||||
|
||||
// The udp property reads from the Rust JSON snapshot
|
||||
expect(metrics.udp).toBeDefined();
|
||||
const totalSessions = metrics.udp.totalSessions();
|
||||
const datagramsIn = metrics.udp.datagramsIn();
|
||||
const datagramsOut = metrics.udp.datagramsOut();
|
||||
|
||||
console.log(`UDP metrics: sessions=${totalSessions}, in=${datagramsIn}, out=${datagramsOut}`);
|
||||
|
||||
expect(totalSessions).toBeGreaterThan(0);
|
||||
expect(datagramsIn).toBeGreaterThan(0);
|
||||
expect(datagramsOut).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
tap.test('cleanup: stop SmartProxy and backend', async () => {
|
||||
await smartProxy.stop();
|
||||
await new Promise<void>((resolve) => backendServer.close(() => resolve()));
|
||||
await assertPortsFree([PROXY_PORT, BACKEND_PORT]);
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartproxy',
|
||||
version: '25.13.0',
|
||||
version: '25.16.0',
|
||||
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
|
||||
}
|
||||
|
||||
@@ -303,6 +303,21 @@ export class SmartProxy extends plugins.EventEmitter {
|
||||
this.socketHandlerServer = null;
|
||||
}
|
||||
|
||||
// Update datagram handler relay if datagram handler routes changed
|
||||
const hasDatagramHandlers = newRoutes.some(
|
||||
(r) => r.action.type === 'socket-handler' && r.action.datagramHandler
|
||||
);
|
||||
|
||||
if (hasDatagramHandlers && !this.datagramHandlerServer) {
|
||||
const dgPath = `/tmp/smartproxy-dgram-relay-${process.pid}.sock`;
|
||||
this.datagramHandlerServer = new DatagramHandlerServer(dgPath, this.preprocessor);
|
||||
await this.datagramHandlerServer.start();
|
||||
await this.bridge.setDatagramHandlerRelay(this.datagramHandlerServer.getSocketPath());
|
||||
} else if (!hasDatagramHandlers && this.datagramHandlerServer) {
|
||||
await this.datagramHandlerServer.stop();
|
||||
this.datagramHandlerServer = null;
|
||||
}
|
||||
|
||||
// Update stored routes
|
||||
this.settings.routes = newRoutes;
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ import type { IRouteConfig, IRouteMatch, IRouteAction, TPortRange } from '../mod
|
||||
export class RouteValidator {
|
||||
private static readonly VALID_TLS_MODES = ['terminate', 'passthrough', 'terminate-and-reencrypt'];
|
||||
private static readonly VALID_ACTION_TYPES = ['forward', 'socket-handler'];
|
||||
private static readonly VALID_PROTOCOLS = ['tcp', 'http', 'https', 'ws', 'wss'];
|
||||
private static readonly VALID_PROTOCOLS = ['tcp', 'http', 'https', 'ws', 'wss', 'udp', 'quic', 'http3'];
|
||||
private static readonly MAX_PORTS = 100;
|
||||
private static readonly MAX_DOMAINS = 1000;
|
||||
private static readonly MAX_HEADER_SIZE = 8192;
|
||||
@@ -123,10 +123,10 @@ export class RouteValidator {
|
||||
errors.push(`Invalid action type: ${route.action.type}. Must be one of: ${this.VALID_ACTION_TYPES.join(', ')}`);
|
||||
}
|
||||
|
||||
// Validate socket-handler
|
||||
// Validate socket-handler (TCP socketHandler or UDP datagramHandler)
|
||||
if (route.action.type === 'socket-handler') {
|
||||
if (typeof route.action.socketHandler !== 'function') {
|
||||
errors.push('socket-handler action requires a socketHandler function');
|
||||
if (typeof route.action.socketHandler !== 'function' && typeof route.action.datagramHandler !== 'function') {
|
||||
errors.push('socket-handler action requires a socketHandler or datagramHandler function');
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,6 +173,22 @@ export class RouteValidator {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// QUIC routes require TLS with termination (QUIC mandates TLS 1.3)
|
||||
if (route.action.udp?.quic && route.action.type === 'forward') {
|
||||
if (!route.action.tls) {
|
||||
errors.push('QUIC routes require TLS configuration (action.tls) — QUIC mandates TLS 1.3');
|
||||
} else if (route.action.tls.mode === 'passthrough') {
|
||||
errors.push('QUIC routes cannot use TLS mode "passthrough" — use "terminate" or "terminate-and-reencrypt"');
|
||||
}
|
||||
}
|
||||
|
||||
// Protocol quic/http3 requires transport udp or all
|
||||
if (route.match?.protocol && ['quic', 'http3'].includes(route.match.protocol)) {
|
||||
if (route.match.transport && route.match.transport !== 'udp' && route.match.transport !== 'all') {
|
||||
errors.push(`Protocol "${route.match.protocol}" requires transport "udp" or "all"`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Validate security settings
|
||||
@@ -619,11 +635,22 @@ export function validateRouteAction(action: IRouteAction): { valid: boolean; err
|
||||
}
|
||||
}
|
||||
|
||||
// QUIC routes require TLS with termination
|
||||
if (action.udp?.quic && action.type === 'forward') {
|
||||
if (!action.tls) {
|
||||
errors.push('QUIC routes require TLS configuration — QUIC mandates TLS 1.3');
|
||||
} else if (action.tls.mode === 'passthrough') {
|
||||
errors.push('QUIC routes cannot use TLS mode "passthrough"');
|
||||
}
|
||||
}
|
||||
|
||||
if (action.type === 'socket-handler') {
|
||||
if (!action.socketHandler) {
|
||||
errors.push('Socket handler function is required for socket-handler action');
|
||||
} else if (typeof action.socketHandler !== 'function') {
|
||||
if (!action.socketHandler && !action.datagramHandler) {
|
||||
errors.push('Socket handler or datagram handler function is required for socket-handler action');
|
||||
} else if (action.socketHandler && typeof action.socketHandler !== 'function') {
|
||||
errors.push('Socket handler must be a function');
|
||||
} else if (action.datagramHandler && typeof action.datagramHandler !== 'function') {
|
||||
errors.push('Datagram handler must be a function');
|
||||
}
|
||||
}
|
||||
|
||||
@@ -714,7 +741,8 @@ export function hasRequiredPropertiesForAction(route: IRouteConfig, actionType:
|
||||
route.action.targets.length > 0 &&
|
||||
route.action.targets.every(t => t.host && t.port !== undefined);
|
||||
case 'socket-handler':
|
||||
return !!route.action.socketHandler && typeof route.action.socketHandler === 'function';
|
||||
return (!!route.action.socketHandler && typeof route.action.socketHandler === 'function') ||
|
||||
(!!route.action.datagramHandler && typeof route.action.datagramHandler === 'function');
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user