Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 47140e5403 | |||
| a6ffa24e36 | |||
| c0e432fd9b | |||
| a3d8a3a388 | |||
| 437d1a3329 | |||
| 746d93663d | |||
| a3f3fee253 | |||
| 53dee1fffc | |||
| 34dc0cb9b6 | |||
| c83c43194b |
37
changelog.md
37
changelog.md
@@ -1,5 +1,42 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-03-26 - 26.3.0 - feat(nftables)
|
||||
move NFTables forwarding management from the Rust engine to @push.rocks/smartnftables
|
||||
|
||||
- add @push.rocks/smartnftables as a runtime dependency and export it via the plugin layer
|
||||
- remove the internal rustproxy-nftables crate along with Rust-side NFTables rule application and status management
|
||||
- apply and clean up NFTables port-forwarding rules in the TypeScript SmartProxy lifecycle and route update flow
|
||||
- change getNfTablesStatus to return local smartnftables status instead of querying the Rust bridge
|
||||
- update README documentation to describe NFTables support as provided through @push.rocks/smartnftables
|
||||
|
||||
## 2026-03-26 - 26.2.4 - fix(rustproxy-http)
|
||||
improve HTTP/3 connection reuse and clean up stale proxy state
|
||||
|
||||
- Reuse pooled HTTP/3 SendRequest handles to skip repeated SETTINGS handshakes and reduce request overhead on QUIC pool hits
|
||||
- Add periodic cleanup for per-route rate limiters and orphaned backend metrics to prevent unbounded memory growth after traffic or backend errors stop
|
||||
- Enforce HTTP max connection lifetime alongside idle timeouts and apply configured lifetime values from the TCP listener
|
||||
- Reduce HTTP/3 body copying by using owned Bytes paths for request and response streaming, and replace the custom response body adapter with a stream-based implementation
|
||||
- Harden auxiliary proxy components by capping datagram handler buffer growth and removing duplicate RustProxy exit listeners
|
||||
|
||||
## 2026-03-25 - 26.2.3 - fix(repo)
|
||||
no changes to commit
|
||||
|
||||
|
||||
## 2026-03-25 - 26.2.2 - fix(proxy)
|
||||
improve connection cleanup and route validation handling
|
||||
|
||||
- add timeouts for HTTP/1 upstream connection drivers to prevent lingering tasks
|
||||
- ensure QUIC relay sessions cancel and abort background tasks on drop
|
||||
- avoid registering unnamed routes as duplicates and label unnamed catch-all conflicts clearly
|
||||
- fix offset mapping route helper to forward only remaining route options without overriding derived values
|
||||
- update project config filename and toolchain versions for the current build setup
|
||||
|
||||
## 2026-03-23 - 26.2.1 - fix(rustproxy-http)
|
||||
include the upstream request URL when caching H3 Alt-Svc discoveries
|
||||
|
||||
- Tracks the request path that triggered Alt-Svc discovery in connection activity state
|
||||
- Adds request URL context to Alt-Svc debug logging and protocol cache insertion reasons for better traceability
|
||||
|
||||
## 2026-03-23 - 26.2.0 - feat(protocol-cache)
|
||||
add sliding TTL re-probing and eviction for backend protocol detection
|
||||
|
||||
|
||||
2
license
2
license
@@ -1,4 +1,4 @@
|
||||
Copyright (c) 2019 Lossless GmbH (hello@lossless.com)
|
||||
Copyright (c) 2019 Task Venture Capital GmbH (hello@task.vc)
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
17
package.json
17
package.json
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartproxy",
|
||||
"version": "26.2.0",
|
||||
"version": "26.3.0",
|
||||
"private": false,
|
||||
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
|
||||
"main": "dist_ts/index.js",
|
||||
@@ -16,18 +16,19 @@
|
||||
"buildDocs": "tsdoc"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@git.zone/tsbuild": "^4.3.0",
|
||||
"@git.zone/tsrun": "^2.0.1",
|
||||
"@git.zone/tsrust": "^1.3.0",
|
||||
"@git.zone/tstest": "^3.5.0",
|
||||
"@push.rocks/smartserve": "^2.0.1",
|
||||
"@git.zone/tsbuild": "^4.4.0",
|
||||
"@git.zone/tsrun": "^2.0.2",
|
||||
"@git.zone/tsrust": "^1.3.2",
|
||||
"@git.zone/tstest": "^3.6.0",
|
||||
"@push.rocks/smartserve": "^2.0.3",
|
||||
"@types/node": "^25.5.0",
|
||||
"typescript": "^5.9.3",
|
||||
"typescript": "^6.0.2",
|
||||
"why-is-node-running": "^3.2.2"
|
||||
},
|
||||
"dependencies": {
|
||||
"@push.rocks/smartcrypto": "^2.0.4",
|
||||
"@push.rocks/smartlog": "^3.2.1",
|
||||
"@push.rocks/smartnftables": "^1.0.1",
|
||||
"@push.rocks/smartrust": "^1.3.2",
|
||||
"@tsclass/tsclass": "^9.5.0",
|
||||
"minimatch": "^10.2.4"
|
||||
@@ -41,7 +42,7 @@
|
||||
"dist_ts_web/**/*",
|
||||
"assets/**/*",
|
||||
"cli.js",
|
||||
"npmextra.json",
|
||||
".smartconfig.json",
|
||||
"readme.md",
|
||||
"changelog.md"
|
||||
],
|
||||
|
||||
506
pnpm-lock.yaml
generated
506
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
38
readme.md
38
readme.md
@@ -1,6 +1,6 @@
|
||||
# @push.rocks/smartproxy 🚀
|
||||
|
||||
**A high-performance, Rust-powered proxy toolkit for Node.js** — unified route-based configuration for SSL/TLS termination, HTTP/HTTPS reverse proxying, WebSocket support, UDP/QUIC/HTTP3, load balancing, custom protocol handlers, and kernel-level NFTables forwarding.
|
||||
**A high-performance, Rust-powered proxy toolkit for Node.js** — unified route-based configuration for SSL/TLS termination, HTTP/HTTPS reverse proxying, WebSocket support, UDP/QUIC/HTTP3, load balancing, custom protocol handlers, and kernel-level NFTables forwarding via [`@push.rocks/smartnftables`](https://code.foss.global/push.rocks/smartnftables).
|
||||
|
||||
## 📦 Installation
|
||||
|
||||
@@ -384,7 +384,7 @@ const dualStackRoute: IRouteConfig = {
|
||||
|
||||
### ⚡ High-Performance NFTables Forwarding
|
||||
|
||||
For ultra-low latency on Linux, use kernel-level forwarding (requires root):
|
||||
For ultra-low latency on Linux, use kernel-level forwarding via [`@push.rocks/smartnftables`](https://code.foss.global/push.rocks/smartnftables) (requires root):
|
||||
|
||||
```typescript
|
||||
import { SmartProxy, createNfTablesTerminateRoute } from '@push.rocks/smartproxy';
|
||||
@@ -694,22 +694,26 @@ SmartProxy uses a hybrid **Rust + TypeScript** architecture:
|
||||
│ │ Listener│ │ Reverse │ │ Matcher │ │ Cert Mgr │ │
|
||||
│ │ │ │ Proxy │ │ │ │ │ │
|
||||
│ └─────────┘ └─────────┘ └─────────┘ └──────────┘ │
|
||||
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌──────────┐ │
|
||||
│ │ UDP │ │ Security│ │ Metrics │ │ NFTables │ │
|
||||
│ │ QUIC │ │ Enforce │ │ Collect │ │ Mgr │ │
|
||||
│ │ HTTP/3 │ │ │ │ │ │ │ │
|
||||
│ └─────────┘ └─────────┘ └─────────┘ └──────────┘ │
|
||||
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
|
||||
│ │ UDP │ │ Security│ │ Metrics │ │
|
||||
│ │ QUIC │ │ Enforce │ │ Collect │ │
|
||||
│ │ HTTP/3 │ │ │ │ │ │
|
||||
│ └─────────┘ └─────────┘ └─────────┘ │
|
||||
└──────────────────┬──────────────────────────────────┘
|
||||
│ Unix Socket Relay
|
||||
┌──────────────────▼──────────────────────────────────┐
|
||||
│ TypeScript Socket & Datagram Handler Servers │
|
||||
│ (for JS socket handlers, datagram handlers, │
|
||||
│ and dynamic routes) │
|
||||
├─────────────────────────────────────────────────────┤
|
||||
│ @push.rocks/smartnftables (kernel-level NFTables) │
|
||||
│ (DNAT/SNAT, firewall, rate limiting via nft CLI) │
|
||||
└─────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
- **Rust Engine** handles all networking: TCP, UDP, TLS, QUIC, HTTP proxying, connection management, security, and metrics
|
||||
- **TypeScript** provides the npm API, configuration types, route helpers, validation, and handler callbacks
|
||||
- **NFTables** managed by [`@push.rocks/smartnftables`](https://code.foss.global/push.rocks/smartnftables) — kernel-level DNAT/SNAT forwarding, firewall rules, and rate limiting via the `nft` CLI
|
||||
- **IPC** — The TypeScript wrapper uses JSON commands/events over stdin/stdout to communicate with the Rust binary
|
||||
- **Socket/Datagram Relay** — Unix domain socket servers for routes requiring TypeScript-side handling (socket handlers, datagram handlers, dynamic host/port functions)
|
||||
|
||||
@@ -938,8 +942,8 @@ class SmartProxy extends EventEmitter {
|
||||
getCertificateStatus(routeName: string): Promise<any>;
|
||||
getEligibleDomainsForCertificates(): string[];
|
||||
|
||||
// NFTables
|
||||
getNfTablesStatus(): Promise<Record<string, any>>;
|
||||
// NFTables (managed by @push.rocks/smartnftables)
|
||||
getNfTablesStatus(): INftStatus | null;
|
||||
|
||||
// Events
|
||||
on(event: 'error', handler: (err: Error) => void): this;
|
||||
@@ -991,11 +995,11 @@ interface ISmartProxyOptions {
|
||||
sendProxyProtocol?: boolean; // Send PROXY protocol to targets
|
||||
|
||||
// Timeouts
|
||||
connectionTimeout?: number; // Backend connection timeout (default: 30s)
|
||||
initialDataTimeout?: number; // Initial data/SNI timeout (default: 120s)
|
||||
socketTimeout?: number; // Socket inactivity timeout (default: 1h)
|
||||
maxConnectionLifetime?: number; // Max connection lifetime (default: 24h)
|
||||
inactivityTimeout?: number; // Inactivity timeout (default: 4h)
|
||||
connectionTimeout?: number; // Backend connection timeout (default: 60s)
|
||||
initialDataTimeout?: number; // Initial data/SNI timeout (default: 60s)
|
||||
socketTimeout?: number; // Socket inactivity timeout (default: 60s)
|
||||
maxConnectionLifetime?: number; // Max connection lifetime (default: 1h)
|
||||
inactivityTimeout?: number; // Inactivity timeout (default: 75s)
|
||||
gracefulShutdownTimeout?: number; // Shutdown grace period (default: 30s)
|
||||
|
||||
// Connection limits
|
||||
@@ -1004,8 +1008,8 @@ interface ISmartProxyOptions {
|
||||
|
||||
// Keep-alive
|
||||
keepAliveTreatment?: 'standard' | 'extended' | 'immortal';
|
||||
keepAliveInactivityMultiplier?: number; // (default: 6)
|
||||
extendedKeepAliveLifetime?: number; // (default: 7 days)
|
||||
keepAliveInactivityMultiplier?: number; // (default: 4)
|
||||
extendedKeepAliveLifetime?: number; // (default: 1h)
|
||||
|
||||
// Metrics
|
||||
metrics?: {
|
||||
@@ -1137,7 +1141,7 @@ SmartProxy searches for the Rust binary in this order:
|
||||
|
||||
## License and Legal Information
|
||||
|
||||
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
|
||||
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [license](./license) file.
|
||||
|
||||
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
|
||||
|
||||
|
||||
16
rust/Cargo.lock
generated
16
rust/Cargo.lock
generated
@@ -1238,7 +1238,6 @@ dependencies = [
|
||||
"rustproxy-config",
|
||||
"rustproxy-http",
|
||||
"rustproxy-metrics",
|
||||
"rustproxy-nftables",
|
||||
"rustproxy-passthrough",
|
||||
"rustproxy-routing",
|
||||
"rustproxy-security",
|
||||
@@ -1270,6 +1269,7 @@ dependencies = [
|
||||
"arc-swap",
|
||||
"bytes",
|
||||
"dashmap",
|
||||
"futures",
|
||||
"h3",
|
||||
"h3-quinn",
|
||||
"http-body",
|
||||
@@ -1303,20 +1303,6 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustproxy-nftables"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"libc",
|
||||
"rustproxy-config",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustproxy-passthrough"
|
||||
version = "0.1.0"
|
||||
|
||||
@@ -7,7 +7,6 @@ members = [
|
||||
"crates/rustproxy-tls",
|
||||
"crates/rustproxy-passthrough",
|
||||
"crates/rustproxy-http",
|
||||
"crates/rustproxy-nftables",
|
||||
"crates/rustproxy-metrics",
|
||||
"crates/rustproxy-security",
|
||||
]
|
||||
@@ -107,6 +106,5 @@ rustproxy-routing = { path = "crates/rustproxy-routing" }
|
||||
rustproxy-tls = { path = "crates/rustproxy-tls" }
|
||||
rustproxy-passthrough = { path = "crates/rustproxy-passthrough" }
|
||||
rustproxy-http = { path = "crates/rustproxy-http" }
|
||||
rustproxy-nftables = { path = "crates/rustproxy-nftables" }
|
||||
rustproxy-metrics = { path = "crates/rustproxy-metrics" }
|
||||
rustproxy-security = { path = "crates/rustproxy-security" }
|
||||
|
||||
@@ -40,8 +40,6 @@ pub fn create_http_route(
|
||||
load_balancing: None,
|
||||
advanced: None,
|
||||
options: None,
|
||||
forwarding_engine: None,
|
||||
nftables: None,
|
||||
send_proxy_protocol: None,
|
||||
udp: None,
|
||||
},
|
||||
@@ -138,8 +136,6 @@ pub fn create_http_to_https_redirect(
|
||||
url_rewrite: None,
|
||||
}),
|
||||
options: None,
|
||||
forwarding_engine: None,
|
||||
nftables: None,
|
||||
send_proxy_protocol: None,
|
||||
udp: None,
|
||||
},
|
||||
@@ -222,8 +218,6 @@ pub fn create_load_balancer_route(
|
||||
}),
|
||||
advanced: None,
|
||||
options: None,
|
||||
forwarding_engine: None,
|
||||
nftables: None,
|
||||
send_proxy_protocol: None,
|
||||
udp: None,
|
||||
},
|
||||
|
||||
@@ -60,16 +60,6 @@ pub enum RouteActionType {
|
||||
SocketHandler,
|
||||
}
|
||||
|
||||
// ─── Forwarding Engine ───────────────────────────────────────────────
|
||||
|
||||
/// Forwarding engine specification.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum ForwardingEngine {
|
||||
Node,
|
||||
Nftables,
|
||||
}
|
||||
|
||||
// ─── Route Match ─────────────────────────────────────────────────────
|
||||
|
||||
/// Domain specification: single string or array.
|
||||
@@ -341,38 +331,6 @@ pub struct RouteAdvanced {
|
||||
pub url_rewrite: Option<RouteUrlRewrite>,
|
||||
}
|
||||
|
||||
// ─── NFTables Options ────────────────────────────────────────────────
|
||||
|
||||
/// NFTables protocol type.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum NfTablesProtocol {
|
||||
Tcp,
|
||||
Udp,
|
||||
All,
|
||||
}
|
||||
|
||||
/// NFTables-specific configuration options.
|
||||
/// Matches TypeScript: `INfTablesOptions`
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct NfTablesOptions {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub preserve_source_ip: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub protocol: Option<NfTablesProtocol>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub max_rate: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub priority: Option<i32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub table_name: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub use_ip_sets: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub use_advanced_nat: Option<bool>,
|
||||
}
|
||||
|
||||
// ─── Backend Protocol ────────────────────────────────────────────────
|
||||
|
||||
/// Backend protocol.
|
||||
@@ -541,14 +499,6 @@ pub struct RouteAction {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub options: Option<ActionOptions>,
|
||||
|
||||
/// Forwarding engine specification
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub forwarding_engine: Option<ForwardingEngine>,
|
||||
|
||||
/// NFTables-specific options
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub nftables: Option<NfTablesOptions>,
|
||||
|
||||
/// PROXY protocol support (default for all targets)
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub send_proxy_protocol: Option<bool>,
|
||||
|
||||
@@ -30,3 +30,4 @@ socket2 = { workspace = true }
|
||||
quinn = { workspace = true }
|
||||
h3 = { workspace = true }
|
||||
h3-quinn = { workspace = true }
|
||||
futures = { version = "0.3", default-features = false, features = ["std"] }
|
||||
|
||||
@@ -56,7 +56,11 @@ struct PooledH2 {
|
||||
}
|
||||
|
||||
/// A pooled QUIC/HTTP/3 connection (multiplexed like H2).
|
||||
/// Stores the h3 `SendRequest` handle so pool hits skip the h3 SETTINGS handshake.
|
||||
pub struct PooledH3 {
|
||||
/// Multiplexed h3 request handle — clone to open a new stream.
|
||||
pub send_request: h3::client::SendRequest<h3_quinn::OpenStreams, Bytes>,
|
||||
/// Raw QUIC connection — kept for liveness probing (close_reason) only.
|
||||
pub connection: quinn::Connection,
|
||||
pub created_at: Instant,
|
||||
pub generation: u64,
|
||||
@@ -197,7 +201,10 @@ impl ConnectionPool {
|
||||
|
||||
/// Try to get a pooled QUIC connection for the given key.
|
||||
/// QUIC connections are multiplexed — the connection is shared, not removed.
|
||||
pub fn checkout_h3(&self, key: &PoolKey) -> Option<(quinn::Connection, Duration)> {
|
||||
pub fn checkout_h3(
|
||||
&self,
|
||||
key: &PoolKey,
|
||||
) -> Option<(h3::client::SendRequest<h3_quinn::OpenStreams, Bytes>, quinn::Connection, Duration)> {
|
||||
let entry = self.h3_pool.get(key)?;
|
||||
let pooled = entry.value();
|
||||
let age = pooled.created_at.elapsed();
|
||||
@@ -215,13 +222,20 @@ impl ConnectionPool {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some((pooled.connection.clone(), age))
|
||||
Some((pooled.send_request.clone(), pooled.connection.clone(), age))
|
||||
}
|
||||
|
||||
/// Register a QUIC connection in the pool. Returns the generation ID.
|
||||
pub fn register_h3(&self, key: PoolKey, connection: quinn::Connection) -> u64 {
|
||||
/// Register a QUIC connection and its h3 SendRequest handle in the pool.
|
||||
/// Returns the generation ID.
|
||||
pub fn register_h3(
|
||||
&self,
|
||||
key: PoolKey,
|
||||
connection: quinn::Connection,
|
||||
send_request: h3::client::SendRequest<h3_quinn::OpenStreams, Bytes>,
|
||||
) -> u64 {
|
||||
let gen = self.h2_generation.fetch_add(1, Ordering::Relaxed);
|
||||
self.h3_pool.insert(key, PooledH3 {
|
||||
send_request,
|
||||
connection,
|
||||
created_at: Instant::now(),
|
||||
generation: gen,
|
||||
|
||||
@@ -116,7 +116,7 @@ async fn handle_h3_request(
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
// Stream request body from H3 client via an mpsc channel.
|
||||
let (body_tx, body_rx) = tokio::sync::mpsc::channel::<Bytes>(4);
|
||||
let (body_tx, body_rx) = tokio::sync::mpsc::channel::<Bytes>(32);
|
||||
|
||||
// Spawn the H3 body reader task with cancellation
|
||||
let body_cancel = cancel.clone();
|
||||
@@ -132,8 +132,7 @@ async fn handle_h3_request(
|
||||
}
|
||||
};
|
||||
let mut chunk = chunk;
|
||||
let data = Bytes::copy_from_slice(chunk.chunk());
|
||||
chunk.advance(chunk.remaining());
|
||||
let data = chunk.copy_to_bytes(chunk.remaining());
|
||||
if body_tx.send(data).await.is_err() {
|
||||
break;
|
||||
}
|
||||
@@ -179,8 +178,8 @@ async fn handle_h3_request(
|
||||
while let Some(frame) = resp_body.frame().await {
|
||||
match frame {
|
||||
Ok(frame) => {
|
||||
if let Some(data) = frame.data_ref() {
|
||||
stream.send_data(Bytes::copy_from_slice(data)).await
|
||||
if let Ok(data) = frame.into_data() {
|
||||
stream.send_data(data).await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to send H3 data: {}", e))?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,6 +47,8 @@ pub struct ConnActivity {
|
||||
/// checks the backend's original response headers for Alt-Svc before our
|
||||
/// ResponseFilter injects its own. None when not in auto-detect mode or after H3 failure.
|
||||
alt_svc_cache_key: Option<crate::protocol_cache::ProtocolCacheKey>,
|
||||
/// The upstream request path that triggered Alt-Svc discovery. Logged for traceability.
|
||||
alt_svc_request_url: Option<String>,
|
||||
}
|
||||
|
||||
impl ConnActivity {
|
||||
@@ -58,6 +60,7 @@ impl ConnActivity {
|
||||
start: std::time::Instant::now(),
|
||||
active_requests: None,
|
||||
alt_svc_cache_key: None,
|
||||
alt_svc_request_url: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -69,15 +72,16 @@ const DEFAULT_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_s
|
||||
/// If no new request arrives within this duration, the connection is closed.
|
||||
const DEFAULT_HTTP_IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
|
||||
|
||||
/// Default HTTP max connection lifetime (1 hour).
|
||||
/// HTTP connections are forcefully closed after this duration regardless of activity.
|
||||
const DEFAULT_HTTP_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(3600);
|
||||
|
||||
/// Default WebSocket inactivity timeout (1 hour).
|
||||
const DEFAULT_WS_INACTIVITY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3600);
|
||||
|
||||
/// Default WebSocket max lifetime (24 hours).
|
||||
const DEFAULT_WS_MAX_LIFETIME: std::time::Duration = std::time::Duration::from_secs(86400);
|
||||
|
||||
/// Timeout for QUIC (H3) backend connections. Short because UDP is often firewalled.
|
||||
const QUIC_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
|
||||
|
||||
/// Protocol decision for backend connection.
|
||||
#[derive(Debug)]
|
||||
enum ProtocolDecision {
|
||||
@@ -219,6 +223,8 @@ pub struct HttpProxyService {
|
||||
protocol_cache: Arc<crate::protocol_cache::ProtocolCache>,
|
||||
/// HTTP keep-alive idle timeout: close connection if no new request arrives within this duration.
|
||||
http_idle_timeout: std::time::Duration,
|
||||
/// HTTP max connection lifetime: forcefully close connection after this duration regardless of activity.
|
||||
http_max_lifetime: std::time::Duration,
|
||||
/// WebSocket inactivity timeout (no data in either direction).
|
||||
ws_inactivity_timeout: std::time::Duration,
|
||||
/// WebSocket maximum connection lifetime.
|
||||
@@ -245,6 +251,7 @@ impl HttpProxyService {
|
||||
connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()),
|
||||
protocol_cache: Arc::new(crate::protocol_cache::ProtocolCache::new()),
|
||||
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
|
||||
http_max_lifetime: DEFAULT_HTTP_MAX_LIFETIME,
|
||||
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
|
||||
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
|
||||
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
|
||||
@@ -272,21 +279,24 @@ impl HttpProxyService {
|
||||
connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()),
|
||||
protocol_cache: Arc::new(crate::protocol_cache::ProtocolCache::new()),
|
||||
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
|
||||
http_max_lifetime: DEFAULT_HTTP_MAX_LIFETIME,
|
||||
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
|
||||
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
|
||||
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the HTTP keep-alive idle timeout, WebSocket inactivity timeout, and
|
||||
/// WebSocket max lifetime from connection config values.
|
||||
/// Set the HTTP keep-alive idle timeout, HTTP max lifetime, WebSocket inactivity
|
||||
/// timeout, and WebSocket max lifetime from connection config values.
|
||||
pub fn set_connection_timeouts(
|
||||
&mut self,
|
||||
http_idle_timeout: std::time::Duration,
|
||||
http_max_lifetime: std::time::Duration,
|
||||
ws_inactivity_timeout: std::time::Duration,
|
||||
ws_max_lifetime: std::time::Duration,
|
||||
) {
|
||||
self.http_idle_timeout = http_idle_timeout;
|
||||
self.http_max_lifetime = http_max_lifetime;
|
||||
self.ws_inactivity_timeout = ws_inactivity_timeout;
|
||||
self.ws_max_lifetime = ws_max_lifetime;
|
||||
}
|
||||
@@ -311,6 +321,15 @@ impl HttpProxyService {
|
||||
self.protocol_cache.clear();
|
||||
}
|
||||
|
||||
/// Clean up expired entries in all per-route rate limiters.
|
||||
/// Called from the background sampling task to prevent unbounded growth
|
||||
/// when traffic stops after a burst of unique IPs.
|
||||
pub fn cleanup_all_rate_limiters(&self) {
|
||||
for entry in self.route_rate_limiters.iter() {
|
||||
entry.value().cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
/// Snapshot the protocol cache for metrics/UI display.
|
||||
pub fn protocol_cache_snapshot(&self) -> Vec<crate::protocol_cache::ProtocolCacheEntry> {
|
||||
self.protocol_cache.snapshot()
|
||||
@@ -351,6 +370,7 @@ impl HttpProxyService {
|
||||
|
||||
// Capture timeouts before `self` is moved into the service closure.
|
||||
let idle_timeout = self.http_idle_timeout;
|
||||
let max_lifetime = self.http_max_lifetime;
|
||||
|
||||
// Activity tracker: updated at the START and END of each request.
|
||||
// The idle watchdog checks this to determine if the connection is idle
|
||||
@@ -371,7 +391,7 @@ impl HttpProxyService {
|
||||
let cn = cancel_inner.clone();
|
||||
let la = Arc::clone(&la_inner);
|
||||
let st = start;
|
||||
let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start, active_requests: Some(Arc::clone(&ar_inner)), alt_svc_cache_key: None };
|
||||
let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start, active_requests: Some(Arc::clone(&ar_inner)), alt_svc_cache_key: None, alt_svc_request_url: None };
|
||||
async move {
|
||||
let req = req.map(|body| BoxBody::new(body));
|
||||
let result = svc.handle_request(req, peer, port, cn, ca).await;
|
||||
@@ -409,15 +429,23 @@ impl HttpProxyService {
|
||||
}
|
||||
}
|
||||
_ = async {
|
||||
// Idle watchdog: check every 5s whether the connection has been idle
|
||||
// (no active requests AND no activity for idle_timeout).
|
||||
// This avoids killing long-running requests or upgraded connections.
|
||||
// Idle + lifetime watchdog: check every 5s whether the connection has been
|
||||
// idle (no active requests AND no activity for idle_timeout) or exceeded
|
||||
// the max connection lifetime.
|
||||
let check_interval = std::time::Duration::from_secs(5);
|
||||
let mut last_seen = 0u64;
|
||||
loop {
|
||||
tokio::time::sleep(check_interval).await;
|
||||
|
||||
// Never close while a request is in progress
|
||||
// Check max connection lifetime (unconditional — even active connections
|
||||
// must eventually be recycled to prevent resource accumulation).
|
||||
if start.elapsed() >= max_lifetime {
|
||||
debug!("HTTP connection exceeded max lifetime ({}s) from {}",
|
||||
max_lifetime.as_secs(), peer_addr);
|
||||
return;
|
||||
}
|
||||
|
||||
// Never close for idleness while a request is in progress
|
||||
if active_requests.load(Ordering::Relaxed) > 0 {
|
||||
last_seen = last_activity.load(Ordering::Relaxed);
|
||||
continue;
|
||||
@@ -434,7 +462,7 @@ impl HttpProxyService {
|
||||
last_seen = current;
|
||||
}
|
||||
} => {
|
||||
debug!("HTTP connection idle timeout ({}s) from {}", idle_timeout.as_secs(), peer_addr);
|
||||
debug!("HTTP connection timeout from {}", peer_addr);
|
||||
conn.as_mut().graceful_shutdown();
|
||||
// Give any in-flight work 5s to drain after graceful shutdown
|
||||
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), conn).await;
|
||||
@@ -775,6 +803,7 @@ impl HttpProxyService {
|
||||
// the backend's original Alt-Svc header before ResponseFilter injects our own.
|
||||
if is_auto_detect_mode {
|
||||
conn_activity.alt_svc_cache_key = Some(protocol_cache_key.clone());
|
||||
conn_activity.alt_svc_request_url = Some(upstream_path.to_string());
|
||||
}
|
||||
|
||||
// --- H3 path: try QUIC connection before TCP ---
|
||||
@@ -787,10 +816,10 @@ impl HttpProxyService {
|
||||
};
|
||||
|
||||
// Try H3 pool checkout first
|
||||
if let Some((quic_conn, _age)) = self.connection_pool.checkout_h3(&h3_pool_key) {
|
||||
if let Some((pooled_sr, quic_conn, _age)) = self.connection_pool.checkout_h3(&h3_pool_key) {
|
||||
self.metrics.backend_pool_hit(&upstream_key);
|
||||
let result = self.forward_h3(
|
||||
quic_conn, parts, body, upstream_headers, &upstream_path,
|
||||
quic_conn, Some(pooled_sr), parts, body, upstream_headers, &upstream_path,
|
||||
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
|
||||
).await;
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
@@ -803,7 +832,7 @@ impl HttpProxyService {
|
||||
self.metrics.backend_pool_miss(&upstream_key);
|
||||
self.metrics.backend_connection_opened(&upstream_key, std::time::Instant::now().elapsed());
|
||||
let result = self.forward_h3(
|
||||
quic_conn, parts, body, upstream_headers, &upstream_path,
|
||||
quic_conn, None, parts, body, upstream_headers, &upstream_path,
|
||||
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
|
||||
).await;
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
@@ -962,7 +991,7 @@ impl HttpProxyService {
|
||||
protocol: crate::connection_pool::PoolProtocol::H3,
|
||||
};
|
||||
let result = self.forward_h3(
|
||||
quic_conn, parts, body, upstream_headers, &upstream_path,
|
||||
quic_conn, None, parts, body, upstream_headers, &upstream_path,
|
||||
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
|
||||
).await;
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
@@ -1005,7 +1034,7 @@ impl HttpProxyService {
|
||||
protocol: crate::connection_pool::PoolProtocol::H3,
|
||||
};
|
||||
let result = self.forward_h3(
|
||||
quic_conn, parts, body, upstream_headers, &upstream_path,
|
||||
quic_conn, None, parts, body, upstream_headers, &upstream_path,
|
||||
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
|
||||
).await;
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
@@ -1064,7 +1093,7 @@ impl HttpProxyService {
|
||||
protocol: crate::connection_pool::PoolProtocol::H3,
|
||||
};
|
||||
let result = self.forward_h3(
|
||||
quic_conn, parts, body, upstream_headers, &upstream_path,
|
||||
quic_conn, None, parts, body, upstream_headers, &upstream_path,
|
||||
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
|
||||
).await;
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
@@ -1107,7 +1136,7 @@ impl HttpProxyService {
|
||||
protocol: crate::connection_pool::PoolProtocol::H3,
|
||||
};
|
||||
let result = self.forward_h3(
|
||||
quic_conn, parts, body, upstream_headers, &upstream_path,
|
||||
quic_conn, None, parts, body, upstream_headers, &upstream_path,
|
||||
route_match.route, route_id, &ip_str, &h3_pool_key, domain_str, &conn_activity, &upstream_key,
|
||||
).await;
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
@@ -1205,8 +1234,10 @@ impl HttpProxyService {
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = conn.await {
|
||||
debug!("Upstream connection error: {}", e);
|
||||
match tokio::time::timeout(std::time::Duration::from_secs(300), conn).await {
|
||||
Ok(Err(e)) => debug!("Upstream connection error: {}", e),
|
||||
Err(_) => debug!("H1 connection driver timed out after 300s"),
|
||||
_ => {}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -1799,8 +1830,10 @@ impl HttpProxyService {
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = conn.await {
|
||||
debug!("H1 fallback: upstream connection error: {}", e);
|
||||
match tokio::time::timeout(std::time::Duration::from_secs(300), conn).await {
|
||||
Ok(Err(e)) => debug!("H1 fallback: upstream connection error: {}", e),
|
||||
Err(_) => debug!("H1 fallback: connection driver timed out after 300s"),
|
||||
_ => {}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -1979,8 +2012,10 @@ impl HttpProxyService {
|
||||
if let Some(ref cache_key) = conn_activity.alt_svc_cache_key {
|
||||
if let Some(alt_svc) = resp_parts.headers.get("alt-svc").and_then(|v| v.to_str().ok()) {
|
||||
if let Some(h3_port) = parse_alt_svc_h3_port(alt_svc) {
|
||||
debug!(h3_port, "Backend advertises H3 via Alt-Svc");
|
||||
self.protocol_cache.insert_h3(cache_key.clone(), h3_port, "Alt-Svc response header");
|
||||
let url = conn_activity.alt_svc_request_url.as_deref().unwrap_or("-");
|
||||
debug!(h3_port, url, "Backend advertises H3 via Alt-Svc");
|
||||
let reason = format!("Alt-Svc response header ({})", url);
|
||||
self.protocol_cache.insert_h3(cache_key.clone(), h3_port, &reason);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2734,7 +2769,12 @@ impl HttpProxyService {
|
||||
|
||||
let quic_crypto = quinn::crypto::rustls::QuicClientConfig::try_from(tls_config)
|
||||
.expect("Failed to create QUIC client crypto config");
|
||||
let client_config = quinn::ClientConfig::new(Arc::new(quic_crypto));
|
||||
|
||||
// Tune QUIC transport to match H2 flow-control: 2 MB per-stream receive window.
|
||||
let mut transport = quinn::TransportConfig::default();
|
||||
transport.stream_receive_window(quinn::VarInt::from_u32(2 * 1024 * 1024));
|
||||
let mut client_config = quinn::ClientConfig::new(Arc::new(quic_crypto));
|
||||
client_config.transport_config(Arc::new(transport));
|
||||
|
||||
let mut endpoint = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())
|
||||
.expect("Failed to create QUIC client endpoint");
|
||||
@@ -2756,8 +2796,8 @@ impl HttpProxyService {
|
||||
let server_name = host.to_string();
|
||||
let connecting = self.quinn_client_endpoint.connect(addr, &server_name)?;
|
||||
|
||||
let connection = tokio::time::timeout(QUIC_CONNECT_TIMEOUT, connecting).await
|
||||
.map_err(|_| format!("QUIC connect timeout (3s) for {}", host))??;
|
||||
let connection = tokio::time::timeout(self.connect_timeout, connecting).await
|
||||
.map_err(|_| format!("QUIC connect timeout ({:?}) for {}", self.connect_timeout, host))??;
|
||||
|
||||
debug!("QUIC backend connection established to {}:{}", host, port);
|
||||
Ok(connection)
|
||||
@@ -2767,6 +2807,7 @@ impl HttpProxyService {
|
||||
async fn forward_h3(
|
||||
&self,
|
||||
quic_conn: quinn::Connection,
|
||||
pooled_sender: Option<h3::client::SendRequest<h3_quinn::OpenStreams, Bytes>>,
|
||||
parts: hyper::http::request::Parts,
|
||||
body: BoxBody<Bytes, hyper::Error>,
|
||||
upstream_headers: hyper::HeaderMap,
|
||||
@@ -2779,8 +2820,14 @@ impl HttpProxyService {
|
||||
conn_activity: &ConnActivity,
|
||||
backend_key: &str,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
// Obtain the h3 SendRequest handle: skip handshake + driver on pool hit.
|
||||
let (mut send_request, gen_holder) = if let Some(sr) = pooled_sender {
|
||||
// Pool hit — reuse existing h3 session, no SETTINGS round-trip
|
||||
(sr, None)
|
||||
} else {
|
||||
// Fresh QUIC connection — full h3 handshake + driver spawn
|
||||
let h3_quinn_conn = h3_quinn::Connection::new(quic_conn.clone());
|
||||
let (mut driver, mut send_request) = match h3::client::builder()
|
||||
let (mut driver, sr) = match h3::client::builder()
|
||||
.send_grease(false)
|
||||
.build(h3_quinn_conn)
|
||||
.await
|
||||
@@ -2793,10 +2840,10 @@ impl HttpProxyService {
|
||||
}
|
||||
};
|
||||
|
||||
// Spawn the h3 connection driver
|
||||
let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX));
|
||||
{
|
||||
let driver_pool = Arc::clone(&self.connection_pool);
|
||||
let driver_pool_key = pool_key.clone();
|
||||
let gen_holder = Arc::new(std::sync::atomic::AtomicU64::new(u64::MAX));
|
||||
let driver_gen = Arc::clone(&gen_holder);
|
||||
tokio::spawn(async move {
|
||||
let close_err = std::future::poll_fn(|cx| driver.poll_close(cx)).await;
|
||||
@@ -2806,6 +2853,9 @@ impl HttpProxyService {
|
||||
driver_pool.remove_h3_if_generation(&driver_pool_key, g);
|
||||
}
|
||||
});
|
||||
}
|
||||
(sr, Some(gen_holder))
|
||||
};
|
||||
|
||||
// Build the H3 request
|
||||
let uri = hyper::Uri::builder()
|
||||
@@ -2835,7 +2885,7 @@ impl HttpProxyService {
|
||||
}
|
||||
};
|
||||
|
||||
// Stream request body
|
||||
// Stream request body (zero-copy: into_data yields owned Bytes)
|
||||
let rid: Option<Arc<str>> = route_id.map(Arc::from);
|
||||
let sip: Arc<str> = Arc::from(source_ip);
|
||||
|
||||
@@ -2845,9 +2895,9 @@ impl HttpProxyService {
|
||||
while let Some(frame) = body.frame().await {
|
||||
match frame {
|
||||
Ok(frame) => {
|
||||
if let Some(data) = frame.data_ref() {
|
||||
if let Ok(data) = frame.into_data() {
|
||||
self.metrics.record_bytes(data.len() as u64, 0, rid.as_deref(), Some(&sip));
|
||||
if let Err(e) = stream.send_data(Bytes::copy_from_slice(data)).await {
|
||||
if let Err(e) = stream.send_data(data).await {
|
||||
error!(backend = %backend_key, error = %e, "H3 send_data failed");
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "H3 body send failed"));
|
||||
}
|
||||
@@ -2889,8 +2939,23 @@ impl HttpProxyService {
|
||||
ResponseFilter::apply_headers(route, headers, None);
|
||||
}
|
||||
|
||||
// Stream response body back via an adapter
|
||||
let h3_body = H3ClientResponseBody { stream };
|
||||
// Stream response body back via unfold — correctly preserves waker across polls
|
||||
let body_stream = futures::stream::unfold(stream, |mut s| async move {
|
||||
match s.recv_data().await {
|
||||
Ok(Some(mut buf)) => {
|
||||
use bytes::Buf;
|
||||
let data = buf.copy_to_bytes(buf.remaining());
|
||||
Some((Ok::<_, hyper::Error>(http_body::Frame::data(data)), s))
|
||||
}
|
||||
Ok(None) => None,
|
||||
Err(e) => {
|
||||
warn!("H3 response body recv error: {}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
});
|
||||
let h3_body = http_body_util::StreamBody::new(body_stream);
|
||||
|
||||
let counting_body = CountingBody::new(
|
||||
h3_body,
|
||||
Arc::clone(&self.metrics),
|
||||
@@ -2907,10 +2972,16 @@ impl HttpProxyService {
|
||||
|
||||
let body: BoxBody<Bytes, hyper::Error> = BoxBody::new(counting_body);
|
||||
|
||||
// Register connection in pool on success
|
||||
// Register connection in pool on success (fresh connections only)
|
||||
if status != StatusCode::BAD_GATEWAY {
|
||||
let g = self.connection_pool.register_h3(pool_key.clone(), quic_conn);
|
||||
gen_holder.store(g, std::sync::atomic::Ordering::Relaxed);
|
||||
if let Some(gh) = gen_holder {
|
||||
let g = self.connection_pool.register_h3(
|
||||
pool_key.clone(),
|
||||
quic_conn,
|
||||
send_request,
|
||||
);
|
||||
gh.store(g, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
self.metrics.set_backend_protocol(backend_key, "h3");
|
||||
@@ -2939,41 +3010,6 @@ fn parse_alt_svc_h3_port(header_value: &str) -> Option<u16> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Response body adapter for H3 client responses.
|
||||
/// Reads data from the h3 `RequestStream` recv side and presents it as an `http_body::Body`.
|
||||
struct H3ClientResponseBody {
|
||||
stream: h3::client::RequestStream<h3_quinn::BidiStream<Bytes>, Bytes>,
|
||||
}
|
||||
|
||||
impl http_body::Body for H3ClientResponseBody {
|
||||
type Data = Bytes;
|
||||
type Error = hyper::Error;
|
||||
|
||||
fn poll_frame(
|
||||
mut self: Pin<&mut Self>,
|
||||
_cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
|
||||
// h3's recv_data is async, so we need to poll it manually.
|
||||
// Use a small future to poll the recv_data call.
|
||||
use std::future::Future;
|
||||
let mut fut = Box::pin(self.stream.recv_data());
|
||||
match fut.as_mut().poll(_cx) {
|
||||
Poll::Ready(Ok(Some(mut buf))) => {
|
||||
use bytes::Buf;
|
||||
let data = Bytes::copy_from_slice(buf.chunk());
|
||||
buf.advance(buf.remaining());
|
||||
Poll::Ready(Some(Ok(http_body::Frame::data(data))))
|
||||
}
|
||||
Poll::Ready(Ok(None)) => Poll::Ready(None),
|
||||
Poll::Ready(Err(e)) => {
|
||||
warn!("H3 response body recv error: {}", e);
|
||||
Poll::Ready(None)
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Insecure certificate verifier for backend TLS connections (fallback only).
|
||||
/// The production path uses the shared config from tls_handler which has the same
|
||||
/// behavior but with session resumption across all outbound connections.
|
||||
@@ -3042,6 +3078,7 @@ impl Default for HttpProxyService {
|
||||
connection_pool: Arc::new(crate::connection_pool::ConnectionPool::new()),
|
||||
protocol_cache: Arc::new(crate::protocol_cache::ProtocolCache::new()),
|
||||
http_idle_timeout: DEFAULT_HTTP_IDLE_TIMEOUT,
|
||||
http_max_lifetime: DEFAULT_HTTP_MAX_LIFETIME,
|
||||
ws_inactivity_timeout: DEFAULT_WS_INACTIVITY_TIMEOUT,
|
||||
ws_max_lifetime: DEFAULT_WS_MAX_LIFETIME,
|
||||
quinn_client_endpoint: Arc::new(Self::create_quinn_client_endpoint()),
|
||||
|
||||
@@ -624,6 +624,24 @@ impl MetricsCollector {
|
||||
self.ip_pending_tp.retain(|k, _| self.ip_connections.contains_key(k));
|
||||
self.ip_throughput.retain(|k, _| self.ip_connections.contains_key(k));
|
||||
self.ip_total_connections.retain(|k, _| self.ip_connections.contains_key(k));
|
||||
|
||||
// Safety-net: prune orphaned backend error/stats entries for backends
|
||||
// that have no active or total connections (error-only backends).
|
||||
// These accumulate when backend_connect_error/backend_handshake_error
|
||||
// create entries but backend_connection_opened is never called.
|
||||
let known_backends: HashSet<String> = self.backend_active.iter()
|
||||
.map(|e| e.key().clone())
|
||||
.chain(self.backend_total.iter().map(|e| e.key().clone()))
|
||||
.collect();
|
||||
self.backend_connect_errors.retain(|k, _| known_backends.contains(k));
|
||||
self.backend_handshake_errors.retain(|k, _| known_backends.contains(k));
|
||||
self.backend_request_errors.retain(|k, _| known_backends.contains(k));
|
||||
self.backend_connect_time_us.retain(|k, _| known_backends.contains(k));
|
||||
self.backend_connect_count.retain(|k, _| known_backends.contains(k));
|
||||
self.backend_pool_hits.retain(|k, _| known_backends.contains(k));
|
||||
self.backend_pool_misses.retain(|k, _| known_backends.contains(k));
|
||||
self.backend_h2_failures.retain(|k, _| known_backends.contains(k));
|
||||
self.backend_protocol.retain(|k, _| known_backends.contains(k));
|
||||
}
|
||||
|
||||
/// Remove per-route metrics for route IDs that are no longer active.
|
||||
|
||||
@@ -1,17 +0,0 @@
|
||||
[package]
|
||||
name = "rustproxy-nftables"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
authors.workspace = true
|
||||
description = "NFTables kernel-level forwarding for RustProxy"
|
||||
|
||||
[dependencies]
|
||||
rustproxy-config = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
libc = { workspace = true }
|
||||
@@ -1,10 +0,0 @@
|
||||
//! # rustproxy-nftables
|
||||
//!
|
||||
//! NFTables kernel-level forwarding for RustProxy.
|
||||
//! Generates and manages nft CLI rules for DNAT/SNAT.
|
||||
|
||||
pub mod nft_manager;
|
||||
pub mod rule_builder;
|
||||
|
||||
pub use nft_manager::*;
|
||||
pub use rule_builder::*;
|
||||
@@ -1,238 +0,0 @@
|
||||
use thiserror::Error;
|
||||
use std::collections::HashMap;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum NftError {
|
||||
#[error("nft command failed: {0}")]
|
||||
CommandFailed(String),
|
||||
#[error("IO error: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
#[error("Not running as root")]
|
||||
NotRoot,
|
||||
}
|
||||
|
||||
/// Manager for nftables rules.
|
||||
///
|
||||
/// Executes `nft` CLI commands to manage kernel-level packet forwarding.
|
||||
/// Requires root privileges; operations are skipped gracefully if not root.
|
||||
pub struct NftManager {
|
||||
table_name: String,
|
||||
/// Active rules indexed by route ID
|
||||
active_rules: HashMap<String, Vec<String>>,
|
||||
/// Whether the table has been initialized
|
||||
table_initialized: bool,
|
||||
}
|
||||
|
||||
impl NftManager {
|
||||
pub fn new(table_name: Option<String>) -> Self {
|
||||
Self {
|
||||
table_name: table_name.unwrap_or_else(|| "rustproxy".to_string()),
|
||||
active_rules: HashMap::new(),
|
||||
table_initialized: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if we are running as root.
|
||||
fn is_root() -> bool {
|
||||
unsafe { libc::geteuid() == 0 }
|
||||
}
|
||||
|
||||
/// Execute a single nft command via the CLI.
|
||||
async fn exec_nft(command: &str) -> Result<String, NftError> {
|
||||
// The command starts with "nft ", strip it to get the args
|
||||
let args = if command.starts_with("nft ") {
|
||||
&command[4..]
|
||||
} else {
|
||||
command
|
||||
};
|
||||
|
||||
let output = tokio::process::Command::new("nft")
|
||||
.args(args.split_whitespace())
|
||||
.output()
|
||||
.await
|
||||
.map_err(NftError::Io)?;
|
||||
|
||||
if output.status.success() {
|
||||
Ok(String::from_utf8_lossy(&output.stdout).to_string())
|
||||
} else {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
Err(NftError::CommandFailed(format!(
|
||||
"Command '{}' failed: {}",
|
||||
command, stderr
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Ensure the nftables table and chains are set up.
|
||||
async fn ensure_table(&mut self) -> Result<(), NftError> {
|
||||
if self.table_initialized {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let setup_commands = crate::rule_builder::build_table_setup(&self.table_name);
|
||||
for cmd in &setup_commands {
|
||||
Self::exec_nft(cmd).await?;
|
||||
}
|
||||
|
||||
self.table_initialized = true;
|
||||
info!("NFTables table '{}' initialized", self.table_name);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Apply rules for a route.
|
||||
///
|
||||
/// Executes the nft commands via the CLI. If not running as root,
|
||||
/// the rules are stored locally but not applied to the kernel.
|
||||
pub async fn apply_rules(&mut self, route_id: &str, rules: Vec<String>) -> Result<(), NftError> {
|
||||
if !Self::is_root() {
|
||||
warn!("Not running as root, nftables rules will not be applied to kernel");
|
||||
self.active_rules.insert(route_id.to_string(), rules);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.ensure_table().await?;
|
||||
|
||||
for cmd in &rules {
|
||||
Self::exec_nft(cmd).await?;
|
||||
debug!("Applied nft rule: {}", cmd);
|
||||
}
|
||||
|
||||
info!("Applied {} nftables rules for route '{}'", rules.len(), route_id);
|
||||
self.active_rules.insert(route_id.to_string(), rules);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove rules for a route.
|
||||
///
|
||||
/// Currently removes the route from tracking. To fully remove specific
|
||||
/// rules would require handle-based tracking; for now, cleanup() removes
|
||||
/// the entire table.
|
||||
pub async fn remove_rules(&mut self, route_id: &str) -> Result<(), NftError> {
|
||||
if let Some(rules) = self.active_rules.remove(route_id) {
|
||||
info!("Removed {} tracked nft rules for route '{}'", rules.len(), route_id);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Clean up all managed rules by deleting the entire nftables table.
|
||||
pub async fn cleanup(&mut self) -> Result<(), NftError> {
|
||||
if !Self::is_root() {
|
||||
warn!("Not running as root, skipping nftables cleanup");
|
||||
self.active_rules.clear();
|
||||
self.table_initialized = false;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if self.table_initialized {
|
||||
let cleanup_commands = crate::rule_builder::build_table_cleanup(&self.table_name);
|
||||
for cmd in &cleanup_commands {
|
||||
match Self::exec_nft(cmd).await {
|
||||
Ok(_) => debug!("Cleanup: {}", cmd),
|
||||
Err(e) => warn!("Cleanup command failed (may be ok): {}", e),
|
||||
}
|
||||
}
|
||||
info!("NFTables table '{}' cleaned up", self.table_name);
|
||||
}
|
||||
|
||||
self.active_rules.clear();
|
||||
self.table_initialized = false;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get the table name.
|
||||
pub fn table_name(&self) -> &str {
|
||||
&self.table_name
|
||||
}
|
||||
|
||||
/// Whether the table has been initialized in the kernel.
|
||||
pub fn is_initialized(&self) -> bool {
|
||||
self.table_initialized
|
||||
}
|
||||
|
||||
/// Get the number of active route rule sets.
|
||||
pub fn active_route_count(&self) -> usize {
|
||||
self.active_rules.len()
|
||||
}
|
||||
|
||||
/// Get the status of all active rules.
|
||||
pub fn status(&self) -> HashMap<String, serde_json::Value> {
|
||||
let mut status = HashMap::new();
|
||||
for (route_id, rules) in &self.active_rules {
|
||||
status.insert(
|
||||
route_id.clone(),
|
||||
serde_json::json!({
|
||||
"ruleCount": rules.len(),
|
||||
"rules": rules,
|
||||
}),
|
||||
);
|
||||
}
|
||||
status
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_new_default_table_name() {
|
||||
let mgr = NftManager::new(None);
|
||||
assert_eq!(mgr.table_name(), "rustproxy");
|
||||
assert!(!mgr.is_initialized());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_new_custom_table_name() {
|
||||
let mgr = NftManager::new(Some("custom".to_string()));
|
||||
assert_eq!(mgr.table_name(), "custom");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_apply_rules_non_root() {
|
||||
let mut mgr = NftManager::new(None);
|
||||
// When not root, rules are stored but not applied to kernel
|
||||
let rules = vec!["nft add rule ip rustproxy prerouting tcp dport 443 dnat to 10.0.0.1:8443".to_string()];
|
||||
mgr.apply_rules("route-1", rules).await.unwrap();
|
||||
assert_eq!(mgr.active_route_count(), 1);
|
||||
|
||||
let status = mgr.status();
|
||||
assert!(status.contains_key("route-1"));
|
||||
assert_eq!(status["route-1"]["ruleCount"], 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_remove_rules() {
|
||||
let mut mgr = NftManager::new(None);
|
||||
let rules = vec!["nft add rule test".to_string()];
|
||||
mgr.apply_rules("route-1", rules).await.unwrap();
|
||||
assert_eq!(mgr.active_route_count(), 1);
|
||||
|
||||
mgr.remove_rules("route-1").await.unwrap();
|
||||
assert_eq!(mgr.active_route_count(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cleanup_non_root() {
|
||||
let mut mgr = NftManager::new(None);
|
||||
let rules = vec!["nft add rule test".to_string()];
|
||||
mgr.apply_rules("route-1", rules).await.unwrap();
|
||||
mgr.apply_rules("route-2", vec!["nft add rule test2".to_string()]).await.unwrap();
|
||||
|
||||
mgr.cleanup().await.unwrap();
|
||||
assert_eq!(mgr.active_route_count(), 0);
|
||||
assert!(!mgr.is_initialized());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_status_multiple_routes() {
|
||||
let mut mgr = NftManager::new(None);
|
||||
mgr.apply_rules("web", vec!["rule1".to_string(), "rule2".to_string()]).await.unwrap();
|
||||
mgr.apply_rules("api", vec!["rule3".to_string()]).await.unwrap();
|
||||
|
||||
let status = mgr.status();
|
||||
assert_eq!(status.len(), 2);
|
||||
assert_eq!(status["web"]["ruleCount"], 2);
|
||||
assert_eq!(status["api"]["ruleCount"], 1);
|
||||
}
|
||||
}
|
||||
@@ -1,146 +0,0 @@
|
||||
use rustproxy_config::{NfTablesOptions, NfTablesProtocol};
|
||||
|
||||
/// Build nftables DNAT rule for port forwarding.
|
||||
pub fn build_dnat_rule(
|
||||
table_name: &str,
|
||||
chain_name: &str,
|
||||
source_port: u16,
|
||||
target_host: &str,
|
||||
target_port: u16,
|
||||
options: &NfTablesOptions,
|
||||
) -> Vec<String> {
|
||||
let protocols: Vec<&str> = match options.protocol.as_ref().unwrap_or(&NfTablesProtocol::Tcp) {
|
||||
NfTablesProtocol::Tcp => vec!["tcp"],
|
||||
NfTablesProtocol::Udp => vec!["udp"],
|
||||
NfTablesProtocol::All => vec!["tcp", "udp"],
|
||||
};
|
||||
|
||||
let mut rules = Vec::new();
|
||||
|
||||
for protocol in &protocols {
|
||||
// DNAT rule
|
||||
rules.push(format!(
|
||||
"nft add rule ip {} {} {} dport {} dnat to {}:{}",
|
||||
table_name, chain_name, protocol, source_port, target_host, target_port,
|
||||
));
|
||||
|
||||
// SNAT rule if preserving source IP is not enabled
|
||||
if !options.preserve_source_ip.unwrap_or(false) {
|
||||
rules.push(format!(
|
||||
"nft add rule ip {} postrouting {} dport {} masquerade",
|
||||
table_name, protocol, target_port,
|
||||
));
|
||||
}
|
||||
|
||||
// Rate limiting
|
||||
if let Some(max_rate) = &options.max_rate {
|
||||
rules.push(format!(
|
||||
"nft add rule ip {} {} {} dport {} limit rate {} accept",
|
||||
table_name, chain_name, protocol, source_port, max_rate,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
rules
|
||||
}
|
||||
|
||||
/// Build the initial table and chain setup commands.
|
||||
pub fn build_table_setup(table_name: &str) -> Vec<String> {
|
||||
vec![
|
||||
format!("nft add table ip {}", table_name),
|
||||
format!("nft add chain ip {} prerouting {{ type nat hook prerouting priority 0 \\; }}", table_name),
|
||||
format!("nft add chain ip {} postrouting {{ type nat hook postrouting priority 100 \\; }}", table_name),
|
||||
]
|
||||
}
|
||||
|
||||
/// Build cleanup commands to remove the table.
|
||||
pub fn build_table_cleanup(table_name: &str) -> Vec<String> {
|
||||
vec![format!("nft delete table ip {}", table_name)]
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn make_options() -> NfTablesOptions {
|
||||
NfTablesOptions {
|
||||
preserve_source_ip: None,
|
||||
protocol: None,
|
||||
max_rate: None,
|
||||
priority: None,
|
||||
table_name: None,
|
||||
use_ip_sets: None,
|
||||
use_advanced_nat: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_basic_dnat_rule() {
|
||||
let options = make_options();
|
||||
let rules = build_dnat_rule("rustproxy", "prerouting", 443, "10.0.0.1", 8443, &options);
|
||||
assert!(rules.len() >= 1);
|
||||
assert!(rules[0].contains("dnat to 10.0.0.1:8443"));
|
||||
assert!(rules[0].contains("dport 443"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_preserve_source_ip() {
|
||||
let mut options = make_options();
|
||||
options.preserve_source_ip = Some(true);
|
||||
let rules = build_dnat_rule("rustproxy", "prerouting", 443, "10.0.0.1", 8443, &options);
|
||||
// When preserving source IP, no masquerade rule
|
||||
assert!(rules.iter().all(|r| !r.contains("masquerade")));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_without_preserve_source_ip() {
|
||||
let options = make_options();
|
||||
let rules = build_dnat_rule("rustproxy", "prerouting", 443, "10.0.0.1", 8443, &options);
|
||||
assert!(rules.iter().any(|r| r.contains("masquerade")));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rate_limited_rule() {
|
||||
let mut options = make_options();
|
||||
options.max_rate = Some("100/second".to_string());
|
||||
let rules = build_dnat_rule("rustproxy", "prerouting", 80, "10.0.0.1", 8080, &options);
|
||||
assert!(rules.iter().any(|r| r.contains("limit rate 100/second")));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_table_setup_commands() {
|
||||
let commands = build_table_setup("rustproxy");
|
||||
assert_eq!(commands.len(), 3);
|
||||
assert!(commands[0].contains("add table ip rustproxy"));
|
||||
assert!(commands[1].contains("prerouting"));
|
||||
assert!(commands[2].contains("postrouting"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_table_cleanup() {
|
||||
let commands = build_table_cleanup("rustproxy");
|
||||
assert_eq!(commands.len(), 1);
|
||||
assert!(commands[0].contains("delete table ip rustproxy"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_protocol_all_generates_tcp_and_udp_rules() {
|
||||
let mut options = make_options();
|
||||
options.protocol = Some(NfTablesProtocol::All);
|
||||
let rules = build_dnat_rule("rustproxy", "prerouting", 53, "10.0.0.53", 53, &options);
|
||||
// Should have TCP DNAT + masquerade + UDP DNAT + masquerade = 4 rules
|
||||
assert_eq!(rules.len(), 4);
|
||||
assert!(rules.iter().any(|r| r.contains("tcp dport 53 dnat")));
|
||||
assert!(rules.iter().any(|r| r.contains("udp dport 53 dnat")));
|
||||
assert!(rules.iter().filter(|r| r.contains("masquerade")).count() == 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_protocol_udp() {
|
||||
let mut options = make_options();
|
||||
options.protocol = Some(NfTablesProtocol::Udp);
|
||||
let rules = build_dnat_rule("rustproxy", "prerouting", 53, "10.0.0.53", 53, &options);
|
||||
assert!(rules.iter().all(|r| !r.contains("tcp")));
|
||||
assert!(rules.iter().any(|r| r.contains("udp dport 53 dnat")));
|
||||
}
|
||||
}
|
||||
@@ -10,7 +10,6 @@ pub mod forwarder;
|
||||
pub mod proxy_protocol;
|
||||
pub mod tls_handler;
|
||||
pub mod connection_tracker;
|
||||
pub mod socket_relay;
|
||||
pub mod socket_opts;
|
||||
pub mod udp_session;
|
||||
pub mod udp_listener;
|
||||
@@ -22,7 +21,6 @@ pub use forwarder::*;
|
||||
pub use proxy_protocol::*;
|
||||
pub use tls_handler::*;
|
||||
pub use connection_tracker::*;
|
||||
pub use socket_relay::*;
|
||||
pub use socket_opts::*;
|
||||
pub use udp_session::*;
|
||||
pub use udp_listener::*;
|
||||
|
||||
@@ -77,6 +77,13 @@ struct RelaySession {
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl Drop for RelaySession {
|
||||
fn drop(&mut self) {
|
||||
self.cancel.cancel();
|
||||
self.return_task.abort();
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a QUIC endpoint with a PROXY protocol v2 relay layer.
|
||||
///
|
||||
/// Instead of giving the external socket to quinn, we:
|
||||
@@ -634,7 +641,7 @@ async fn forward_quic_stream_to_tcp(
|
||||
let la_watch = Arc::clone(&last_activity);
|
||||
let c2b_abort = c2b.abort_handle();
|
||||
let b2c_abort = b2c.abort_handle();
|
||||
tokio::spawn(async move {
|
||||
let watchdog = tokio::spawn(async move {
|
||||
let check_interval = std::time::Duration::from_secs(5);
|
||||
let mut last_seen = 0u64;
|
||||
loop {
|
||||
@@ -665,6 +672,7 @@ async fn forward_quic_stream_to_tcp(
|
||||
|
||||
let bytes_in = c2b.await.unwrap_or(0);
|
||||
let bytes_out = b2c.await.unwrap_or(0);
|
||||
watchdog.abort();
|
||||
|
||||
Ok((bytes_in, bytes_out))
|
||||
}
|
||||
|
||||
@@ -1,126 +1,4 @@
|
||||
//! Socket handler relay for connecting client connections to a TypeScript handler
|
||||
//! via a Unix domain socket.
|
||||
//! Socket handler relay module.
|
||||
//!
|
||||
//! Protocol: Send a JSON metadata line terminated by `\n`, then bidirectional relay.
|
||||
|
||||
use tokio::net::UnixStream;
|
||||
use tokio::io::{AsyncWriteExt, AsyncReadExt};
|
||||
use tokio::net::TcpStream;
|
||||
use serde::Serialize;
|
||||
use tracing::debug;
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct RelayMetadata {
|
||||
connection_id: u64,
|
||||
remote_ip: String,
|
||||
remote_port: u16,
|
||||
local_port: u16,
|
||||
sni: Option<String>,
|
||||
route_name: String,
|
||||
initial_data_base64: Option<String>,
|
||||
}
|
||||
|
||||
/// Relay a client connection to a TypeScript handler via Unix domain socket.
|
||||
///
|
||||
/// Protocol: Send a JSON metadata line terminated by `\n`, then bidirectional relay.
|
||||
pub async fn relay_to_handler(
|
||||
client: TcpStream,
|
||||
relay_socket_path: &str,
|
||||
connection_id: u64,
|
||||
remote_ip: String,
|
||||
remote_port: u16,
|
||||
local_port: u16,
|
||||
sni: Option<String>,
|
||||
route_name: String,
|
||||
initial_data: Option<&[u8]>,
|
||||
) -> std::io::Result<()> {
|
||||
debug!(
|
||||
"Relaying connection {} to handler socket {}",
|
||||
connection_id, relay_socket_path
|
||||
);
|
||||
|
||||
// Connect to TypeScript handler Unix socket
|
||||
let mut handler = UnixStream::connect(relay_socket_path).await?;
|
||||
|
||||
// Build and send metadata header
|
||||
let initial_data_base64 = initial_data.map(base64_encode);
|
||||
|
||||
let metadata = RelayMetadata {
|
||||
connection_id,
|
||||
remote_ip,
|
||||
remote_port,
|
||||
local_port,
|
||||
sni,
|
||||
route_name,
|
||||
initial_data_base64,
|
||||
};
|
||||
|
||||
let metadata_json = serde_json::to_string(&metadata)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
|
||||
|
||||
handler.write_all(metadata_json.as_bytes()).await?;
|
||||
handler.write_all(b"\n").await?;
|
||||
|
||||
// Bidirectional relay between client and handler
|
||||
let (mut client_read, mut client_write) = client.into_split();
|
||||
let (mut handler_read, mut handler_write) = handler.into_split();
|
||||
|
||||
let c2h = tokio::spawn(async move {
|
||||
let mut buf = vec![0u8; 65536];
|
||||
loop {
|
||||
let n = match client_read.read(&mut buf).await {
|
||||
Ok(0) | Err(_) => break,
|
||||
Ok(n) => n,
|
||||
};
|
||||
if handler_write.write_all(&buf[..n]).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let _ = handler_write.shutdown().await;
|
||||
});
|
||||
|
||||
let h2c = tokio::spawn(async move {
|
||||
let mut buf = vec![0u8; 65536];
|
||||
loop {
|
||||
let n = match handler_read.read(&mut buf).await {
|
||||
Ok(0) | Err(_) => break,
|
||||
Ok(n) => n,
|
||||
};
|
||||
if client_write.write_all(&buf[..n]).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let _ = client_write.shutdown().await;
|
||||
});
|
||||
|
||||
let _ = tokio::join!(c2h, h2c);
|
||||
|
||||
debug!("Relay connection {} completed", connection_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Simple base64 encoding without external dependency.
|
||||
fn base64_encode(data: &[u8]) -> String {
|
||||
const CHARS: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
|
||||
let mut result = String::new();
|
||||
for chunk in data.chunks(3) {
|
||||
let b0 = chunk[0] as u32;
|
||||
let b1 = if chunk.len() > 1 { chunk[1] as u32 } else { 0 };
|
||||
let b2 = if chunk.len() > 2 { chunk[2] as u32 } else { 0 };
|
||||
let n = (b0 << 16) | (b1 << 8) | b2;
|
||||
result.push(CHARS[((n >> 18) & 0x3F) as usize] as char);
|
||||
result.push(CHARS[((n >> 12) & 0x3F) as usize] as char);
|
||||
if chunk.len() > 1 {
|
||||
result.push(CHARS[((n >> 6) & 0x3F) as usize] as char);
|
||||
} else {
|
||||
result.push('=');
|
||||
}
|
||||
if chunk.len() > 2 {
|
||||
result.push(CHARS[(n & 0x3F) as usize] as char);
|
||||
} else {
|
||||
result.push('=');
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
//! Note: The actual relay logic lives in `tcp_listener::relay_to_socket_handler()`
|
||||
//! which has proper timeouts, cancellation, and metrics integration.
|
||||
|
||||
@@ -182,6 +182,7 @@ impl TcpListenerManager {
|
||||
http_proxy_svc.set_backend_tls_config_alpn(tls_handler::shared_backend_tls_config_alpn());
|
||||
http_proxy_svc.set_connection_timeouts(
|
||||
std::time::Duration::from_millis(conn_config.socket_timeout_ms),
|
||||
std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms),
|
||||
std::time::Duration::from_millis(conn_config.socket_timeout_ms),
|
||||
std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms),
|
||||
);
|
||||
@@ -220,6 +221,7 @@ impl TcpListenerManager {
|
||||
http_proxy_svc.set_backend_tls_config_alpn(tls_handler::shared_backend_tls_config_alpn());
|
||||
http_proxy_svc.set_connection_timeouts(
|
||||
std::time::Duration::from_millis(conn_config.socket_timeout_ms),
|
||||
std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms),
|
||||
std::time::Duration::from_millis(conn_config.socket_timeout_ms),
|
||||
std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms),
|
||||
);
|
||||
@@ -263,6 +265,7 @@ impl TcpListenerManager {
|
||||
http_proxy_svc.set_backend_tls_config_alpn(tls_handler::shared_backend_tls_config_alpn());
|
||||
http_proxy_svc.set_connection_timeouts(
|
||||
std::time::Duration::from_millis(config.socket_timeout_ms),
|
||||
std::time::Duration::from_millis(config.max_connection_lifetime_ms),
|
||||
std::time::Duration::from_millis(config.socket_timeout_ms),
|
||||
std::time::Duration::from_millis(config.max_connection_lifetime_ms),
|
||||
);
|
||||
|
||||
@@ -355,8 +355,6 @@ mod tests {
|
||||
load_balancing: None,
|
||||
advanced: None,
|
||||
options: None,
|
||||
forwarding_engine: None,
|
||||
nftables: None,
|
||||
send_proxy_protocol: None,
|
||||
udp: None,
|
||||
},
|
||||
|
||||
@@ -20,7 +20,6 @@ rustproxy-routing = { workspace = true }
|
||||
rustproxy-tls = { workspace = true }
|
||||
rustproxy-passthrough = { workspace = true }
|
||||
rustproxy-http = { workspace = true }
|
||||
rustproxy-nftables = { workspace = true }
|
||||
rustproxy-metrics = { workspace = true }
|
||||
rustproxy-security = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
|
||||
@@ -41,16 +41,14 @@ pub use rustproxy_routing;
|
||||
pub use rustproxy_passthrough;
|
||||
pub use rustproxy_tls;
|
||||
pub use rustproxy_http;
|
||||
pub use rustproxy_nftables;
|
||||
pub use rustproxy_metrics;
|
||||
pub use rustproxy_security;
|
||||
|
||||
use rustproxy_config::{RouteConfig, RustProxyOptions, TlsMode, CertificateSpec, ForwardingEngine};
|
||||
use rustproxy_config::{RouteConfig, RustProxyOptions, TlsMode, CertificateSpec};
|
||||
use rustproxy_routing::RouteManager;
|
||||
use rustproxy_passthrough::{TcpListenerManager, UdpListenerManager, TlsCertConfig, ConnectionConfig};
|
||||
use rustproxy_metrics::{MetricsCollector, Metrics, Statistics};
|
||||
use rustproxy_tls::{CertManager, CertStore, CertBundle, CertMetadata, CertSource};
|
||||
use rustproxy_nftables::{NftManager, rule_builder};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
/// Certificate status.
|
||||
@@ -74,7 +72,6 @@ pub struct RustProxy {
|
||||
challenge_server: Option<challenge_server::ChallengeServer>,
|
||||
renewal_handle: Option<tokio::task::JoinHandle<()>>,
|
||||
sampling_handle: Option<tokio::task::JoinHandle<()>>,
|
||||
nft_manager: Option<NftManager>,
|
||||
started: bool,
|
||||
started_at: Option<Instant>,
|
||||
/// Shared path to a Unix domain socket for relaying socket-handler connections back to TypeScript.
|
||||
@@ -121,7 +118,6 @@ impl RustProxy {
|
||||
challenge_server: None,
|
||||
renewal_handle: None,
|
||||
sampling_handle: None,
|
||||
nft_manager: None,
|
||||
started: false,
|
||||
started_at: None,
|
||||
socket_handler_relay: Arc::new(std::sync::RwLock::new(None)),
|
||||
@@ -363,6 +359,7 @@ impl RustProxy {
|
||||
// Start the throughput sampling task with cooperative cancellation
|
||||
let metrics = Arc::clone(&self.metrics);
|
||||
let conn_tracker = self.listener_manager.as_ref().unwrap().conn_tracker().clone();
|
||||
let http_proxy = self.listener_manager.as_ref().unwrap().http_proxy().clone();
|
||||
let interval_ms = self.options.metrics.as_ref()
|
||||
.and_then(|m| m.sample_interval_ms)
|
||||
.unwrap_or(1000);
|
||||
@@ -378,14 +375,14 @@ impl RustProxy {
|
||||
metrics.sample_all();
|
||||
// Periodically clean up stale rate-limit timestamp entries
|
||||
conn_tracker.cleanup_stale_timestamps();
|
||||
// Clean up expired rate limiter entries to prevent unbounded
|
||||
// growth from unique IPs after traffic stops
|
||||
http_proxy.cleanup_all_rate_limiters();
|
||||
}
|
||||
}
|
||||
}
|
||||
}));
|
||||
|
||||
// Apply NFTables rules for routes using nftables forwarding engine
|
||||
self.apply_nftables_rules(&self.options.routes.clone()).await;
|
||||
|
||||
// Start renewal timer if ACME is enabled
|
||||
self.start_renewal_timer();
|
||||
|
||||
@@ -612,14 +609,6 @@ impl RustProxy {
|
||||
}
|
||||
self.challenge_server = None;
|
||||
|
||||
// Clean up NFTables rules
|
||||
if let Some(ref mut nft) = self.nft_manager {
|
||||
if let Err(e) = nft.cleanup().await {
|
||||
warn!("NFTables cleanup failed: {}", e);
|
||||
}
|
||||
}
|
||||
self.nft_manager = None;
|
||||
|
||||
if let Some(ref mut listener) = self.listener_manager {
|
||||
listener.graceful_stop().await;
|
||||
}
|
||||
@@ -821,9 +810,6 @@ impl RustProxy {
|
||||
}
|
||||
}
|
||||
|
||||
// Update NFTables rules: remove old, apply new
|
||||
self.update_nftables_rules(&routes).await;
|
||||
|
||||
self.options.routes = routes;
|
||||
Ok(())
|
||||
}
|
||||
@@ -1145,99 +1131,6 @@ impl RustProxy {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get NFTables status.
|
||||
pub async fn get_nftables_status(&self) -> Result<HashMap<String, serde_json::Value>> {
|
||||
match &self.nft_manager {
|
||||
Some(nft) => Ok(nft.status()),
|
||||
None => Ok(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply NFTables rules for routes using the nftables forwarding engine.
|
||||
async fn apply_nftables_rules(&mut self, routes: &[RouteConfig]) {
|
||||
let nft_routes: Vec<&RouteConfig> = routes.iter()
|
||||
.filter(|r| r.action.forwarding_engine.as_ref() == Some(&ForwardingEngine::Nftables))
|
||||
.collect();
|
||||
|
||||
if nft_routes.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
info!("Applying NFTables rules for {} routes", nft_routes.len());
|
||||
|
||||
let table_name = nft_routes.iter()
|
||||
.find_map(|r| r.action.nftables.as_ref()?.table_name.clone())
|
||||
.unwrap_or_else(|| "rustproxy".to_string());
|
||||
|
||||
let mut nft = NftManager::new(Some(table_name));
|
||||
|
||||
for route in &nft_routes {
|
||||
let route_id = route.id.as_deref()
|
||||
.or(route.name.as_deref())
|
||||
.unwrap_or("unnamed");
|
||||
|
||||
let nft_options = match &route.action.nftables {
|
||||
Some(opts) => opts.clone(),
|
||||
None => rustproxy_config::NfTablesOptions {
|
||||
preserve_source_ip: None,
|
||||
protocol: None,
|
||||
max_rate: None,
|
||||
priority: None,
|
||||
table_name: None,
|
||||
use_ip_sets: None,
|
||||
use_advanced_nat: None,
|
||||
},
|
||||
};
|
||||
|
||||
let targets = match &route.action.targets {
|
||||
Some(targets) => targets,
|
||||
None => {
|
||||
warn!("NFTables route '{}' has no targets, skipping", route_id);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let source_ports = route.route_match.ports.to_ports();
|
||||
for target in targets {
|
||||
let target_host = target.host.first().to_string();
|
||||
let target_port_spec = &target.port;
|
||||
|
||||
for &source_port in &source_ports {
|
||||
let resolved_port = target_port_spec.resolve(source_port);
|
||||
let rules = rule_builder::build_dnat_rule(
|
||||
nft.table_name(),
|
||||
"prerouting",
|
||||
source_port,
|
||||
&target_host,
|
||||
resolved_port,
|
||||
&nft_options,
|
||||
);
|
||||
|
||||
let rule_id = format!("{}-{}-{}", route_id, source_port, resolved_port);
|
||||
if let Err(e) = nft.apply_rules(&rule_id, rules).await {
|
||||
error!("Failed to apply NFTables rules for route '{}': {}", route_id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.nft_manager = Some(nft);
|
||||
}
|
||||
|
||||
/// Update NFTables rules when routes change.
|
||||
async fn update_nftables_rules(&mut self, new_routes: &[RouteConfig]) {
|
||||
// Clean up old rules
|
||||
if let Some(ref mut nft) = self.nft_manager {
|
||||
if let Err(e) = nft.cleanup().await {
|
||||
warn!("NFTables cleanup during update failed: {}", e);
|
||||
}
|
||||
}
|
||||
self.nft_manager = None;
|
||||
|
||||
// Apply new rules
|
||||
self.apply_nftables_rules(new_routes).await;
|
||||
}
|
||||
|
||||
/// Extract TLS configurations from route configs.
|
||||
fn extract_tls_configs(routes: &[RouteConfig]) -> HashMap<String, TlsCertConfig> {
|
||||
let mut configs = HashMap::new();
|
||||
|
||||
@@ -147,7 +147,6 @@ async fn handle_request(
|
||||
"renewCertificate" => handle_renew_certificate(&id, &request.params, proxy).await,
|
||||
"getCertificateStatus" => handle_get_certificate_status(&id, &request.params, proxy).await,
|
||||
"getListeningPorts" => handle_get_listening_ports(&id, proxy),
|
||||
"getNftablesStatus" => handle_get_nftables_status(&id, proxy).await,
|
||||
"setSocketHandlerRelay" => handle_set_socket_handler_relay(&id, &request.params, proxy).await,
|
||||
"setDatagramHandlerRelay" => handle_set_datagram_handler_relay(&id, &request.params, proxy).await,
|
||||
"addListeningPort" => handle_add_listening_port(&id, &request.params, proxy).await,
|
||||
@@ -352,26 +351,6 @@ fn handle_get_listening_ports(
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_get_nftables_status(
|
||||
id: &str,
|
||||
proxy: &Option<RustProxy>,
|
||||
) -> ManagementResponse {
|
||||
match proxy.as_ref() {
|
||||
Some(p) => {
|
||||
match p.get_nftables_status().await {
|
||||
Ok(status) => {
|
||||
match serde_json::to_value(&status) {
|
||||
Ok(v) => ManagementResponse::ok(id.to_string(), v),
|
||||
Err(e) => ManagementResponse::err(id.to_string(), format!("Failed to serialize: {}", e)),
|
||||
}
|
||||
}
|
||||
Err(e) => ManagementResponse::err(id.to_string(), format!("Failed to get status: {}", e)),
|
||||
}
|
||||
}
|
||||
None => ManagementResponse::ok(id.to_string(), serde_json::json!({})),
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_set_socket_handler_relay(
|
||||
id: &str,
|
||||
params: &serde_json::Value,
|
||||
|
||||
@@ -297,8 +297,6 @@ pub fn make_test_route(
|
||||
load_balancing: None,
|
||||
advanced: None,
|
||||
options: None,
|
||||
forwarding_engine: None,
|
||||
nftables: None,
|
||||
send_proxy_protocol: None,
|
||||
udp: None,
|
||||
},
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartproxy',
|
||||
version: '26.2.0',
|
||||
version: '26.3.0',
|
||||
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
|
||||
}
|
||||
|
||||
@@ -19,12 +19,14 @@ export { tsclass };
|
||||
import * as smartcrypto from '@push.rocks/smartcrypto';
|
||||
import * as smartlog from '@push.rocks/smartlog';
|
||||
import * as smartlogDestinationLocal from '@push.rocks/smartlog/destination-local';
|
||||
import * as smartnftables from '@push.rocks/smartnftables';
|
||||
import * as smartrust from '@push.rocks/smartrust';
|
||||
|
||||
export {
|
||||
smartcrypto,
|
||||
smartlog,
|
||||
smartlogDestinationLocal,
|
||||
smartnftables,
|
||||
smartrust,
|
||||
};
|
||||
|
||||
|
||||
@@ -26,6 +26,8 @@ interface IDatagramRelayMessage {
|
||||
* - TS→Rust: { type: "reply", sourceIp, sourcePort, destPort, payloadBase64 }
|
||||
*/
|
||||
export class DatagramHandlerServer {
|
||||
private static readonly MAX_BUFFER_SIZE = 50 * 1024 * 1024; // 50 MB
|
||||
|
||||
private server: plugins.net.Server | null = null;
|
||||
private connection: plugins.net.Socket | null = null;
|
||||
private socketPath: string;
|
||||
@@ -100,6 +102,11 @@ export class DatagramHandlerServer {
|
||||
|
||||
socket.on('data', (chunk: Buffer) => {
|
||||
this.readBuffer = Buffer.concat([this.readBuffer, chunk]);
|
||||
if (this.readBuffer.length > DatagramHandlerServer.MAX_BUFFER_SIZE) {
|
||||
logger.log('error', `DatagramHandlerServer: buffer exceeded ${DatagramHandlerServer.MAX_BUFFER_SIZE} bytes, resetting`);
|
||||
this.readBuffer = Buffer.alloc(0);
|
||||
return;
|
||||
}
|
||||
this.processFrames();
|
||||
});
|
||||
|
||||
|
||||
@@ -15,7 +15,6 @@ type TSmartProxyCommands = {
|
||||
renewCertificate: { params: { routeName: string }; result: void };
|
||||
getCertificateStatus: { params: { routeName: string }; result: any };
|
||||
getListeningPorts: { params: Record<string, never>; result: { ports: number[] } };
|
||||
getNftablesStatus: { params: Record<string, never>; result: any };
|
||||
setSocketHandlerRelay: { params: { socketPath: string }; result: void };
|
||||
addListeningPort: { params: { port: number }; result: void };
|
||||
removeListeningPort: { params: { port: number }; result: void };
|
||||
@@ -159,10 +158,6 @@ export class RustProxyBridge extends plugins.EventEmitter {
|
||||
return result?.ports ?? [];
|
||||
}
|
||||
|
||||
public async getNftablesStatus(): Promise<any> {
|
||||
return this.bridge.sendCommand('getNftablesStatus', {} as Record<string, never>);
|
||||
}
|
||||
|
||||
public async setSocketHandlerRelay(socketPath: string): Promise<void> {
|
||||
await this.bridge.sendCommand('setSocketHandlerRelay', { socketPath });
|
||||
}
|
||||
|
||||
@@ -23,8 +23,8 @@ import type { IMetrics } from './models/metrics-types.js';
|
||||
/**
|
||||
* SmartProxy - Rust-backed proxy engine with TypeScript configuration API.
|
||||
*
|
||||
* All networking (TCP, TLS, HTTP reverse proxy, connection management, security,
|
||||
* NFTables) is handled by the Rust binary. TypeScript is only:
|
||||
* All networking (TCP, TLS, HTTP reverse proxy, connection management, security)
|
||||
* is handled by the Rust binary. TypeScript is only:
|
||||
* - The npm module interface (types, route helpers)
|
||||
* - The thin IPC wrapper (this class)
|
||||
* - Socket-handler callback relay (for JS-defined handlers)
|
||||
@@ -39,6 +39,7 @@ export class SmartProxy extends plugins.EventEmitter {
|
||||
private socketHandlerServer: SocketHandlerServer | null = null;
|
||||
private datagramHandlerServer: DatagramHandlerServer | null = null;
|
||||
private metricsAdapter: RustMetricsAdapter;
|
||||
private nftablesManager: InstanceType<typeof plugins.smartnftables.SmartNftables> | null = null;
|
||||
private routeUpdateLock: Mutex;
|
||||
private stopping = false;
|
||||
private certProvisionPromise: Promise<void> | null = null;
|
||||
@@ -128,6 +129,7 @@ export class SmartProxy extends plugins.EventEmitter {
|
||||
}
|
||||
|
||||
// Handle unexpected exit (only emits error if not intentionally stopping)
|
||||
this.bridge.removeAllListeners('exit');
|
||||
this.bridge.on('exit', (code: number | null, signal: string | null) => {
|
||||
if (this.stopping) return;
|
||||
logger.log('error', `RustProxy exited unexpectedly (code=${code}, signal=${signal})`, { component: 'smart-proxy' });
|
||||
@@ -210,6 +212,9 @@ export class SmartProxy extends plugins.EventEmitter {
|
||||
}
|
||||
}
|
||||
|
||||
// Apply NFTables rules for routes using nftables forwarding engine
|
||||
await this.applyNftablesRules(this.settings.routes);
|
||||
|
||||
// Start metrics polling BEFORE cert provisioning — the Rust engine is already
|
||||
// running and accepting connections, so metrics should be available immediately.
|
||||
// Cert provisioning can hang indefinitely (e.g. DNS-01 ACME timeouts) and must
|
||||
@@ -237,6 +242,12 @@ export class SmartProxy extends plugins.EventEmitter {
|
||||
this.certProvisionPromise = null;
|
||||
}
|
||||
|
||||
// Clean up NFTables rules
|
||||
if (this.nftablesManager) {
|
||||
await this.nftablesManager.cleanup();
|
||||
this.nftablesManager = null;
|
||||
}
|
||||
|
||||
// Stop metrics polling
|
||||
this.metricsAdapter.stopPolling();
|
||||
|
||||
@@ -318,6 +329,13 @@ export class SmartProxy extends plugins.EventEmitter {
|
||||
this.datagramHandlerServer = null;
|
||||
}
|
||||
|
||||
// Update NFTables rules
|
||||
if (this.nftablesManager) {
|
||||
await this.nftablesManager.cleanup();
|
||||
this.nftablesManager = null;
|
||||
}
|
||||
await this.applyNftablesRules(newRoutes);
|
||||
|
||||
// Update stored routes
|
||||
this.settings.routes = newRoutes;
|
||||
|
||||
@@ -410,14 +428,59 @@ export class SmartProxy extends plugins.EventEmitter {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get NFTables status (async - calls Rust).
|
||||
* Get NFTables status.
|
||||
*/
|
||||
public async getNfTablesStatus(): Promise<Record<string, any>> {
|
||||
return this.bridge.getNftablesStatus();
|
||||
public getNfTablesStatus(): plugins.smartnftables.INftStatus | null {
|
||||
return this.nftablesManager?.status() ?? null;
|
||||
}
|
||||
|
||||
// --- Private helpers ---
|
||||
|
||||
/**
|
||||
* Apply NFTables rules for routes using the nftables forwarding engine.
|
||||
*/
|
||||
private async applyNftablesRules(routes: IRouteConfig[]): Promise<void> {
|
||||
const nftRoutes = routes.filter(r => r.action.forwardingEngine === 'nftables');
|
||||
if (nftRoutes.length === 0) return;
|
||||
|
||||
const tableName = nftRoutes.find(r => r.action.nftables?.tableName)?.action.nftables?.tableName ?? 'smartproxy';
|
||||
const nft = new plugins.smartnftables.SmartNftables({ tableName });
|
||||
await nft.initialize();
|
||||
|
||||
for (const route of nftRoutes) {
|
||||
const routeId = route.name || 'unnamed';
|
||||
const targets = route.action.targets;
|
||||
if (!targets) continue;
|
||||
|
||||
const nftOpts = route.action.nftables;
|
||||
const protocol: plugins.smartnftables.TNftProtocol = (nftOpts?.protocol as any) ?? 'tcp';
|
||||
const preserveSourceIP = nftOpts?.preserveSourceIP ?? false;
|
||||
|
||||
const ports = Array.isArray(route.match.ports)
|
||||
? route.match.ports.flatMap(p => typeof p === 'number' ? [p] : [])
|
||||
: typeof route.match.ports === 'number' ? [route.match.ports] : [];
|
||||
|
||||
for (const target of targets) {
|
||||
const targetHost = Array.isArray(target.host) ? target.host[0] : target.host;
|
||||
if (typeof targetHost !== 'string') continue;
|
||||
|
||||
for (const sourcePort of ports) {
|
||||
const targetPort = typeof target.port === 'number' ? target.port : sourcePort;
|
||||
await nft.nat.addPortForwarding(`${routeId}-${sourcePort}-${targetPort}`, {
|
||||
sourcePort,
|
||||
targetHost,
|
||||
targetPort,
|
||||
protocol,
|
||||
preserveSourceIP,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.nftablesManager = nft;
|
||||
logger.log('info', `Applied NFTables rules for ${nftRoutes.length} route(s)`, { component: 'smart-proxy' });
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the Rust configuration object from TS settings.
|
||||
*/
|
||||
|
||||
@@ -69,14 +69,15 @@ export function createOffsetPortMappingRoute(options: {
|
||||
priority?: number;
|
||||
[key: string]: any;
|
||||
}): IRouteConfig {
|
||||
const { ports, targetHost, offset, name, domains, priority, ...rest } = options;
|
||||
return createPortMappingRoute({
|
||||
sourcePortRange: options.ports,
|
||||
targetHost: options.targetHost,
|
||||
portMapper: (context) => context.port + options.offset,
|
||||
name: options.name || `Offset Mapping (${options.offset > 0 ? '+' : ''}${options.offset}) for ${options.domains || 'all domains'}`,
|
||||
domains: options.domains,
|
||||
priority: options.priority,
|
||||
...options
|
||||
sourcePortRange: ports,
|
||||
targetHost,
|
||||
portMapper: (context) => context.port + offset,
|
||||
name: name || `Offset Mapping (${offset > 0 ? '+' : ''}${offset}) for ${domains || 'all domains'}`,
|
||||
domains,
|
||||
priority,
|
||||
...rest
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -258,8 +258,10 @@ export class RouteValidator {
|
||||
errorMap.set(route.name, existingErrors);
|
||||
valid = false;
|
||||
}
|
||||
if (route.name) {
|
||||
routeNames.add(route.name);
|
||||
}
|
||||
}
|
||||
|
||||
// Validate each route
|
||||
for (const route of routes) {
|
||||
@@ -328,7 +330,7 @@ export class RouteValidator {
|
||||
if (catchAllRoutes.length > 1) {
|
||||
for (const route of catchAllRoutes) {
|
||||
conflicts.push({
|
||||
route: route.name,
|
||||
route: route.name || 'unnamed',
|
||||
message: `Multiple catch-all routes on port ${port}`
|
||||
});
|
||||
}
|
||||
|
||||
@@ -7,8 +7,7 @@
|
||||
"moduleResolution": "NodeNext",
|
||||
"esModuleInterop": true,
|
||||
"verbatimModuleSyntax": true,
|
||||
"baseUrl": ".",
|
||||
"paths": {}
|
||||
"ignoreDeprecations": "6.0"
|
||||
},
|
||||
"exclude": [
|
||||
"dist_*/**/*.d.ts"
|
||||
|
||||
Reference in New Issue
Block a user