Compare commits

...

12 Commits

Author SHA1 Message Date
fdabf807b0 v23.1.3
Some checks failed
Default (tags) / security (push) Successful in 44s
Default (tags) / test (push) Failing after 4m6s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-12 20:17:32 +00:00
81e0e6b4d8 fix(rustproxy): install default rustls crypto provider early; detect and skip raw fast-path for HTTP connections and return proper HTTP 502 when no route matches 2026-02-12 20:17:32 +00:00
28fa69bf59 v23.1.2
Some checks failed
Default (tags) / security (push) Successful in 36s
Default (tags) / test (push) Failing after 4m1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-11 13:48:30 +00:00
5019658032 fix(core): use node: scoped builtin imports and add route unit tests 2026-02-11 13:48:30 +00:00
a9fe365c78 v23.1.1
Some checks failed
Default (tags) / security (push) Successful in 39s
Default (tags) / test (push) Failing after 4m1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-11 12:52:45 +00:00
32e0410227 fix(rust-proxy): increase rust proxy bridge maxPayloadSize to 100 MB and bump dependencies 2026-02-11 12:52:45 +00:00
fd56064495 v23.1.0
Some checks failed
Default (tags) / security (push) Successful in 35s
Default (tags) / test (push) Failing after 4m1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-10 09:43:40 +00:00
3b7e6a6ed7 feat(rust-bridge): integrate tsrust to build and locate cross-compiled Rust binaries; refactor rust-proxy bridge to use typed IPC and streamline process handling; add @push.rocks/smartrust and update build/dev dependencies 2026-02-10 09:43:40 +00:00
131ed8949a v23.0.0
Some checks failed
Default (tags) / security (push) Successful in 52s
Default (tags) / test (push) Failing after 48s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-09 17:11:37 +00:00
7b3009dc53 BREAKING CHANGE(proxies/nftables-proxy): remove nftables-proxy implementation, models, and utilities from the repository 2026-02-09 17:11:37 +00:00
db2e2fb76e v22.6.0
Some checks failed
Default (tags) / security (push) Successful in 40s
Default (tags) / test (push) Failing after 48s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-02-09 16:25:33 +00:00
f7605e042e feat(smart-proxy): add socket-handler relay, fast-path port-only forwarding, metrics and bridge improvements, and various TS/Rust integration fixes 2026-02-09 16:25:33 +00:00
46 changed files with 10160 additions and 4133 deletions

View File

@@ -1,5 +1,57 @@
# Changelog
## 2026-02-12 - 23.1.3 - fix(rustproxy)
install default rustls crypto provider early; detect and skip raw fast-path for HTTP connections and return proper HTTP 502 when no route matches
- Install ring-based rustls crypto provider at startup to prevent panics from instant-acme/hyper-rustls calling ClientConfig::builder() before TLS listeners are initialized
- Add a non-blocking 10ms peek to detect HTTP traffic in the TCP passthrough fast-path to avoid misrouting HTTP and ensure HTTP proxy handles CORS, errors, and request-level routing
- Skip the fast-path and fall back to the HTTP proxy when HTTP is detected (with a debug log)
- When no route matches for detected HTTP connections, send an HTTP 502 Bad Gateway response and close the connection instead of silently dropping it
## 2026-02-11 - 23.1.2 - fix(core)
use node: scoped builtin imports and add route unit tests
- Replaced bare Node built-in imports (events, fs, http, https, net, path, tls, url, http2, buffer, crypto) with 'node:' specifiers for ESM/bundler compatibility (files updated include ts/plugins.ts, ts/core/models/socket-types.ts, ts/core/utils/enhanced-connection-pool.ts, ts/core/utils/socket-tracker.ts, ts/protocols/common/fragment-handler.ts, ts/protocols/tls/sni/client-hello-parser.ts, ts/protocols/tls/sni/sni-extraction.ts, ts/protocols/websocket/utils.ts, ts/tls/sni/sni-handler.ts).
- Added new unit tests (test/test.bun.ts and test/test.deno.ts) covering route helpers, validators, matching, merging and cloning to improve test coverage.
## 2026-02-11 - 23.1.1 - fix(rust-proxy)
increase rust proxy bridge maxPayloadSize to 100 MB and bump dependencies
- Set maxPayloadSize to 100 * 1024 * 1024 (100 MB) in ts/proxies/smart-proxy/rust-proxy-bridge.ts to support large route configs
- Bump devDependency @types/node from ^25.2.2 to ^25.2.3
- Bump dependency @push.rocks/smartrust from ^1.1.1 to ^1.2.0
## 2026-02-10 - 23.1.0 - feat(rust-bridge)
integrate tsrust to build and locate cross-compiled Rust binaries; refactor rust-proxy bridge to use typed IPC and streamline process handling; add @push.rocks/smartrust and update build/dev dependencies
- Add tsrust to the build script and include dist_rust candidates when locating the Rust binary (enables cross-compiled artifacts produced by tsrust).
- Remove the old rust-binary-locator and refactor rust-proxy-bridge to use explicit, typed IPC command definitions and improved process spawn/cleanup logic.
- Introduce @push.rocks/smartrust for type-safe JSON IPC and export it via plugins; update README with expanded metrics documentation and change initialDataTimeout default from 60s to 120s.
- Add rust/.cargo/config.toml with aarch64 linker configuration to support cross-compilation for arm64.
- Bump several devDependencies and runtime dependencies (e.g. @git.zone/tsbuild, @git.zone/tstest, @push.rocks/smartserve, @push.rocks/taskbuffer, ws, minimatch, etc.).
- Update runtime message guiding local builds to use 'pnpm build' (tsrust) instead of direct cargo invocation.
## 2026-02-09 - 23.0.0 - BREAKING CHANGE(proxies/nftables-proxy)
remove nftables-proxy implementation, models, and utilities from the repository
- Deleted nftables-proxy module files under ts/proxies/nftables-proxy (index, models, utils, command executor, validators, etc.)
- Removed nftables-proxy exports from ts/index.ts and ts/proxies/index.ts
- Updated smart-proxy types to drop dependency on nftables proxy models
- Breaking change: any consumers importing nftables-proxy will no longer find those exports; update imports or install/use the extracted/alternative package if applicable
## 2026-02-09 - 22.6.0 - feat(smart-proxy)
add socket-handler relay, fast-path port-only forwarding, metrics and bridge improvements, and various TS/Rust integration fixes
- Add Unix-domain socket relay for socket-handler routes so Rust can hand off matched connections to TypeScript handlers (metadata JSON + initial bytes, relay implementation in Rust and SocketHandlerServer in TS).
- Implement fast-path port-only forwarding in the TCP accept/handler path to forward simple non-TLS, port-only routes immediately without peeking at client data (improves server-speaks-first protocol handling).
- Use ArcSwap for route manager hot-reload visibility in accept loops and share socket_handler_relay via Arc<RwLock> so listeners see relay path updates immediately.
- Enhance SNI/HTTP parsing: add extract_http_path and extract_http_host to aid domain/path matching from initial data.
- Improve RustProxy shutdown/kill handling: remove listeners, reject pending requests, destroy stdio pipes and unref process to avoid leaking handles.
- Enhance Rust <-> TS metrics bridge and adapter: add immediate poll(), map Rust JSON fields to IMetrics (per-route active/throughput/totals), and use safer polling/unref timers.
- SocketHandlerServer enhancements: track active sockets, destroy on stop, pause/resume to prevent data loss, support async socketHandler callbacks and dynamic function-based target forwarding (resolve host/port functions and forward).
- TypeScript smart-proxy lifecycle tweaks: only set bridge relay after Rust starts, guard unexpected-exit emission when intentionally stopping, stop polling and remove listeners on stop, add stopping flag.
- Misc: README and API ergonomics updates (nft proxy option renames and config comments), various test updates to use stable http.request helper, adjust timeouts/metrics sampling and assertions, and multiple small bugfixes in listeners, timeouts and TLS typings.
## 2026-02-09 - 22.5.0 - feat(rustproxy)
introduce a Rust-powered proxy engine and workspace with core crates for proxy functionality, ACME/TLS support, passthrough and HTTP proxies, metrics, nftables integration, routing/security, management IPC, tests, and README updates

7333
deno.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -40,5 +40,8 @@
},
"@ship.zone/szci": {
"npmGlobalTools": []
},
"@git.zone/tsrust": {
"targets": ["linux_amd64", "linux_arm64"]
}
}

View File

@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartproxy",
"version": "22.5.0",
"version": "23.1.3",
"private": false,
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
"main": "dist_ts/index.js",
@@ -10,16 +10,17 @@
"license": "MIT",
"scripts": {
"test": "(tstest test/**/test*.ts --verbose --timeout 60 --logfile)",
"build": "(tsbuild tsfolders --allowimplicitany)",
"build": "(tsbuild tsfolders --allowimplicitany) && (tsrust)",
"format": "(gitzone format)",
"buildDocs": "tsdoc"
},
"devDependencies": {
"@git.zone/tsbuild": "^3.1.2",
"@git.zone/tsrun": "^2.0.0",
"@git.zone/tstest": "^3.1.3",
"@push.rocks/smartserve": "^1.4.0",
"@types/node": "^24.10.2",
"@git.zone/tsbuild": "^4.1.2",
"@git.zone/tsrun": "^2.0.1",
"@git.zone/tsrust": "^1.3.0",
"@git.zone/tstest": "^3.1.8",
"@push.rocks/smartserve": "^2.0.1",
"@types/node": "^25.2.3",
"typescript": "^5.9.3",
"why-is-node-running": "^3.2.2"
},
@@ -28,20 +29,21 @@
"@push.rocks/smartacme": "^8.0.0",
"@push.rocks/smartcrypto": "^2.0.4",
"@push.rocks/smartdelay": "^3.0.5",
"@push.rocks/smartfile": "^13.1.0",
"@push.rocks/smartfile": "^13.1.2",
"@push.rocks/smartlog": "^3.1.10",
"@push.rocks/smartnetwork": "^4.4.0",
"@push.rocks/smartpromise": "^4.2.3",
"@push.rocks/smartrequest": "^5.0.1",
"@push.rocks/smartrust": "^1.2.0",
"@push.rocks/smartrx": "^3.0.10",
"@push.rocks/smartstring": "^4.1.0",
"@push.rocks/taskbuffer": "^3.5.0",
"@push.rocks/taskbuffer": "^4.2.0",
"@tsclass/tsclass": "^9.3.0",
"@types/minimatch": "^6.0.0",
"@types/ws": "^8.18.1",
"minimatch": "^10.1.1",
"minimatch": "^10.1.2",
"pretty-ms": "^9.3.0",
"ws": "^8.18.3"
"ws": "^8.19.0"
},
"files": [
"ts/**/*",

2702
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@@ -214,8 +214,8 @@ const echoRoute = createSocketHandlerRoute(
const customRoute = createSocketHandlerRoute(
'custom.example.com',
9999,
async (socket, context) => {
console.log(`Connection from ${context.clientIp}`);
async (socket) => {
console.log(`New connection on custom protocol`);
socket.write('Welcome to my custom protocol!\n');
socket.on('data', (data) => {
@@ -261,8 +261,7 @@ const proxy = new SmartProxy({
{
ports: 443,
certificate: 'auto',
preserveSourceIP: true, // Backend sees real client IP
maxRate: '1gbps' // QoS rate limiting
preserveSourceIP: true // Backend sees real client IP
}
)
]
@@ -379,7 +378,9 @@ await proxy.updateRoutes([...newRoutes]);
// Get real-time metrics
const metrics = proxy.getMetrics();
console.log(`Active connections: ${metrics.connections.active()}`);
console.log(`Requests/sec: ${metrics.throughput.requestsPerSecond()}`);
console.log(`Bytes in: ${metrics.totals.bytesIn()}`);
console.log(`Requests/sec: ${metrics.requests.perSecond()}`);
console.log(`Throughput in: ${metrics.throughput.instant().in} bytes/sec`);
// Get detailed statistics from the Rust engine
const stats = await proxy.getStatistics();
@@ -487,8 +488,8 @@ SmartProxy uses a hybrid **Rust + TypeScript** architecture:
- **Rust Engine** handles all networking, TLS, HTTP proxying, connection management, security, and metrics
- **TypeScript** provides the npm API, configuration types, route helpers, validation, and socket handler callbacks
- **IPC** — JSON commands/events over stdin/stdout for seamless cross-language communication
- **Socket Relay** — a Unix domain socket server for routes requiring TypeScript-side handling (socket handlers, dynamic host/port functions)
- **IPC** — The TypeScript wrapper uses [`@push.rocks/smartrust`](https://code.foss.global/push.rocks/smartrust) for type-safe JSON commands/events over stdin/stdout
- **Socket Relay** — A Unix domain socket server for routes requiring TypeScript-side handling (socket handlers, dynamic host/port functions)
## 🎯 Route Configuration Reference
@@ -529,7 +530,7 @@ interface IRouteTarget {
```typescript
interface IRouteTls {
mode: 'passthrough' | 'terminate' | 'terminate-and-reencrypt';
certificate: 'auto' | {
certificate?: 'auto' | {
key: string;
cert: string;
ca?: string;
@@ -543,7 +544,7 @@ interface IRouteTls {
renewBeforeDays?: number;
};
versions?: string[];
ciphers?: string[];
ciphers?: string;
honorCipherOrder?: boolean;
sessionTimeout?: number;
}
@@ -569,10 +570,10 @@ interface IRouteLoadBalancing {
algorithm: 'round-robin' | 'least-connections' | 'ip-hash';
healthCheck?: {
path: string;
interval: number; // ms
timeout: number; // ms
unhealthyThreshold?: number;
healthyThreshold?: number;
interval: number; // ms
timeout: number; // ms
unhealthyThreshold: number;
healthyThreshold: number;
};
}
```
@@ -725,33 +726,38 @@ interface ISmartProxyOptions {
// Behavior
enableDetailedLogging?: boolean; // Verbose connection logging
enableTlsDebugLogging?: boolean; // TLS handshake debug logging
// Rust binary
rustBinaryPath?: string; // Custom path to the Rust binary
}
```
### NfTablesProxy Class
### IMetrics Interface
A standalone class for managing nftables NAT rules directly (Linux only, requires root):
The `getMetrics()` method returns a cached metrics adapter that polls the Rust engine:
```typescript
import { NfTablesProxy } from '@push.rocks/smartproxy';
const metrics = proxy.getMetrics();
const nftProxy = new NfTablesProxy({
fromPorts: [80, 443],
toHost: 'backend-server',
toPorts: [8080, 8443],
protocol: 'tcp',
preserveSourceIP: true,
enableIPv6: true,
maxRate: '1gbps',
useIPSets: true
});
// Connection metrics
metrics.connections.active(); // Current active connections
metrics.connections.total(); // Total connections since start
metrics.connections.byRoute(); // Map<routeName, activeCount>
metrics.connections.byIP(); // Map<ip, activeCount>
metrics.connections.topIPs(10); // Top N IPs by connection count
await nftProxy.start(); // Apply nftables rules
const status = nftProxy.getStatus();
await nftProxy.stop(); // Remove rules
// Throughput (bytes/sec)
metrics.throughput.instant(); // { in: number, out: number }
metrics.throughput.recent(); // Recent average
metrics.throughput.average(); // Overall average
metrics.throughput.byRoute(); // Map<routeName, { in, out }>
// Request rates
metrics.requests.perSecond(); // Requests per second
metrics.requests.perMinute(); // Requests per minute
metrics.requests.total(); // Total requests
// Cumulative totals
metrics.totals.bytesIn(); // Total bytes received
metrics.totals.bytesOut(); // Total bytes sent
metrics.totals.connections(); // Total connections
```
## 🐛 Troubleshooting
@@ -770,13 +776,13 @@ await nftProxy.stop(); // Remove rules
- ✅ Enable debug logging with `enableDetailedLogging: true`
### Rust Binary Not Found
SmartProxy searches for the Rust binary in this order:
1. `SMARTPROXY_RUST_BINARY` environment variable
2. Platform-specific npm package (`@push.rocks/smartproxy-linux-x64`, etc.)
3. Local dev build (`./rust/target/release/rustproxy`)
4. System PATH (`rustproxy`)
Set `rustBinaryPath` in options to override.
3. `dist_rust/rustproxy` relative to the package root (built by `tsrust`)
4. Local dev build (`./rust/target/release/rustproxy`)
5. System PATH (`rustproxy`)
### Performance Tuning
- ✅ Use NFTables forwarding for high-traffic routes (Linux only)

2
rust/.cargo/config.toml Normal file
View File

@@ -0,0 +1,2 @@
[target.aarch64-unknown-linux-gnu]
linker = "aarch64-linux-gnu-gcc"

View File

@@ -146,6 +146,41 @@ pub fn is_tls(data: &[u8]) -> bool {
data.len() >= 3 && data[0] == 0x16 && data[1] == 0x03
}
/// Extract the HTTP request path from initial data.
/// E.g., from "GET /foo/bar HTTP/1.1\r\n..." returns Some("/foo/bar").
pub fn extract_http_path(data: &[u8]) -> Option<String> {
let text = std::str::from_utf8(data).ok()?;
// Find first space (after method)
let method_end = text.find(' ')?;
let rest = &text[method_end + 1..];
// Find end of path (next space before "HTTP/...")
let path_end = rest.find(' ').unwrap_or(rest.len());
let path = &rest[..path_end];
// Strip query string for path matching
let path = path.split('?').next().unwrap_or(path);
if path.starts_with('/') {
Some(path.to_string())
} else {
None
}
}
/// Extract the HTTP Host header from initial data.
/// E.g., from "GET / HTTP/1.1\r\nHost: example.com\r\n..." returns Some("example.com").
pub fn extract_http_host(data: &[u8]) -> Option<String> {
let text = std::str::from_utf8(data).ok()?;
for line in text.split("\r\n") {
if let Some(value) = line.strip_prefix("Host: ").or_else(|| line.strip_prefix("host: ")) {
// Strip port if present
let host = value.split(':').next().unwrap_or(value).trim();
if !host.is_empty() {
return Some(host.to_lowercase());
}
}
}
None
}
/// Check if the initial bytes look like HTTP.
pub fn is_http(data: &[u8]) -> bool {
if data.len() < 4 {

View File

@@ -1,10 +1,12 @@
use std::collections::HashMap;
use std::sync::Arc;
use arc_swap::ArcSwap;
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
use tracing::{info, error, debug, warn};
use thiserror::Error;
use rustproxy_config::RouteActionType;
use rustproxy_routing::RouteManager;
use rustproxy_metrics::MetricsCollector;
use rustproxy_http::HttpProxyService;
@@ -82,8 +84,8 @@ impl Default for ConnectionConfig {
pub struct TcpListenerManager {
/// Active listeners indexed by port
listeners: HashMap<u16, tokio::task::JoinHandle<()>>,
/// Shared route manager
route_manager: Arc<RouteManager>,
/// Shared route manager (ArcSwap for hot-reload visibility in accept loops)
route_manager: Arc<ArcSwap<RouteManager>>,
/// Shared metrics collector
metrics: Arc<MetricsCollector>,
/// TLS acceptors indexed by domain
@@ -96,6 +98,8 @@ pub struct TcpListenerManager {
conn_tracker: Arc<ConnectionTracker>,
/// Cancellation token for graceful shutdown
cancel_token: CancellationToken,
/// Path to Unix domain socket for relaying socket-handler connections to TypeScript.
socket_handler_relay: Arc<std::sync::RwLock<Option<String>>>,
}
impl TcpListenerManager {
@@ -112,13 +116,14 @@ impl TcpListenerManager {
));
Self {
listeners: HashMap::new(),
route_manager,
route_manager: Arc::new(ArcSwap::from(route_manager)),
metrics,
tls_configs: Arc::new(HashMap::new()),
http_proxy,
conn_config: Arc::new(conn_config),
conn_tracker,
cancel_token: CancellationToken::new(),
socket_handler_relay: Arc::new(std::sync::RwLock::new(None)),
}
}
@@ -135,13 +140,14 @@ impl TcpListenerManager {
));
Self {
listeners: HashMap::new(),
route_manager,
route_manager: Arc::new(ArcSwap::from(route_manager)),
metrics,
tls_configs: Arc::new(HashMap::new()),
http_proxy,
conn_config: Arc::new(conn_config),
conn_tracker,
cancel_token: CancellationToken::new(),
socket_handler_relay: Arc::new(std::sync::RwLock::new(None)),
}
}
@@ -159,6 +165,12 @@ impl TcpListenerManager {
self.tls_configs = Arc::new(configs);
}
/// Set the shared socket-handler relay path.
/// This allows RustProxy to share the relay path Arc with the listener.
pub fn set_socket_handler_relay(&mut self, relay: Arc<std::sync::RwLock<Option<String>>>) {
self.socket_handler_relay = relay;
}
/// Start listening on a port.
pub async fn add_port(&mut self, port: u16) -> Result<(), ListenerError> {
if self.listeners.contains_key(&port) {
@@ -172,18 +184,19 @@ impl TcpListenerManager {
info!("Listening on port {}", port);
let route_manager = Arc::clone(&self.route_manager);
let route_manager_swap = Arc::clone(&self.route_manager);
let metrics = Arc::clone(&self.metrics);
let tls_configs = Arc::clone(&self.tls_configs);
let http_proxy = Arc::clone(&self.http_proxy);
let conn_config = Arc::clone(&self.conn_config);
let conn_tracker = Arc::clone(&self.conn_tracker);
let cancel = self.cancel_token.clone();
let relay = Arc::clone(&self.socket_handler_relay);
let handle = tokio::spawn(async move {
Self::accept_loop(
listener, port, route_manager, metrics, tls_configs,
http_proxy, conn_config, conn_tracker, cancel,
listener, port, route_manager_swap, metrics, tls_configs,
http_proxy, conn_config, conn_tracker, cancel, relay,
).await;
});
@@ -255,8 +268,9 @@ impl TcpListenerManager {
}
/// Update the route manager (for hot-reload).
/// Uses ArcSwap so running accept loops immediately see the new routes.
pub fn update_route_manager(&mut self, route_manager: Arc<RouteManager>) {
self.route_manager = route_manager;
self.route_manager.store(route_manager);
}
/// Get a reference to the metrics collector.
@@ -268,13 +282,14 @@ impl TcpListenerManager {
async fn accept_loop(
listener: TcpListener,
port: u16,
route_manager: Arc<RouteManager>,
route_manager_swap: Arc<ArcSwap<RouteManager>>,
metrics: Arc<MetricsCollector>,
tls_configs: Arc<HashMap<String, TlsCertConfig>>,
http_proxy: Arc<HttpProxyService>,
conn_config: Arc<ConnectionConfig>,
conn_tracker: Arc<ConnectionTracker>,
cancel: CancellationToken,
socket_handler_relay: Arc<std::sync::RwLock<Option<String>>>,
) {
loop {
tokio::select! {
@@ -296,18 +311,20 @@ impl TcpListenerManager {
conn_tracker.connection_opened(&ip);
let rm = Arc::clone(&route_manager);
// Load the latest route manager from ArcSwap on each connection
let rm = route_manager_swap.load_full();
let m = Arc::clone(&metrics);
let tc = Arc::clone(&tls_configs);
let hp = Arc::clone(&http_proxy);
let cc = Arc::clone(&conn_config);
let ct = Arc::clone(&conn_tracker);
let cn = cancel.clone();
let sr = Arc::clone(&socket_handler_relay);
debug!("Accepted connection from {} on port {}", peer_addr, port);
tokio::spawn(async move {
let result = Self::handle_connection(
stream, port, peer_addr, rm, m, tc, hp, cc, cn,
stream, port, peer_addr, rm, m, tc, hp, cc, cn, sr,
).await;
if let Err(e) = result {
debug!("Connection error from {}: {}", peer_addr, e);
@@ -336,11 +353,139 @@ impl TcpListenerManager {
http_proxy: Arc<HttpProxyService>,
conn_config: Arc<ConnectionConfig>,
cancel: CancellationToken,
socket_handler_relay: Arc<std::sync::RwLock<Option<String>>>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use tokio::io::AsyncReadExt;
stream.set_nodelay(true)?;
// === Fast path: try port-only matching before peeking at data ===
// This handles "server-speaks-first" protocols where the client
// doesn't send initial data (e.g., SMTP, greeting-based protocols).
// If a route matches by port alone and doesn't need domain/path/TLS info,
// we can forward immediately without waiting for client data.
//
// IMPORTANT: HTTP connections must NOT use this path — they need the HTTP
// proxy for proper error responses, CORS handling, and request-level routing.
// We detect HTTP via a non-blocking peek before committing to raw forwarding.
{
let quick_ctx = rustproxy_routing::MatchContext {
port,
domain: None,
path: None,
client_ip: Some(&peer_addr.ip().to_string()),
tls_version: None,
headers: None,
is_tls: false,
};
if let Some(quick_match) = route_manager.find_route(&quick_ctx) {
let rm = &quick_match.route.route_match;
let has_no_domain = rm.domains.is_none();
let has_no_path = rm.path.is_none();
let is_forward = quick_match.route.action.action_type == RouteActionType::Forward;
let has_no_tls = quick_match.route.action.tls.is_none();
// Only use fast path for simple port-only forward routes with no TLS
if has_no_domain && has_no_path && is_forward && has_no_tls {
// Non-blocking peek: if client has already sent data that looks
// like HTTP, skip fast path and let the normal path handle it
// through the HTTP proxy (for CORS, error responses, path routing).
let is_likely_http = {
let mut probe = [0u8; 16];
// Brief peek: HTTP clients send data immediately after connect.
// Server-speaks-first protocols (SMTP etc.) send nothing initially.
// 10ms is ample for any HTTP client while negligible for
// server-speaks-first protocols (which wait seconds for greeting).
match tokio::time::timeout(
std::time::Duration::from_millis(10),
stream.peek(&mut probe),
).await {
Ok(Ok(n)) if n > 0 => sni_parser::is_http(&probe[..n]),
_ => false,
}
};
if is_likely_http {
debug!("Fast-path skipped: HTTP detected from {}, using HTTP proxy", peer_addr);
// Fall through to normal path for HTTP proxy handling
} else if let Some(target) = quick_match.target {
let target_host = target.host.first().to_string();
let target_port = target.port.resolve(port);
let route_id = quick_match.route.id.as_deref();
// Check route-level IP security
if let Some(ref security) = quick_match.route.security {
if !rustproxy_http::request_filter::RequestFilter::check_ip_security(
security, &peer_addr.ip(),
) {
debug!("Connection from {} blocked by route security", peer_addr);
return Ok(());
}
}
metrics.connection_opened(route_id);
let connect_timeout = std::time::Duration::from_millis(conn_config.connection_timeout_ms);
let inactivity_timeout = std::time::Duration::from_millis(conn_config.socket_timeout_ms);
let max_lifetime = std::time::Duration::from_millis(conn_config.max_connection_lifetime_ms);
debug!(
"Fast-path forward (no peek): {} -> {}:{}",
peer_addr, target_host, target_port
);
let backend = match tokio::time::timeout(
connect_timeout,
tokio::net::TcpStream::connect(format!("{}:{}", target_host, target_port)),
).await {
Ok(Ok(s)) => s,
Ok(Err(e)) => {
metrics.connection_closed(route_id);
return Err(e.into());
}
Err(_) => {
metrics.connection_closed(route_id);
return Err("Backend connection timeout".into());
}
};
backend.set_nodelay(true)?;
// Send PROXY protocol header if configured
let should_send_proxy = conn_config.send_proxy_protocol
|| quick_match.route.action.send_proxy_protocol.unwrap_or(false)
|| target.send_proxy_protocol.unwrap_or(false);
if should_send_proxy {
use tokio::io::AsyncWriteExt;
let dest = std::net::SocketAddr::new(
target_host.parse().unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED)),
target_port,
);
let header = crate::proxy_protocol::generate_v1(&peer_addr, &dest);
let mut backend_w = backend;
backend_w.write_all(header.as_bytes()).await?;
let (bytes_in, bytes_out) = forwarder::forward_bidirectional_with_timeouts(
stream, backend_w, None,
inactivity_timeout, max_lifetime, cancel,
).await?;
metrics.record_bytes(bytes_in, bytes_out, route_id);
} else {
let (bytes_in, bytes_out) = forwarder::forward_bidirectional_with_timeouts(
stream, backend, None,
inactivity_timeout, max_lifetime, cancel,
).await?;
metrics.record_bytes(bytes_in, bytes_out, route_id);
}
metrics.connection_closed(route_id);
return Ok(());
}
}
}
}
// === End fast path ===
// Handle PROXY protocol if configured
let mut effective_peer_addr = peer_addr;
if conn_config.accept_proxy_protocol {
@@ -412,11 +557,24 @@ impl TcpListenerManager {
None
};
// Extract HTTP path and host from initial data for route matching
let http_path = if is_http {
sni_parser::extract_http_path(initial_data)
} else {
None
};
let http_host = if is_http && domain.is_none() {
sni_parser::extract_http_host(initial_data)
} else {
None
};
let effective_domain = domain.as_deref().or(http_host.as_deref());
// Match route
let ctx = rustproxy_routing::MatchContext {
port,
domain: domain.as_deref(),
path: None,
domain: effective_domain,
path: http_path.as_deref(),
client_ip: Some(&peer_addr.ip().to_string()),
tls_version: None,
headers: None,
@@ -429,6 +587,17 @@ impl TcpListenerManager {
Some(rm) => rm,
None => {
debug!("No route matched for port {} domain {:?}", port, domain);
if is_http {
// Send a proper HTTP error instead of dropping the connection
use tokio::io::AsyncWriteExt;
let body = "No route matched";
let resp = format!(
"HTTP/1.1 502 Bad Gateway\r\nContent-Type: text/plain\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
body.len(), body
);
let _ = stream.write_all(resp.as_bytes()).await;
let _ = stream.shutdown().await;
}
return Ok(());
}
};
@@ -449,6 +618,28 @@ impl TcpListenerManager {
// Track connection in metrics
metrics.connection_opened(route_id);
// Check if this is a socket-handler route that should be relayed to TypeScript
if route_match.route.action.action_type == RouteActionType::SocketHandler {
let relay_path = {
let guard = socket_handler_relay.read().unwrap();
guard.clone()
};
if let Some(relay_socket_path) = relay_path {
let result = Self::relay_to_socket_handler(
stream, n, port, peer_addr,
&route_match, domain.as_deref(), is_tls,
&relay_socket_path,
).await;
metrics.connection_closed(route_id);
return result;
} else {
debug!("Socket-handler route matched but no relay path configured");
metrics.connection_closed(route_id);
return Ok(());
}
}
let target = match route_match.target {
Some(t) => t,
None => {
@@ -654,6 +845,75 @@ impl TcpListenerManager {
result
}
/// Relay a connection to the TypeScript socket-handler via Unix domain socket.
///
/// Protocol:
/// 1. Connect to the Unix socket at `relay_path`
/// 2. Send a JSON metadata line (terminated by \n)
/// 3. Forward the initial peeked bytes
/// 4. Bidirectional relay between the TCP stream and Unix socket
async fn relay_to_socket_handler(
mut stream: tokio::net::TcpStream,
peek_len: usize,
port: u16,
peer_addr: std::net::SocketAddr,
route_match: &rustproxy_routing::RouteMatchResult<'_>,
domain: Option<&str>,
is_tls: bool,
relay_path: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::UnixStream;
// Connect to the TypeScript socket handler server
let mut unix_stream = match UnixStream::connect(relay_path).await {
Ok(s) => s,
Err(e) => {
error!("Failed to connect to socket handler relay at {}: {}", relay_path, e);
return Err(e.into());
}
};
// Build metadata JSON
let route_key = route_match.route.name.as_deref()
.or(route_match.route.id.as_deref())
.unwrap_or("unknown");
let metadata = serde_json::json!({
"routeKey": route_key,
"remoteIP": peer_addr.ip().to_string(),
"remotePort": peer_addr.port(),
"localPort": port,
"isTLS": is_tls,
"domain": domain,
});
// Send metadata line (JSON + newline)
let mut metadata_line = serde_json::to_string(&metadata)?;
metadata_line.push('\n');
unix_stream.write_all(metadata_line.as_bytes()).await?;
// Read the initial peeked data from the TCP stream (peek doesn't consume)
let mut initial_buf = vec![0u8; peek_len];
stream.read_exact(&mut initial_buf).await?;
// Forward initial data to the Unix socket
unix_stream.write_all(&initial_buf).await?;
// Bidirectional relay between TCP client and Unix socket handler
match tokio::io::copy_bidirectional(&mut stream, &mut unix_stream).await {
Ok((c2s, s2c)) => {
debug!("Socket handler relay complete for {}: {} bytes in, {} bytes out",
route_key, c2s, s2c);
}
Err(e) => {
debug!("Socket handler relay ended for {}: {}", route_key, e);
}
}
Ok(())
}
/// Handle TLS terminate-and-reencrypt: accept TLS from client, connect TLS to backend.
async fn handle_tls_terminate_reencrypt(
stream: tokio::net::TcpStream,

View File

@@ -74,8 +74,8 @@ pub struct RustProxy {
nft_manager: Option<NftManager>,
started: bool,
started_at: Option<Instant>,
/// Path to a Unix domain socket for relaying socket-handler connections back to TypeScript.
socket_handler_relay_path: Option<String>,
/// Shared path to a Unix domain socket for relaying socket-handler connections back to TypeScript.
socket_handler_relay: Arc<std::sync::RwLock<Option<String>>>,
}
impl RustProxy {
@@ -111,7 +111,7 @@ impl RustProxy {
nft_manager: None,
started: false,
started_at: None,
socket_handler_relay_path: None,
socket_handler_relay: Arc::new(std::sync::RwLock::new(None)),
})
}
@@ -259,6 +259,9 @@ impl RustProxy {
);
listener.set_connection_config(conn_config);
// Share the socket-handler relay path with the listener
listener.set_socket_handler_relay(Arc::clone(&self.socket_handler_relay));
// Extract TLS configurations from routes and cert manager
let mut tls_configs = Self::extract_tls_configs(&self.options.routes);
@@ -729,14 +732,16 @@ impl RustProxy {
}
/// Set the Unix domain socket path for relaying socket-handler connections to TypeScript.
/// The path is shared with the TcpListenerManager via Arc<RwLock>, so updates
/// take effect immediately for all new connections.
pub fn set_socket_handler_relay_path(&mut self, path: Option<String>) {
info!("Socket handler relay path set to: {:?}", path);
self.socket_handler_relay_path = path;
*self.socket_handler_relay.write().unwrap() = path;
}
/// Get the current socket handler relay path.
pub fn get_socket_handler_relay_path(&self) -> Option<&str> {
self.socket_handler_relay_path.as_deref()
pub fn get_socket_handler_relay_path(&self) -> Option<String> {
self.socket_handler_relay.read().unwrap().clone()
}
/// Load a certificate for a domain and hot-swap the TLS configuration.

View File

@@ -29,6 +29,11 @@ struct Cli {
#[tokio::main]
async fn main() -> Result<()> {
// Install the default CryptoProvider early, before any TLS or ACME code runs.
// This prevents panics from instant-acme/hyper-rustls calling ClientConfig::builder()
// before TLS listeners have started. Idempotent — later calls harmlessly return Err.
let _ = rustls::crypto::ring::default_provider().install_default();
let cli = Cli::parse();
// Initialize tracing - write to stderr so stdout is reserved for management IPC

View File

@@ -1,13 +1,37 @@
import { tap, expect } from '@git.zone/tstest/tapbundle';
import * as plugins from '../ts/plugins.js';
import * as http from 'http';
import { SmartProxy, SocketHandlers } from '../ts/index.js';
/**
* Helper to make HTTP requests using Node's http module (unlike fetch/undici,
* http.request doesn't keep the event loop alive via a connection pool).
*/
function httpRequest(url: string, options: { method?: string; headers?: Record<string, string> } = {}): Promise<{ status: number; headers: http.IncomingHttpHeaders; body: string }> {
return new Promise((resolve, reject) => {
const parsed = new URL(url);
const req = http.request({
hostname: parsed.hostname,
port: parsed.port,
path: parsed.pathname + parsed.search,
method: options.method || 'GET',
headers: options.headers,
}, (res) => {
let body = '';
res.on('data', (chunk: Buffer) => { body += chunk.toString(); });
res.on('end', () => resolve({ status: res.statusCode!, headers: res.headers, body }));
});
req.on('error', reject);
req.end();
});
}
tap.test('should handle HTTP requests on port 80 for ACME challenges', async (tools) => {
tools.timeout(10000);
// Track HTTP requests that are handled
const handledRequests: any[] = [];
const settings = {
routes: [
{
@@ -24,7 +48,7 @@ tap.test('should handle HTTP requests on port 80 for ACME challenges', async (to
method: req.method,
headers: req.headers
});
// Simulate ACME challenge response
const token = req.url?.split('/').pop() || '';
res.header('Content-Type', 'text/plain');
@@ -34,40 +58,31 @@ tap.test('should handle HTTP requests on port 80 for ACME challenges', async (to
}
]
};
const proxy = new SmartProxy(settings);
// Mock NFTables manager
(proxy as any).nftablesManager = {
ensureNFTablesSetup: async () => {},
stop: async () => {}
};
await proxy.start();
// Make an HTTP request to the challenge endpoint
const response = await fetch('http://localhost:18080/.well-known/acme-challenge/test-token', {
method: 'GET'
});
const response = await httpRequest('http://localhost:18080/.well-known/acme-challenge/test-token');
// Verify response
expect(response.status).toEqual(200);
const body = await response.text();
expect(body).toEqual('challenge-response-for-test-token');
expect(response.body).toEqual('challenge-response-for-test-token');
// Verify request was handled
expect(handledRequests.length).toEqual(1);
expect(handledRequests[0].path).toEqual('/.well-known/acme-challenge/test-token');
expect(handledRequests[0].method).toEqual('GET');
await proxy.stop();
});
tap.test('should parse HTTP headers correctly', async (tools) => {
tools.timeout(10000);
const capturedContext: any = {};
const settings = {
routes: [
{
@@ -92,36 +107,30 @@ tap.test('should parse HTTP headers correctly', async (tools) => {
}
]
};
const proxy = new SmartProxy(settings);
// Mock NFTables manager
(proxy as any).nftablesManager = {
ensureNFTablesSetup: async () => {},
stop: async () => {}
};
await proxy.start();
// Make request with custom headers
const response = await fetch('http://localhost:18081/test', {
const response = await httpRequest('http://localhost:18081/test', {
method: 'POST',
headers: {
'X-Custom-Header': 'test-value',
'User-Agent': 'test-agent'
}
});
expect(response.status).toEqual(200);
const body = await response.json();
const body = JSON.parse(response.body);
// Verify headers were parsed correctly
expect(capturedContext.headers['x-custom-header']).toEqual('test-value');
expect(capturedContext.headers['user-agent']).toEqual('test-agent');
expect(capturedContext.method).toEqual('POST');
expect(capturedContext.path).toEqual('/test');
await proxy.stop();
});
export default tap.start();
export default tap.start();

123
test/test.bun.ts Normal file
View File

@@ -0,0 +1,123 @@
import { tap, expect } from '@git.zone/tstest/tapbundle';
import {
createHttpsTerminateRoute,
createCompleteHttpsServer,
createHttpRoute,
} from '../ts/proxies/smart-proxy/utils/route-helpers.js';
import {
mergeRouteConfigs,
cloneRoute,
routeMatchesPath,
} from '../ts/proxies/smart-proxy/utils/route-utils.js';
import {
validateRoutes,
validateRouteConfig,
} from '../ts/proxies/smart-proxy/utils/route-validator.js';
import type { IRouteConfig } from '../ts/proxies/smart-proxy/models/route-types.js';
tap.test('route creation - createHttpsTerminateRoute produces correct structure', async () => {
const route = createHttpsTerminateRoute('secure.example.com', { host: '127.0.0.1', port: 8443 });
expect(route).toHaveProperty('match');
expect(route).toHaveProperty('action');
expect(route.action.type).toEqual('forward');
expect(route.action.tls).toBeDefined();
expect(route.action.tls!.mode).toEqual('terminate');
expect(route.match.domains).toEqual('secure.example.com');
});
tap.test('route creation - createCompleteHttpsServer returns redirect and main route', async () => {
const routes = createCompleteHttpsServer('app.example.com', { host: '127.0.0.1', port: 3000 });
expect(routes).toBeArray();
expect(routes.length).toBeGreaterThanOrEqual(2);
// Should have an HTTP→HTTPS redirect and an HTTPS route
const hasRedirect = routes.some((r) => r.action.type === 'forward' && r.action.redirect !== undefined);
const hasHttps = routes.some((r) => r.action.tls?.mode === 'terminate');
expect(hasRedirect || hasHttps).toBeTrue();
});
tap.test('route validation - validateRoutes on a set of routes', async () => {
const routes: IRouteConfig[] = [
createHttpRoute('a.com', { host: '127.0.0.1', port: 3000 }),
createHttpRoute('b.com', { host: '127.0.0.1', port: 4000 }),
];
const result = validateRoutes(routes);
expect(result.valid).toBeTrue();
expect(result.errors).toHaveLength(0);
});
tap.test('route validation - validateRoutes catches invalid route in set', async () => {
const routes: any[] = [
createHttpRoute('valid.com', { host: '127.0.0.1', port: 3000 }),
{ match: { ports: 80 } }, // missing action
];
const result = validateRoutes(routes);
expect(result.valid).toBeFalse();
expect(result.errors.length).toBeGreaterThan(0);
});
tap.test('path matching - routeMatchesPath with exact path', async () => {
const route = createHttpRoute('example.com', { host: '127.0.0.1', port: 3000 });
route.match.path = '/api';
expect(routeMatchesPath(route, '/api')).toBeTrue();
expect(routeMatchesPath(route, '/other')).toBeFalse();
});
tap.test('path matching - route without path matches everything', async () => {
const route = createHttpRoute('example.com', { host: '127.0.0.1', port: 3000 });
// No path set, should match any path
expect(routeMatchesPath(route, '/anything')).toBeTrue();
expect(routeMatchesPath(route, '/')).toBeTrue();
});
tap.test('route merging - mergeRouteConfigs combines routes', async () => {
const base = createHttpRoute('example.com', { host: '127.0.0.1', port: 3000 });
base.priority = 10;
base.name = 'base-route';
const merged = mergeRouteConfigs(base, {
priority: 50,
name: 'merged-route',
});
expect(merged.priority).toEqual(50);
expect(merged.name).toEqual('merged-route');
// Original route fields should be preserved
expect(merged.match.domains).toEqual('example.com');
expect(merged.action.targets![0].host).toEqual('127.0.0.1');
});
tap.test('route merging - mergeRouteConfigs does not mutate original', async () => {
const base = createHttpRoute('example.com', { host: '127.0.0.1', port: 3000 });
base.name = 'original';
const merged = mergeRouteConfigs(base, { name: 'changed' });
expect(base.name).toEqual('original');
expect(merged.name).toEqual('changed');
});
tap.test('route cloning - cloneRoute produces independent copy', async () => {
const original = createHttpRoute('example.com', { host: '127.0.0.1', port: 3000 });
original.priority = 42;
original.name = 'original-route';
const cloned = cloneRoute(original);
// Should be equal in value
expect(cloned.match.domains).toEqual('example.com');
expect(cloned.priority).toEqual(42);
expect(cloned.name).toEqual('original-route');
expect(cloned.action.targets![0].host).toEqual('127.0.0.1');
expect(cloned.action.targets![0].port).toEqual(3000);
// Should be independent - modifying clone shouldn't affect original
cloned.name = 'cloned-route';
cloned.priority = 99;
expect(original.name).toEqual('original-route');
expect(original.priority).toEqual(42);
});
export default tap.start();

View File

@@ -84,17 +84,15 @@ tap.test('should forward TCP connections correctly', async () => {
socket.on('error', reject);
});
// Test data transmission
// Test data transmission - wait for welcome message first
await new Promise<void>((resolve) => {
client.on('data', (data) => {
client.once('data', (data) => {
const response = data.toString();
console.log('Received:', response);
expect(response).toContain('Connected to TCP test server');
client.end();
resolve();
});
client.write('Hello from client');
});
await smartProxy.stop();
@@ -146,15 +144,13 @@ tap.test('should handle TLS passthrough correctly', async () => {
// Test data transmission over TLS
await new Promise<void>((resolve) => {
client.on('data', (data) => {
client.once('data', (data) => {
const response = data.toString();
console.log('TLS Received:', response);
expect(response).toContain('Connected to TLS test server');
client.end();
resolve();
});
client.write('Hello from TLS client');
});
await smartProxy.stop();
@@ -222,15 +218,13 @@ tap.test('should handle SNI-based forwarding', async () => {
});
await new Promise<void>((resolve) => {
clientA.on('data', (data) => {
clientA.once('data', (data) => {
const response = data.toString();
console.log('Domain A response:', response);
expect(response).toContain('Connected to TLS test server');
clientA.end();
resolve();
});
clientA.write('Hello from domain A');
});
// Test domain B should also use TLS since it's on port 8443
@@ -251,7 +245,7 @@ tap.test('should handle SNI-based forwarding', async () => {
});
await new Promise<void>((resolve) => {
clientB.on('data', (data) => {
clientB.once('data', (data) => {
const response = data.toString();
console.log('Domain B response:', response);
// Should be forwarded to TLS server
@@ -259,8 +253,6 @@ tap.test('should handle SNI-based forwarding', async () => {
clientB.end();
resolve();
});
clientB.write('Hello from domain B');
});
await smartProxy.stop();

111
test/test.deno.ts Normal file
View File

@@ -0,0 +1,111 @@
import { tap, expect } from '@git.zone/tstest/tapbundle';
import {
createHttpRoute,
createHttpsTerminateRoute,
createLoadBalancerRoute,
} from '../ts/proxies/smart-proxy/utils/route-helpers.js';
import {
findMatchingRoutes,
findBestMatchingRoute,
routeMatchesDomain,
routeMatchesPort,
routeMatchesPath,
} from '../ts/proxies/smart-proxy/utils/route-utils.js';
import {
validateRouteConfig,
isValidDomain,
isValidPort,
} from '../ts/proxies/smart-proxy/utils/route-validator.js';
import type { IRouteConfig } from '../ts/proxies/smart-proxy/models/route-types.js';
tap.test('route creation - createHttpRoute produces correct structure', async () => {
const route = createHttpRoute('example.com', { host: '127.0.0.1', port: 3000 });
expect(route).toHaveProperty('match');
expect(route).toHaveProperty('action');
expect(route.match.domains).toEqual('example.com');
expect(route.action.type).toEqual('forward');
expect(route.action.targets).toBeArray();
expect(route.action.targets![0].host).toEqual('127.0.0.1');
expect(route.action.targets![0].port).toEqual(3000);
});
tap.test('route creation - createHttpRoute with array of domains', async () => {
const route = createHttpRoute(['a.com', 'b.com'], { host: 'localhost', port: 8080 });
expect(route.match.domains).toEqual(['a.com', 'b.com']);
});
tap.test('route validation - validateRouteConfig accepts valid route', async () => {
const route = createHttpRoute('valid.example.com', { host: '10.0.0.1', port: 8080 });
const result = validateRouteConfig(route);
expect(result.valid).toBeTrue();
expect(result.errors).toHaveLength(0);
});
tap.test('route validation - validateRouteConfig rejects missing action', async () => {
const badRoute = { match: { ports: 80 } } as any;
const result = validateRouteConfig(badRoute);
expect(result.valid).toBeFalse();
expect(result.errors.length).toBeGreaterThan(0);
});
tap.test('route validation - isValidDomain checks correctly', async () => {
expect(isValidDomain('example.com')).toBeTrue();
expect(isValidDomain('*.example.com')).toBeTrue();
expect(isValidDomain('')).toBeFalse();
});
tap.test('route validation - isValidPort checks correctly', async () => {
expect(isValidPort(80)).toBeTrue();
expect(isValidPort(443)).toBeTrue();
expect(isValidPort(0)).toBeFalse();
expect(isValidPort(70000)).toBeFalse();
expect(isValidPort(-1)).toBeFalse();
});
tap.test('domain matching - exact domain', async () => {
const route = createHttpRoute('example.com', { host: '127.0.0.1', port: 3000 });
expect(routeMatchesDomain(route, 'example.com')).toBeTrue();
expect(routeMatchesDomain(route, 'other.com')).toBeFalse();
});
tap.test('domain matching - wildcard domain', async () => {
const route = createHttpRoute('*.example.com', { host: '127.0.0.1', port: 3000 });
expect(routeMatchesDomain(route, 'sub.example.com')).toBeTrue();
expect(routeMatchesDomain(route, 'example.com')).toBeFalse();
});
tap.test('port matching - single port', async () => {
const route = createHttpRoute('example.com', { host: '127.0.0.1', port: 3000 });
// createHttpRoute defaults to port 80
expect(routeMatchesPort(route, 80)).toBeTrue();
expect(routeMatchesPort(route, 443)).toBeFalse();
});
tap.test('route finding - findBestMatchingRoute selects by priority', async () => {
const lowPriority = createHttpRoute('example.com', { host: '127.0.0.1', port: 3000 });
lowPriority.priority = 10;
const highPriority = createHttpRoute('example.com', { host: '127.0.0.1', port: 4000 });
highPriority.priority = 100;
const routes: IRouteConfig[] = [lowPriority, highPriority];
const best = findBestMatchingRoute(routes, { domain: 'example.com', port: 80 });
expect(best).toBeDefined();
expect(best!.priority).toEqual(100);
expect(best!.action.targets![0].port).toEqual(4000);
});
tap.test('route finding - findMatchingRoutes returns all matches', async () => {
const route1 = createHttpRoute('example.com', { host: '127.0.0.1', port: 3000 });
const route2 = createHttpRoute('example.com', { host: '127.0.0.1', port: 4000 });
const route3 = createHttpRoute('other.com', { host: '127.0.0.1', port: 5000 });
const matches = findMatchingRoutes([route1, route2, route3], { domain: 'example.com', port: 80 });
expect(matches).toHaveLength(2);
});
export default tap.start();

View File

@@ -15,7 +15,7 @@ tap.test('should create echo server for testing', async () => {
socket.write(data); // Echo back the data
});
});
await new Promise<void>((resolve) => {
echoServer.listen(echoServerPort, () => {
console.log(`Echo server listening on port ${echoServerPort}`);
@@ -27,55 +27,48 @@ tap.test('should create echo server for testing', async () => {
tap.test('should create SmartProxy instance with new metrics', async () => {
smartProxyInstance = new SmartProxy({
routes: [{
id: 'test-route', // id is needed for per-route metrics tracking in Rust
name: 'test-route',
match: {
ports: [proxyPort],
domains: '*'
ports: [proxyPort]
// No domains — port-only route uses fast-path (no data peeking)
},
action: {
type: 'forward',
targets: [{
host: 'localhost',
port: echoServerPort
}],
tls: {
mode: 'passthrough'
}
}]
// No TLS — plain TCP forwarding
}
}],
defaults: {
target: {
host: 'localhost',
port: echoServerPort
}
},
metrics: {
enabled: true,
sampleIntervalMs: 100, // Sample every 100ms for faster testing
retentionSeconds: 60
}
});
await smartProxyInstance.start();
});
tap.test('should verify new metrics API structure', async () => {
const metrics = smartProxyInstance.getMetrics();
// Check API structure
expect(metrics).toHaveProperty('connections');
expect(metrics).toHaveProperty('throughput');
expect(metrics).toHaveProperty('requests');
expect(metrics).toHaveProperty('totals');
expect(metrics).toHaveProperty('percentiles');
// Check connections methods
expect(metrics.connections).toHaveProperty('active');
expect(metrics.connections).toHaveProperty('total');
expect(metrics.connections).toHaveProperty('byRoute');
expect(metrics.connections).toHaveProperty('byIP');
expect(metrics.connections).toHaveProperty('topIPs');
// Check throughput methods
expect(metrics.throughput).toHaveProperty('instant');
expect(metrics.throughput).toHaveProperty('recent');
@@ -86,86 +79,103 @@ tap.test('should verify new metrics API structure', async () => {
expect(metrics.throughput).toHaveProperty('byIP');
});
tap.test('should track throughput correctly', async (tools) => {
tap.test('should track active connections', async (tools) => {
const metrics = smartProxyInstance.getMetrics();
// Initial state - no connections yet
// Initial state - no connections
expect(metrics.connections.active()).toEqual(0);
expect(metrics.throughput.instant()).toEqual({ in: 0, out: 0 });
// Create a test connection
const client = new net.Socket();
await new Promise<void>((resolve, reject) => {
client.connect(proxyPort, 'localhost', () => {
console.log('Connected to proxy');
resolve();
});
client.on('error', reject);
});
// Send some data
// Send some data and wait for echo
const testData = Buffer.from('Hello, World!'.repeat(100)); // ~1.3KB
await new Promise<void>((resolve) => {
client.write(testData, () => {
console.log('Data sent');
resolve();
});
client.write(testData, () => resolve());
});
// Wait for echo response
await new Promise<void>((resolve) => {
client.once('data', (data) => {
console.log(`Received ${data.length} bytes back`);
resolve();
});
});
// Wait for metrics to be sampled
await tools.delayFor(200);
// Check metrics
// Wait for metrics to be polled
await tools.delayFor(500);
// Active connection count should be 1
expect(metrics.connections.active()).toEqual(1);
expect(metrics.requests.total()).toBeGreaterThan(0);
// Check throughput - should show bytes transferred
const instant = metrics.throughput.instant();
console.log('Instant throughput:', instant);
// Should have recorded some throughput
expect(instant.in).toBeGreaterThan(0);
expect(instant.out).toBeGreaterThan(0);
// Check totals
expect(metrics.totals.bytesIn()).toBeGreaterThan(0);
expect(metrics.totals.bytesOut()).toBeGreaterThan(0);
// Clean up
// Total connections should be tracked
expect(metrics.connections.total()).toBeGreaterThan(0);
// Per-route tracking should show the connection
const byRoute = metrics.connections.byRoute();
console.log('Connections by route:', Array.from(byRoute.entries()));
expect(byRoute.get('test-route')).toEqual(1);
// Clean up - close the connection
client.destroy();
// Wait for connection cleanup with retry
for (let i = 0; i < 10; i++) {
// Wait for connection cleanup
for (let i = 0; i < 20; i++) {
await tools.delayFor(100);
if (metrics.connections.active() === 0) break;
}
// Verify connection was cleaned up
expect(metrics.connections.active()).toEqual(0);
});
tap.test('should track multiple connections and routes', async (tools) => {
tap.test('should track bytes after connection closes', async (tools) => {
const metrics = smartProxyInstance.getMetrics();
// Ensure we start with 0 connections
const initialActive = metrics.connections.active();
if (initialActive > 0) {
console.log(`Warning: Starting with ${initialActive} active connections, waiting for cleanup...`);
for (let i = 0; i < 10; i++) {
await tools.delayFor(100);
if (metrics.connections.active() === 0) break;
}
// Create a connection, send data, then close it
const client = new net.Socket();
await new Promise<void>((resolve, reject) => {
client.connect(proxyPort, 'localhost', () => resolve());
client.on('error', reject);
});
// Send some data
const testData = Buffer.from('Hello, World!'.repeat(100)); // ~1.3KB
await new Promise<void>((resolve) => {
client.write(testData, () => resolve());
});
// Wait for echo
await new Promise<void>((resolve) => {
client.once('data', () => resolve());
});
// Close the connection — Rust records bytes on connection close
client.destroy();
// Wait for connection to fully close and metrics to poll
for (let i = 0; i < 20; i++) {
await tools.delayFor(100);
if (metrics.connections.active() === 0 && metrics.totals.bytesIn() > 0) break;
}
// Now bytes should be recorded
console.log('Total bytes in:', metrics.totals.bytesIn());
console.log('Total bytes out:', metrics.totals.bytesOut());
expect(metrics.totals.bytesIn()).toBeGreaterThan(0);
expect(metrics.totals.bytesOut()).toBeGreaterThan(0);
});
tap.test('should track multiple connections', async (tools) => {
const metrics = smartProxyInstance.getMetrics();
// Ensure we start with 0 active connections
for (let i = 0; i < 20; i++) {
await tools.delayFor(100);
if (metrics.connections.active() === 0) break;
}
// Create multiple connections
@@ -174,100 +184,79 @@ tap.test('should track multiple connections and routes', async (tools) => {
for (let i = 0; i < connectionCount; i++) {
const client = new net.Socket();
await new Promise<void>((resolve, reject) => {
client.connect(proxyPort, 'localhost', () => {
resolve();
});
client.connect(proxyPort, 'localhost', () => resolve());
client.on('error', reject);
});
clients.push(client);
}
// Allow connections to be fully established and tracked
await tools.delayFor(100);
// Allow connections to be fully established and metrics polled
await tools.delayFor(500);
// Verify active connections
console.log('Active connections:', metrics.connections.active());
expect(metrics.connections.active()).toEqual(connectionCount);
// Send data on each connection
const dataPromises = clients.map((client, index) => {
return new Promise<void>((resolve) => {
const data = Buffer.from(`Connection ${index}: `.repeat(50));
client.write(data, () => {
client.once('data', () => resolve());
});
});
});
await Promise.all(dataPromises);
await tools.delayFor(200);
// Check metrics by route
// Per-route should track all connections
const routeConnections = metrics.connections.byRoute();
console.log('Connections by route:', Array.from(routeConnections.entries()));
expect(routeConnections.get('test-route')).toEqual(connectionCount);
// Check top IPs
const topIPs = metrics.connections.topIPs(5);
console.log('Top IPs:', topIPs);
expect(topIPs.length).toBeGreaterThan(0);
expect(topIPs[0].count).toEqual(connectionCount);
// Clean up all connections
clients.forEach(client => client.destroy());
await tools.delayFor(100);
for (let i = 0; i < 20; i++) {
await tools.delayFor(100);
if (metrics.connections.active() === 0) break;
}
expect(metrics.connections.active()).toEqual(0);
});
tap.test('should provide throughput history', async (tools) => {
tap.test('should provide throughput data', async (tools) => {
const metrics = smartProxyInstance.getMetrics();
// Create a connection and send data periodically
const client = new net.Socket();
await new Promise<void>((resolve, reject) => {
client.connect(proxyPort, 'localhost', () => resolve());
client.on('error', reject);
});
// Send data every 100ms for 1 second
for (let i = 0; i < 10; i++) {
const data = Buffer.from(`Packet ${i}: `.repeat(100));
client.write(data);
await tools.delayFor(100);
}
// Get throughput history
const history = metrics.throughput.history(2); // Last 2 seconds
console.log('Throughput history entries:', history.length);
console.log('Sample history entry:', history[0]);
expect(history.length).toBeGreaterThan(0);
expect(history[0]).toHaveProperty('timestamp');
expect(history[0]).toHaveProperty('in');
expect(history[0]).toHaveProperty('out');
// Verify different time windows show different rates
// Close connection so bytes are recorded
client.destroy();
// Wait for metrics to update
for (let i = 0; i < 20; i++) {
await tools.delayFor(100);
if (metrics.totals.bytesIn() > 0) break;
}
// Verify different time windows are available (all return same data from Rust for now)
const instant = metrics.throughput.instant();
const recent = metrics.throughput.recent();
const average = metrics.throughput.average();
console.log('Throughput windows:');
console.log(' Instant (1s):', instant);
console.log(' Recent (10s):', recent);
console.log(' Average (60s):', average);
// Clean up
client.destroy();
// Total bytes should have accumulated
expect(metrics.totals.bytesIn()).toBeGreaterThan(0);
expect(metrics.totals.bytesOut()).toBeGreaterThan(0);
});
tap.test('should clean up resources', async () => {
await smartProxyInstance.stop();
await new Promise<void>((resolve) => {
echoServer.close(() => {
console.log('Echo server closed');
@@ -276,4 +265,4 @@ tap.test('should clean up resources', async () => {
});
});
export default tap.start();
export default tap.start();

View File

@@ -200,10 +200,11 @@ tap.test('route security with block list should work', async () => {
client.connect(9993, '127.0.0.1');
});
// Should connect then be immediately closed by security
// Connection should be blocked by security - either closed or error
expect(events).toContain('connected');
expect(events).toContain('closed');
expect(result).toEqual('closed');
// Rust drops the stream immediately; client may see 'closed', 'error', or both
const wasBlocked = result === 'closed' || result === 'error';
expect(wasBlocked).toEqual(true);
expect(targetServerConnections).toEqual(0);
// Clean up

View File

@@ -13,49 +13,26 @@ tap.test('route security should be correctly configured', async () => {
targets: [{
host: '127.0.0.1',
port: 8991
}],
security: {
ipAllowList: ['192.168.1.1'],
ipBlockList: ['10.0.0.1']
}
}]
},
security: {
ipAllowList: ['192.168.1.1'],
ipBlockList: ['10.0.0.1']
}
}];
// This should not throw an error
const proxy = new smartproxy.SmartProxy({
enableDetailedLogging: false,
routes: routes
});
// The proxy should be created successfully
expect(proxy).toBeInstanceOf(smartproxy.SmartProxy);
// Test that security manager exists and has the isIPAuthorized method
const securityManager = (proxy as any).securityManager;
expect(securityManager).toBeDefined();
expect(typeof securityManager.isIPAuthorized).toEqual('function');
// Test IP authorization logic directly
const isLocalhostAllowed = securityManager.isIPAuthorized(
'127.0.0.1',
['192.168.1.1'], // Allow list
[] // Block list
);
expect(isLocalhostAllowed).toBeFalse();
const isAllowedIPAllowed = securityManager.isIPAuthorized(
'192.168.1.1',
['192.168.1.1'], // Allow list
[] // Block list
);
expect(isAllowedIPAllowed).toBeTrue();
const isBlockedIPAllowed = securityManager.isIPAuthorized(
'10.0.0.1',
['0.0.0.0/0'], // Allow all
['10.0.0.1'] // But block this specific IP
);
expect(isBlockedIPAllowed).toBeFalse();
// Verify route configuration was preserved
expect(proxy.settings.routes[0].security?.ipAllowList).toContain('192.168.1.1');
expect(proxy.settings.routes[0].security?.ipBlockList).toContain('10.0.0.1');
});
export default tap.start();
export default tap.start();

View File

@@ -94,7 +94,7 @@ tap.test('setup port proxy test environment', async () => {
tap.test('should start port proxy', async () => {
await smartProxy.start();
// Check if the proxy is listening by verifying the ports are active
expect(smartProxy.getListeningPorts().length).toBeGreaterThan(0);
expect((await smartProxy.getListeningPorts()).length).toBeGreaterThan(0);
});
// Test basic TCP forwarding.
@@ -237,7 +237,7 @@ tap.test('should handle connection timeouts', async () => {
tap.test('should stop port proxy', async () => {
await smartProxy.stop();
// Verify that there are no listening ports after stopping
expect(smartProxy.getListeningPorts().length).toEqual(0);
expect((await smartProxy.getListeningPorts()).length).toEqual(0);
// Remove from tracking
const index = allProxies.indexOf(smartProxy);

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartproxy',
version: '22.5.0',
version: '23.1.3',
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
}

View File

@@ -1,4 +1,4 @@
import * as net from 'net';
import * as net from 'node:net';
import { WrappedSocket } from './wrapped-socket.js';
/**

View File

@@ -1,7 +1,7 @@
import { LifecycleComponent } from './lifecycle-component.js';
import { BinaryHeap } from './binary-heap.js';
import { AsyncMutex } from './async-utils.js';
import { EventEmitter } from 'events';
import { EventEmitter } from 'node:events';
/**
* Interface for pooled connection

View File

@@ -3,7 +3,7 @@
* Provides standardized socket cleanup with proper listener and timer management
*/
import type { Socket } from 'net';
import type { Socket } from 'node:net';
export type SocketTracked = {
cleanup: () => void;

View File

@@ -2,9 +2,6 @@
* SmartProxy main module exports
*/
// NFTables proxy exports
export * from './proxies/nftables-proxy/index.js';
// Export SmartProxy elements
export { SmartProxy } from './proxies/smart-proxy/index.js';
export { SharedRouteManager as RouteManager } from './core/routing/route-manager.js';

View File

@@ -1,13 +1,13 @@
// node native scope
import { EventEmitter } from 'events';
import * as fs from 'fs';
import * as http from 'http';
import * as https from 'https';
import * as net from 'net';
import * as path from 'path';
import * as tls from 'tls';
import * as url from 'url';
import * as http2 from 'http2';
import { EventEmitter } from 'node:events';
import * as fs from 'node:fs';
import * as http from 'node:http';
import * as https from 'node:https';
import * as net from 'node:net';
import * as path from 'node:path';
import * as tls from 'node:tls';
import * as url from 'node:url';
import * as http2 from 'node:http2';
export { EventEmitter, fs, http, https, net, path, tls, url, http2 };
@@ -31,6 +31,7 @@ import * as smartlog from '@push.rocks/smartlog';
import * as smartlogDestinationLocal from '@push.rocks/smartlog/destination-local';
import * as taskbuffer from '@push.rocks/taskbuffer';
import * as smartrx from '@push.rocks/smartrx';
import * as smartrust from '@push.rocks/smartrust';
export {
lik,
@@ -47,6 +48,7 @@ export {
smartlogDestinationLocal,
taskbuffer,
smartrx,
smartrust,
};
// third party scope

View File

@@ -5,7 +5,7 @@
* that may span multiple TCP packets.
*/
import { Buffer } from 'buffer';
import { Buffer } from 'node:buffer';
/**
* Fragment tracking information
@@ -49,6 +49,10 @@ export class FragmentHandler {
() => this.cleanup(),
options.cleanupInterval
);
// Don't let this timer prevent process exit
if (this.cleanupTimer.unref) {
this.cleanupTimer.unref();
}
}
}

View File

@@ -1,4 +1,4 @@
import { Buffer } from 'buffer';
import { Buffer } from 'node:buffer';
import {
TlsRecordType,
TlsHandshakeType,

View File

@@ -1,4 +1,4 @@
import { Buffer } from 'buffer';
import { Buffer } from 'node:buffer';
import { TlsExtensionType, TlsUtils } from '../utils/tls-utils.js';
import {
ClientHelloParser,

View File

@@ -2,7 +2,7 @@
* WebSocket Protocol Utilities
*/
import * as crypto from 'crypto';
import * as crypto from 'node:crypto';
import { WEBSOCKET_MAGIC_STRING } from './constants.js';
import type { RawData } from './types.js';

View File

@@ -8,6 +8,3 @@ export { SharedRouteManager as SmartProxyRouteManager } from '../core/routing/ro
export * from './smart-proxy/utils/index.js';
// Export smart-proxy models except IAcmeOptions
export type { ISmartProxyOptions, IConnectionRecord, IRouteConfig, IRouteMatch, IRouteAction, IRouteTls, IRouteContext } from './smart-proxy/models/index.js';
// Export NFTables proxy (no conflicts)
export * from './nftables-proxy/index.js';

View File

@@ -1,6 +0,0 @@
/**
* NfTablesProxy implementation
*/
export * from './nftables-proxy.js';
export * from './models/index.js';
export * from './utils/index.js';

View File

@@ -1,30 +0,0 @@
/**
* Custom error classes for better error handling
*/
export class NftBaseError extends Error {
constructor(message: string) {
super(message);
this.name = 'NftBaseError';
}
}
export class NftValidationError extends NftBaseError {
constructor(message: string) {
super(message);
this.name = 'NftValidationError';
}
}
export class NftExecutionError extends NftBaseError {
constructor(message: string) {
super(message);
this.name = 'NftExecutionError';
}
}
export class NftResourceError extends NftBaseError {
constructor(message: string) {
super(message);
this.name = 'NftResourceError';
}
}

View File

@@ -1,5 +0,0 @@
/**
* Export all models
*/
export * from './interfaces.js';
export * from './errors.js';

View File

@@ -1,94 +0,0 @@
/**
* Interfaces for NfTablesProxy
*/
/**
* Represents a port range for forwarding
*/
export interface PortRange {
from: number;
to: number;
}
// Legacy interface name for backward compatibility
export type IPortRange = PortRange;
/**
* Settings for NfTablesProxy.
*/
export interface NfTableProxyOptions {
// Basic settings
fromPort: number | PortRange | Array<number | PortRange>; // Support single port, port range, or multiple ports/ranges
toPort: number | PortRange | Array<number | PortRange>;
toHost?: string; // Target host for proxying; defaults to 'localhost'
// Advanced settings
preserveSourceIP?: boolean; // If true, the original source IP is preserved
deleteOnExit?: boolean; // If true, clean up rules before process exit
protocol?: 'tcp' | 'udp' | 'all'; // Protocol to forward, defaults to 'tcp'
enableLogging?: boolean; // Enable detailed logging
ipv6Support?: boolean; // Enable IPv6 support
logFormat?: 'plain' | 'json'; // Format for logs
// Source filtering
ipAllowList?: string[]; // If provided, only these IPs are allowed
ipBlockList?: string[]; // If provided, these IPs are blocked
useIPSets?: boolean; // Use nftables sets for efficient IP management
// Rule management
forceCleanSlate?: boolean; // Clear all NfTablesProxy rules before starting
tableName?: string; // Custom table name (defaults to 'portproxy')
// Connection management
maxRetries?: number; // Maximum number of retries for failed commands
retryDelayMs?: number; // Delay between retries in milliseconds
useAdvancedNAT?: boolean; // Use connection tracking for stateful NAT
// Quality of Service
qos?: {
enabled: boolean;
maxRate?: string; // e.g. "10mbps"
priority?: number; // 1 (highest) to 10 (lowest)
markConnections?: boolean; // Mark connections for easier management
};
// Integration with PortProxy/NetworkProxy
netProxyIntegration?: {
enabled: boolean;
redirectLocalhost?: boolean; // Redirect localhost traffic to NetworkProxy
sslTerminationPort?: number; // Port where NetworkProxy handles SSL termination
};
}
// Legacy interface name for backward compatibility
export type INfTableProxySettings = NfTableProxyOptions;
/**
* Interface for status reporting
*/
export interface NfTablesStatus {
active: boolean;
ruleCount: {
total: number;
added: number;
verified: number;
};
tablesConfigured: { family: string; tableName: string }[];
metrics: {
forwardedConnections?: number;
activeConnections?: number;
bytesForwarded?: {
sent: number;
received: number;
};
};
qosEnabled?: boolean;
ipSetsConfigured?: {
name: string;
elementCount: number;
type: string;
}[];
}
// Legacy interface name for backward compatibility
export type INfTablesStatus = NfTablesStatus;

File diff suppressed because it is too large Load Diff

View File

@@ -1,38 +0,0 @@
/**
* NFTables Proxy Utilities
*
* This module exports utility functions and classes for NFTables operations.
*/
// Command execution
export { NftCommandExecutor } from './nft-command-executor.js';
export type { INftLoggerFn, INftExecutorOptions } from './nft-command-executor.js';
// Port specification normalization
export {
normalizePortSpec,
validatePorts,
formatPortRange,
portSpecToNftExpr,
rangesOverlap,
mergeOverlappingRanges,
countPorts,
isPortInSpec
} from './nft-port-spec-normalizer.js';
// Rule validation
export {
isValidIP,
isValidIPv4,
isValidIPv6,
isValidHostname,
isValidTableName,
isValidRate,
validateIPs,
validateHost,
validateTableName,
validateQosSettings,
validateSettings,
isIPForFamily,
filterIPsByFamily
} from './nft-rule-validator.js';

View File

@@ -1,162 +0,0 @@
/**
* NFTables Command Executor
*
* Handles command execution with retry logic, temp file management,
* and error handling for nftables operations.
*/
import { exec, execSync } from 'child_process';
import { promisify } from 'util';
import { delay } from '../../../core/utils/async-utils.js';
import { AsyncFileSystem } from '../../../core/utils/fs-utils.js';
import { NftExecutionError } from '../models/index.js';
const execAsync = promisify(exec);
export interface INftLoggerFn {
(level: 'info' | 'warn' | 'error' | 'debug', message: string, data?: Record<string, any>): void;
}
export interface INftExecutorOptions {
maxRetries?: number;
retryDelayMs?: number;
tempFilePath?: string;
}
/**
* NFTables command executor with retry logic and temp file support
*/
export class NftCommandExecutor {
private static readonly NFT_CMD = 'nft';
private maxRetries: number;
private retryDelayMs: number;
private tempFilePath: string;
constructor(
private log: INftLoggerFn,
options: INftExecutorOptions = {}
) {
this.maxRetries = options.maxRetries || 3;
this.retryDelayMs = options.retryDelayMs || 1000;
this.tempFilePath = options.tempFilePath || `/tmp/nft-rules-${Date.now()}.nft`;
}
/**
* Execute a command with retry capability
*/
async executeWithRetry(command: string, maxRetries?: number, retryDelayMs?: number): Promise<string> {
const retries = maxRetries ?? this.maxRetries;
const delayMs = retryDelayMs ?? this.retryDelayMs;
let lastError: Error | undefined;
for (let i = 0; i < retries; i++) {
try {
const { stdout } = await execAsync(command);
return stdout;
} catch (err) {
lastError = err as Error;
this.log('warn', `Command failed (attempt ${i+1}/${retries}): ${command}`, { error: lastError.message });
// Wait before retry, unless it's the last attempt
if (i < retries - 1) {
await delay(delayMs);
}
}
}
throw new NftExecutionError(`Failed after ${retries} attempts: ${lastError?.message || 'Unknown error'}`);
}
/**
* Execute system command synchronously (single attempt, no retry)
* Used only for exit handlers where the process is terminating anyway.
*/
executeSync(command: string): string {
try {
return execSync(command, { timeout: 5000 }).toString();
} catch (err) {
this.log('warn', `Sync command failed: ${command}`, { error: (err as Error).message });
throw err;
}
}
/**
* Execute nftables commands with a temporary file
*/
async executeWithTempFile(rulesetContent: string): Promise<void> {
await AsyncFileSystem.writeFile(this.tempFilePath, rulesetContent);
try {
await this.executeWithRetry(
`${NftCommandExecutor.NFT_CMD} -f ${this.tempFilePath}`,
this.maxRetries,
this.retryDelayMs
);
} finally {
// Always clean up the temp file
await AsyncFileSystem.remove(this.tempFilePath);
}
}
/**
* Check if nftables is available
*/
async checkAvailability(): Promise<boolean> {
try {
await this.executeWithRetry(`${NftCommandExecutor.NFT_CMD} --version`, this.maxRetries, this.retryDelayMs);
return true;
} catch (err) {
this.log('error', `nftables is not available: ${(err as Error).message}`);
return false;
}
}
/**
* Check if connection tracking modules are loaded
*/
async checkConntrackModules(): Promise<boolean> {
try {
await this.executeWithRetry('lsmod | grep nf_conntrack', this.maxRetries, this.retryDelayMs);
return true;
} catch (err) {
this.log('warn', 'Connection tracking modules might not be loaded, advanced NAT features may not work');
return false;
}
}
/**
* Run an nft command directly
*/
async nft(args: string): Promise<string> {
return this.executeWithRetry(`${NftCommandExecutor.NFT_CMD} ${args}`, this.maxRetries, this.retryDelayMs);
}
/**
* Run an nft command synchronously (for cleanup on exit)
*/
nftSync(args: string): string {
return this.executeSync(`${NftCommandExecutor.NFT_CMD} ${args}`);
}
/**
* Get the NFT command path
*/
static get nftCmd(): string {
return NftCommandExecutor.NFT_CMD;
}
/**
* Update the temp file path
*/
setTempFilePath(path: string): void {
this.tempFilePath = path;
}
/**
* Update retry settings
*/
setRetryOptions(maxRetries: number, retryDelayMs: number): void {
this.maxRetries = maxRetries;
this.retryDelayMs = retryDelayMs;
}
}

View File

@@ -1,125 +0,0 @@
/**
* NFTables Port Specification Normalizer
*
* Handles normalization and validation of port specifications
* for nftables rules.
*/
import type { PortRange } from '../models/index.js';
import { NftValidationError } from '../models/index.js';
/**
* Normalizes port specifications into an array of port ranges
*/
export function normalizePortSpec(portSpec: number | PortRange | Array<number | PortRange>): PortRange[] {
const result: PortRange[] = [];
if (Array.isArray(portSpec)) {
// If it's an array, process each element
for (const spec of portSpec) {
result.push(...normalizePortSpec(spec));
}
} else if (typeof portSpec === 'number') {
// Single port becomes a range with the same start and end
result.push({ from: portSpec, to: portSpec });
} else {
// Already a range
result.push(portSpec);
}
return result;
}
/**
* Validates port numbers or ranges
*/
export function validatePorts(port: number | PortRange | Array<number | PortRange>): void {
if (Array.isArray(port)) {
port.forEach(p => validatePorts(p));
return;
}
if (typeof port === 'number') {
if (port < 1 || port > 65535) {
throw new NftValidationError(`Invalid port number: ${port}`);
}
} else if (typeof port === 'object') {
if (port.from < 1 || port.from > 65535 || port.to < 1 || port.to > 65535 || port.from > port.to) {
throw new NftValidationError(`Invalid port range: ${port.from}-${port.to}`);
}
}
}
/**
* Format port range for nftables rule
*/
export function formatPortRange(range: PortRange): string {
if (range.from === range.to) {
return String(range.from);
}
return `${range.from}-${range.to}`;
}
/**
* Convert port spec to nftables expression
*/
export function portSpecToNftExpr(portSpec: number | PortRange | Array<number | PortRange>): string {
const ranges = normalizePortSpec(portSpec);
if (ranges.length === 1) {
return formatPortRange(ranges[0]);
}
// Multiple ports/ranges need to use a set
const ports = ranges.map(formatPortRange);
return `{ ${ports.join(', ')} }`;
}
/**
* Check if two port ranges overlap
*/
export function rangesOverlap(range1: PortRange, range2: PortRange): boolean {
return range1.from <= range2.to && range2.from <= range1.to;
}
/**
* Merge overlapping port ranges
*/
export function mergeOverlappingRanges(ranges: PortRange[]): PortRange[] {
if (ranges.length <= 1) return ranges;
// Sort by start port
const sorted = [...ranges].sort((a, b) => a.from - b.from);
const merged: PortRange[] = [sorted[0]];
for (let i = 1; i < sorted.length; i++) {
const current = sorted[i];
const lastMerged = merged[merged.length - 1];
if (current.from <= lastMerged.to + 1) {
// Ranges overlap or are adjacent, merge them
lastMerged.to = Math.max(lastMerged.to, current.to);
} else {
// No overlap, add as new range
merged.push(current);
}
}
return merged;
}
/**
* Calculate the total number of ports in a port specification
*/
export function countPorts(portSpec: number | PortRange | Array<number | PortRange>): number {
const ranges = normalizePortSpec(portSpec);
return ranges.reduce((total, range) => total + (range.to - range.from + 1), 0);
}
/**
* Check if a port is within the given specification
*/
export function isPortInSpec(port: number, portSpec: number | PortRange | Array<number | PortRange>): boolean {
const ranges = normalizePortSpec(portSpec);
return ranges.some(range => port >= range.from && port <= range.to);
}

View File

@@ -1,156 +0,0 @@
/**
* NFTables Rule Validator
*
* Handles validation of settings and inputs for nftables operations.
* Prevents command injection and ensures valid values.
*/
import type { PortRange, NfTableProxyOptions } from '../models/index.js';
import { NftValidationError } from '../models/index.js';
import { validatePorts } from './nft-port-spec-normalizer.js';
// IP address validation patterns
const IPV4_REGEX = /^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])(\/([0-9]|[1-2][0-9]|3[0-2]))?$/;
const IPV6_REGEX = /^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))(\/([0-9]|[1-9][0-9]|1[0-1][0-9]|12[0-8]))?$/;
const HOSTNAME_REGEX = /^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$/;
const TABLE_NAME_REGEX = /^[a-zA-Z0-9_]+$/;
const RATE_REGEX = /^[0-9]+[kKmMgG]?bps$/;
/**
* Validates an IP address (IPv4 or IPv6)
*/
export function isValidIP(ip: string): boolean {
return IPV4_REGEX.test(ip) || IPV6_REGEX.test(ip);
}
/**
* Validates an IPv4 address
*/
export function isValidIPv4(ip: string): boolean {
return IPV4_REGEX.test(ip);
}
/**
* Validates an IPv6 address
*/
export function isValidIPv6(ip: string): boolean {
return IPV6_REGEX.test(ip);
}
/**
* Validates a hostname
*/
export function isValidHostname(hostname: string): boolean {
return HOSTNAME_REGEX.test(hostname);
}
/**
* Validates a table name for nftables
*/
export function isValidTableName(tableName: string): boolean {
return TABLE_NAME_REGEX.test(tableName);
}
/**
* Validates a rate specification (e.g., "10mbps")
*/
export function isValidRate(rate: string): boolean {
return RATE_REGEX.test(rate);
}
/**
* Validates an array of IP addresses
*/
export function validateIPs(ips?: string[]): void {
if (!ips) return;
for (const ip of ips) {
if (!isValidIP(ip)) {
throw new NftValidationError(`Invalid IP address format: ${ip}`);
}
}
}
/**
* Validates a host (can be hostname or IP)
*/
export function validateHost(host?: string): void {
if (!host) return;
if (!isValidHostname(host) && !isValidIP(host)) {
throw new NftValidationError(`Invalid host format: ${host}`);
}
}
/**
* Validates a table name
*/
export function validateTableName(tableName?: string): void {
if (!tableName) return;
if (!isValidTableName(tableName)) {
throw new NftValidationError(
`Invalid table name: ${tableName}. Only alphanumeric characters and underscores are allowed.`
);
}
}
/**
* Validates QoS settings
*/
export function validateQosSettings(qos?: NfTableProxyOptions['qos']): void {
if (!qos?.enabled) return;
if (qos.maxRate && !isValidRate(qos.maxRate)) {
throw new NftValidationError(
`Invalid rate format: ${qos.maxRate}. Use format like "10mbps", "1gbps", etc.`
);
}
if (qos.priority !== undefined) {
if (qos.priority < 1 || qos.priority > 10 || !Number.isInteger(qos.priority)) {
throw new NftValidationError(
`Invalid priority: ${qos.priority}. Must be an integer between 1 and 10.`
);
}
}
}
/**
* Validates all NfTablesProxy settings
*/
export function validateSettings(settings: NfTableProxyOptions): void {
// Validate port numbers
validatePorts(settings.fromPort);
validatePorts(settings.toPort);
// Validate IP addresses
validateIPs(settings.ipAllowList);
validateIPs(settings.ipBlockList);
// Validate target host
validateHost(settings.toHost);
// Validate table name
validateTableName(settings.tableName);
// Validate QoS settings
validateQosSettings(settings.qos);
}
/**
* Check if an IP matches the given family (ip or ip6)
*/
export function isIPForFamily(ip: string, family: 'ip' | 'ip6'): boolean {
if (family === 'ip6') {
return ip.includes(':');
}
return ip.includes('.');
}
/**
* Filter IPs by family
*/
export function filterIPsByFamily(ips: string[], family: 'ip' | 'ip6'): string[] {
return ips.filter(ip => isIPForFamily(ip, family));
}

View File

@@ -1,6 +1,4 @@
import * as plugins from '../../../plugins.js';
// Certificate types removed - use local definition
import type { PortRange } from '../../../proxies/nftables-proxy/models/interfaces.js';
import type { IRouteContext } from '../../../core/models/route-context.js';
// Re-export IRouteContext for convenience

View File

@@ -1,112 +0,0 @@
import * as plugins from '../../plugins.js';
import { logger } from '../../core/utils/logger.js';
/**
* Locates the RustProxy binary using a priority-ordered search strategy:
* 1. SMARTPROXY_RUST_BINARY environment variable
* 2. Platform-specific optional npm package
* 3. Local development build at ./rust/target/release/rustproxy
* 4. System PATH
*/
export class RustBinaryLocator {
private cachedPath: string | null = null;
/**
* Find the RustProxy binary path.
* Returns null if no binary is available.
*/
public async findBinary(): Promise<string | null> {
if (this.cachedPath !== null) {
return this.cachedPath;
}
const path = await this.searchBinary();
this.cachedPath = path;
return path;
}
/**
* Clear the cached binary path (e.g., after a failed launch).
*/
public clearCache(): void {
this.cachedPath = null;
}
private async searchBinary(): Promise<string | null> {
// 1. Environment variable override
const envPath = process.env.SMARTPROXY_RUST_BINARY;
if (envPath) {
if (await this.isExecutable(envPath)) {
logger.log('info', `RustProxy binary found via SMARTPROXY_RUST_BINARY: ${envPath}`, { component: 'rust-locator' });
return envPath;
}
logger.log('warn', `SMARTPROXY_RUST_BINARY set but not executable: ${envPath}`, { component: 'rust-locator' });
}
// 2. Platform-specific optional npm package
const platformBinary = await this.findPlatformPackageBinary();
if (platformBinary) {
logger.log('info', `RustProxy binary found in platform package: ${platformBinary}`, { component: 'rust-locator' });
return platformBinary;
}
// 3. Local development build
const localPaths = [
plugins.path.resolve(process.cwd(), 'rust/target/release/rustproxy'),
plugins.path.resolve(process.cwd(), 'rust/target/debug/rustproxy'),
];
for (const localPath of localPaths) {
if (await this.isExecutable(localPath)) {
logger.log('info', `RustProxy binary found at local path: ${localPath}`, { component: 'rust-locator' });
return localPath;
}
}
// 4. System PATH
const systemPath = await this.findInPath('rustproxy');
if (systemPath) {
logger.log('info', `RustProxy binary found in system PATH: ${systemPath}`, { component: 'rust-locator' });
return systemPath;
}
logger.log('error', 'No RustProxy binary found. Set SMARTPROXY_RUST_BINARY, install the platform package, or build with: cd rust && cargo build --release', { component: 'rust-locator' });
return null;
}
private async findPlatformPackageBinary(): Promise<string | null> {
const platform = process.platform;
const arch = process.arch;
const packageName = `@push.rocks/smartproxy-${platform}-${arch}`;
try {
// Try to resolve the platform-specific package
const packagePath = require.resolve(`${packageName}/rustproxy`);
if (await this.isExecutable(packagePath)) {
return packagePath;
}
} catch {
// Package not installed - expected for development
}
return null;
}
private async isExecutable(filePath: string): Promise<boolean> {
try {
await plugins.fs.promises.access(filePath, plugins.fs.constants.X_OK);
return true;
} catch {
return false;
}
}
private async findInPath(binaryName: string): Promise<string | null> {
const pathDirs = (process.env.PATH || '').split(plugins.path.delimiter);
for (const dir of pathDirs) {
const fullPath = plugins.path.join(dir, binaryName);
if (await this.isExecutable(fullPath)) {
return fullPath;
}
}
return null;
}
}

View File

@@ -6,7 +6,11 @@ import type { RustProxyBridge } from './rust-proxy-bridge.js';
*
* Polls the Rust binary periodically via the bridge and caches the result.
* All IMetrics getters read from the cache synchronously.
* Fields not yet in Rust (percentiles, per-IP, history) return zero/empty.
*
* Rust Metrics JSON fields (camelCase via serde):
* activeConnections, totalConnections, bytesIn, bytesOut,
* throughputInBytesPerSec, throughputOutBytesPerSec,
* routes: { [routeName]: { activeConnections, totalConnections, bytesIn, bytesOut, ... } }
*/
export class RustMetricsAdapter implements IMetrics {
private bridge: RustProxyBridge;
@@ -14,30 +18,28 @@ export class RustMetricsAdapter implements IMetrics {
private pollTimer: ReturnType<typeof setInterval> | null = null;
private pollIntervalMs: number;
// Cumulative totals tracked across polls
private cumulativeBytesIn = 0;
private cumulativeBytesOut = 0;
private cumulativeConnections = 0;
constructor(bridge: RustProxyBridge, pollIntervalMs = 1000) {
this.bridge = bridge;
this.pollIntervalMs = pollIntervalMs;
}
/**
* Poll Rust for metrics once. Can be awaited to ensure cache is fresh.
*/
public async poll(): Promise<void> {
try {
this.cache = await this.bridge.getMetrics();
} catch {
// Ignore poll errors (bridge may be shutting down)
}
}
public startPolling(): void {
if (this.pollTimer) return;
this.pollTimer = setInterval(async () => {
try {
this.cache = await this.bridge.getMetrics();
// Update cumulative totals
if (this.cache) {
this.cumulativeBytesIn = this.cache.totalBytesIn ?? this.cache.total_bytes_in ?? 0;
this.cumulativeBytesOut = this.cache.totalBytesOut ?? this.cache.total_bytes_out ?? 0;
this.cumulativeConnections = this.cache.totalConnections ?? this.cache.total_connections ?? 0;
}
} catch {
// Ignore poll errors (bridge may be shutting down)
}
// Immediate first poll so cache is populated ASAP
this.poll();
this.pollTimer = setInterval(() => {
this.poll();
}, this.pollIntervalMs);
if (this.pollTimer.unref) {
this.pollTimer.unref();
@@ -55,25 +57,36 @@ export class RustMetricsAdapter implements IMetrics {
public connections = {
active: (): number => {
return this.cache?.activeConnections ?? this.cache?.active_connections ?? 0;
return this.cache?.activeConnections ?? 0;
},
total: (): number => {
return this.cumulativeConnections;
return this.cache?.totalConnections ?? 0;
},
byRoute: (): Map<string, number> => {
return new Map();
const result = new Map<string, number>();
if (this.cache?.routes) {
for (const [name, rm] of Object.entries(this.cache.routes)) {
result.set(name, (rm as any).activeConnections ?? 0);
}
}
return result;
},
byIP: (): Map<string, number> => {
// Per-IP tracking not yet available from Rust
return new Map();
},
topIPs: (_limit?: number): Array<{ ip: string; count: number }> => {
// Per-IP tracking not yet available from Rust
return [];
},
};
public throughput = {
instant: (): IThroughputData => {
return { in: this.cache?.bytesInPerSecond ?? 0, out: this.cache?.bytesOutPerSecond ?? 0 };
return {
in: this.cache?.throughputInBytesPerSec ?? 0,
out: this.cache?.throughputOutBytesPerSec ?? 0,
};
},
recent: (): IThroughputData => {
return this.throughput.instant();
@@ -85,10 +98,20 @@ export class RustMetricsAdapter implements IMetrics {
return this.throughput.instant();
},
history: (_seconds: number): Array<IThroughputHistoryPoint> => {
// Throughput history not yet available from Rust
return [];
},
byRoute: (_windowSeconds?: number): Map<string, IThroughputData> => {
return new Map();
const result = new Map<string, IThroughputData>();
if (this.cache?.routes) {
for (const [name, rm] of Object.entries(this.cache.routes)) {
result.set(name, {
in: (rm as any).throughputInBytesPerSec ?? 0,
out: (rm as any).throughputOutBytesPerSec ?? 0,
});
}
}
return result;
},
byIP: (_windowSeconds?: number): Map<string, IThroughputData> => {
return new Map();
@@ -97,25 +120,27 @@ export class RustMetricsAdapter implements IMetrics {
public requests = {
perSecond: (): number => {
return this.cache?.requestsPerSecond ?? 0;
// Rust tracks connections, not HTTP requests (TCP-level proxy)
return 0;
},
perMinute: (): number => {
return (this.cache?.requestsPerSecond ?? 0) * 60;
return 0;
},
total: (): number => {
return this.cache?.totalRequests ?? this.cache?.total_requests ?? 0;
// Use total connections as a proxy for total requests
return this.cache?.totalConnections ?? 0;
},
};
public totals = {
bytesIn: (): number => {
return this.cumulativeBytesIn;
return this.cache?.bytesIn ?? 0;
},
bytesOut: (): number => {
return this.cumulativeBytesOut;
return this.cache?.bytesOut ?? 0;
},
connections: (): number => {
return this.cumulativeConnections;
return this.cache?.totalConnections ?? 0;
},
};

View File

@@ -1,278 +1,180 @@
import * as plugins from '../../plugins.js';
import { logger } from '../../core/utils/logger.js';
import { RustBinaryLocator } from './rust-binary-locator.js';
import type { IRouteConfig } from './models/route-types.js';
import { ChildProcess, spawn } from 'child_process';
import { createInterface, Interface as ReadlineInterface } from 'readline';
/**
* Management request sent to the Rust binary via stdin.
* Type-safe command definitions for the Rust proxy IPC protocol.
*/
interface IManagementRequest {
id: string;
method: string;
params: Record<string, any>;
type TSmartProxyCommands = {
start: { params: { config: any }; result: void };
stop: { params: Record<string, never>; result: void };
updateRoutes: { params: { routes: IRouteConfig[] }; result: void };
getMetrics: { params: Record<string, never>; result: any };
getStatistics: { params: Record<string, never>; result: any };
provisionCertificate: { params: { routeName: string }; result: void };
renewCertificate: { params: { routeName: string }; result: void };
getCertificateStatus: { params: { routeName: string }; result: any };
getListeningPorts: { params: Record<string, never>; result: { ports: number[] } };
getNftablesStatus: { params: Record<string, never>; result: any };
setSocketHandlerRelay: { params: { socketPath: string }; result: void };
addListeningPort: { params: { port: number }; result: void };
removeListeningPort: { params: { port: number }; result: void };
loadCertificate: { params: { domain: string; cert: string; key: string; ca?: string }; result: void };
};
/**
* Get the package root directory using import.meta.url.
* This file is at ts/proxies/smart-proxy/, so package root is 3 levels up.
*/
function getPackageRoot(): string {
const thisDir = plugins.path.dirname(plugins.url.fileURLToPath(import.meta.url));
return plugins.path.resolve(thisDir, '..', '..', '..');
}
/**
* Management response received from the Rust binary via stdout.
* Map Node.js process.platform/process.arch to tsrust's friendly name suffix.
* tsrust names cross-compiled binaries as: rustproxy_linux_amd64, rustproxy_linux_arm64, etc.
*/
interface IManagementResponse {
id: string;
success: boolean;
result?: any;
error?: string;
function getTsrustPlatformSuffix(): string | null {
const archMap: Record<string, string> = { x64: 'amd64', arm64: 'arm64' };
const osMap: Record<string, string> = { linux: 'linux', darwin: 'macos' };
const os = osMap[process.platform];
const arch = archMap[process.arch];
if (os && arch) {
return `${os}_${arch}`;
}
return null;
}
/**
* Management event received from the Rust binary (unsolicited).
* Build local search paths for the Rust binary, including dist_rust/ candidates
* (built by tsrust) and local development build paths.
*/
interface IManagementEvent {
event: string;
data: any;
function buildLocalPaths(): string[] {
const packageRoot = getPackageRoot();
const suffix = getTsrustPlatformSuffix();
const paths: string[] = [];
// dist_rust/ candidates (tsrust cross-compiled output)
if (suffix) {
paths.push(plugins.path.join(packageRoot, 'dist_rust', `rustproxy_${suffix}`));
}
paths.push(plugins.path.join(packageRoot, 'dist_rust', 'rustproxy'));
// Local dev build paths
paths.push(plugins.path.resolve(process.cwd(), 'rust', 'target', 'release', 'rustproxy'));
paths.push(plugins.path.resolve(process.cwd(), 'rust', 'target', 'debug', 'rustproxy'));
return paths;
}
/**
* Bridge between TypeScript SmartProxy and the Rust binary.
* Communicates via JSON-over-stdin/stdout IPC protocol.
* Wraps @push.rocks/smartrust's RustBridge with type-safe command definitions.
*/
export class RustProxyBridge extends plugins.EventEmitter {
private locator = new RustBinaryLocator();
private process: ChildProcess | null = null;
private readline: ReadlineInterface | null = null;
private pendingRequests = new Map<string, {
resolve: (value: any) => void;
reject: (error: Error) => void;
timer: NodeJS.Timeout;
}>();
private requestCounter = 0;
private isRunning = false;
private binaryPath: string | null = null;
private readonly requestTimeoutMs = 30000;
private bridge: plugins.smartrust.RustBridge<TSmartProxyCommands>;
constructor() {
super();
this.bridge = new plugins.smartrust.RustBridge<TSmartProxyCommands>({
binaryName: 'rustproxy',
envVarName: 'SMARTPROXY_RUST_BINARY',
platformPackagePrefix: '@push.rocks/smartproxy',
localPaths: buildLocalPaths(),
maxPayloadSize: 100 * 1024 * 1024, // 100 MB route configs with many entries can be large
logger: {
log: (level: string, message: string, data?: Record<string, any>) => {
logger.log(level as any, message, data);
},
},
});
// Forward events from the inner bridge
this.bridge.on('exit', (code: number | null, signal: string | null) => {
this.emit('exit', code, signal);
});
}
/**
* Spawn the Rust binary in management mode.
* Returns true if the binary was found and spawned successfully.
*/
public async spawn(): Promise<boolean> {
this.binaryPath = await this.locator.findBinary();
if (!this.binaryPath) {
return false;
}
return new Promise<boolean>((resolve) => {
try {
this.process = spawn(this.binaryPath!, ['--management'], {
stdio: ['pipe', 'pipe', 'pipe'],
env: { ...process.env },
});
// Handle stderr (logging from Rust goes here)
this.process.stderr?.on('data', (data: Buffer) => {
const lines = data.toString().split('\n').filter(l => l.trim());
for (const line of lines) {
logger.log('debug', `[rustproxy] ${line}`, { component: 'rust-bridge' });
}
});
// Handle stdout (JSON IPC)
this.readline = createInterface({ input: this.process.stdout! });
this.readline.on('line', (line: string) => {
this.handleLine(line.trim());
});
// Handle process exit
this.process.on('exit', (code, signal) => {
logger.log('info', `RustProxy process exited (code=${code}, signal=${signal})`, { component: 'rust-bridge' });
this.cleanup();
this.emit('exit', code, signal);
});
this.process.on('error', (err) => {
logger.log('error', `RustProxy process error: ${err.message}`, { component: 'rust-bridge' });
this.cleanup();
resolve(false);
});
// Wait for the 'ready' event from Rust
const readyTimeout = setTimeout(() => {
logger.log('error', 'RustProxy did not send ready event within 10s', { component: 'rust-bridge' });
this.kill();
resolve(false);
}, 10000);
this.once('management:ready', () => {
clearTimeout(readyTimeout);
this.isRunning = true;
logger.log('info', 'RustProxy bridge connected', { component: 'rust-bridge' });
resolve(true);
});
} catch (err: any) {
logger.log('error', `Failed to spawn RustProxy: ${err.message}`, { component: 'rust-bridge' });
resolve(false);
}
});
return this.bridge.spawn();
}
/**
* Send a management command to the Rust process and wait for the response.
*/
public async sendCommand(method: string, params: Record<string, any> = {}): Promise<any> {
if (!this.process || !this.isRunning) {
throw new Error('RustProxy bridge is not running');
}
const id = `req_${++this.requestCounter}`;
const request: IManagementRequest = { id, method, params };
return new Promise<any>((resolve, reject) => {
const timer = setTimeout(() => {
this.pendingRequests.delete(id);
reject(new Error(`RustProxy command '${method}' timed out after ${this.requestTimeoutMs}ms`));
}, this.requestTimeoutMs);
this.pendingRequests.set(id, { resolve, reject, timer });
const json = JSON.stringify(request) + '\n';
this.process!.stdin!.write(json, (err) => {
if (err) {
clearTimeout(timer);
this.pendingRequests.delete(id);
reject(new Error(`Failed to write to RustProxy stdin: ${err.message}`));
}
});
});
}
// Convenience methods for each management command
public async startProxy(config: any): Promise<void> {
await this.sendCommand('start', { config });
}
public async stopProxy(): Promise<void> {
await this.sendCommand('stop');
}
public async updateRoutes(routes: IRouteConfig[]): Promise<void> {
await this.sendCommand('updateRoutes', { routes });
}
public async getMetrics(): Promise<any> {
return this.sendCommand('getMetrics');
}
public async getStatistics(): Promise<any> {
return this.sendCommand('getStatistics');
}
public async provisionCertificate(routeName: string): Promise<void> {
await this.sendCommand('provisionCertificate', { routeName });
}
public async renewCertificate(routeName: string): Promise<void> {
await this.sendCommand('renewCertificate', { routeName });
}
public async getCertificateStatus(routeName: string): Promise<any> {
return this.sendCommand('getCertificateStatus', { routeName });
}
public async getListeningPorts(): Promise<number[]> {
const result = await this.sendCommand('getListeningPorts');
return result?.ports ?? [];
}
public async getNftablesStatus(): Promise<any> {
return this.sendCommand('getNftablesStatus');
}
public async setSocketHandlerRelay(socketPath: string): Promise<void> {
await this.sendCommand('setSocketHandlerRelay', { socketPath });
}
public async addListeningPort(port: number): Promise<void> {
await this.sendCommand('addListeningPort', { port });
}
public async removeListeningPort(port: number): Promise<void> {
await this.sendCommand('removeListeningPort', { port });
}
public async loadCertificate(domain: string, cert: string, key: string, ca?: string): Promise<void> {
await this.sendCommand('loadCertificate', { domain, cert, key, ca });
}
/**
* Kill the Rust process.
* Kill the Rust process and clean up.
*/
public kill(): void {
if (this.process) {
this.process.kill('SIGTERM');
// Force kill after 5 seconds
setTimeout(() => {
if (this.process) {
this.process.kill('SIGKILL');
}
}, 5000).unref();
}
this.bridge.kill();
}
/**
* Whether the bridge is currently running.
*/
public get running(): boolean {
return this.isRunning;
return this.bridge.running;
}
private handleLine(line: string): void {
if (!line) return;
// --- Convenience methods for each management command ---
let parsed: any;
try {
parsed = JSON.parse(line);
} catch {
logger.log('warn', `Non-JSON output from RustProxy: ${line}`, { component: 'rust-bridge' });
return;
}
// Check if it's an event (has 'event' field)
if ('event' in parsed) {
const event = parsed as IManagementEvent;
this.emit(`management:${event.event}`, event.data);
return;
}
// Otherwise it's a response (has 'id' field)
if ('id' in parsed) {
const response = parsed as IManagementResponse;
const pending = this.pendingRequests.get(response.id);
if (pending) {
clearTimeout(pending.timer);
this.pendingRequests.delete(response.id);
if (response.success) {
pending.resolve(response.result);
} else {
pending.reject(new Error(response.error || 'Unknown error from RustProxy'));
}
}
}
public async startProxy(config: any): Promise<void> {
await this.bridge.sendCommand('start', { config });
}
private cleanup(): void {
this.isRunning = false;
this.process = null;
public async stopProxy(): Promise<void> {
await this.bridge.sendCommand('stop', {} as Record<string, never>);
}
if (this.readline) {
this.readline.close();
this.readline = null;
}
public async updateRoutes(routes: IRouteConfig[]): Promise<void> {
await this.bridge.sendCommand('updateRoutes', { routes });
}
// Reject all pending requests
for (const [id, pending] of this.pendingRequests) {
clearTimeout(pending.timer);
pending.reject(new Error('RustProxy process exited'));
}
this.pendingRequests.clear();
public async getMetrics(): Promise<any> {
return this.bridge.sendCommand('getMetrics', {} as Record<string, never>);
}
public async getStatistics(): Promise<any> {
return this.bridge.sendCommand('getStatistics', {} as Record<string, never>);
}
public async provisionCertificate(routeName: string): Promise<void> {
await this.bridge.sendCommand('provisionCertificate', { routeName });
}
public async renewCertificate(routeName: string): Promise<void> {
await this.bridge.sendCommand('renewCertificate', { routeName });
}
public async getCertificateStatus(routeName: string): Promise<any> {
return this.bridge.sendCommand('getCertificateStatus', { routeName });
}
public async getListeningPorts(): Promise<number[]> {
const result = await this.bridge.sendCommand('getListeningPorts', {} as Record<string, never>);
return result?.ports ?? [];
}
public async getNftablesStatus(): Promise<any> {
return this.bridge.sendCommand('getNftablesStatus', {} as Record<string, never>);
}
public async setSocketHandlerRelay(socketPath: string): Promise<void> {
await this.bridge.sendCommand('setSocketHandlerRelay', { socketPath });
}
public async addListeningPort(port: number): Promise<void> {
await this.bridge.sendCommand('addListeningPort', { port });
}
public async removeListeningPort(port: number): Promise<void> {
await this.bridge.sendCommand('removeListeningPort', { port });
}
public async loadCertificate(domain: string, cert: string, key: string, ca?: string): Promise<void> {
await this.bridge.sendCommand('loadCertificate', { domain, cert, key, ca });
}
}

View File

@@ -3,7 +3,6 @@ import { logger } from '../../core/utils/logger.js';
// Rust bridge and helpers
import { RustProxyBridge } from './rust-proxy-bridge.js';
import { RustBinaryLocator } from './rust-binary-locator.js';
import { RoutePreprocessor } from './route-preprocessor.js';
import { SocketHandlerServer } from './socket-handler-server.js';
import { RustMetricsAdapter } from './rust-metrics-adapter.js';
@@ -37,6 +36,7 @@ export class SmartProxy extends plugins.EventEmitter {
private socketHandlerServer: SocketHandlerServer | null = null;
private metricsAdapter: RustMetricsAdapter;
private routeUpdateLock: Mutex;
private stopping = false;
constructor(settingsArg: ISmartProxyOptions) {
super();
@@ -102,7 +102,10 @@ export class SmartProxy extends plugins.EventEmitter {
this.bridge = new RustProxyBridge();
this.preprocessor = new RoutePreprocessor();
this.metricsAdapter = new RustMetricsAdapter(this.bridge);
this.metricsAdapter = new RustMetricsAdapter(
this.bridge,
this.settings.metrics?.sampleIntervalMs ?? 1000
);
this.routeUpdateLock = new Mutex();
}
@@ -116,27 +119,28 @@ export class SmartProxy extends plugins.EventEmitter {
if (!spawned) {
throw new Error(
'RustProxy binary not found. Set SMARTPROXY_RUST_BINARY env var, install the platform package, ' +
'or build locally with: cd rust && cargo build --release'
'or build locally with: pnpm build'
);
}
// Handle unexpected exit
// Handle unexpected exit (only emits error if not intentionally stopping)
this.bridge.on('exit', (code: number | null, signal: string | null) => {
if (this.stopping) return;
logger.log('error', `RustProxy exited unexpectedly (code=${code}, signal=${signal})`, { component: 'smart-proxy' });
this.emit('error', new Error(`RustProxy exited (code=${code}, signal=${signal})`));
});
// Start socket handler relay if any routes need TS-side handling
// Check if any routes need TS-side handling (socket handlers, dynamic functions)
const hasHandlerRoutes = this.settings.routes.some(
(r) =>
(r.action.type === 'socket-handler' && r.action.socketHandler) ||
r.action.targets?.some((t) => typeof t.host === 'function' || typeof t.port === 'function')
);
// Start socket handler relay server (but don't tell Rust yet - proxy not started)
if (hasHandlerRoutes) {
this.socketHandlerServer = new SocketHandlerServer(this.preprocessor);
await this.socketHandlerServer.start();
await this.bridge.setSocketHandlerRelay(this.socketHandlerServer.getSocketPath());
}
// Preprocess routes (strip JS functions, convert socket-handler routes)
@@ -148,6 +152,11 @@ export class SmartProxy extends plugins.EventEmitter {
// Start the Rust proxy
await this.bridge.startProxy(config);
// Now that Rust proxy is running, configure socket handler relay
if (this.socketHandlerServer) {
await this.bridge.setSocketHandlerRelay(this.socketHandlerServer.getSocketPath());
}
// Handle certProvisionFunction
await this.provisionCertificatesViaCallback();
@@ -162,10 +171,14 @@ export class SmartProxy extends plugins.EventEmitter {
*/
public async stop(): Promise<void> {
logger.log('info', 'SmartProxy shutting down...', { component: 'smart-proxy' });
this.stopping = true;
// Stop metrics polling
this.metricsAdapter.stopPolling();
// Remove exit listener before killing to avoid spurious error events
this.bridge.removeAllListeners('exit');
// Stop Rust proxy
try {
await this.bridge.stopProxy();
@@ -283,6 +296,7 @@ export class SmartProxy extends plugins.EventEmitter {
* Get all currently listening ports (async - calls Rust).
*/
public async getListeningPorts(): Promise<number[]> {
if (!this.bridge.running) return [];
return this.bridge.getListeningPorts();
}

View File

@@ -15,6 +15,7 @@ export class SocketHandlerServer {
private server: plugins.net.Server | null = null;
private socketPath: string;
private preprocessor: RoutePreprocessor;
private activeSockets = new Set<plugins.net.Socket>();
constructor(preprocessor: RoutePreprocessor) {
this.preprocessor = preprocessor;
@@ -41,6 +42,8 @@ export class SocketHandlerServer {
return new Promise<void>((resolve, reject) => {
this.server = plugins.net.createServer((socket) => {
this.activeSockets.add(socket);
socket.on('close', () => this.activeSockets.delete(socket));
this.handleConnection(socket);
});
@@ -61,6 +64,12 @@ export class SocketHandlerServer {
* Stop the server and clean up.
*/
public async stop(): Promise<void> {
// Destroy all active connections first
for (const socket of this.activeSockets) {
socket.destroy();
}
this.activeSockets.clear();
if (this.server) {
return new Promise<void>((resolve) => {
this.server!.close(() => {
@@ -100,6 +109,7 @@ export class SocketHandlerServer {
metadataParsed = true;
socket.removeListener('data', onData);
socket.pause(); // Prevent data loss between handler removal and pipe setup
const metadataJson = metadataBuffer.slice(0, newlineIndex);
const remainingData = metadataBuffer.slice(newlineIndex + 1);
@@ -140,13 +150,6 @@ export class SocketHandlerServer {
return;
}
const handler = originalRoute.action.socketHandler;
if (!handler) {
logger.log('error', `Route ${routeKey} has no socketHandler`, { component: 'socket-handler-server' });
socket.destroy();
return;
}
// Build route context
const context: IRouteContext = {
port: metadata.localPort || 0,
@@ -167,12 +170,110 @@ export class SocketHandlerServer {
socket.unshift(Buffer.from(remainingData, 'utf8'));
}
// Call the handler
try {
handler(socket, context);
} catch (err: any) {
logger.log('error', `Socket handler threw for route ${routeKey}: ${err.message}`, { component: 'socket-handler-server' });
socket.destroy();
const handler = originalRoute.action.socketHandler;
if (handler) {
// Route has an explicit socket handler callback
try {
const result = handler(socket, context);
// If the handler is async, wait for it to finish setup before resuming.
// This prevents data loss when async handlers need to do work before
// attaching their `data` listeners.
if (result && typeof (result as any).then === 'function') {
(result as any).then(() => {
socket.resume();
}).catch((err: any) => {
logger.log('error', `Async socket handler rejected for route ${routeKey}: ${err.message}`, { component: 'socket-handler-server' });
socket.destroy();
});
} else {
// Synchronous handler — listeners are already attached, safe to resume.
socket.resume();
}
} catch (err: any) {
logger.log('error', `Socket handler threw for route ${routeKey}: ${err.message}`, { component: 'socket-handler-server' });
socket.destroy();
}
return;
}
// Route has dynamic host/port functions - resolve and forward
if (originalRoute.action.targets && originalRoute.action.targets.length > 0) {
this.forwardDynamicRoute(socket, originalRoute, context);
return;
}
logger.log('error', `Route ${routeKey} has no socketHandler and no targets`, { component: 'socket-handler-server' });
socket.destroy();
}
/**
* Forward a connection to a dynamically resolved target.
* Used for routes with function-based host/port that Rust cannot handle.
*/
private forwardDynamicRoute(socket: plugins.net.Socket, route: IRouteConfig, context: IRouteContext): void {
const targets = route.action.targets!;
// Pick a target (round-robin would be ideal, but simple random for now)
const target = targets[Math.floor(Math.random() * targets.length)];
// Resolve host
let host: string;
if (typeof target.host === 'function') {
try {
const result = target.host(context);
host = Array.isArray(result) ? result[Math.floor(Math.random() * result.length)] : result;
} catch (err: any) {
logger.log('error', `Dynamic host function failed: ${err.message}`, { component: 'socket-handler-server' });
socket.destroy();
return;
}
} else if (typeof target.host === 'string') {
host = target.host;
} else if (Array.isArray(target.host)) {
host = target.host[Math.floor(Math.random() * target.host.length)];
} else {
host = 'localhost';
}
// Resolve port
let port: number;
if (typeof target.port === 'function') {
try {
port = target.port(context);
} catch (err: any) {
logger.log('error', `Dynamic port function failed: ${err.message}`, { component: 'socket-handler-server' });
socket.destroy();
return;
}
} else if (typeof target.port === 'number') {
port = target.port;
} else {
port = context.port;
}
logger.log('debug', `Dynamic forward: ${context.clientIp} -> ${host}:${port}`, { component: 'socket-handler-server' });
// Connect to the resolved target
const backend = plugins.net.connect(port, host, () => {
// Pipe bidirectionally
socket.pipe(backend);
backend.pipe(socket);
});
backend.on('error', (err) => {
logger.log('error', `Dynamic forward backend error: ${err.message}`, { component: 'socket-handler-server' });
socket.destroy();
});
socket.on('error', () => {
backend.destroy();
});
socket.on('close', () => {
backend.destroy();
});
backend.on('close', () => {
socket.destroy();
});
}
}

View File

@@ -1,4 +1,4 @@
import { Buffer } from 'buffer';
import { Buffer } from 'node:buffer';
import {
TlsRecordType,
TlsHandshakeType,