Compare commits

..

18 Commits

Author SHA1 Message Date
jkunz cb71f32b90 v27.8.0
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-14 12:43:59 +00:00
jkunz 46155ab12c feat(metrics): add per-domain HTTP request rate metrics 2026-04-14 12:43:59 +00:00
jkunz 490a310b54 v27.7.4
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-14 09:17:55 +00:00
jkunz 6c5180573a fix(rustproxy metrics): use stable route metrics keys across HTTP and passthrough listeners 2026-04-14 09:17:55 +00:00
jkunz 30e5ab308f v27.7.3
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-14 01:14:33 +00:00
jkunz d2a54b3491 fix(repo): no changes detected 2026-04-14 01:14:33 +00:00
jkunz dc922c97df v27.7.2
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-14 00:55:25 +00:00
jkunz 8d1bae7604 fix(docs): clarify metrics documentation for domain normalization and saturating gauges 2026-04-14 00:55:25 +00:00
jkunz 200e86e311 v27.7.1
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-14 00:54:12 +00:00
jkunz a53a2c4ca5 fix(rustproxy-http,rustproxy-metrics): fix domain-scoped request host detection and harden connection metrics cleanup 2026-04-14 00:54:12 +00:00
jkunz 6ee7237357 v27.7.0
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-13 23:21:54 +00:00
jkunz b5b4c608f0 feat(smart-proxy): add typed Rust config serialization and regex header contract coverage 2026-04-13 23:21:54 +00:00
jkunz af132f40fc v27.6.0
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-13 18:33:28 +00:00
jkunz 781634446a feat(metrics): track per-IP domain request metrics across HTTP and TCP passthrough traffic 2026-04-13 18:33:28 +00:00
jkunz e988d935b6 v27.5.0
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-06 12:46:09 +00:00
jkunz 99a026627d feat(security): add domain-scoped IP allow list support across HTTP and passthrough filtering 2026-04-06 12:46:09 +00:00
jkunz 572e31587a v27.4.0
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-04 19:25:06 +00:00
jkunz 8587fb997c feat(rustproxy): add HTTP/3 proxy service wiring for QUIC listeners 2026-04-04 19:25:06 +00:00
42 changed files with 4219 additions and 795 deletions
+61
View File
@@ -1,5 +1,66 @@
# Changelog
## 2026-04-14 - 27.8.0 - feat(metrics)
add per-domain HTTP request rate metrics
- Record canonicalized HTTP request rates per domain in the Rust metrics collector and expose per-second and last-minute values in snapshots.
- Add TypeScript metrics interfaces and adapter support for requests.byDomain().
- Cover HTTP domain rate tracking and ensure TLS passthrough SNI traffic does not affect HTTP request rate metrics.
## 2026-04-14 - 27.7.4 - fix(rustproxy metrics)
use stable route metrics keys across HTTP and passthrough listeners
- adds a shared RouteConfig::metrics_key helper that prefers route name and falls back to route id
- updates HTTP, TCP, UDP, and QUIC metrics labeling to use the shared route metrics key consistently
- keeps route cancellation and rate limiter indexing bound to route config ids where required
- adds tests covering metrics key selection behavior
## 2026-04-14 - 27.7.3 - fix(repo)
no changes detected
## 2026-04-14 - 27.7.2 - fix(docs)
clarify metrics documentation for domain normalization and saturating gauges
- Document that per-IP domain keys are normalized to lowercase and have trailing dots stripped before counting.
- Clarify that the saturating close pattern also applies to connection and UDP active gauges.
## 2026-04-14 - 27.7.1 - fix(rustproxy-http,rustproxy-metrics)
fix domain-scoped request host detection and harden connection metrics cleanup
- use a shared request host extractor that falls back to URI authority so domain-scoped IP allow lists work for HTTP/2 and HTTP/3 requests without a Host header
- add request filter and host extraction tests covering domain-scoped ACL behavior
- prevent connection counters from underflowing during close handling and clean up per-IP metrics entries more safely
- normalize tracked domain keys in metrics to reduce duplicate entries caused by case or trailing-dot variations
## 2026-04-13 - 27.7.0 - feat(smart-proxy)
add typed Rust config serialization and regex header contract coverage
- serialize SmartProxy routes and top-level options into explicit Rust-safe types, including header regex literals, UDP field normalization, ACME, defaults, and proxy settings
- support JS-style regex header literals with flags in Rust header matching and add cross-contract tests for route preprocessing and config deserialization
- improve TypeScript safety for Rust bridge and metrics integration by replacing loose any-based payloads with dedicated Rust type definitions
## 2026-04-13 - 27.6.0 - feat(metrics)
track per-IP domain request metrics across HTTP and TCP passthrough traffic
- records domain request counts per frontend IP from HTTP Host headers and TCP SNI
- exposes per-IP domain maps and top IP-domain request pairs through the TypeScript metrics adapter
- bounds per-IP domain tracking and prunes stale entries to limit memory growth
- adds metrics system documentation covering architecture, collected data, and known gaps
## 2026-04-06 - 27.5.0 - feat(security)
add domain-scoped IP allow list support across HTTP and passthrough filtering
- extend route security types to accept IP allow entries scoped to specific domains
- apply domain-aware IP checks using Host headers for HTTP and SNI context for QUIC and passthrough connections
- preserve compatibility for existing plain allow list entries and add validation and tests for scoped matching
## 2026-04-04 - 27.4.0 - feat(rustproxy)
add HTTP/3 proxy service wiring for QUIC listeners
- registers H3ProxyService with the UDP listener manager so QUIC connections can serve HTTP/3
- keeps proxy IP configuration intact while enabling HTTP/3 handling during listener setup
## 2026-04-04 - 27.3.1 - fix(metrics)
correct frontend and backend protocol connection tracking across h1, h2, h3, and websocket traffic
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartproxy",
"version": "27.3.1",
"version": "27.8.0",
"private": false,
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
"main": "dist_ts/index.js",
+484
View File
@@ -0,0 +1,484 @@
# SmartProxy Metrics System
## Architecture
Two-tier design separating the data plane from the observation plane:
**Hot path (per-chunk, lock-free):** All recording in the proxy data plane touches only `AtomicU64` counters. No `Mutex` is ever acquired on the forwarding path. `CountingBody` batches flushes every 64KB to reduce DashMap shard contention.
**Cold path (1Hz sampling):** A background tokio task drains pending atomics into `ThroughputTracker` circular buffers (Mutex-guarded), producing per-second throughput history. Same task prunes orphaned entries and cleans up rate limiter state.
**Read path (on-demand):** `snapshot()` reads all atomics and locks ThroughputTrackers to build a serializable `Metrics` struct. TypeScript polls this at 1s intervals via IPC.
```
Data Plane (lock-free) Background (1Hz) Read Path
───────────────────── ────────────────── ─────────
record_bytes() ──> AtomicU64 ──┐
record_http_request() ──> AtomicU64 ──┤
connection_opened/closed() ──> AtomicU64 ──┤ sample_all() snapshot()
backend_*() ──> DashMap<AtomicU64> ──┤────> drain atomics ──────> Metrics struct
protocol_*() ──> AtomicU64 ──┤ feed ThroughputTrackers ──> JSON
datagram_*() ──> AtomicU64 ──┘ prune orphans ──> IPC stdout
──> TS cache
──> IMetrics API
```
### Key Types
| Type | Crate | Purpose |
|---|---|---|
| `MetricsCollector` | `rustproxy-metrics` | Central store. All DashMaps, atomics, and throughput trackers |
| `ThroughputTracker` | `rustproxy-metrics` | Circular buffer of 1Hz samples. Default 3600 capacity (1 hour) |
| `ForwardMetricsCtx` | `rustproxy-passthrough` | Carries `Arc<MetricsCollector>` + route_id + source_ip through TCP forwarding |
| `CountingBody` | `rustproxy-http` | Wraps HTTP bodies, batches byte recording per 64KB, flushes on drop |
| `ProtocolGuard` | `rustproxy-http` | RAII guard for frontend/backend protocol active/total counters |
| `ConnectionGuard` | `rustproxy-passthrough` | RAII guard calling `connection_closed()` on drop |
| `RustMetricsAdapter` | TypeScript | Polls Rust via IPC, implements `IMetrics` interface over cached JSON |
---
## What's Collected
### Global Counters
| Metric | Type | Updated by |
|---|---|---|
| Active connections | AtomicU64 | `connection_opened/closed` |
| Total connections (lifetime) | AtomicU64 | `connection_opened` |
| Total bytes in | AtomicU64 | `record_bytes` |
| Total bytes out | AtomicU64 | `record_bytes` |
| Total HTTP requests | AtomicU64 | `record_http_request` |
| Active UDP sessions | AtomicU64 | `udp_session_opened/closed` |
| Total UDP sessions | AtomicU64 | `udp_session_opened` |
| Total datagrams in | AtomicU64 | `record_datagram_in` |
| Total datagrams out | AtomicU64 | `record_datagram_out` |
### Per-Route Metrics (keyed by route ID string)
| Metric | Storage |
|---|---|
| Active connections | `DashMap<String, AtomicU64>` |
| Total connections | `DashMap<String, AtomicU64>` |
| Bytes in / out | `DashMap<String, AtomicU64>` |
| Pending throughput (in, out) | `DashMap<String, (AtomicU64, AtomicU64)>` |
| Throughput history | `DashMap<String, Mutex<ThroughputTracker>>` |
Entries are pruned via `retain_routes()` when routes are removed.
### Per-IP Metrics (keyed by IP string)
| Metric | Storage |
|---|---|
| Active connections | `DashMap<String, AtomicU64>` |
| Total connections | `DashMap<String, AtomicU64>` |
| Bytes in / out | `DashMap<String, AtomicU64>` |
| Pending throughput (in, out) | `DashMap<String, (AtomicU64, AtomicU64)>` |
| Throughput history | `DashMap<String, Mutex<ThroughputTracker>>` |
| Domain requests | `DashMap<String, DashMap<String, AtomicU64>>` (IP → domain → count) |
All seven maps for an IP are evicted when its active connection count drops to 0. Safety-net pruning in `sample_all()` catches entries orphaned by races. Snapshots cap at 100 IPs, sorted by active connections descending.
**Domain request tracking:** Records which domains each frontend IP has requested. Populated from HTTP Host headers (for HTTP/1.1, HTTP/2, HTTP/3 requests) and from SNI (for TLS passthrough connections). Domain keys are normalized to lowercase with any trailing dot stripped so the same hostname does not fragment across multiple counters. Capped at 256 domains per IP (`MAX_DOMAINS_PER_IP`) to prevent subdomain-spray abuse. Inner DashMap uses 2 shards to minimise base memory per IP (~200 bytes). Common case (IP + domain both known) is two DashMap reads + one atomic increment with zero allocation.
### Per-Backend Metrics (keyed by "host:port")
| Metric | Storage |
|---|---|
| Active connections | `DashMap<String, AtomicU64>` |
| Total connections | `DashMap<String, AtomicU64>` |
| Detected protocol (h1/h2/h3) | `DashMap<String, String>` |
| Connect errors | `DashMap<String, AtomicU64>` |
| Handshake errors | `DashMap<String, AtomicU64>` |
| Request errors | `DashMap<String, AtomicU64>` |
| Total connect time (microseconds) | `DashMap<String, AtomicU64>` |
| Connect count | `DashMap<String, AtomicU64>` |
| Pool hits | `DashMap<String, AtomicU64>` |
| Pool misses | `DashMap<String, AtomicU64>` |
| H2 failures (fallback to H1) | `DashMap<String, AtomicU64>` |
All per-backend maps are evicted when active count reaches 0. Pruned via `retain_backends()`.
### Frontend Protocol Distribution
Tracked via `ProtocolGuard` RAII guards and `FrontendProtocolTracker`. Five protocol categories, each with active + total counters (AtomicU64):
| Protocol | Where detected |
|---|---|
| h1 | `FrontendProtocolTracker` on first HTTP/1.x request |
| h2 | `FrontendProtocolTracker` on first HTTP/2 request |
| h3 | `ProtocolGuard::frontend("h3")` in H3ProxyService |
| ws | `ProtocolGuard::frontend("ws")` on WebSocket upgrade |
| other | Fallback (TCP passthrough without HTTP) |
Uses `fetch_update` for saturating decrements to prevent underflow races. The same saturating-close pattern is also used for connection and UDP active gauges.
### Backend Protocol Distribution
Same five categories (h1/h2/h3/ws/other), tracked via `ProtocolGuard::backend()` at connection establishment. Backend h2 failures (fallback to h1) are separately counted.
### Throughput History
`ThroughputTracker` is a circular buffer storing `ThroughputSample { timestamp_ms, bytes_in, bytes_out }` at 1Hz.
- Global tracker: 1 instance, default 3600 capacity
- Per-route trackers: 1 per active route
- Per-IP trackers: 1 per connected IP (evicted with the IP)
- HTTP request tracker: reuses ThroughputTracker with bytes_in = request count, bytes_out = 0
Query methods:
- `instant()` — last 1 second average
- `recent()` — last 10 seconds average
- `throughput(N)` — last N seconds average
- `history(N)` — last N raw samples in chronological order
Snapshots return 60 samples of global throughput history.
### Protocol Detection Cache
Not part of MetricsCollector. Maintained by `HttpProxyService`'s protocol detection system. Injected into the metrics snapshot at read time by `get_metrics()`.
Each entry records: host, port, domain, detected protocol (h1/h2/h3), H3 port, age, last accessed, last probed, suppression flags, cooldown timers, consecutive failure counts.
---
## Instrumentation Points
### TCP Passthrough (`rustproxy-passthrough`)
**Connection lifecycle**`tcp_listener.rs`:
- Accept: `conn_tracker.connection_opened(&ip)` (rate limiter) + `ConnectionTrackerGuard` RAII
- Route match: `metrics.connection_opened(route_id, source_ip)` + `ConnectionGuard` RAII
- Close: Both guards call their respective `_closed()` methods on drop
**Byte recording**`forwarder.rs` (`forward_bidirectional_with_timeouts`):
- Initial peeked data recorded immediately
- Per-chunk in both directions: `record_bytes(n, 0, ...)` / `record_bytes(0, n, ...)`
- Same pattern in `forward_bidirectional_split_with_timeouts` (tcp_listener.rs) for TLS-terminated paths
### HTTP Proxy (`rustproxy-http`)
**Request counting**`proxy_service.rs`:
- `record_http_request()` called once per request after route matching succeeds
**Body byte counting**`counting_body.rs` wrapping:
- Request bodies: `CountingBody::new(body, ..., Direction::In)` — counts client-to-upstream bytes
- Response bodies: `CountingBody::new(body, ..., Direction::Out)` — counts upstream-to-client bytes
- Batched flush every 64KB (`BYTE_FLUSH_THRESHOLD = 65_536`), remainder flushed on drop
- Also updates `connection_activity` atomic (idle watchdog) and `active_requests` counter (streaming detection)
**Backend metrics**`proxy_service.rs`:
- `backend_connection_opened(key, connect_time)` — after TCP/TLS connect succeeds
- `backend_connection_closed(key)` — on teardown
- `backend_connect_error(key)` — TCP/TLS connect failure or timeout
- `backend_handshake_error(key)` — H1/H2 protocol handshake failure
- `backend_request_error(key)` — send_request failure
- `backend_h2_failure(key)` — H2 attempted, fell back to H1
- `backend_pool_hit(key)` / `backend_pool_miss(key)` — connection pool reuse
- `set_backend_protocol(key, proto)` — records detected protocol
**WebSocket**`proxy_service.rs`:
- Does NOT use CountingBody; records bytes directly per-chunk in both directions of the bidirectional copy loop
### QUIC (`rustproxy-passthrough`)
**Connection level**`quic_handler.rs`:
- `connection_opened` / `connection_closed` via `QuicConnGuard` RAII
- `conn_tracker.connection_opened/closed` for rate limiting
**Stream level**:
- For QUIC-to-TCP stream forwarding: `record_bytes(bytes_in, bytes_out, ...)` called once per stream at completion (post-hoc, not per-chunk)
- For HTTP/3: delegates to `HttpProxyService.handle_request()`, so all HTTP proxy metrics apply
**H3 specifics**`h3_service.rs`:
- `ProtocolGuard::frontend("h3")` tracks the H3 connection
- H3 request bodies: `record_bytes(data.len(), 0, ...)` called directly (not CountingBody) since H3 uses `stream.send_data()`
- H3 response bodies: wrapped in CountingBody like HTTP/1 and HTTP/2
### UDP (`rustproxy-passthrough`)
**Session lifecycle**`udp_listener.rs` / `udp_session.rs`:
- `udp_session_opened()` + `connection_opened(route_id, source_ip)` on new session
- `udp_session_closed()` + `connection_closed(route_id, source_ip)` on idle reap or port drain
**Datagram counting**`udp_listener.rs`:
- Inbound: `record_bytes(len, 0, ...)` + `record_datagram_in()`
- Outbound (backend reply): `record_bytes(0, len, ...)` + `record_datagram_out()`
---
## Sampling Loop
`lib.rs` spawns a tokio task at configurable interval (default 1000ms):
```rust
loop {
tokio::select! {
_ = cancel => break,
_ = interval.tick() => {
metrics.sample_all();
conn_tracker.cleanup_stale_timestamps();
http_proxy.cleanup_all_rate_limiters();
}
}
}
```
`sample_all()` performs in one pass:
1. Drains `global_pending_tp_in/out` into global ThroughputTracker, samples
2. Drains per-route pending counters into per-route trackers, samples each
3. Samples idle route trackers (no new data) to advance their window
4. Drains per-IP pending counters into per-IP trackers, samples each
5. Drains `pending_http_requests` into HTTP request throughput tracker
6. Prunes orphaned per-IP entries (bytes/throughput maps with no matching ip_connections key)
7. Prunes orphaned per-backend entries (error/stats maps with no matching active/total key)
---
## Data Flow: Rust to TypeScript
```
MetricsCollector.snapshot()
├── reads all AtomicU64 counters
├── iterates DashMaps (routes, IPs, backends)
├── locks ThroughputTrackers for instant/recent rates + history
└── produces Metrics struct
RustProxy::get_metrics()
├── calls snapshot()
├── enriches with detectedProtocols from HTTP proxy protocol cache
└── returns Metrics
management.rs "getMetrics" IPC command
├── calls get_metrics()
├── serde_json::to_value (camelCase)
└── writes JSON to stdout
RustProxyBridge (TypeScript)
├── reads JSON from Rust process stdout
└── returns parsed object
RustMetricsAdapter
├── setInterval polls bridge.getMetrics() every 1s
├── stores raw JSON in this.cache
└── IMetrics methods read synchronously from cache
SmartProxy.getMetrics()
└── returns the RustMetricsAdapter instance
```
### IPC JSON Shape (Metrics)
```json
{
"activeConnections": 42,
"totalConnections": 1000,
"bytesIn": 123456789,
"bytesOut": 987654321,
"throughputInBytesPerSec": 50000,
"throughputOutBytesPerSec": 80000,
"throughputRecentInBytesPerSec": 45000,
"throughputRecentOutBytesPerSec": 75000,
"routes": {
"<route-id>": {
"activeConnections": 5,
"totalConnections": 100,
"bytesIn": 0, "bytesOut": 0,
"throughputInBytesPerSec": 0, "throughputOutBytesPerSec": 0,
"throughputRecentInBytesPerSec": 0, "throughputRecentOutBytesPerSec": 0
}
},
"ips": {
"<ip>": {
"activeConnections": 2, "totalConnections": 10,
"bytesIn": 0, "bytesOut": 0,
"throughputInBytesPerSec": 0, "throughputOutBytesPerSec": 0,
"domainRequests": {
"example.com": 4821,
"api.example.com": 312
}
}
},
"backends": {
"<host:port>": {
"activeConnections": 3, "totalConnections": 50,
"protocol": "h2",
"connectErrors": 0, "handshakeErrors": 0, "requestErrors": 0,
"totalConnectTimeUs": 150000, "connectCount": 50,
"poolHits": 40, "poolMisses": 10, "h2Failures": 1
}
},
"throughputHistory": [
{ "timestampMs": 1713000000000, "bytesIn": 50000, "bytesOut": 80000 }
],
"totalHttpRequests": 5000,
"httpRequestsPerSec": 100,
"httpRequestsPerSecRecent": 95,
"activeUdpSessions": 0, "totalUdpSessions": 5,
"totalDatagramsIn": 1000, "totalDatagramsOut": 1000,
"frontendProtocols": {
"h1Active": 10, "h1Total": 500,
"h2Active": 5, "h2Total": 200,
"h3Active": 1, "h3Total": 50,
"wsActive": 2, "wsTotal": 30,
"otherActive": 0, "otherTotal": 0
},
"backendProtocols": { "...same shape..." },
"detectedProtocols": [
{
"host": "backend", "port": 443, "domain": "example.com",
"protocol": "h2", "h3Port": 443,
"ageSecs": 120, "lastAccessedSecs": 5, "lastProbedSecs": 120,
"h2Suppressed": false, "h3Suppressed": false,
"h2CooldownRemainingSecs": null, "h3CooldownRemainingSecs": null,
"h2ConsecutiveFailures": null, "h3ConsecutiveFailures": null
}
]
}
```
### IPC JSON Shape (Statistics)
Lightweight administrative summary, fetched on-demand (not polled):
```json
{
"activeConnections": 42,
"totalConnections": 1000,
"routesCount": 5,
"listeningPorts": [80, 443, 8443],
"uptimeSeconds": 86400
}
```
---
## TypeScript Consumer API
`SmartProxy.getMetrics()` returns an `IMetrics` object. All members are synchronous methods reading from the polled cache.
### connections
| Method | Return | Source |
|---|---|---|
| `active()` | `number` | `cache.activeConnections` |
| `total()` | `number` | `cache.totalConnections` |
| `byRoute()` | `Map<string, number>` | `cache.routes[name].activeConnections` |
| `byIP()` | `Map<string, number>` | `cache.ips[ip].activeConnections` |
| `topIPs(limit?)` | `Array<{ip, count}>` | `cache.ips` sorted by active desc, default 10 |
| `domainRequestsByIP()` | `Map<string, Map<string, number>>` | `cache.ips[ip].domainRequests` |
| `topDomainRequests(limit?)` | `Array<{ip, domain, count}>` | Flattened from all IPs, sorted by count desc, default 20 |
| `frontendProtocols()` | `IProtocolDistribution` | `cache.frontendProtocols.*` |
| `backendProtocols()` | `IProtocolDistribution` | `cache.backendProtocols.*` |
### throughput
| Method | Return | Source |
|---|---|---|
| `instant()` | `{in, out}` | `cache.throughputInBytesPerSec/Out` |
| `recent()` | `{in, out}` | `cache.throughputRecentInBytesPerSec/Out` |
| `average()` | `{in, out}` | Falls back to `instant()` (not wired to windowed average) |
| `custom(seconds)` | `{in, out}` | Falls back to `instant()` (not wired) |
| `history(seconds)` | `IThroughputHistoryPoint[]` | `cache.throughputHistory` sliced to last N entries |
| `byRoute(windowSeconds?)` | `Map<string, {in, out}>` | `cache.routes[name].throughputIn/OutBytesPerSec` |
| `byIP(windowSeconds?)` | `Map<string, {in, out}>` | `cache.ips[ip].throughputIn/OutBytesPerSec` |
### requests
| Method | Return | Source |
|---|---|---|
| `perSecond()` | `number` | `cache.httpRequestsPerSec` |
| `perMinute()` | `number` | `cache.httpRequestsPerSecRecent * 60` |
| `total()` | `number` | `cache.totalHttpRequests` (fallback: totalConnections) |
### totals
| Method | Return | Source |
|---|---|---|
| `bytesIn()` | `number` | `cache.bytesIn` |
| `bytesOut()` | `number` | `cache.bytesOut` |
| `connections()` | `number` | `cache.totalConnections` |
### backends
| Method | Return | Source |
|---|---|---|
| `byBackend()` | `Map<string, IBackendMetrics>` | `cache.backends[key].*` with computed `avgConnectTimeMs` and `poolHitRate` |
| `protocols()` | `Map<string, string>` | `cache.backends[key].protocol` |
| `topByErrors(limit?)` | `IBackendMetrics[]` | Sorted by total errors desc |
| `detectedProtocols()` | `IProtocolCacheEntry[]` | `cache.detectedProtocols` passthrough |
`IBackendMetrics`: `{ protocol, activeConnections, totalConnections, connectErrors, handshakeErrors, requestErrors, avgConnectTimeMs, poolHitRate, h2Failures }`
### udp
| Method | Return | Source |
|---|---|---|
| `activeSessions()` | `number` | `cache.activeUdpSessions` |
| `totalSessions()` | `number` | `cache.totalUdpSessions` |
| `datagramsIn()` | `number` | `cache.totalDatagramsIn` |
| `datagramsOut()` | `number` | `cache.totalDatagramsOut` |
### percentiles (stub)
`connectionDuration()` and `bytesTransferred()` always return zeros. Not implemented.
---
## Configuration
```typescript
interface IMetricsConfig {
enabled: boolean; // default true
sampleIntervalMs: number; // default 1000 (1Hz sampling + TS polling)
retentionSeconds: number; // default 3600 (ThroughputTracker capacity)
enableDetailedTracking: boolean;
enablePercentiles: boolean;
cacheResultsMs: number;
prometheusEnabled: boolean; // not wired
prometheusPath: string; // not wired
prometheusPrefix: string; // not wired
}
```
Rust-side config (`MetricsConfig` in `rustproxy-config`):
```rust
pub struct MetricsConfig {
pub enabled: Option<bool>,
pub sample_interval_ms: Option<u64>, // default 1000
pub retention_seconds: Option<u64>, // default 3600
}
```
---
## Design Decisions
**Lock-free hot path.** `record_bytes()` is the most frequently called method (per-chunk in TCP, per-64KB in HTTP). It only touches `AtomicU64` with `Relaxed` ordering and short-circuits zero-byte directions to skip DashMap lookups entirely.
**CountingBody batching.** HTTP body frames are typically 16KB. Flushing to MetricsCollector every 64KB reduces DashMap shard contention by ~4x compared to per-frame recording.
**RAII guards everywhere.** `ConnectionGuard`, `ConnectionTrackerGuard`, `QuicConnGuard`, `ProtocolGuard`, `FrontendProtocolTracker` all use Drop to guarantee counter cleanup on all exit paths including panics.
**Saturating decrements.** Protocol counters use `fetch_update` loops instead of `fetch_sub` to prevent underflow to `u64::MAX` from concurrent close races.
**Bounded memory.** Per-IP entries evicted on last connection close. Per-backend entries evicted on last connection close. Snapshot caps IPs and backends at 100 each. `sample_all()` prunes orphaned entries every second.
**Two-phase throughput.** Pending bytes accumulate in lock-free atomics. The 1Hz cold path drains them into Mutex-guarded ThroughputTrackers. This means the hot path never contends on a Mutex, while the cold path does minimal work (one drain + one sample per tracker).
---
## Known Gaps
| Gap | Status |
|---|---|
| `throughput.average()` / `throughput.custom(seconds)` | Fall back to `instant()`. Not wired to Rust windowed queries. |
| `percentiles.connectionDuration()` / `percentiles.bytesTransferred()` | Stub returning zeros. No histogram in Rust. |
| Prometheus export | Config fields exist but not wired to any exporter. |
| `LogDeduplicator` | Implemented in `rustproxy-metrics` but not connected to any call site. |
| Rate limit hit counters | Rate-limited requests return 429 but no counter is recorded in MetricsCollector. |
| QUIC stream byte counting | Post-hoc (per-stream totals after close), not per-chunk like TCP. |
| Throughput history in snapshot | Capped at 60 samples. TS `history(seconds)` cannot return more than 60 points regardless of `retentionSeconds`. |
| Per-route total connections / bytes | Available in Rust JSON but `IMetrics.connections.byRoute()` only exposes active connections. |
| Per-IP total connections / bytes | Available in Rust JSON but `IMetrics.connections.byIP()` only exposes active connections. |
| IPC response typing | `RustProxyBridge` declares `result: any` for both metrics commands. No type-safe response. |
+246 -23
View File
@@ -129,7 +129,6 @@ pub struct RustProxyOptions {
pub defaults: Option<DefaultConfig>,
// ─── Timeout Settings ────────────────────────────────────────────
/// Timeout for establishing connection to backend (ms), default: 30000
#[serde(skip_serializing_if = "Option::is_none")]
pub connection_timeout: Option<u64>,
@@ -159,7 +158,6 @@ pub struct RustProxyOptions {
pub graceful_shutdown_timeout: Option<u64>,
// ─── Socket Optimization ─────────────────────────────────────────
/// Disable Nagle's algorithm (default: true)
#[serde(skip_serializing_if = "Option::is_none")]
pub no_delay: Option<bool>,
@@ -177,7 +175,6 @@ pub struct RustProxyOptions {
pub max_pending_data_size: Option<u64>,
// ─── Enhanced Features ───────────────────────────────────────────
/// Disable inactivity checking entirely
#[serde(skip_serializing_if = "Option::is_none")]
pub disable_inactivity_check: Option<bool>,
@@ -199,7 +196,6 @@ pub struct RustProxyOptions {
pub enable_randomized_timeouts: Option<bool>,
// ─── Rate Limiting ───────────────────────────────────────────────
/// Maximum simultaneous connections from a single IP
#[serde(skip_serializing_if = "Option::is_none")]
pub max_connections_per_ip: Option<u64>,
@@ -213,7 +209,6 @@ pub struct RustProxyOptions {
pub max_connections: Option<u64>,
// ─── Keep-Alive Settings ─────────────────────────────────────────
/// How to treat keep-alive connections
#[serde(skip_serializing_if = "Option::is_none")]
pub keep_alive_treatment: Option<KeepAliveTreatment>,
@@ -227,7 +222,6 @@ pub struct RustProxyOptions {
pub extended_keep_alive_lifetime: Option<u64>,
// ─── HttpProxy Integration ───────────────────────────────────────
/// Array of ports to forward to HttpProxy
#[serde(skip_serializing_if = "Option::is_none")]
pub use_http_proxy: Option<Vec<u16>>,
@@ -237,13 +231,11 @@ pub struct RustProxyOptions {
pub http_proxy_port: Option<u16>,
// ─── Metrics ─────────────────────────────────────────────────────
/// Metrics configuration
#[serde(skip_serializing_if = "Option::is_none")]
pub metrics: Option<MetricsConfig>,
// ─── ACME ────────────────────────────────────────────────────────
/// Global ACME configuration
#[serde(skip_serializing_if = "Option::is_none")]
pub acme: Option<AcmeOptions>,
@@ -318,7 +310,8 @@ impl RustProxyOptions {
/// Get all unique ports that routes listen on.
pub fn all_listening_ports(&self) -> Vec<u16> {
let mut ports: Vec<u16> = self.routes
let mut ports: Vec<u16> = self
.routes
.iter()
.flat_map(|r| r.listening_ports())
.collect();
@@ -340,7 +333,12 @@ mod tests {
route_match: RouteMatch {
ports: PortRange::Single(listen_port),
domains: Some(DomainSpec::Single(domain.to_string())),
path: None, client_ip: None, transport: None, tls_version: None, headers: None, protocol: None,
path: None,
client_ip: None,
transport: None,
tls_version: None,
headers: None,
protocol: None,
},
action: RouteAction {
action_type: RouteActionType::Forward,
@@ -348,14 +346,30 @@ mod tests {
target_match: None,
host: HostSpec::Single(host.to_string()),
port: PortSpec::Fixed(port),
tls: None, websocket: None, load_balancing: None, send_proxy_protocol: None,
headers: None, advanced: None, backend_transport: None, priority: None,
tls: None,
websocket: None,
load_balancing: None,
send_proxy_protocol: None,
headers: None,
advanced: None,
backend_transport: None,
priority: None,
}]),
tls: None, websocket: None, load_balancing: None, advanced: None,
options: None, send_proxy_protocol: None, udp: None,
tls: None,
websocket: None,
load_balancing: None,
advanced: None,
options: None,
send_proxy_protocol: None,
udp: None,
},
headers: None, security: None, name: None, description: None,
priority: None, tags: None, enabled: None,
headers: None,
security: None,
name: None,
description: None,
priority: None,
tags: None,
enabled: None,
}
}
@@ -363,8 +377,12 @@ mod tests {
let mut route = make_route(domain, host, port, 443);
route.action.tls = Some(RouteTls {
mode: TlsMode::Passthrough,
certificate: None, acme: None, versions: None, ciphers: None,
honor_cipher_order: None, session_timeout: None,
certificate: None,
acme: None,
versions: None,
ciphers: None,
honor_cipher_order: None,
session_timeout: None,
});
route
}
@@ -410,6 +428,209 @@ mod tests {
assert_eq!(parsed.connection_timeout, Some(5000));
}
#[test]
fn test_deserialize_ts_contract_route_shapes() {
let value = serde_json::json!({
"routes": [{
"name": "contract-route",
"match": {
"ports": [443, { "from": 8443, "to": 8444 }],
"domains": ["api.example.com", "*.example.com"],
"transport": "udp",
"protocol": "http3",
"headers": {
"content-type": "/^application\\/json$/i"
}
},
"action": {
"type": "forward",
"targets": [{
"match": {
"ports": [443],
"path": "/api/*",
"method": ["GET"],
"headers": {
"x-env": "/^(prod|stage)$/"
}
},
"host": ["backend-a", "backend-b"],
"port": "preserve",
"sendProxyProtocol": true,
"backendTransport": "tcp"
}],
"tls": {
"mode": "terminate",
"certificate": "auto"
},
"sendProxyProtocol": true,
"udp": {
"maxSessionsPerIp": 321,
"quic": {
"enableHttp3": true
}
}
},
"security": {
"ipAllowList": [{
"ip": "10.0.0.0/8",
"domains": ["api.example.com"]
}]
}
}],
"preserveSourceIp": true,
"proxyIps": ["10.0.0.1"],
"acceptProxyProtocol": true,
"sendProxyProtocol": true,
"noDelay": true,
"keepAlive": true,
"keepAliveInitialDelay": 1500,
"maxPendingDataSize": 4096,
"disableInactivityCheck": true,
"enableKeepAliveProbes": true,
"enableDetailedLogging": true,
"enableTlsDebugLogging": true,
"enableRandomizedTimeouts": true,
"connectionTimeout": 5000,
"initialDataTimeout": 7000,
"socketTimeout": 9000,
"inactivityCheckInterval": 1100,
"maxConnectionLifetime": 13000,
"inactivityTimeout": 15000,
"gracefulShutdownTimeout": 17000,
"maxConnectionsPerIp": 20,
"connectionRateLimitPerMinute": 30,
"keepAliveTreatment": "extended",
"keepAliveInactivityMultiplier": 2.0,
"extendedKeepAliveLifetime": 19000,
"metrics": {
"enabled": true,
"sampleIntervalMs": 250,
"retentionSeconds": 60
},
"acme": {
"enabled": true,
"email": "ops@example.com",
"environment": "staging",
"useProduction": false,
"skipConfiguredCerts": true,
"renewThresholdDays": 14,
"renewCheckIntervalHours": 12,
"autoRenew": true,
"port": 80
}
});
let options: RustProxyOptions = serde_json::from_value(value).unwrap();
assert_eq!(options.routes.len(), 1);
assert_eq!(options.preserve_source_ip, Some(true));
assert_eq!(options.proxy_ips, Some(vec!["10.0.0.1".to_string()]));
assert_eq!(options.accept_proxy_protocol, Some(true));
assert_eq!(options.send_proxy_protocol, Some(true));
assert_eq!(options.no_delay, Some(true));
assert_eq!(options.keep_alive, Some(true));
assert_eq!(options.keep_alive_initial_delay, Some(1500));
assert_eq!(options.max_pending_data_size, Some(4096));
assert_eq!(options.disable_inactivity_check, Some(true));
assert_eq!(options.enable_keep_alive_probes, Some(true));
assert_eq!(options.enable_detailed_logging, Some(true));
assert_eq!(options.enable_tls_debug_logging, Some(true));
assert_eq!(options.enable_randomized_timeouts, Some(true));
assert_eq!(options.connection_timeout, Some(5000));
assert_eq!(options.initial_data_timeout, Some(7000));
assert_eq!(options.socket_timeout, Some(9000));
assert_eq!(options.inactivity_check_interval, Some(1100));
assert_eq!(options.max_connection_lifetime, Some(13000));
assert_eq!(options.inactivity_timeout, Some(15000));
assert_eq!(options.graceful_shutdown_timeout, Some(17000));
assert_eq!(options.max_connections_per_ip, Some(20));
assert_eq!(options.connection_rate_limit_per_minute, Some(30));
assert_eq!(
options.keep_alive_treatment,
Some(KeepAliveTreatment::Extended)
);
assert_eq!(options.keep_alive_inactivity_multiplier, Some(2.0));
assert_eq!(options.extended_keep_alive_lifetime, Some(19000));
let route = &options.routes[0];
assert_eq!(route.route_match.transport, Some(TransportProtocol::Udp));
assert_eq!(route.route_match.protocol.as_deref(), Some("http3"));
assert_eq!(
route
.route_match
.headers
.as_ref()
.unwrap()
.get("content-type")
.unwrap(),
"/^application\\/json$/i"
);
let target = &route.action.targets.as_ref().unwrap()[0];
assert!(matches!(target.host, HostSpec::List(_)));
assert!(matches!(target.port, PortSpec::Special(ref p) if p == "preserve"));
assert_eq!(target.backend_transport, Some(TransportProtocol::Tcp));
assert_eq!(target.send_proxy_protocol, Some(true));
assert_eq!(
target
.target_match
.as_ref()
.unwrap()
.headers
.as_ref()
.unwrap()
.get("x-env")
.unwrap(),
"/^(prod|stage)$/"
);
assert_eq!(route.action.send_proxy_protocol, Some(true));
assert_eq!(
route.action.udp.as_ref().unwrap().max_sessions_per_ip,
Some(321)
);
assert_eq!(
route
.action
.udp
.as_ref()
.unwrap()
.quic
.as_ref()
.unwrap()
.enable_http3,
Some(true)
);
let allow_list = route
.security
.as_ref()
.unwrap()
.ip_allow_list
.as_ref()
.unwrap();
assert!(matches!(
&allow_list[0],
crate::security_types::IpAllowEntry::DomainScoped { ip, domains }
if ip == "10.0.0.0/8" && domains == &vec!["api.example.com".to_string()]
));
let metrics = options.metrics.as_ref().unwrap();
assert_eq!(metrics.enabled, Some(true));
assert_eq!(metrics.sample_interval_ms, Some(250));
assert_eq!(metrics.retention_seconds, Some(60));
let acme = options.acme.as_ref().unwrap();
assert_eq!(acme.enabled, Some(true));
assert_eq!(acme.email.as_deref(), Some("ops@example.com"));
assert_eq!(acme.environment, Some(AcmeEnvironment::Staging));
assert_eq!(acme.use_production, Some(false));
assert_eq!(acme.skip_configured_certs, Some(true));
assert_eq!(acme.renew_threshold_days, Some(14));
assert_eq!(acme.renew_check_interval_hours, Some(12));
assert_eq!(acme.auto_renew, Some(true));
assert_eq!(acme.port, Some(80));
}
#[test]
fn test_default_timeouts() {
let options = RustProxyOptions::default();
@@ -438,9 +659,9 @@ mod tests {
fn test_all_listening_ports() {
let options = RustProxyOptions {
routes: vec![
make_route("a.com", "backend", 8080, 80), // port 80
make_route("a.com", "backend", 8080, 80), // port 80
make_passthrough_route("b.com", "backend", 443), // port 443
make_route("c.com", "backend", 9090, 80), // port 80 (duplicate)
make_route("c.com", "backend", 9090, 80), // port 80 (duplicate)
],
..Default::default()
};
@@ -464,9 +685,11 @@ mod tests {
#[test]
fn test_deserialize_example_json() {
let content = std::fs::read_to_string(
concat!(env!("CARGO_MANIFEST_DIR"), "/../../config/example.json")
).unwrap();
let content = std::fs::read_to_string(concat!(
env!("CARGO_MANIFEST_DIR"),
"/../../config/example.json"
))
.unwrap();
let options: RustProxyOptions = serde_json::from_str(&content).unwrap();
assert_eq!(options.routes.len(), 4);
let ports = options.all_listening_ports();
@@ -1,8 +1,8 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use crate::tls_types::RouteTls;
use crate::security_types::RouteSecurity;
use crate::tls_types::RouteTls;
// ─── Port Range ──────────────────────────────────────────────────────
@@ -32,12 +32,13 @@ impl PortRange {
pub fn to_ports(&self) -> Vec<u16> {
match self {
PortRange::Single(p) => vec![*p],
PortRange::List(items) => {
items.iter().flat_map(|item| match item {
PortRange::List(items) => items
.iter()
.flat_map(|item| match item {
PortRangeItem::Port(p) => vec![*p],
PortRangeItem::Range(r) => (r.from..=r.to).collect(),
}).collect()
}
})
.collect(),
}
}
}
@@ -105,7 +106,8 @@ impl From<Vec<&str>> for DomainSpec {
}
/// Header match value: either exact string or regex pattern.
/// In JSON, all values come as strings. Regex patterns are prefixed with `/` and suffixed with `/`.
/// In JSON, all values come as strings. Regex patterns use JS-style literal syntax,
/// e.g. `/^application\/json$/` or `/^application\/json$/i`.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum HeaderMatchValue {
@@ -654,6 +656,11 @@ impl RouteConfig {
self.route_match.ports.to_ports()
}
/// Stable key used for frontend route-scoped metrics.
pub fn metrics_key(&self) -> Option<&str> {
self.name.as_deref().or(self.id.as_deref())
}
/// Get the TLS mode for this route (from action-level or first target).
pub fn tls_mode(&self) -> Option<&crate::tls_types::TlsMode> {
// Check action-level TLS first
@@ -671,3 +678,63 @@ impl RouteConfig {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_route(name: Option<&str>, id: Option<&str>) -> RouteConfig {
RouteConfig {
id: id.map(str::to_string),
route_match: RouteMatch {
ports: PortRange::Single(443),
transport: None,
domains: None,
path: None,
client_ip: None,
tls_version: None,
headers: None,
protocol: None,
},
action: RouteAction {
action_type: RouteActionType::Forward,
targets: None,
tls: None,
websocket: None,
load_balancing: None,
advanced: None,
options: None,
send_proxy_protocol: None,
udp: None,
},
headers: None,
security: None,
name: name.map(str::to_string),
description: None,
priority: None,
tags: None,
enabled: None,
}
}
#[test]
fn metrics_key_prefers_name() {
let route = test_route(Some("named-route"), Some("route-id"));
assert_eq!(route.metrics_key(), Some("named-route"));
}
#[test]
fn metrics_key_falls_back_to_id() {
let route = test_route(None, Some("route-id"));
assert_eq!(route.metrics_key(), Some("route-id"));
}
#[test]
fn metrics_key_is_absent_without_name_or_id() {
let route = test_route(None, None);
assert_eq!(route.metrics_key(), None);
}
}
@@ -103,14 +103,30 @@ pub struct JwtAuthConfig {
pub exclude_paths: Option<Vec<String>>,
}
/// An entry in the IP allow list: either a plain IP/CIDR string
/// or a domain-scoped entry that restricts the IP to specific domains.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum IpAllowEntry {
/// Plain IP/CIDR — allowed for all domains on this route
Plain(String),
/// Domain-scoped — allowed only when the requested domain matches
DomainScoped {
ip: String,
domains: Vec<String>,
},
}
/// Security options for routes.
/// Matches TypeScript: `IRouteSecurity`
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RouteSecurity {
/// IP addresses that are allowed to connect
/// IP addresses that are allowed to connect.
/// Entries can be plain strings (full route access) or objects with
/// `{ ip, domains }` to scope access to specific domains.
#[serde(skip_serializing_if = "Option::is_none")]
pub ip_allow_list: Option<Vec<String>>,
pub ip_allow_list: Option<Vec<IpAllowEntry>>,
/// IP addresses that are blocked from connecting
#[serde(skip_serializing_if = "Option::is_none")]
pub ip_block_list: Option<Vec<String>>,
@@ -3,8 +3,8 @@
//! Reuses idle keep-alive connections to avoid per-request TCP+TLS handshakes.
//! HTTP/2 and HTTP/3 connections are multiplexed (clone the sender / share the connection).
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use bytes::Bytes;
@@ -105,13 +105,19 @@ impl ConnectionPool {
/// Try to check out an idle HTTP/1.1 sender for the given key.
/// Returns `None` if no usable idle connection exists.
pub fn checkout_h1(&self, key: &PoolKey) -> Option<http1::SendRequest<BoxBody<Bytes, hyper::Error>>> {
pub fn checkout_h1(
&self,
key: &PoolKey,
) -> Option<http1::SendRequest<BoxBody<Bytes, hyper::Error>>> {
let mut entry = self.h1_pool.get_mut(key)?;
let idles = entry.value_mut();
while let Some(idle) = idles.pop() {
// Check if the connection is still alive and ready
if idle.idle_since.elapsed() < IDLE_TIMEOUT && idle.sender.is_ready() && !idle.sender.is_closed() {
if idle.idle_since.elapsed() < IDLE_TIMEOUT
&& idle.sender.is_ready()
&& !idle.sender.is_closed()
{
// H1 pool hit — no logging on hot path
return Some(idle.sender);
}
@@ -128,7 +134,11 @@ impl ConnectionPool {
/// Return an HTTP/1.1 sender to the pool after the response body has been prepared.
/// The caller should NOT call this if the sender is closed or not ready.
pub fn checkin_h1(&self, key: PoolKey, sender: http1::SendRequest<BoxBody<Bytes, hyper::Error>>) {
pub fn checkin_h1(
&self,
key: PoolKey,
sender: http1::SendRequest<BoxBody<Bytes, hyper::Error>>,
) {
if sender.is_closed() || !sender.is_ready() {
return; // Don't pool broken connections
}
@@ -145,7 +155,10 @@ impl ConnectionPool {
/// Try to get a cloned HTTP/2 sender for the given key.
/// HTTP/2 senders are Clone-able (multiplexed), so we clone rather than remove.
pub fn checkout_h2(&self, key: &PoolKey) -> Option<(http2::SendRequest<BoxBody<Bytes, hyper::Error>>, Duration)> {
pub fn checkout_h2(
&self,
key: &PoolKey,
) -> Option<(http2::SendRequest<BoxBody<Bytes, hyper::Error>>, Duration)> {
let entry = self.h2_pool.get(key)?;
let pooled = entry.value();
let age = pooled.created_at.elapsed();
@@ -184,16 +197,23 @@ impl ConnectionPool {
/// Register an HTTP/2 sender in the pool. Returns the generation ID for this entry.
/// The caller should pass this generation to the connection driver so it can use
/// `remove_h2_if_generation` instead of `remove_h2` to avoid phantom eviction.
pub fn register_h2(&self, key: PoolKey, sender: http2::SendRequest<BoxBody<Bytes, hyper::Error>>) -> u64 {
pub fn register_h2(
&self,
key: PoolKey,
sender: http2::SendRequest<BoxBody<Bytes, hyper::Error>>,
) -> u64 {
let gen = self.h2_generation.fetch_add(1, Ordering::Relaxed);
if sender.is_closed() {
return gen;
}
self.h2_pool.insert(key, PooledH2 {
sender,
created_at: Instant::now(),
generation: gen,
});
self.h2_pool.insert(
key,
PooledH2 {
sender,
created_at: Instant::now(),
generation: gen,
},
);
gen
}
@@ -204,7 +224,11 @@ impl ConnectionPool {
pub fn checkout_h3(
&self,
key: &PoolKey,
) -> Option<(h3::client::SendRequest<h3_quinn::OpenStreams, Bytes>, quinn::Connection, Duration)> {
) -> Option<(
h3::client::SendRequest<h3_quinn::OpenStreams, Bytes>,
quinn::Connection,
Duration,
)> {
let entry = self.h3_pool.get(key)?;
let pooled = entry.value();
let age = pooled.created_at.elapsed();
@@ -234,12 +258,15 @@ impl ConnectionPool {
send_request: h3::client::SendRequest<h3_quinn::OpenStreams, Bytes>,
) -> u64 {
let gen = self.h2_generation.fetch_add(1, Ordering::Relaxed);
self.h3_pool.insert(key, PooledH3 {
send_request,
connection,
created_at: Instant::now(),
generation: gen,
});
self.h3_pool.insert(
key,
PooledH3 {
send_request,
connection,
created_at: Instant::now(),
generation: gen,
},
);
gen
}
@@ -280,7 +307,9 @@ impl ConnectionPool {
// Evict dead or aged-out H2 connections
let mut dead_h2 = Vec::new();
for entry in h2_pool.iter() {
if entry.value().sender.is_closed() || entry.value().created_at.elapsed() >= MAX_H2_AGE {
if entry.value().sender.is_closed()
|| entry.value().created_at.elapsed() >= MAX_H2_AGE
{
dead_h2.push(entry.key().clone());
}
}
@@ -1,8 +1,8 @@
//! A body wrapper that counts bytes flowing through and reports them to MetricsCollector.
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use bytes::Bytes;
@@ -76,7 +76,11 @@ impl<B> CountingBody<B> {
/// Set the connection-level activity tracker. When set, each data frame
/// updates this timestamp to prevent the idle watchdog from killing the
/// connection during active body streaming.
pub fn with_connection_activity(mut self, activity: Arc<AtomicU64>, start: std::time::Instant) -> Self {
pub fn with_connection_activity(
mut self,
activity: Arc<AtomicU64>,
start: std::time::Instant,
) -> Self {
self.connection_activity = Some(activity);
self.activity_start = Some(start);
self
@@ -134,7 +138,9 @@ where
}
// Keep the connection-level idle watchdog alive on every frame
// (this is just one atomic store — cheap enough per-frame)
if let (Some(activity), Some(start)) = (&this.connection_activity, &this.activity_start) {
if let (Some(activity), Some(start)) =
(&this.connection_activity, &this.activity_start)
{
activity.store(start.elapsed().as_millis() as u64, Ordering::Relaxed);
}
}
+28 -10
View File
@@ -11,8 +11,8 @@ use std::task::{Context, Poll};
use bytes::{Buf, Bytes};
use http_body::Frame;
use http_body_util::BodyExt;
use http_body_util::combinators::BoxBody;
use http_body_util::BodyExt;
use tracing::{debug, warn};
use rustproxy_config::RouteConfig;
@@ -49,7 +49,8 @@ impl H3ProxyService {
debug!("HTTP/3 connection from {} on port {}", remote_addr, port);
// Track frontend H3 connection for the QUIC connection's lifetime.
let _frontend_h3_guard = ProtocolGuard::frontend(Arc::clone(self.http_proxy.metrics()), "h3");
let _frontend_h3_guard =
ProtocolGuard::frontend(Arc::clone(self.http_proxy.metrics()), "h3");
let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
h3::server::builder()
@@ -92,8 +93,15 @@ impl H3ProxyService {
tokio::spawn(async move {
if let Err(e) = handle_h3_request(
request, stream, port, remote_addr, &http_proxy, request_cancel,
).await {
request,
stream,
port,
remote_addr,
&http_proxy,
request_cancel,
)
.await
{
debug!("HTTP/3 request error from {}: {}", remote_addr, e);
}
});
@@ -153,11 +161,14 @@ async fn handle_h3_request(
// Delegate to HttpProxyService — same backend path as TCP/HTTP:
// route matching, ALPN protocol detection, connection pool, H1/H2/H3 auto.
let conn_activity = ConnActivity::new_standalone();
let response = http_proxy.handle_request(req, peer_addr, port, cancel, conn_activity).await
let response = http_proxy
.handle_request(req, peer_addr, port, cancel, conn_activity)
.await
.map_err(|e| anyhow::anyhow!("Backend request failed: {}", e))?;
// Await the body reader to get the H3 stream back
let mut stream = body_reader.await
let mut stream = body_reader
.await
.map_err(|e| anyhow::anyhow!("Body reader task failed: {}", e))?;
// Send response headers over H3 (skip hop-by-hop headers)
@@ -170,10 +181,13 @@ async fn handle_h3_request(
}
h3_response = h3_response.header(name, value);
}
let h3_response = h3_response.body(())
let h3_response = h3_response
.body(())
.map_err(|e| anyhow::anyhow!("Failed to build H3 response: {}", e))?;
stream.send_response(h3_response).await
stream
.send_response(h3_response)
.await
.map_err(|e| anyhow::anyhow!("Failed to send H3 response: {}", e))?;
// Stream response body back over H3
@@ -182,7 +196,9 @@ async fn handle_h3_request(
match frame {
Ok(frame) => {
if let Ok(data) = frame.into_data() {
stream.send_data(data).await
stream
.send_data(data)
.await
.map_err(|e| anyhow::anyhow!("Failed to send H3 data: {}", e))?;
}
}
@@ -194,7 +210,9 @@ async fn handle_h3_request(
}
// Finish the H3 stream (send QUIC FIN)
stream.finish().await
stream
.finish()
.await
.map_err(|e| anyhow::anyhow!("Failed to finish H3 stream: {}", e))?;
Ok(())
+2 -1
View File
@@ -5,14 +5,15 @@
pub mod connection_pool;
pub mod counting_body;
pub mod h3_service;
pub mod protocol_cache;
pub mod proxy_service;
pub mod request_filter;
mod request_host;
pub mod response_filter;
pub mod shutdown_on_drop;
pub mod template;
pub mod upstream_selector;
pub mod h3_service;
pub use connection_pool::*;
pub use counting_body::*;
@@ -144,10 +144,14 @@ impl FailureState {
}
fn all_expired(&self) -> bool {
let h2_expired = self.h2.as_ref()
let h2_expired = self
.h2
.as_ref()
.map(|r| r.failed_at.elapsed() >= r.cooldown)
.unwrap_or(true);
let h3_expired = self.h3.as_ref()
let h3_expired = self
.h3
.as_ref()
.map(|r| r.failed_at.elapsed() >= r.cooldown)
.unwrap_or(true);
h2_expired && h3_expired
@@ -355,9 +359,13 @@ impl ProtocolCache {
let record = entry.get_mut(protocol);
let (consecutive, new_cooldown) = match record {
Some(existing) if existing.failed_at.elapsed() < existing.cooldown.saturating_mul(2) => {
Some(existing)
if existing.failed_at.elapsed() < existing.cooldown.saturating_mul(2) =>
{
// Still within the "recent" window — escalate
let c = existing.consecutive_failures.saturating_add(1)
let c = existing
.consecutive_failures
.saturating_add(1)
.min(PROTOCOL_FAILURE_ESCALATION_CAP);
(c, escalate_cooldown(c))
}
@@ -394,8 +402,13 @@ impl ProtocolCache {
if protocol == DetectedProtocol::H1 {
return false;
}
self.failures.get(key)
.and_then(|entry| entry.get(protocol).map(|r| r.failed_at.elapsed() < r.cooldown))
self.failures
.get(key)
.and_then(|entry| {
entry
.get(protocol)
.map(|r| r.failed_at.elapsed() < r.cooldown)
})
.unwrap_or(false)
}
@@ -464,19 +477,18 @@ impl ProtocolCache {
/// Snapshot all non-expired cache entries for metrics/UI display.
pub fn snapshot(&self) -> Vec<ProtocolCacheEntry> {
self.cache.iter()
self.cache
.iter()
.filter(|entry| entry.value().last_accessed_at.elapsed() < PROTOCOL_CACHE_TTL)
.map(|entry| {
let key = entry.key();
let val = entry.value();
let failure_info = self.failures.get(key);
let (h2_sup, h2_cd, h2_cons) = Self::suppression_info(
failure_info.as_deref().and_then(|f| f.h2.as_ref()),
);
let (h3_sup, h3_cd, h3_cons) = Self::suppression_info(
failure_info.as_deref().and_then(|f| f.h3.as_ref()),
);
let (h2_sup, h2_cd, h2_cons) =
Self::suppression_info(failure_info.as_deref().and_then(|f| f.h2.as_ref()));
let (h3_sup, h3_cd, h3_cons) =
Self::suppression_info(failure_info.as_deref().and_then(|f| f.h3.as_ref()));
ProtocolCacheEntry {
host: key.host.clone(),
@@ -507,7 +519,13 @@ impl ProtocolCache {
/// Insert a protocol detection result with an optional H3 port.
/// Logs protocol transitions when overwriting an existing entry.
/// No suppression check — callers must check before calling.
fn insert_internal(&self, key: ProtocolCacheKey, protocol: DetectedProtocol, h3_port: Option<u16>, reason: &str) {
fn insert_internal(
&self,
key: ProtocolCacheKey,
protocol: DetectedProtocol,
h3_port: Option<u16>,
reason: &str,
) {
// Check for existing entry to log protocol transitions
if let Some(existing) = self.cache.get(&key) {
if existing.protocol != protocol {
@@ -522,7 +540,9 @@ impl ProtocolCache {
// Evict oldest entry if at capacity
if self.cache.len() >= PROTOCOL_CACHE_MAX_ENTRIES && !self.cache.contains_key(&key) {
let oldest = self.cache.iter()
let oldest = self
.cache
.iter()
.min_by_key(|entry| entry.value().last_accessed_at)
.map(|entry| entry.key().clone());
if let Some(oldest_key) = oldest {
@@ -531,13 +551,16 @@ impl ProtocolCache {
}
let now = Instant::now();
self.cache.insert(key, CachedEntry {
protocol,
detected_at: now,
last_accessed_at: now,
last_probed_at: now,
h3_port,
});
self.cache.insert(
key,
CachedEntry {
protocol,
detected_at: now,
last_accessed_at: now,
last_probed_at: now,
h3_port,
},
);
}
/// Reduce a failure record's remaining cooldown to `target`, if it currently
@@ -582,26 +605,34 @@ impl ProtocolCache {
interval.tick().await;
// Clean expired cache entries (sliding TTL based on last_accessed_at)
let expired: Vec<ProtocolCacheKey> = cache.iter()
let expired: Vec<ProtocolCacheKey> = cache
.iter()
.filter(|entry| entry.value().last_accessed_at.elapsed() >= PROTOCOL_CACHE_TTL)
.map(|entry| entry.key().clone())
.collect();
if !expired.is_empty() {
debug!("Protocol cache cleanup: removing {} expired entries", expired.len());
debug!(
"Protocol cache cleanup: removing {} expired entries",
expired.len()
);
for key in expired {
cache.remove(&key);
}
}
// Clean fully-expired failure entries
let expired_failures: Vec<ProtocolCacheKey> = failures.iter()
let expired_failures: Vec<ProtocolCacheKey> = failures
.iter()
.filter(|entry| entry.value().all_expired())
.map(|entry| entry.key().clone())
.collect();
if !expired_failures.is_empty() {
debug!("Protocol cache cleanup: removing {} expired failure entries", expired_failures.len());
debug!(
"Protocol cache cleanup: removing {} expired failure entries",
expired_failures.len()
);
for key in expired_failures {
failures.remove(&key);
}
@@ -609,7 +640,8 @@ impl ProtocolCache {
// Safety net: cap failures map at 2× max entries
if failures.len() > PROTOCOL_CACHE_MAX_ENTRIES * 2 {
let oldest: Vec<ProtocolCacheKey> = failures.iter()
let oldest: Vec<ProtocolCacheKey> = failures
.iter()
.filter(|e| e.value().all_expired())
.map(|e| e.key().clone())
.take(failures.len() - PROTOCOL_CACHE_MAX_ENTRIES)
File diff suppressed because it is too large Load Diff
+144 -39
View File
@@ -4,13 +4,15 @@ use std::net::SocketAddr;
use std::sync::Arc;
use bytes::Bytes;
use http_body_util::Full;
use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
use http_body_util::combinators::BoxBody;
use http_body_util::BodyExt;
use http_body_util::Full;
use hyper::{Request, Response, StatusCode};
use rustproxy_config::RouteSecurity;
use rustproxy_security::{IpFilter, BasicAuthValidator, JwtValidator, RateLimiter};
use rustproxy_security::{BasicAuthValidator, IpFilter, JwtValidator, RateLimiter};
use crate::request_host::extract_request_host;
pub struct RequestFilter;
@@ -35,13 +37,14 @@ impl RequestFilter {
let client_ip = peer_addr.ip();
let request_path = req.uri().path();
// IP filter
// IP filter (domain-aware: use the same host extraction as route matching)
if security.ip_allow_list.is_some() || security.ip_block_list.is_some() {
let allow = security.ip_allow_list.as_deref().unwrap_or(&[]);
let block = security.ip_block_list.as_deref().unwrap_or(&[]);
let filter = IpFilter::new(allow, block);
let normalized = IpFilter::normalize_ip(&client_ip);
if !filter.is_allowed(&normalized) {
let host = extract_request_host(req);
if !filter.is_allowed_for_domain(&normalized, host) {
return Some(error_response(StatusCode::FORBIDDEN, "Access denied"));
}
}
@@ -55,16 +58,15 @@ impl RequestFilter {
!limiter.check(&key)
} else {
// Create a per-check limiter (less ideal but works for non-shared case)
let limiter = RateLimiter::new(
rate_limit_config.max_requests,
rate_limit_config.window,
);
let limiter =
RateLimiter::new(rate_limit_config.max_requests, rate_limit_config.window);
let key = Self::rate_limit_key(rate_limit_config, req, peer_addr);
!limiter.check(&key)
};
if should_block {
let message = rate_limit_config.error_message
let message = rate_limit_config
.error_message
.as_deref()
.unwrap_or("Rate limit exceeded");
return Some(error_response(StatusCode::TOO_MANY_REQUESTS, message));
@@ -80,36 +82,48 @@ impl RequestFilter {
if let Some(ref basic_auth) = security.basic_auth {
if basic_auth.enabled {
// Check basic auth exclude paths
let skip_basic = basic_auth.exclude_paths.as_ref()
let skip_basic = basic_auth
.exclude_paths
.as_ref()
.map(|paths| Self::path_matches_any(request_path, paths))
.unwrap_or(false);
if !skip_basic {
let users: Vec<(String, String)> = basic_auth.users.iter()
let users: Vec<(String, String)> = basic_auth
.users
.iter()
.map(|c| (c.username.clone(), c.password.clone()))
.collect();
let validator = BasicAuthValidator::new(users, basic_auth.realm.clone());
let auth_header = req.headers()
let auth_header = req
.headers()
.get("authorization")
.and_then(|v| v.to_str().ok());
match auth_header {
Some(header) => {
if validator.validate(header).is_none() {
return Some(Response::builder()
.status(StatusCode::UNAUTHORIZED)
.header("WWW-Authenticate", validator.www_authenticate())
.body(boxed_body("Invalid credentials"))
.unwrap());
return Some(
Response::builder()
.status(StatusCode::UNAUTHORIZED)
.header(
"WWW-Authenticate",
validator.www_authenticate(),
)
.body(boxed_body("Invalid credentials"))
.unwrap(),
);
}
}
None => {
return Some(Response::builder()
.status(StatusCode::UNAUTHORIZED)
.header("WWW-Authenticate", validator.www_authenticate())
.body(boxed_body("Authentication required"))
.unwrap());
return Some(
Response::builder()
.status(StatusCode::UNAUTHORIZED)
.header("WWW-Authenticate", validator.www_authenticate())
.body(boxed_body("Authentication required"))
.unwrap(),
);
}
}
}
@@ -120,7 +134,9 @@ impl RequestFilter {
if let Some(ref jwt_auth) = security.jwt_auth {
if jwt_auth.enabled {
// Check JWT auth exclude paths
let skip_jwt = jwt_auth.exclude_paths.as_ref()
let skip_jwt = jwt_auth
.exclude_paths
.as_ref()
.map(|paths| Self::path_matches_any(request_path, paths))
.unwrap_or(false);
@@ -132,18 +148,25 @@ impl RequestFilter {
jwt_auth.audience.as_deref(),
);
let auth_header = req.headers()
let auth_header = req
.headers()
.get("authorization")
.and_then(|v| v.to_str().ok());
match auth_header.and_then(JwtValidator::extract_token) {
Some(token) => {
if validator.validate(token).is_err() {
return Some(error_response(StatusCode::UNAUTHORIZED, "Invalid token"));
return Some(error_response(
StatusCode::UNAUTHORIZED,
"Invalid token",
));
}
}
None => {
return Some(error_response(StatusCode::UNAUTHORIZED, "Bearer token required"));
return Some(error_response(
StatusCode::UNAUTHORIZED,
"Bearer token required",
));
}
}
}
@@ -203,14 +226,19 @@ impl RequestFilter {
}
/// Check IP-based security (for use in passthrough / TCP-level connections).
/// `domain` is the SNI from the TLS handshake (if available) for domain-scoped filtering.
/// Returns true if allowed, false if blocked.
pub fn check_ip_security(security: &RouteSecurity, client_ip: &std::net::IpAddr) -> bool {
pub fn check_ip_security(
security: &RouteSecurity,
client_ip: &std::net::IpAddr,
domain: Option<&str>,
) -> bool {
if security.ip_allow_list.is_some() || security.ip_block_list.is_some() {
let allow = security.ip_allow_list.as_deref().unwrap_or(&[]);
let block = security.ip_block_list.as_deref().unwrap_or(&[]);
let filter = IpFilter::new(allow, block);
let normalized = IpFilter::normalize_ip(client_ip);
filter.is_allowed(&normalized)
filter.is_allowed_for_domain(&normalized, domain)
} else {
true
}
@@ -233,19 +261,28 @@ impl RequestFilter {
return None;
}
let origin = req.headers()
let origin = req
.headers()
.get("origin")
.and_then(|v| v.to_str().ok())
.unwrap_or("*");
Some(Response::builder()
.status(StatusCode::NO_CONTENT)
.header("Access-Control-Allow-Origin", origin)
.header("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, PATCH, OPTIONS")
.header("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With")
.header("Access-Control-Max-Age", "86400")
.body(boxed_body(""))
.unwrap())
Some(
Response::builder()
.status(StatusCode::NO_CONTENT)
.header("Access-Control-Allow-Origin", origin)
.header(
"Access-Control-Allow-Methods",
"GET, POST, PUT, DELETE, PATCH, OPTIONS",
)
.header(
"Access-Control-Allow-Headers",
"Content-Type, Authorization, X-Requested-With",
)
.header("Access-Control-Max-Age", "86400")
.body(boxed_body(""))
.unwrap(),
)
}
}
@@ -260,3 +297,71 @@ fn error_response(status: StatusCode, message: &str) -> Response<BoxBody<Bytes,
fn boxed_body(data: &str) -> BoxBody<Bytes, hyper::Error> {
BoxBody::new(Full::new(Bytes::from(data.to_string())).map_err(|never| match never {}))
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use http_body_util::Empty;
use hyper::{Request, StatusCode, Version};
use rustproxy_config::{IpAllowEntry, RouteSecurity};
use super::RequestFilter;
fn domain_scoped_security() -> RouteSecurity {
RouteSecurity {
ip_allow_list: Some(vec![IpAllowEntry::DomainScoped {
ip: "10.8.0.2".to_string(),
domains: vec!["*.abc.xyz".to_string()],
}]),
ip_block_list: None,
max_connections: None,
authentication: None,
rate_limit: None,
basic_auth: None,
jwt_auth: None,
}
}
fn peer_addr() -> std::net::SocketAddr {
std::net::SocketAddr::from(([10, 8, 0, 2], 4242))
}
fn request(uri: &str, version: Version, host: Option<&str>) -> Request<Empty<Bytes>> {
let mut builder = Request::builder().uri(uri).version(version);
if let Some(host) = host {
builder = builder.header("host", host);
}
builder.body(Empty::<Bytes>::new()).unwrap()
}
#[test]
fn domain_scoped_acl_allows_uri_authority_without_host_header() {
let security = domain_scoped_security();
let req = request("https://outline.abc.xyz/", Version::HTTP_2, None);
assert!(RequestFilter::apply(&security, &req, &peer_addr()).is_none());
}
#[test]
fn domain_scoped_acl_allows_host_header_with_port() {
let security = domain_scoped_security();
let req = request(
"https://unrelated.invalid/",
Version::HTTP_11,
Some("outline.abc.xyz:443"),
);
assert!(RequestFilter::apply(&security, &req, &peer_addr()).is_none());
}
#[test]
fn domain_scoped_acl_denies_non_matching_uri_authority() {
let security = domain_scoped_security();
let req = request("https://outline.other.xyz/", Version::HTTP_2, None);
let response = RequestFilter::apply(&security, &req, &peer_addr())
.expect("non-matching domain should be denied");
assert_eq!(response.status(), StatusCode::FORBIDDEN);
}
}
@@ -0,0 +1,43 @@
use hyper::Request;
/// Extract the effective request host for routing and scoped ACL checks.
///
/// Prefer the explicit `Host` header when present, otherwise fall back to the
/// URI authority used by HTTP/2 and HTTP/3 requests.
pub(crate) fn extract_request_host<B>(req: &Request<B>) -> Option<&str> {
req.headers()
.get("host")
.and_then(|value| value.to_str().ok())
.map(|host| host.split(':').next().unwrap_or(host))
.or_else(|| req.uri().host())
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use http_body_util::Empty;
use hyper::Request;
use super::extract_request_host;
#[test]
fn extracts_host_header_before_uri_authority() {
let req = Request::builder()
.uri("https://uri.abc.xyz/test")
.header("host", "header.abc.xyz:443")
.body(Empty::<Bytes>::new())
.unwrap();
assert_eq!(extract_request_host(&req), Some("header.abc.xyz"));
}
#[test]
fn falls_back_to_uri_authority_when_host_header_missing() {
let req = Request::builder()
.uri("https://outline.abc.xyz/test")
.body(Empty::<Bytes>::new())
.unwrap();
assert_eq!(extract_request_host(&req), Some("outline.abc.xyz"));
}
}
@@ -3,7 +3,7 @@
use hyper::header::{HeaderMap, HeaderName, HeaderValue};
use rustproxy_config::RouteConfig;
use crate::template::{RequestContext, expand_template};
use crate::template::{expand_template, RequestContext};
pub struct ResponseFilter;
@@ -11,12 +11,17 @@ impl ResponseFilter {
/// Apply response headers from route config and CORS settings.
/// If a `RequestContext` is provided, template variables in header values will be expanded.
/// Also injects Alt-Svc header for routes with HTTP/3 enabled.
pub fn apply_headers(route: &RouteConfig, headers: &mut HeaderMap, req_ctx: Option<&RequestContext>) {
pub fn apply_headers(
route: &RouteConfig,
headers: &mut HeaderMap,
req_ctx: Option<&RequestContext>,
) {
// Inject Alt-Svc for HTTP/3 advertisement if QUIC/HTTP3 is enabled on this route
if let Some(ref udp) = route.action.udp {
if let Some(ref quic) = udp.quic {
if quic.enable_http3.unwrap_or(false) {
let port = quic.alt_svc_port
let port = quic
.alt_svc_port
.or_else(|| req_ctx.map(|c| c.port))
.unwrap_or(443);
let max_age = quic.alt_svc_max_age.unwrap_or(86400);
@@ -63,10 +68,7 @@ impl ResponseFilter {
headers.insert("access-control-allow-origin", val);
}
} else {
headers.insert(
"access-control-allow-origin",
HeaderValue::from_static("*"),
);
headers.insert("access-control-allow-origin", HeaderValue::from_static("*"));
}
// Allow-Methods
@@ -62,17 +62,11 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send + 'static> AsyncWrite for Shutdown
self.inner.as_ref().unwrap().is_write_vectored()
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(self.get_mut().inner.as_mut().unwrap()).poll_flush(cx)
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = self.get_mut();
let result = Pin::new(this.inner.as_mut().unwrap()).poll_shutdown(cx);
if result.is_ready() {
@@ -93,7 +87,8 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send + 'static> Drop for ShutdownOnDrop
let _ = tokio::time::timeout(
std::time::Duration::from_secs(2),
tokio::io::AsyncWriteExt::shutdown(&mut stream),
).await;
)
.await;
// stream is dropped here — all resources freed
});
}
+6 -2
View File
@@ -39,7 +39,8 @@ pub fn expand_headers(
headers: &HashMap<String, String>,
ctx: &RequestContext,
) -> HashMap<String, String> {
headers.iter()
headers
.iter()
.map(|(k, v)| (k.clone(), expand_template(v, ctx)))
.collect()
}
@@ -150,7 +151,10 @@ mod tests {
let ctx = test_context();
let template = "{clientIp}|{domain}|{port}|{path}|{routeName}|{connectionId}";
let result = expand_template(template, &ctx);
assert_eq!(result, "192.168.1.100|example.com|443|/api/v1/users|api-route|42");
assert_eq!(
result,
"192.168.1.100|example.com|443|/api/v1/users|api-route|42"
);
}
#[test]
@@ -7,7 +7,7 @@ use std::sync::Arc;
use std::sync::Mutex;
use dashmap::DashMap;
use rustproxy_config::{RouteTarget, LoadBalancingAlgorithm};
use rustproxy_config::{LoadBalancingAlgorithm, RouteTarget};
/// Upstream selection result.
pub struct UpstreamSelection {
@@ -51,21 +51,19 @@ impl UpstreamSelector {
}
// Determine load balancing algorithm
let algorithm = target.load_balancing.as_ref()
let algorithm = target
.load_balancing
.as_ref()
.map(|lb| &lb.algorithm)
.unwrap_or(&LoadBalancingAlgorithm::RoundRobin);
let idx = match algorithm {
LoadBalancingAlgorithm::RoundRobin => {
self.round_robin_select(&hosts, port)
}
LoadBalancingAlgorithm::RoundRobin => self.round_robin_select(&hosts, port),
LoadBalancingAlgorithm::IpHash => {
let hash = Self::ip_hash(client_addr);
hash % hosts.len()
}
LoadBalancingAlgorithm::LeastConnections => {
self.least_connections_select(&hosts, port)
}
LoadBalancingAlgorithm::LeastConnections => self.least_connections_select(&hosts, port),
};
UpstreamSelection {
@@ -78,9 +76,7 @@ impl UpstreamSelector {
fn round_robin_select(&self, hosts: &[&str], port: u16) -> usize {
let key = format!("{}:{}", hosts[0], port);
let mut counters = self.round_robin.lock().unwrap();
let counter = counters
.entry(key)
.or_insert_with(|| AtomicUsize::new(0));
let counter = counters.entry(key).or_insert_with(|| AtomicUsize::new(0));
let idx = counter.fetch_add(1, Ordering::Relaxed);
idx % hosts.len()
}
@@ -91,7 +87,8 @@ impl UpstreamSelector {
for (i, host) in hosts.iter().enumerate() {
let key = format!("{}:{}", host, port);
let conns = self.active_connections
let conns = self
.active_connections
.get(&key)
.map(|entry| entry.value().load(Ordering::Relaxed))
.unwrap_or(0);
@@ -228,13 +225,21 @@ mod tests {
selector.connection_started("backend:8080");
selector.connection_started("backend:8080");
assert_eq!(
selector.active_connections.get("backend:8080").unwrap().load(Ordering::Relaxed),
selector
.active_connections
.get("backend:8080")
.unwrap()
.load(Ordering::Relaxed),
2
);
selector.connection_ended("backend:8080");
assert_eq!(
selector.active_connections.get("backend:8080").unwrap().load(Ordering::Relaxed),
selector
.active_connections
.get("backend:8080")
.unwrap()
.load(Ordering::Relaxed),
1
);
File diff suppressed because it is too large Load Diff
+146 -1
View File
@@ -29,6 +29,113 @@ pub struct ThroughputTracker {
created_at: Instant,
}
fn unix_timestamp_seconds() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
/// Circular buffer for per-second event counts.
///
/// Unlike `ThroughputTracker`, events are recorded directly into the current
/// second so request counts remain stable even when the collector is sampled
/// more frequently than once per second.
pub(crate) struct RequestRateTracker {
samples: Vec<u64>,
write_index: usize,
count: usize,
capacity: usize,
current_second: Option<u64>,
current_count: u64,
}
impl RequestRateTracker {
pub(crate) fn new(retention_seconds: usize) -> Self {
Self {
samples: Vec::with_capacity(retention_seconds.max(1)),
write_index: 0,
count: 0,
capacity: retention_seconds.max(1),
current_second: None,
current_count: 0,
}
}
fn push_sample(&mut self, count: u64) {
if self.samples.len() < self.capacity {
self.samples.push(count);
} else {
self.samples[self.write_index] = count;
}
self.write_index = (self.write_index + 1) % self.capacity;
self.count = (self.count + 1).min(self.capacity);
}
pub(crate) fn record_event(&mut self) {
self.record_events_at(unix_timestamp_seconds(), 1);
}
pub(crate) fn record_events_at(&mut self, now_sec: u64, count: u64) {
self.advance_to(now_sec);
self.current_count = self.current_count.saturating_add(count);
}
pub(crate) fn advance_to_now(&mut self) {
self.advance_to(unix_timestamp_seconds());
}
pub(crate) fn advance_to(&mut self, now_sec: u64) {
match self.current_second {
Some(current_second) if now_sec > current_second => {
self.push_sample(self.current_count);
for _ in 1..(now_sec - current_second) {
self.push_sample(0);
}
self.current_second = Some(now_sec);
self.current_count = 0;
}
Some(_) => {}
None => {
self.current_second = Some(now_sec);
self.current_count = 0;
}
}
}
fn sum_recent(&self, window_seconds: usize) -> u64 {
let window = window_seconds.min(self.count);
if window == 0 {
return 0;
}
let mut total = 0u64;
for i in 0..window {
let idx = if self.write_index >= i + 1 {
self.write_index - i - 1
} else {
self.capacity - (i + 1 - self.write_index)
};
if idx < self.samples.len() {
total += self.samples[idx];
}
}
total
}
pub(crate) fn last_second(&self) -> u64 {
self.sum_recent(1)
}
pub(crate) fn last_minute(&self) -> u64 {
self.sum_recent(60)
}
pub(crate) fn is_idle(&self) -> bool {
self.current_count == 0 && self.sum_recent(self.capacity) == 0
}
}
impl ThroughputTracker {
/// Create a new tracker with the given capacity (seconds of retention).
pub fn new(retention_seconds: usize) -> Self {
@@ -46,7 +153,8 @@ impl ThroughputTracker {
/// Record bytes (called from data flow callbacks).
pub fn record_bytes(&self, bytes_in: u64, bytes_out: u64) {
self.pending_bytes_in.fetch_add(bytes_in, Ordering::Relaxed);
self.pending_bytes_out.fetch_add(bytes_out, Ordering::Relaxed);
self.pending_bytes_out
.fetch_add(bytes_out, Ordering::Relaxed);
}
/// Take a sample (called at 1Hz).
@@ -229,4 +337,41 @@ mod tests {
let history = tracker.history(10);
assert!(history.is_empty());
}
#[test]
fn test_request_rate_tracker_counts_last_second_and_last_minute() {
let mut tracker = RequestRateTracker::new(60);
tracker.record_events_at(100, 2);
tracker.record_events_at(100, 3);
tracker.advance_to(101);
assert_eq!(tracker.last_second(), 5);
assert_eq!(tracker.last_minute(), 5);
}
#[test]
fn test_request_rate_tracker_adds_zero_samples_for_gaps() {
let mut tracker = RequestRateTracker::new(60);
tracker.record_events_at(100, 4);
tracker.record_events_at(102, 1);
tracker.advance_to(103);
assert_eq!(tracker.last_second(), 1);
assert_eq!(tracker.last_minute(), 5);
}
#[test]
fn test_request_rate_tracker_decays_to_zero_over_window() {
let mut tracker = RequestRateTracker::new(60);
tracker.record_events_at(100, 7);
tracker.advance_to(101);
tracker.advance_to(161);
assert_eq!(tracker.last_second(), 0);
assert_eq!(tracker.last_minute(), 0);
assert!(tracker.is_idle());
}
}
@@ -100,7 +100,7 @@ impl ConnectionRegistry {
let mut recycled = 0u64;
self.connections.retain(|_, entry| {
if entry.route_id.as_deref() == Some(route_id) {
if !RequestFilter::check_ip_security(new_security, &entry.source_ip) {
if !RequestFilter::check_ip_security(new_security, &entry.source_ip, entry.domain.as_deref()) {
info!(
"Terminating connection from {} — IP now blocked on route '{}'",
entry.source_ip, route_id
@@ -409,10 +409,10 @@ pub async fn quic_accept_loop(
}
};
// Check route-level IP security (previously missing for QUIC)
// Check route-level IP security for QUIC (domain from SNI context)
if let Some(ref security) = route.security {
if !rustproxy_http::request_filter::RequestFilter::check_ip_security(
security, &ip,
security, &ip, ctx.domain,
) {
debug!("QUIC connection from {} blocked by route security", real_addr);
continue;
@@ -420,7 +420,7 @@ pub async fn quic_accept_loop(
}
conn_tracker.connection_opened(&ip);
let route_id = route.name.clone().or(route.id.clone());
let route_id = route.metrics_key().map(str::to_string);
metrics.connection_opened(route_id.as_deref(), Some(&ip_str));
// Resolve per-route cancel token (child of global cancel)
@@ -541,7 +541,7 @@ async fn handle_quic_stream_forwarding(
real_client_addr: Option<SocketAddr>,
) -> anyhow::Result<()> {
let effective_addr = real_client_addr.unwrap_or_else(|| connection.remote_address());
let route_id = route.name.as_deref().or(route.id.as_deref());
let route_id = route.metrics_key();
let metrics_arc = metrics;
// Resolve backend target
@@ -715,10 +715,11 @@ impl TcpListenerManager {
} else if let Some(target) = quick_match.target {
let target_host = target.host.first().to_string();
let target_port = target.port.resolve(port);
let route_id = quick_match.route.id.as_deref();
let route_config_id = quick_match.route.id.as_deref();
let route_id = quick_match.route.metrics_key();
// Resolve per-route cancel token (child of global cancel)
let route_cancel = match route_id {
let route_cancel = match route_config_id {
Some(id) => route_cancels.entry(id.to_string())
.or_insert_with(|| cancel.child_token())
.clone(),
@@ -733,14 +734,14 @@ impl TcpListenerManager {
cancel: conn_cancel.clone(),
source_ip: peer_addr.ip(),
domain: None, // fast path has no domain
route_id: route_id.map(|s| s.to_string()),
route_id: route_config_id.map(|s| s.to_string()),
},
);
// Check route-level IP security
// Check route-level IP security (fast path: no SNI available)
if let Some(ref security) = quick_match.route.security {
if !rustproxy_http::request_filter::RequestFilter::check_ip_security(
security, &peer_addr.ip(),
security, &peer_addr.ip(), None,
) {
warn!("Connection from {} blocked by route security", peer_addr);
return Ok(());
@@ -905,12 +906,13 @@ impl TcpListenerManager {
}
};
let route_id = route_match.route.id.as_deref();
let route_config_id = route_match.route.id.as_deref();
let route_id = route_match.route.metrics_key();
// Resolve per-route cancel token (child of global cancel).
// When this route is removed via updateRoutes, the token is cancelled,
// terminating all connections on this route.
let route_cancel = match route_id {
let route_cancel = match route_config_id {
Some(id) => route_cancels.entry(id.to_string())
.or_insert_with(|| cancel.child_token())
.clone(),
@@ -925,15 +927,16 @@ impl TcpListenerManager {
cancel: cancel.clone(),
source_ip: peer_addr.ip(),
domain: domain.clone(),
route_id: route_id.map(|s| s.to_string()),
route_id: route_config_id.map(|s| s.to_string()),
},
);
// Check route-level IP security for passthrough connections
// Check route-level IP security for passthrough connections (SNI available)
if let Some(ref security) = route_match.route.security {
if !rustproxy_http::request_filter::RequestFilter::check_ip_security(
security,
&peer_addr.ip(),
domain.as_deref(),
) {
warn!("Connection from {} blocked by route security", peer_addr);
return Ok(());
@@ -942,6 +945,9 @@ impl TcpListenerManager {
// Track connection in metrics — guard ensures connection_closed on all exit paths
metrics.connection_opened(route_id, Some(&ip_str));
if let Some(d) = effective_domain {
metrics.record_ip_domain_request(&ip_str, d);
}
let _conn_guard = ConnectionGuard::new(Arc::clone(&metrics), route_id, Some(&ip_str));
// Check if this is a socket-handler route that should be relayed to TypeScript
@@ -1310,9 +1316,7 @@ impl TcpListenerManager {
};
// Build metadata JSON
let route_key = route_match.route.name.as_deref()
.or(route_match.route.id.as_deref())
.unwrap_or("unknown");
let route_key = route_match.route.metrics_key().unwrap_or("unknown");
let metadata = serde_json::json!({
"routeKey": route_key,
@@ -617,7 +617,7 @@ impl UdpListenerManager {
};
let route = route_match.route;
let route_id = route.name.as_deref().or(route.id.as_deref());
let route_id = route.metrics_key();
// Socket handler routes → relay datagram to TS via persistent Unix socket
if route.action.action_type == RouteActionType::SocketHandler {
@@ -1,5 +1,42 @@
use std::collections::HashMap;
use regex::Regex;
use std::collections::HashMap;
fn compile_regex_pattern(pattern: &str) -> Option<Regex> {
if !pattern.starts_with('/') {
return None;
}
let last_slash = pattern.rfind('/')?;
if last_slash == 0 {
return None;
}
let regex_body = &pattern[1..last_slash];
let flags = &pattern[last_slash + 1..];
let mut inline_flags = String::new();
for flag in flags.chars() {
match flag {
'i' | 'm' | 's' | 'u' => {
if !inline_flags.contains(flag) {
inline_flags.push(flag);
}
}
'g' => {
// Global has no effect for single header matching.
}
_ => return None,
}
}
let compiled = if inline_flags.is_empty() {
regex_body.to_string()
} else {
format!("(?{}){}", inline_flags, regex_body)
};
Regex::new(&compiled).ok()
}
/// Match HTTP headers against a set of patterns.
///
@@ -24,16 +61,15 @@ pub fn headers_match(
None => return false, // Required header not present
};
// Check if pattern is a regex (surrounded by /)
if pattern.starts_with('/') && pattern.ends_with('/') && pattern.len() > 2 {
let regex_str = &pattern[1..pattern.len() - 1];
match Regex::new(regex_str) {
Ok(re) => {
// Check if pattern is a regex literal (/pattern/ or /pattern/flags)
if pattern.starts_with('/') && pattern.len() > 2 {
match compile_regex_pattern(pattern) {
Some(re) => {
if !re.is_match(header_value) {
return false;
}
}
Err(_) => {
None => {
// Invalid regex, fall back to exact match
if header_value != pattern {
return false;
@@ -85,6 +121,24 @@ mod tests {
assert!(headers_match(&patterns, &headers));
}
#[test]
fn test_regex_header_match_with_flags() {
let patterns: HashMap<String, String> = {
let mut m = HashMap::new();
m.insert(
"Content-Type".to_string(),
"/^application\\/json$/i".to_string(),
);
m
};
let headers: HashMap<String, String> = {
let mut m = HashMap::new();
m.insert("content-type".to_string(), "Application/JSON".to_string());
m
};
assert!(headers_match(&patterns, &headers));
}
#[test]
fn test_missing_header() {
let patterns: HashMap<String, String> = {
+194 -37
View File
@@ -2,12 +2,24 @@ use ipnet::IpNet;
use std::net::IpAddr;
use std::str::FromStr;
use rustproxy_config::IpAllowEntry;
/// IP filter supporting CIDR ranges, wildcards, and exact matches.
/// Supports domain-scoped allow entries that restrict an IP to specific domains.
pub struct IpFilter {
/// Plain allow entries — IP allowed for any domain on the route
allow_list: Vec<IpPattern>,
/// Domain-scoped allow entries — IP allowed only for matching domains
domain_scoped: Vec<DomainScopedEntry>,
block_list: Vec<IpPattern>,
}
/// A domain-scoped allow entry: IP + list of allowed domain patterns.
struct DomainScopedEntry {
pattern: IpPattern,
domains: Vec<String>,
}
/// Represents an IP pattern for matching.
#[derive(Debug)]
enum IpPattern {
@@ -31,10 +43,6 @@ impl IpPattern {
if let Ok(addr) = IpAddr::from_str(s) {
return IpPattern::Exact(addr);
}
// Try as CIDR by appending default prefix
if let Ok(addr) = IpAddr::from_str(s) {
return IpPattern::Exact(addr);
}
// Fallback: treat as exact, will never match an invalid string
IpPattern::Exact(IpAddr::from_str("0.0.0.0").unwrap())
}
@@ -48,19 +56,56 @@ impl IpPattern {
}
}
/// Simple domain pattern matching (exact, `*`, or `*.suffix`).
fn domain_matches_pattern(pattern: &str, domain: &str) -> bool {
let p = pattern.trim();
let d = domain.trim();
if p == "*" {
return true;
}
if p.eq_ignore_ascii_case(d) {
return true;
}
if p.starts_with("*.") {
let suffix = &p[1..]; // e.g., ".abc.xyz"
d.len() > suffix.len()
&& d[d.len() - suffix.len()..].eq_ignore_ascii_case(suffix)
} else {
false
}
}
impl IpFilter {
/// Create a new IP filter from allow and block lists.
pub fn new(allow_list: &[String], block_list: &[String]) -> Self {
/// Create a new IP filter from allow entries and a block list.
pub fn new(allow_entries: &[IpAllowEntry], block_list: &[String]) -> Self {
let mut allow_list = Vec::new();
let mut domain_scoped = Vec::new();
for entry in allow_entries {
match entry {
IpAllowEntry::Plain(ip) => {
allow_list.push(IpPattern::parse(ip));
}
IpAllowEntry::DomainScoped { ip, domains } => {
domain_scoped.push(DomainScopedEntry {
pattern: IpPattern::parse(ip),
domains: domains.clone(),
});
}
}
}
Self {
allow_list: allow_list.iter().map(|s| IpPattern::parse(s)).collect(),
allow_list,
domain_scoped,
block_list: block_list.iter().map(|s| IpPattern::parse(s)).collect(),
}
}
/// Check if an IP is allowed.
/// If allow_list is non-empty, IP must match at least one entry.
/// If block_list is non-empty, IP must NOT match any entry.
pub fn is_allowed(&self, ip: &IpAddr) -> bool {
/// Check if an IP is allowed, considering domain-scoped entries.
/// If `domain` is Some, domain-scoped entries are evaluated against it.
/// If `domain` is None, only plain allow entries are considered.
pub fn is_allowed_for_domain(&self, ip: &IpAddr, domain: Option<&str>) -> bool {
// Check block list first
if !self.block_list.is_empty() {
for pattern in &self.block_list {
@@ -70,14 +115,36 @@ impl IpFilter {
}
}
// If allow list is non-empty, must match at least one
if !self.allow_list.is_empty() {
return self.allow_list.iter().any(|p| p.matches(ip));
// If there are any allow entries (plain or domain-scoped), IP must match
let has_any_allow = !self.allow_list.is_empty() || !self.domain_scoped.is_empty();
if has_any_allow {
// Check plain allow list — grants access to entire route
if self.allow_list.iter().any(|p| p.matches(ip)) {
return true;
}
// Check domain-scoped entries — grants access only if domain matches
if let Some(req_domain) = domain {
for entry in &self.domain_scoped {
if entry.pattern.matches(ip) {
if entry.domains.iter().any(|d| domain_matches_pattern(d, req_domain)) {
return true;
}
}
}
}
return false;
}
true
}
/// Check if an IP is allowed (backwards-compat wrapper, no domain context).
pub fn is_allowed(&self, ip: &IpAddr) -> bool {
self.is_allowed_for_domain(ip, None)
}
/// Normalize IPv4-mapped IPv6 addresses (::ffff:x.x.x.x -> x.x.x.x)
pub fn normalize_ip(ip: &IpAddr) -> IpAddr {
match ip {
@@ -97,19 +164,28 @@ impl IpFilter {
mod tests {
use super::*;
fn plain(s: &str) -> IpAllowEntry {
IpAllowEntry::Plain(s.to_string())
}
fn scoped(ip: &str, domains: &[&str]) -> IpAllowEntry {
IpAllowEntry::DomainScoped {
ip: ip.to_string(),
domains: domains.iter().map(|s| s.to_string()).collect(),
}
}
#[test]
fn test_empty_lists_allow_all() {
let filter = IpFilter::new(&[], &[]);
let ip: IpAddr = "192.168.1.1".parse().unwrap();
assert!(filter.is_allowed(&ip));
assert!(filter.is_allowed_for_domain(&ip, Some("example.com")));
}
#[test]
fn test_allow_list_exact() {
let filter = IpFilter::new(
&["10.0.0.1".to_string()],
&[],
);
fn test_plain_allow_list_exact() {
let filter = IpFilter::new(&[plain("10.0.0.1")], &[]);
let allowed: IpAddr = "10.0.0.1".parse().unwrap();
let denied: IpAddr = "10.0.0.2".parse().unwrap();
assert!(filter.is_allowed(&allowed));
@@ -117,11 +193,8 @@ mod tests {
}
#[test]
fn test_allow_list_cidr() {
let filter = IpFilter::new(
&["10.0.0.0/8".to_string()],
&[],
);
fn test_plain_allow_list_cidr() {
let filter = IpFilter::new(&[plain("10.0.0.0/8")], &[]);
let allowed: IpAddr = "10.255.255.255".parse().unwrap();
let denied: IpAddr = "192.168.1.1".parse().unwrap();
assert!(filter.is_allowed(&allowed));
@@ -130,10 +203,7 @@ mod tests {
#[test]
fn test_block_list() {
let filter = IpFilter::new(
&[],
&["192.168.1.100".to_string()],
);
let filter = IpFilter::new(&[], &["192.168.1.100".to_string()]);
let blocked: IpAddr = "192.168.1.100".parse().unwrap();
let allowed: IpAddr = "192.168.1.101".parse().unwrap();
assert!(!filter.is_allowed(&blocked));
@@ -143,7 +213,7 @@ mod tests {
#[test]
fn test_block_trumps_allow() {
let filter = IpFilter::new(
&["10.0.0.0/8".to_string()],
&[plain("10.0.0.0/8")],
&["10.0.0.5".to_string()],
);
let blocked: IpAddr = "10.0.0.5".parse().unwrap();
@@ -154,20 +224,14 @@ mod tests {
#[test]
fn test_wildcard_allow() {
let filter = IpFilter::new(
&["*".to_string()],
&[],
);
let filter = IpFilter::new(&[plain("*")], &[]);
let ip: IpAddr = "1.2.3.4".parse().unwrap();
assert!(filter.is_allowed(&ip));
}
#[test]
fn test_wildcard_block() {
let filter = IpFilter::new(
&[],
&["*".to_string()],
);
let filter = IpFilter::new(&[], &["*".to_string()]);
let ip: IpAddr = "1.2.3.4".parse().unwrap();
assert!(!filter.is_allowed(&ip));
}
@@ -186,4 +250,97 @@ mod tests {
let normalized = IpFilter::normalize_ip(&ip);
assert_eq!(normalized, ip);
}
// Domain-scoped tests
#[test]
fn test_domain_scoped_allows_matching_domain() {
let filter = IpFilter::new(
&[scoped("10.8.0.2", &["outline.abc.xyz"])],
&[],
);
let ip: IpAddr = "10.8.0.2".parse().unwrap();
assert!(filter.is_allowed_for_domain(&ip, Some("outline.abc.xyz")));
}
#[test]
fn test_domain_scoped_denies_non_matching_domain() {
let filter = IpFilter::new(
&[scoped("10.8.0.2", &["outline.abc.xyz"])],
&[],
);
let ip: IpAddr = "10.8.0.2".parse().unwrap();
assert!(!filter.is_allowed_for_domain(&ip, Some("app.abc.xyz")));
}
#[test]
fn test_domain_scoped_denies_without_domain() {
let filter = IpFilter::new(
&[scoped("10.8.0.2", &["outline.abc.xyz"])],
&[],
);
let ip: IpAddr = "10.8.0.2".parse().unwrap();
// Without domain context, domain-scoped entries cannot match
assert!(!filter.is_allowed_for_domain(&ip, None));
}
#[test]
fn test_domain_scoped_wildcard_domain() {
let filter = IpFilter::new(
&[scoped("10.8.0.2", &["*.abc.xyz"])],
&[],
);
let ip: IpAddr = "10.8.0.2".parse().unwrap();
assert!(filter.is_allowed_for_domain(&ip, Some("outline.abc.xyz")));
assert!(filter.is_allowed_for_domain(&ip, Some("app.abc.xyz")));
assert!(!filter.is_allowed_for_domain(&ip, Some("other.com")));
}
#[test]
fn test_plain_and_domain_scoped_coexist() {
let filter = IpFilter::new(
&[
plain("1.2.3.4"), // full route access
scoped("10.8.0.2", &["outline.abc.xyz"]), // scoped access
],
&[],
);
let admin: IpAddr = "1.2.3.4".parse().unwrap();
let vpn: IpAddr = "10.8.0.2".parse().unwrap();
let other: IpAddr = "9.9.9.9".parse().unwrap();
// Admin IP has full access
assert!(filter.is_allowed_for_domain(&admin, Some("anything.abc.xyz")));
assert!(filter.is_allowed_for_domain(&admin, Some("outline.abc.xyz")));
// VPN IP only has scoped access
assert!(filter.is_allowed_for_domain(&vpn, Some("outline.abc.xyz")));
assert!(!filter.is_allowed_for_domain(&vpn, Some("app.abc.xyz")));
// Unknown IP denied
assert!(!filter.is_allowed_for_domain(&other, Some("outline.abc.xyz")));
}
#[test]
fn test_block_trumps_domain_scoped() {
let filter = IpFilter::new(
&[scoped("10.8.0.2", &["outline.abc.xyz"])],
&["10.8.0.2".to_string()],
);
let ip: IpAddr = "10.8.0.2".parse().unwrap();
assert!(!filter.is_allowed_for_domain(&ip, Some("outline.abc.xyz")));
}
#[test]
fn test_domain_matches_pattern_fn() {
assert!(domain_matches_pattern("example.com", "example.com"));
assert!(domain_matches_pattern("*.abc.xyz", "outline.abc.xyz"));
assert!(domain_matches_pattern("*.abc.xyz", "app.abc.xyz"));
assert!(!domain_matches_pattern("*.abc.xyz", "abc.xyz")); // suffix only, not exact parent
assert!(domain_matches_pattern("*", "anything.com"));
assert!(!domain_matches_pattern("outline.abc.xyz", "app.abc.xyz"));
// Case insensitive
assert!(domain_matches_pattern("*.ABC.XYZ", "outline.abc.xyz"));
}
}
+7 -1
View File
@@ -198,7 +198,9 @@ impl RustProxy {
};
if let Some(ref allow_list) = default_security.ip_allow_list {
security.ip_allow_list = Some(allow_list.clone());
security.ip_allow_list = Some(
allow_list.iter().map(|s| rustproxy_config::IpAllowEntry::Plain(s.clone())).collect()
);
}
if let Some(ref block_list) = default_security.ip_block_list {
security.ip_block_list = Some(block_list.clone());
@@ -845,6 +847,10 @@ impl RustProxy {
connection_registry,
);
udp_mgr.set_proxy_ips(conn_config.proxy_ips);
// Wire up H3ProxyService so QUIC connections can serve HTTP/3
let http_proxy = listener.http_proxy().clone();
let h3_svc = rustproxy_http::h3_service::H3ProxyService::new(http_proxy);
udp_mgr.set_h3_service(std::sync::Arc::new(h3_svc));
self.udp_listener_manager = Some(udp_mgr);
}
}
+191
View File
@@ -0,0 +1,191 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import { SmartProxy } from '../ts/index.js';
import * as http from 'http';
import * as net from 'net';
import * as tls from 'tls';
import * as fs from 'fs';
import * as path from 'path';
import { fileURLToPath } from 'url';
import { assertPortsFree, findFreePorts } from './helpers/port-allocator.js';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const CERT_PEM = fs.readFileSync(path.join(__dirname, '..', 'assets', 'certs', 'cert.pem'), 'utf8');
const KEY_PEM = fs.readFileSync(path.join(__dirname, '..', 'assets', 'certs', 'key.pem'), 'utf8');
let httpBackendPort: number;
let tlsBackendPort: number;
let httpProxyPort: number;
let tlsProxyPort: number;
let httpBackend: http.Server;
let tlsBackend: tls.Server;
let proxy: SmartProxy;
async function pollMetrics(proxyToPoll: SmartProxy): Promise<void> {
await (proxyToPoll as any).metricsAdapter.poll();
}
async function waitForCondition(
callback: () => Promise<boolean>,
timeoutMs: number = 5000,
stepMs: number = 100,
): Promise<void> {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
if (await callback()) {
return;
}
await new Promise((resolve) => setTimeout(resolve, stepMs));
}
throw new Error(`Condition not met within ${timeoutMs}ms`);
}
function hasIpDomainRequest(domain: string): boolean {
const byIp = proxy.getMetrics().connections.domainRequestsByIP();
for (const domainMap of byIp.values()) {
if (domainMap.has(domain)) {
return true;
}
}
return false;
}
tap.test('setup - backend servers for HTTP domain rate metrics', async () => {
[httpBackendPort, tlsBackendPort, httpProxyPort, tlsProxyPort] = await findFreePorts(4);
httpBackend = http.createServer((req, res) => {
let body = '';
req.on('data', (chunk) => {
body += chunk;
});
req.on('end', () => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end(`ok:${body}`);
});
});
await new Promise<void>((resolve) => {
httpBackend.listen(httpBackendPort, () => resolve());
});
tlsBackend = tls.createServer({ cert: CERT_PEM, key: KEY_PEM }, (socket) => {
socket.on('data', (data) => {
socket.write(data);
});
socket.on('error', () => {});
});
await new Promise<void>((resolve) => {
tlsBackend.listen(tlsBackendPort, () => resolve());
});
});
tap.test('setup - start proxy with HTTP and TLS passthrough routes', async () => {
proxy = new SmartProxy({
routes: [
{
id: 'http-domain-rates',
name: 'http-domain-rates',
match: { ports: httpProxyPort, domains: 'example.com' },
action: {
type: 'forward',
targets: [{ host: 'localhost', port: httpBackendPort }],
},
},
{
id: 'tls-passthrough-domain-rates',
name: 'tls-passthrough-domain-rates',
match: { ports: tlsProxyPort, domains: 'passthrough.example.com' },
action: {
type: 'forward',
tls: { mode: 'passthrough' },
targets: [{ host: 'localhost', port: tlsBackendPort }],
},
},
],
metrics: { enabled: true, sampleIntervalMs: 100, retentionSeconds: 60 },
});
await proxy.start();
await new Promise((resolve) => setTimeout(resolve, 300));
});
tap.test('HTTP requests populate per-domain HTTP request rates', async () => {
for (let i = 0; i < 3; i++) {
await new Promise<void>((resolve, reject) => {
const body = `payload-${i}`;
const req = http.request(
{
hostname: 'localhost',
port: httpProxyPort,
path: '/echo',
method: 'POST',
headers: {
Host: 'Example.COM',
'Content-Type': 'text/plain',
'Content-Length': String(body.length),
},
},
(res) => {
res.resume();
res.on('end', () => resolve());
},
);
req.on('error', reject);
req.end(body);
});
}
await waitForCondition(async () => {
await pollMetrics(proxy);
const domainMetrics = proxy.getMetrics().requests.byDomain().get('example.com');
return (domainMetrics?.lastMinute ?? 0) >= 3 && (domainMetrics?.perSecond ?? 0) > 0;
});
const exampleMetrics = proxy.getMetrics().requests.byDomain().get('example.com');
expect(exampleMetrics).toBeTruthy();
expect(exampleMetrics?.lastMinute).toEqual(3);
expect(exampleMetrics?.perSecond).toBeGreaterThan(0);
});
tap.test('TLS passthrough SNI does not inflate HTTP domain request rates', async () => {
const tlsClient = tls.connect({
host: 'localhost',
port: tlsProxyPort,
servername: 'passthrough.example.com',
rejectUnauthorized: false,
});
await new Promise<void>((resolve, reject) => {
tlsClient.once('secureConnect', () => resolve());
tlsClient.once('error', reject);
});
const echoPromise = new Promise<void>((resolve, reject) => {
tlsClient.once('data', () => resolve());
tlsClient.once('error', reject);
});
tlsClient.write(Buffer.from('hello over tls passthrough'));
await echoPromise;
await waitForCondition(async () => {
await pollMetrics(proxy);
return hasIpDomainRequest('passthrough.example.com');
});
const requestRates = proxy.getMetrics().requests.byDomain();
expect(requestRates.has('passthrough.example.com')).toBeFalse();
expect(requestRates.get('example.com')?.lastMinute).toEqual(3);
expect(hasIpDomainRequest('passthrough.example.com')).toBeTrue();
tlsClient.destroy();
});
tap.test('cleanup - stop proxy and close backend servers', async () => {
await proxy.stop();
await new Promise<void>((resolve) => httpBackend.close(() => resolve()));
await new Promise<void>((resolve) => tlsBackend.close(() => resolve()));
await assertPortsFree([httpBackendPort, tlsBackendPort, httpProxyPort, tlsProxyPort]);
});
export default tap.start()
+4 -1
View File
@@ -83,6 +83,9 @@ tap.test('should verify new metrics API structure', async () => {
expect(metrics.throughput).toHaveProperty('history');
expect(metrics.throughput).toHaveProperty('byRoute');
expect(metrics.throughput).toHaveProperty('byIP');
// Check request methods
expect(metrics.requests).toHaveProperty('byDomain');
});
tap.test('should track active connections', async (tools) => {
@@ -273,4 +276,4 @@ tap.test('should clean up resources', async () => {
await assertPortsFree([echoServerPort, proxyPort]);
});
export default tap.start();
export default tap.start();
+25
View File
@@ -537,6 +537,31 @@ tap.test('Route Matching - routeMatchesHeaders', async () => {
'X-Custom-Header': 'value'
})).toBeFalse();
const regexHeaderRoute: IRouteConfig = {
match: {
domains: 'example.com',
ports: 80,
headers: {
'Content-Type': /^application\/(json|problem\+json)$/i,
}
},
action: {
type: 'forward',
targets: [{
host: 'localhost',
port: 3000
}]
}
};
expect(routeMatchesHeaders(regexHeaderRoute, {
'Content-Type': 'Application/Problem+Json',
})).toBeTrue();
expect(routeMatchesHeaders(regexHeaderRoute, {
'Content-Type': 'text/html',
})).toBeFalse();
// Route without header matching should match any headers
const noHeaderRoute: IRouteConfig = {
match: { ports: 80, domains: 'example.com' },
+192
View File
@@ -0,0 +1,192 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import type { ISmartProxyOptions } from '../ts/proxies/smart-proxy/models/interfaces.js';
import type { IRouteConfig } from '../ts/proxies/smart-proxy/models/route-types.js';
import { RoutePreprocessor } from '../ts/proxies/smart-proxy/route-preprocessor.js';
import { buildRustProxyOptions } from '../ts/proxies/smart-proxy/utils/rust-config.js';
tap.test('Rust contract - preprocessor serializes regex headers for Rust', async () => {
const route: IRouteConfig = {
name: 'contract-route',
match: {
ports: [443, { from: 8443, to: 8444 }],
domains: ['api.example.com', '*.example.com'],
transport: 'udp',
protocol: 'http3',
headers: {
'Content-Type': /^application\/json$/i,
},
},
action: {
type: 'forward',
targets: [{
match: {
ports: [443],
path: '/api/*',
method: ['GET'],
headers: {
'X-Env': /^(prod|stage)$/,
},
},
host: ['backend-a', 'backend-b'],
port: 'preserve',
sendProxyProtocol: true,
backendTransport: 'tcp',
}],
tls: {
mode: 'terminate',
certificate: 'auto',
},
sendProxyProtocol: true,
udp: {
maxSessionsPerIP: 321,
quic: {
enableHttp3: true,
},
},
},
security: {
ipAllowList: [{
ip: '10.0.0.0/8',
domains: ['api.example.com'],
}],
},
};
const preprocessor = new RoutePreprocessor();
const [rustRoute] = preprocessor.preprocessForRust([route]);
expect(rustRoute.match.headers?.['Content-Type']).toEqual('/^application\\/json$/i');
expect(rustRoute.match.transport).toEqual('udp');
expect(rustRoute.match.protocol).toEqual('http3');
expect(rustRoute.action.targets?.[0].match?.headers?.['X-Env']).toEqual('/^(prod|stage)$/');
expect(rustRoute.action.targets?.[0].port).toEqual('preserve');
expect(rustRoute.action.targets?.[0].backendTransport).toEqual('tcp');
expect(rustRoute.action.sendProxyProtocol).toBeTrue();
expect(rustRoute.action.udp?.maxSessionsPerIp).toEqual(321);
});
tap.test('Rust contract - preprocessor converts dynamic targets to relay-safe payloads', async () => {
const route: IRouteConfig = {
name: 'dynamic-contract-route',
match: {
ports: 8080,
},
action: {
type: 'forward',
targets: [{
host: () => 'dynamic-backend.internal',
port: () => 9443,
}],
},
};
const preprocessor = new RoutePreprocessor();
const [rustRoute] = preprocessor.preprocessForRust([route]);
expect(rustRoute.action.type).toEqual('socket-handler');
expect(rustRoute.action.targets?.[0].host).toEqual('localhost');
expect(rustRoute.action.targets?.[0].port).toEqual(0);
expect(preprocessor.getOriginalRoute('dynamic-contract-route')).toEqual(route);
});
tap.test('Rust contract - top-level config keeps shared SmartProxy settings', async () => {
const settings: ISmartProxyOptions = {
routes: [{
name: 'top-level-contract-route',
match: {
ports: 443,
domains: 'api.example.com',
},
action: {
type: 'forward',
targets: [{
host: 'backend.internal',
port: 8443,
}],
tls: {
mode: 'terminate',
certificate: 'auto',
},
},
}],
preserveSourceIP: true,
proxyIPs: ['10.0.0.1'],
acceptProxyProtocol: true,
sendProxyProtocol: true,
noDelay: true,
keepAlive: true,
keepAliveInitialDelay: 1500,
maxPendingDataSize: 4096,
disableInactivityCheck: true,
enableKeepAliveProbes: true,
enableDetailedLogging: true,
enableTlsDebugLogging: true,
enableRandomizedTimeouts: true,
connectionTimeout: 5000,
initialDataTimeout: 7000,
socketTimeout: 9000,
inactivityCheckInterval: 1100,
maxConnectionLifetime: 13000,
inactivityTimeout: 15000,
gracefulShutdownTimeout: 17000,
maxConnectionsPerIP: 20,
connectionRateLimitPerMinute: 30,
keepAliveTreatment: 'extended',
keepAliveInactivityMultiplier: 2,
extendedKeepAliveLifetime: 19000,
metrics: {
enabled: true,
sampleIntervalMs: 250,
retentionSeconds: 60,
},
acme: {
enabled: true,
email: 'ops@example.com',
environment: 'staging',
useProduction: false,
skipConfiguredCerts: true,
renewThresholdDays: 14,
renewCheckIntervalHours: 12,
autoRenew: true,
port: 80,
},
};
const preprocessor = new RoutePreprocessor();
const routes = preprocessor.preprocessForRust(settings.routes);
const config = buildRustProxyOptions(settings, routes);
expect(config.preserveSourceIp).toBeTrue();
expect(config.proxyIps).toEqual(['10.0.0.1']);
expect(config.acceptProxyProtocol).toBeTrue();
expect(config.sendProxyProtocol).toBeTrue();
expect(config.noDelay).toBeTrue();
expect(config.keepAlive).toBeTrue();
expect(config.keepAliveInitialDelay).toEqual(1500);
expect(config.maxPendingDataSize).toEqual(4096);
expect(config.disableInactivityCheck).toBeTrue();
expect(config.enableKeepAliveProbes).toBeTrue();
expect(config.enableDetailedLogging).toBeTrue();
expect(config.enableTlsDebugLogging).toBeTrue();
expect(config.enableRandomizedTimeouts).toBeTrue();
expect(config.connectionTimeout).toEqual(5000);
expect(config.initialDataTimeout).toEqual(7000);
expect(config.socketTimeout).toEqual(9000);
expect(config.inactivityCheckInterval).toEqual(1100);
expect(config.maxConnectionLifetime).toEqual(13000);
expect(config.inactivityTimeout).toEqual(15000);
expect(config.gracefulShutdownTimeout).toEqual(17000);
expect(config.maxConnectionsPerIp).toEqual(20);
expect(config.connectionRateLimitPerMinute).toEqual(30);
expect(config.keepAliveTreatment).toEqual('extended');
expect(config.keepAliveInactivityMultiplier).toEqual(2);
expect(config.extendedKeepAliveLifetime).toEqual(19000);
expect(config.metrics?.sampleIntervalMs).toEqual(250);
expect(config.acme?.email).toEqual('ops@example.com');
expect(config.acme?.environment).toEqual('staging');
expect(config.acme?.skipConfiguredCerts).toBeTrue();
expect(config.acme?.renewThresholdDays).toEqual(14);
});
export default tap.start();
+1 -1
View File
@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartproxy',
version: '27.3.1',
version: '27.8.0',
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
}
+11 -1
View File
@@ -29,6 +29,11 @@ export interface IThroughputHistoryPoint {
out: number;
}
export interface IRequestRateMetrics {
perSecond: number;
lastMinute: number;
}
/**
* Main metrics interface with clean, grouped API
*/
@@ -57,6 +62,10 @@ export interface IMetrics {
byRoute(): Map<string, number>;
byIP(): Map<string, number>;
topIPs(limit?: number): Array<{ ip: string; count: number }>;
/** Per-IP domain request counts: IP -> { domain -> count }. */
domainRequestsByIP(): Map<string, Map<string, number>>;
/** Top IP-domain pairs sorted by request count descending. */
topDomainRequests(limit?: number): Array<{ ip: string; domain: string; count: number }>;
frontendProtocols(): IProtocolDistribution;
backendProtocols(): IProtocolDistribution;
};
@@ -77,6 +86,7 @@ export interface IMetrics {
perSecond(): number;
perMinute(): number;
total(): number;
byDomain(): Map<string, IRequestRateMetrics>;
};
// Cumulative totals
@@ -181,4 +191,4 @@ export interface IByteTracker {
bytesOut: number;
startTime: number;
lastUpdate: number;
}
}
+4 -2
View File
@@ -141,8 +141,10 @@ export interface IRouteAuthentication {
* Security options for routes
*/
export interface IRouteSecurity {
// Access control lists
ipAllowList?: string[]; // IP addresses that are allowed to connect
// Access control lists.
// Entries can be plain IP/CIDR strings (full route access) or
// objects { ip, domains } to scope access to specific domains on this route.
ipAllowList?: Array<string | { ip: string; domains: string[] }>;
ipBlockList?: string[]; // IP addresses that are blocked from connecting
// Connection limits
+166
View File
@@ -0,0 +1,166 @@
import type { IProtocolCacheEntry, IProtocolDistribution } from './metrics-types.js';
import type { IAcmeOptions, ISmartProxyOptions } from './interfaces.js';
import type {
IRouteAction,
IRouteConfig,
IRouteMatch,
IRouteTarget,
ITargetMatch,
IRouteUdp,
} from './route-types.js';
export type TRustHeaderMatchers = Record<string, string>;
export interface IRustRouteMatch extends Omit<IRouteMatch, 'headers'> {
headers?: TRustHeaderMatchers;
}
export interface IRustTargetMatch extends Omit<ITargetMatch, 'headers'> {
headers?: TRustHeaderMatchers;
}
export interface IRustRouteTarget extends Omit<IRouteTarget, 'host' | 'port' | 'match'> {
host: string | string[];
port: number | 'preserve';
match?: IRustTargetMatch;
}
export interface IRustRouteUdp extends Omit<IRouteUdp, 'maxSessionsPerIP'> {
maxSessionsPerIp?: number;
}
export interface IRustDefaultConfig extends Omit<NonNullable<ISmartProxyOptions['defaults']>, 'preserveSourceIP'> {
preserveSourceIp?: boolean;
}
export interface IRustRouteAction
extends Omit<IRouteAction, 'targets' | 'socketHandler' | 'datagramHandler' | 'forwardingEngine' | 'nftables' | 'udp'> {
targets?: IRustRouteTarget[];
udp?: IRustRouteUdp;
}
export interface IRustRouteConfig extends Omit<IRouteConfig, 'match' | 'action'> {
match: IRustRouteMatch;
action: IRustRouteAction;
}
export interface IRustAcmeOptions extends Omit<IAcmeOptions, 'routeForwards'> {}
export interface IRustProxyOptions {
routes: IRustRouteConfig[];
preserveSourceIp?: boolean;
proxyIps?: string[];
acceptProxyProtocol?: boolean;
sendProxyProtocol?: boolean;
defaults?: IRustDefaultConfig;
connectionTimeout?: number;
initialDataTimeout?: number;
socketTimeout?: number;
inactivityCheckInterval?: number;
maxConnectionLifetime?: number;
inactivityTimeout?: number;
gracefulShutdownTimeout?: number;
noDelay?: boolean;
keepAlive?: boolean;
keepAliveInitialDelay?: number;
maxPendingDataSize?: number;
disableInactivityCheck?: boolean;
enableKeepAliveProbes?: boolean;
enableDetailedLogging?: boolean;
enableTlsDebugLogging?: boolean;
enableRandomizedTimeouts?: boolean;
maxConnectionsPerIp?: number;
connectionRateLimitPerMinute?: number;
keepAliveTreatment?: ISmartProxyOptions['keepAliveTreatment'];
keepAliveInactivityMultiplier?: number;
extendedKeepAliveLifetime?: number;
metrics?: ISmartProxyOptions['metrics'];
acme?: IRustAcmeOptions;
}
export interface IRustStatistics {
activeConnections: number;
totalConnections: number;
routesCount: number;
listeningPorts: number[];
uptimeSeconds: number;
}
export interface IRustCertificateStatus {
domain: string;
source: string;
expiresAt: number;
isValid: boolean;
}
export interface IRustThroughputSample {
timestampMs: number;
bytesIn: number;
bytesOut: number;
}
export interface IRustRouteMetrics {
activeConnections: number;
totalConnections: number;
bytesIn: number;
bytesOut: number;
throughputInBytesPerSec: number;
throughputOutBytesPerSec: number;
throughputRecentInBytesPerSec: number;
throughputRecentOutBytesPerSec: number;
}
export interface IRustIpMetrics {
activeConnections: number;
totalConnections: number;
bytesIn: number;
bytesOut: number;
throughputInBytesPerSec: number;
throughputOutBytesPerSec: number;
domainRequests: Record<string, number>;
}
export interface IRustBackendMetrics {
activeConnections: number;
totalConnections: number;
protocol: string;
connectErrors: number;
handshakeErrors: number;
requestErrors: number;
totalConnectTimeUs: number;
connectCount: number;
poolHits: number;
poolMisses: number;
h2Failures: number;
}
export interface IRustHttpDomainRequestMetrics {
requestsPerSecond: number;
requestsLastMinute: number;
}
export interface IRustMetricsSnapshot {
activeConnections: number;
totalConnections: number;
bytesIn: number;
bytesOut: number;
throughputInBytesPerSec: number;
throughputOutBytesPerSec: number;
throughputRecentInBytesPerSec: number;
throughputRecentOutBytesPerSec: number;
routes: Record<string, IRustRouteMetrics>;
ips: Record<string, IRustIpMetrics>;
backends: Record<string, IRustBackendMetrics>;
throughputHistory: IRustThroughputSample[];
totalHttpRequests: number;
httpRequestsPerSec: number;
httpRequestsPerSecRecent: number;
httpDomainRequests: Record<string, IRustHttpDomainRequestMetrics>;
activeUdpSessions: number;
totalUdpSessions: number;
totalDatagramsIn: number;
totalDatagramsOut: number;
detectedProtocols: IProtocolCacheEntry[];
frontendProtocols: IProtocolDistribution;
backendProtocols: IProtocolDistribution;
}
+13 -11
View File
@@ -1,5 +1,6 @@
import type { IRouteConfig, IRouteAction, IRouteTarget } from './models/route-types.js';
import { logger } from '../../core/utils/logger.js';
import type { IRustRouteConfig } from './models/rust-types.js';
import { serializeRouteForRust } from './utils/rust-config.js';
/**
* Preprocesses routes before sending them to Rust.
@@ -24,7 +25,7 @@ export class RoutePreprocessor {
* - Non-serializable fields are stripped
* - Original routes are preserved in the local map for handler lookup
*/
public preprocessForRust(routes: IRouteConfig[]): IRouteConfig[] {
public preprocessForRust(routes: IRouteConfig[]): IRustRouteConfig[] {
this.originalRoutes.clear();
return routes.map((route, index) => this.preprocessRoute(route, index));
}
@@ -43,7 +44,7 @@ export class RoutePreprocessor {
return new Map(this.originalRoutes);
}
private preprocessRoute(route: IRouteConfig, index: number): IRouteConfig {
private preprocessRoute(route: IRouteConfig, index: number): IRustRouteConfig {
const routeKey = route.name || route.id || `route_${index}`;
// Check if this route needs TS-side handling
@@ -57,7 +58,7 @@ export class RoutePreprocessor {
// Create a clean copy for Rust
const cleanRoute: IRouteConfig = {
...route,
action: this.cleanAction(route.action, routeKey, needsTsHandling),
action: this.cleanAction(route.action, needsTsHandling),
};
// Ensure we have a name for handler lookup
@@ -65,7 +66,7 @@ export class RoutePreprocessor {
cleanRoute.name = routeKey;
}
return cleanRoute;
return serializeRouteForRust(cleanRoute);
}
private routeNeedsTsHandling(route: IRouteConfig): boolean {
@@ -91,15 +92,16 @@ export class RoutePreprocessor {
return false;
}
private cleanAction(action: IRouteAction, routeKey: string, needsTsHandling: boolean): IRouteAction {
const cleanAction: IRouteAction = { ...action };
private cleanAction(action: IRouteAction, needsTsHandling: boolean): IRouteAction {
let cleanAction: IRouteAction = { ...action };
if (needsTsHandling) {
// Convert to socket-handler type for Rust (Rust will relay back to TS)
cleanAction.type = 'socket-handler';
// Remove the JS handlers (not serializable)
delete (cleanAction as any).socketHandler;
delete (cleanAction as any).datagramHandler;
const { socketHandler: _socketHandler, datagramHandler: _datagramHandler, ...serializableAction } = cleanAction;
cleanAction = {
...serializableAction,
type: 'socket-handler',
};
}
// Clean targets - replace functions with static values
+77 -33
View File
@@ -1,5 +1,6 @@
import type { IMetrics, IBackendMetrics, IProtocolCacheEntry, IProtocolDistribution, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js';
import type { IMetrics, IBackendMetrics, IProtocolCacheEntry, IProtocolDistribution, IRequestRateMetrics, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js';
import type { RustProxyBridge } from './rust-proxy-bridge.js';
import type { IRustBackendMetrics, IRustHttpDomainRequestMetrics, IRustIpMetrics, IRustMetricsSnapshot, IRustRouteMetrics } from './models/rust-types.js';
/**
* Adapts Rust JSON metrics to the IMetrics interface.
@@ -14,7 +15,7 @@ import type { RustProxyBridge } from './rust-proxy-bridge.js';
*/
export class RustMetricsAdapter implements IMetrics {
private bridge: RustProxyBridge;
private cache: any = null;
private cache: IRustMetricsSnapshot | null = null;
private pollTimer: ReturnType<typeof setInterval> | null = null;
private pollIntervalMs: number;
@@ -65,8 +66,8 @@ export class RustMetricsAdapter implements IMetrics {
byRoute: (): Map<string, number> => {
const result = new Map<string, number>();
if (this.cache?.routes) {
for (const [name, rm] of Object.entries(this.cache.routes)) {
result.set(name, (rm as any).activeConnections ?? 0);
for (const [name, rm] of Object.entries(this.cache.routes) as Array<[string, IRustRouteMetrics]>) {
result.set(name, rm.activeConnections ?? 0);
}
}
return result;
@@ -74,8 +75,8 @@ export class RustMetricsAdapter implements IMetrics {
byIP: (): Map<string, number> => {
const result = new Map<string, number>();
if (this.cache?.ips) {
for (const [ip, im] of Object.entries(this.cache.ips)) {
result.set(ip, (im as any).activeConnections ?? 0);
for (const [ip, im] of Object.entries(this.cache.ips) as Array<[string, IRustIpMetrics]>) {
result.set(ip, im.activeConnections ?? 0);
}
}
return result;
@@ -83,8 +84,41 @@ export class RustMetricsAdapter implements IMetrics {
topIPs: (limit: number = 10): Array<{ ip: string; count: number }> => {
const result: Array<{ ip: string; count: number }> = [];
if (this.cache?.ips) {
for (const [ip, im] of Object.entries(this.cache.ips)) {
result.push({ ip, count: (im as any).activeConnections ?? 0 });
for (const [ip, im] of Object.entries(this.cache.ips) as Array<[string, IRustIpMetrics]>) {
result.push({ ip, count: im.activeConnections ?? 0 });
}
}
result.sort((a, b) => b.count - a.count);
return result.slice(0, limit);
},
domainRequestsByIP: (): Map<string, Map<string, number>> => {
const result = new Map<string, Map<string, number>>();
if (this.cache?.ips) {
for (const [ip, im] of Object.entries(this.cache.ips) as Array<[string, IRustIpMetrics]>) {
const dr = im.domainRequests;
if (dr && typeof dr === 'object') {
const domainMap = new Map<string, number>();
for (const [domain, count] of Object.entries(dr)) {
domainMap.set(domain, count as number);
}
if (domainMap.size > 0) {
result.set(ip, domainMap);
}
}
}
}
return result;
},
topDomainRequests: (limit: number = 20): Array<{ ip: string; domain: string; count: number }> => {
const result: Array<{ ip: string; domain: string; count: number }> = [];
if (this.cache?.ips) {
for (const [ip, im] of Object.entries(this.cache.ips) as Array<[string, IRustIpMetrics]>) {
const dr = im.domainRequests;
if (dr && typeof dr === 'object') {
for (const [domain, count] of Object.entries(dr)) {
result.push({ ip, domain, count: count as number });
}
}
}
}
result.sort((a, b) => b.count - a.count);
@@ -143,7 +177,7 @@ export class RustMetricsAdapter implements IMetrics {
},
history: (seconds: number): Array<IThroughputHistoryPoint> => {
if (!this.cache?.throughputHistory) return [];
return this.cache.throughputHistory.slice(-seconds).map((p: any) => ({
return this.cache.throughputHistory.slice(-seconds).map((p) => ({
timestamp: p.timestampMs,
in: p.bytesIn,
out: p.bytesOut,
@@ -152,10 +186,10 @@ export class RustMetricsAdapter implements IMetrics {
byRoute: (_windowSeconds?: number): Map<string, IThroughputData> => {
const result = new Map<string, IThroughputData>();
if (this.cache?.routes) {
for (const [name, rm] of Object.entries(this.cache.routes)) {
for (const [name, rm] of Object.entries(this.cache.routes) as Array<[string, IRustRouteMetrics]>) {
result.set(name, {
in: (rm as any).throughputInBytesPerSec ?? 0,
out: (rm as any).throughputOutBytesPerSec ?? 0,
in: rm.throughputInBytesPerSec ?? 0,
out: rm.throughputOutBytesPerSec ?? 0,
});
}
}
@@ -164,10 +198,10 @@ export class RustMetricsAdapter implements IMetrics {
byIP: (_windowSeconds?: number): Map<string, IThroughputData> => {
const result = new Map<string, IThroughputData>();
if (this.cache?.ips) {
for (const [ip, im] of Object.entries(this.cache.ips)) {
for (const [ip, im] of Object.entries(this.cache.ips) as Array<[string, IRustIpMetrics]>) {
result.set(ip, {
in: (im as any).throughputInBytesPerSec ?? 0,
out: (im as any).throughputOutBytesPerSec ?? 0,
in: im.throughputInBytesPerSec ?? 0,
out: im.throughputOutBytesPerSec ?? 0,
});
}
}
@@ -185,6 +219,18 @@ export class RustMetricsAdapter implements IMetrics {
total: (): number => {
return this.cache?.totalHttpRequests ?? this.cache?.totalConnections ?? 0;
},
byDomain: (): Map<string, IRequestRateMetrics> => {
const result = new Map<string, IRequestRateMetrics>();
if (this.cache?.httpDomainRequests) {
for (const [domain, metrics] of Object.entries(this.cache.httpDomainRequests) as Array<[string, IRustHttpDomainRequestMetrics]>) {
result.set(domain, {
perSecond: metrics.requestsPerSecond ?? 0,
lastMinute: metrics.requestsLastMinute ?? 0,
});
}
}
return result;
},
};
public totals = {
@@ -203,23 +249,22 @@ export class RustMetricsAdapter implements IMetrics {
byBackend: (): Map<string, IBackendMetrics> => {
const result = new Map<string, IBackendMetrics>();
if (this.cache?.backends) {
for (const [key, bm] of Object.entries(this.cache.backends)) {
const m = bm as any;
const totalTimeUs = m.totalConnectTimeUs ?? 0;
const count = m.connectCount ?? 0;
const poolHits = m.poolHits ?? 0;
const poolMisses = m.poolMisses ?? 0;
for (const [key, bm] of Object.entries(this.cache.backends) as Array<[string, IRustBackendMetrics]>) {
const totalTimeUs = bm.totalConnectTimeUs ?? 0;
const count = bm.connectCount ?? 0;
const poolHits = bm.poolHits ?? 0;
const poolMisses = bm.poolMisses ?? 0;
const poolTotal = poolHits + poolMisses;
result.set(key, {
protocol: m.protocol ?? 'unknown',
activeConnections: m.activeConnections ?? 0,
totalConnections: m.totalConnections ?? 0,
connectErrors: m.connectErrors ?? 0,
handshakeErrors: m.handshakeErrors ?? 0,
requestErrors: m.requestErrors ?? 0,
protocol: bm.protocol ?? 'unknown',
activeConnections: bm.activeConnections ?? 0,
totalConnections: bm.totalConnections ?? 0,
connectErrors: bm.connectErrors ?? 0,
handshakeErrors: bm.handshakeErrors ?? 0,
requestErrors: bm.requestErrors ?? 0,
avgConnectTimeMs: count > 0 ? (totalTimeUs / count) / 1000 : 0,
poolHitRate: poolTotal > 0 ? poolHits / poolTotal : 0,
h2Failures: m.h2Failures ?? 0,
h2Failures: bm.h2Failures ?? 0,
});
}
}
@@ -228,8 +273,8 @@ export class RustMetricsAdapter implements IMetrics {
protocols: (): Map<string, string> => {
const result = new Map<string, string>();
if (this.cache?.backends) {
for (const [key, bm] of Object.entries(this.cache.backends)) {
result.set(key, (bm as any).protocol ?? 'unknown');
for (const [key, bm] of Object.entries(this.cache.backends) as Array<[string, IRustBackendMetrics]>) {
result.set(key, bm.protocol ?? 'unknown');
}
}
return result;
@@ -237,9 +282,8 @@ export class RustMetricsAdapter implements IMetrics {
topByErrors: (limit: number = 10): Array<{ backend: string; errors: number }> => {
const result: Array<{ backend: string; errors: number }> = [];
if (this.cache?.backends) {
for (const [key, bm] of Object.entries(this.cache.backends)) {
const m = bm as any;
const errors = (m.connectErrors ?? 0) + (m.handshakeErrors ?? 0) + (m.requestErrors ?? 0);
for (const [key, bm] of Object.entries(this.cache.backends) as Array<[string, IRustBackendMetrics]>) {
const errors = (bm.connectErrors ?? 0) + (bm.handshakeErrors ?? 0) + (bm.requestErrors ?? 0);
if (errors > 0) result.push({ backend: key, errors });
}
}
+24 -18
View File
@@ -1,23 +1,29 @@
import * as plugins from '../../plugins.js';
import { logger } from '../../core/utils/logger.js';
import type { IRouteConfig } from './models/route-types.js';
import type {
IRustCertificateStatus,
IRustMetricsSnapshot,
IRustProxyOptions,
IRustRouteConfig,
IRustStatistics,
} from './models/rust-types.js';
/**
* Type-safe command definitions for the Rust proxy IPC protocol.
*/
type TSmartProxyCommands = {
start: { params: { config: any }; result: void };
stop: { params: Record<string, never>; result: void };
updateRoutes: { params: { routes: IRouteConfig[] }; result: void };
getMetrics: { params: Record<string, never>; result: any };
getStatistics: { params: Record<string, never>; result: any };
provisionCertificate: { params: { routeName: string }; result: void };
renewCertificate: { params: { routeName: string }; result: void };
getCertificateStatus: { params: { routeName: string }; result: any };
getListeningPorts: { params: Record<string, never>; result: { ports: number[] } };
setSocketHandlerRelay: { params: { socketPath: string }; result: void };
addListeningPort: { params: { port: number }; result: void };
removeListeningPort: { params: { port: number }; result: void };
start: { params: { config: IRustProxyOptions }; result: void };
stop: { params: Record<string, never>; result: void };
updateRoutes: { params: { routes: IRustRouteConfig[] }; result: void };
getMetrics: { params: Record<string, never>; result: IRustMetricsSnapshot };
getStatistics: { params: Record<string, never>; result: IRustStatistics };
provisionCertificate: { params: { routeName: string }; result: void };
renewCertificate: { params: { routeName: string }; result: void };
getCertificateStatus: { params: { routeName: string }; result: IRustCertificateStatus | null };
getListeningPorts: { params: Record<string, never>; result: { ports: number[] } };
setSocketHandlerRelay: { params: { socketPath: string }; result: void };
addListeningPort: { params: { port: number }; result: void };
removeListeningPort: { params: { port: number }; result: void };
loadCertificate: { params: { domain: string; cert: string; key: string; ca?: string }; result: void };
setDatagramHandlerRelay: { params: { socketPath: string }; result: void };
};
@@ -121,7 +127,7 @@ export class RustProxyBridge extends plugins.EventEmitter {
// --- Convenience methods for each management command ---
public async startProxy(config: any): Promise<void> {
public async startProxy(config: IRustProxyOptions): Promise<void> {
await this.bridge.sendCommand('start', { config });
}
@@ -129,15 +135,15 @@ export class RustProxyBridge extends plugins.EventEmitter {
await this.bridge.sendCommand('stop', {} as Record<string, never>);
}
public async updateRoutes(routes: IRouteConfig[]): Promise<void> {
public async updateRoutes(routes: IRustRouteConfig[]): Promise<void> {
await this.bridge.sendCommand('updateRoutes', { routes });
}
public async getMetrics(): Promise<any> {
public async getMetrics(): Promise<IRustMetricsSnapshot> {
return this.bridge.sendCommand('getMetrics', {} as Record<string, never>);
}
public async getStatistics(): Promise<any> {
public async getStatistics(): Promise<IRustStatistics> {
return this.bridge.sendCommand('getStatistics', {} as Record<string, never>);
}
@@ -149,7 +155,7 @@ export class RustProxyBridge extends plugins.EventEmitter {
await this.bridge.sendCommand('renewCertificate', { routeName });
}
public async getCertificateStatus(routeName: string): Promise<any> {
public async getCertificateStatus(routeName: string): Promise<IRustCertificateStatus | null> {
return this.bridge.sendCommand('getCertificateStatus', { routeName });
}
+6 -33
View File
@@ -11,6 +11,7 @@ import { RustMetricsAdapter } from './rust-metrics-adapter.js';
// Route management
import { SharedRouteManager as RouteManager } from '../../core/routing/route-manager.js';
import { RouteValidator } from './utils/route-validator.js';
import { buildRustProxyOptions } from './utils/rust-config.js';
import { generateDefaultCertificate } from './utils/default-cert-generator.js';
import { Mutex } from './utils/mutex.js';
import { ConcurrencySemaphore } from './utils/concurrency-semaphore.js';
@@ -19,6 +20,7 @@ import { ConcurrencySemaphore } from './utils/concurrency-semaphore.js';
import type { ISmartProxyOptions, TSmartProxyCertProvisionObject, IAcmeOptions, ICertProvisionEventComms, ICertificateIssuedEvent, ICertificateFailedEvent } from './models/interfaces.js';
import type { IRouteConfig } from './models/route-types.js';
import type { IMetrics } from './models/metrics-types.js';
import type { IRustCertificateStatus, IRustProxyOptions, IRustStatistics } from './models/rust-types.js';
/**
* SmartProxy - Rust-backed proxy engine with TypeScript configuration API.
@@ -365,7 +367,7 @@ export class SmartProxy extends plugins.EventEmitter {
/**
* Get certificate status for a route (async - calls Rust).
*/
public async getCertificateStatus(routeName: string): Promise<any> {
public async getCertificateStatus(routeName: string): Promise<IRustCertificateStatus | null> {
return this.bridge.getCertificateStatus(routeName);
}
@@ -379,7 +381,7 @@ export class SmartProxy extends plugins.EventEmitter {
/**
* Get statistics (async - calls Rust).
*/
public async getStatistics(): Promise<any> {
public async getStatistics(): Promise<IRustStatistics> {
return this.bridge.getStatistics();
}
@@ -484,37 +486,8 @@ export class SmartProxy extends plugins.EventEmitter {
/**
* Build the Rust configuration object from TS settings.
*/
private buildRustConfig(routes: IRouteConfig[], acmeOverride?: IAcmeOptions): any {
const acme = acmeOverride !== undefined ? acmeOverride : this.settings.acme;
return {
routes,
defaults: this.settings.defaults,
acme: acme
? {
enabled: acme.enabled,
email: acme.email,
useProduction: acme.useProduction,
port: acme.port,
renewThresholdDays: acme.renewThresholdDays,
autoRenew: acme.autoRenew,
renewCheckIntervalHours: acme.renewCheckIntervalHours,
}
: undefined,
connectionTimeout: this.settings.connectionTimeout,
initialDataTimeout: this.settings.initialDataTimeout,
socketTimeout: this.settings.socketTimeout,
maxConnectionLifetime: this.settings.maxConnectionLifetime,
gracefulShutdownTimeout: this.settings.gracefulShutdownTimeout,
maxConnectionsPerIp: this.settings.maxConnectionsPerIP,
connectionRateLimitPerMinute: this.settings.connectionRateLimitPerMinute,
keepAliveTreatment: this.settings.keepAliveTreatment,
keepAliveInactivityMultiplier: this.settings.keepAliveInactivityMultiplier,
extendedKeepAliveLifetime: this.settings.extendedKeepAliveLifetime,
proxyIps: this.settings.proxyIPs,
acceptProxyProtocol: this.settings.acceptProxyProtocol,
sendProxyProtocol: this.settings.sendProxyProtocol,
metrics: this.settings.metrics,
};
private buildRustConfig(routes: IRustProxyOptions['routes'], acmeOverride?: IAcmeOptions): IRustProxyOptions {
return buildRustProxyOptions(this.settings, routes, acmeOverride);
}
/**
+22 -8
View File
@@ -168,14 +168,28 @@ export function routeMatchesHeaders(
if (!route.match?.headers || Object.keys(route.match.headers).length === 0) {
return true; // No headers specified means it matches any headers
}
// Convert RegExp patterns to strings for HeaderMatcher
const stringHeaders: Record<string, string> = {};
for (const [key, value] of Object.entries(route.match.headers)) {
stringHeaders[key] = value instanceof RegExp ? value.source : value;
for (const [headerName, expectedValue] of Object.entries(route.match.headers)) {
const actualKey = Object.keys(headers).find((key) => key.toLowerCase() === headerName.toLowerCase());
const actualValue = actualKey ? headers[actualKey] : undefined;
if (actualValue === undefined) {
return false;
}
if (expectedValue instanceof RegExp) {
if (!expectedValue.test(actualValue)) {
return false;
}
continue;
}
if (!HeaderMatcher.match(expectedValue, actualValue)) {
return false;
}
}
return HeaderMatcher.matchAll(stringHeaders, headers);
return true;
}
/**
@@ -283,4 +297,4 @@ export function generateRouteId(route: IRouteConfig): string {
*/
export function cloneRoute(route: IRouteConfig): IRouteConfig {
return JSON.parse(JSON.stringify(route));
}
}
@@ -196,10 +196,19 @@ export class RouteValidator {
// Validate IP allow/block lists
if (route.security.ipAllowList) {
const allowList = Array.isArray(route.security.ipAllowList) ? route.security.ipAllowList : [route.security.ipAllowList];
for (const ip of allowList) {
if (!this.isValidIPPattern(ip)) {
errors.push(`Invalid IP pattern in allow list: ${ip}`);
for (const entry of allowList) {
if (typeof entry === 'string') {
if (!this.isValidIPPattern(entry)) {
errors.push(`Invalid IP pattern in allow list: ${entry}`);
}
} else if (entry && typeof entry === 'object') {
if (!this.isValidIPPattern(entry.ip)) {
errors.push(`Invalid IP pattern in domain-scoped allow entry: ${entry.ip}`);
}
if (!Array.isArray(entry.domains) || entry.domains.length === 0) {
errors.push(`Domain-scoped allow entry for ${entry.ip} must have non-empty domains array`);
}
}
}
}
+187
View File
@@ -0,0 +1,187 @@
import type { IAcmeOptions, ISmartProxyOptions } from '../models/interfaces.js';
import type { IRouteAction, IRouteConfig, IRouteMatch, IRouteTarget, ITargetMatch } from '../models/route-types.js';
import type {
IRustAcmeOptions,
IRustDefaultConfig,
IRustProxyOptions,
IRustRouteAction,
IRustRouteConfig,
IRustRouteMatch,
IRustRouteTarget,
IRustTargetMatch,
IRustRouteUdp,
TRustHeaderMatchers,
} from '../models/rust-types.js';
const SUPPORTED_REGEX_FLAGS = new Set(['i', 'm', 's', 'u', 'g']);
export function serializeHeaderMatchValue(value: string | RegExp): string {
if (typeof value === 'string') {
return value;
}
const unsupportedFlags = Array.from(new Set(value.flags)).filter((flag) => !SUPPORTED_REGEX_FLAGS.has(flag));
if (unsupportedFlags.length > 0) {
throw new Error(
`Header RegExp uses unsupported flags for Rust serialization: ${unsupportedFlags.join(', ')}`
);
}
return `/${value.source}/${value.flags}`;
}
export function serializeHeaderMatchers(headers?: Record<string, string | RegExp>): TRustHeaderMatchers | undefined {
if (!headers) {
return undefined;
}
return Object.fromEntries(
Object.entries(headers).map(([key, value]) => [key, serializeHeaderMatchValue(value)])
);
}
export function serializeTargetMatchForRust(match?: ITargetMatch): IRustTargetMatch | undefined {
if (!match) {
return undefined;
}
return {
...match,
headers: serializeHeaderMatchers(match.headers),
};
}
export function serializeRouteMatchForRust(match: IRouteMatch): IRustRouteMatch {
return {
...match,
headers: serializeHeaderMatchers(match.headers),
};
}
export function serializeRouteTargetForRust(target: IRouteTarget): IRustRouteTarget {
if (typeof target.host !== 'string' && !Array.isArray(target.host)) {
throw new Error('Route target host must be serialized before sending to Rust');
}
if (typeof target.port !== 'number' && target.port !== 'preserve') {
throw new Error('Route target port must be serialized before sending to Rust');
}
return {
...target,
host: target.host,
port: target.port,
match: serializeTargetMatchForRust(target.match),
};
}
function serializeUdpForRust(udp?: IRouteAction['udp']): IRustRouteUdp | undefined {
if (!udp) {
return undefined;
}
const { maxSessionsPerIP, ...rest } = udp;
return {
...rest,
maxSessionsPerIp: maxSessionsPerIP,
};
}
export function serializeRouteActionForRust(action: IRouteAction): IRustRouteAction {
const {
socketHandler: _socketHandler,
datagramHandler: _datagramHandler,
forwardingEngine: _forwardingEngine,
nftables: _nftables,
targets,
udp,
...rest
} = action;
return {
...rest,
targets: targets?.map((target) => serializeRouteTargetForRust(target)),
udp: serializeUdpForRust(udp),
};
}
export function serializeRouteForRust(route: IRouteConfig): IRustRouteConfig {
return {
...route,
match: serializeRouteMatchForRust(route.match),
action: serializeRouteActionForRust(route.action),
};
}
function serializeAcmeForRust(acme?: IAcmeOptions): IRustAcmeOptions | undefined {
if (!acme) {
return undefined;
}
return {
enabled: acme.enabled,
email: acme.email,
environment: acme.environment,
accountEmail: acme.accountEmail,
port: acme.port,
useProduction: acme.useProduction,
renewThresholdDays: acme.renewThresholdDays,
autoRenew: acme.autoRenew,
skipConfiguredCerts: acme.skipConfiguredCerts,
renewCheckIntervalHours: acme.renewCheckIntervalHours,
};
}
function serializeDefaultsForRust(defaults?: ISmartProxyOptions['defaults']): IRustDefaultConfig | undefined {
if (!defaults) {
return undefined;
}
const { preserveSourceIP, ...rest } = defaults;
return {
...rest,
preserveSourceIp: preserveSourceIP,
};
}
export function buildRustProxyOptions(
settings: ISmartProxyOptions,
routes: IRustRouteConfig[],
acmeOverride?: IAcmeOptions,
): IRustProxyOptions {
const acme = acmeOverride !== undefined ? acmeOverride : settings.acme;
return {
routes,
preserveSourceIp: settings.preserveSourceIP,
proxyIps: settings.proxyIPs,
acceptProxyProtocol: settings.acceptProxyProtocol,
sendProxyProtocol: settings.sendProxyProtocol,
defaults: serializeDefaultsForRust(settings.defaults),
connectionTimeout: settings.connectionTimeout,
initialDataTimeout: settings.initialDataTimeout,
socketTimeout: settings.socketTimeout,
inactivityCheckInterval: settings.inactivityCheckInterval,
maxConnectionLifetime: settings.maxConnectionLifetime,
inactivityTimeout: settings.inactivityTimeout,
gracefulShutdownTimeout: settings.gracefulShutdownTimeout,
noDelay: settings.noDelay,
keepAlive: settings.keepAlive,
keepAliveInitialDelay: settings.keepAliveInitialDelay,
maxPendingDataSize: settings.maxPendingDataSize,
disableInactivityCheck: settings.disableInactivityCheck,
enableKeepAliveProbes: settings.enableKeepAliveProbes,
enableDetailedLogging: settings.enableDetailedLogging,
enableTlsDebugLogging: settings.enableTlsDebugLogging,
enableRandomizedTimeouts: settings.enableRandomizedTimeouts,
maxConnectionsPerIp: settings.maxConnectionsPerIP,
connectionRateLimitPerMinute: settings.connectionRateLimitPerMinute,
keepAliveTreatment: settings.keepAliveTreatment,
keepAliveInactivityMultiplier: settings.keepAliveInactivityMultiplier,
extendedKeepAliveLifetime: settings.extendedKeepAliveLifetime,
metrics: settings.metrics,
acme: serializeAcmeForRust(acme),
};
}