Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3bfa451341 | |||
| 7b3ab7378b | |||
| 527c616cd4 | |||
| b04eb0ab17 | |||
| a55ff20391 | |||
| 3c24bf659b |
21
changelog.md
21
changelog.md
@@ -1,5 +1,26 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-04-04 - 27.3.0 - feat(test)
|
||||
add end-to-end WebSocket proxy test coverage
|
||||
|
||||
- add comprehensive WebSocket e2e tests for upgrade handling, bidirectional messaging, header forwarding, close propagation, and large payloads
|
||||
- add ws and @types/ws as development dependencies to support the new test suite
|
||||
|
||||
## 2026-04-04 - 27.2.0 - feat(metrics)
|
||||
add frontend and backend protocol distribution metrics
|
||||
|
||||
- track active and total frontend protocol counts for h1, h2, h3, websocket, and other traffic
|
||||
- add backend protocol counters with RAII guards to ensure metrics are decremented on all exit paths
|
||||
- expose protocol distribution through the TypeScript metrics interfaces and Rust metrics adapter
|
||||
|
||||
## 2026-03-27 - 27.1.0 - feat(rustproxy-passthrough)
|
||||
add selective connection recycling for route, security, and certificate updates
|
||||
|
||||
- introduce a shared connection registry to track active TCP and QUIC connections by route, source IP, and domain
|
||||
- recycle only affected connections when route actions or security rules change instead of broadly invalidating traffic
|
||||
- gracefully recycle existing connections when TLS certificates change for a domain
|
||||
- apply route-level IP security checks to QUIC connections and share route cancellation state with UDP listeners
|
||||
|
||||
## 2026-03-26 - 27.0.0 - BREAKING CHANGE(smart-proxy)
|
||||
remove route helper APIs and standardize route configuration on plain route objects
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartproxy",
|
||||
"version": "27.0.0",
|
||||
"version": "27.3.0",
|
||||
"private": false,
|
||||
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
|
||||
"main": "dist_ts/index.js",
|
||||
@@ -22,8 +22,10 @@
|
||||
"@git.zone/tstest": "^3.6.0",
|
||||
"@push.rocks/smartserve": "^2.0.3",
|
||||
"@types/node": "^25.5.0",
|
||||
"@types/ws": "^8.18.1",
|
||||
"typescript": "^6.0.2",
|
||||
"why-is-node-running": "^3.2.2"
|
||||
"why-is-node-running": "^3.2.2",
|
||||
"ws": "^8.20.0"
|
||||
},
|
||||
"dependencies": {
|
||||
"@push.rocks/smartcrypto": "^2.0.4",
|
||||
|
||||
22
pnpm-lock.yaml
generated
22
pnpm-lock.yaml
generated
@@ -45,12 +45,18 @@ importers:
|
||||
'@types/node':
|
||||
specifier: ^25.5.0
|
||||
version: 25.5.0
|
||||
'@types/ws':
|
||||
specifier: ^8.18.1
|
||||
version: 8.18.1
|
||||
typescript:
|
||||
specifier: ^6.0.2
|
||||
version: 6.0.2
|
||||
why-is-node-running:
|
||||
specifier: ^3.2.2
|
||||
version: 3.2.2
|
||||
ws:
|
||||
specifier: ^8.20.0
|
||||
version: 8.20.0
|
||||
|
||||
packages:
|
||||
|
||||
@@ -3304,18 +3310,6 @@ packages:
|
||||
wrappy@1.0.2:
|
||||
resolution: {integrity: sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=}
|
||||
|
||||
ws@8.19.0:
|
||||
resolution: {integrity: sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg==}
|
||||
engines: {node: '>=10.0.0'}
|
||||
peerDependencies:
|
||||
bufferutil: ^4.0.1
|
||||
utf-8-validate: '>=5.0.2'
|
||||
peerDependenciesMeta:
|
||||
bufferutil:
|
||||
optional: true
|
||||
utf-8-validate:
|
||||
optional: true
|
||||
|
||||
ws@8.20.0:
|
||||
resolution: {integrity: sha512-sAt8BhgNbzCtgGbt2OxmpuryO63ZoDk/sqaB/znQm94T4fCEsy/yV+7CdC1kJhOU9lboAEU7R3kquuycDoibVA==}
|
||||
engines: {node: '>=10.0.0'}
|
||||
@@ -5296,7 +5290,7 @@ snapshots:
|
||||
'@push.rocks/smartenv': 6.0.0
|
||||
'@push.rocks/smartlog': 3.2.1
|
||||
'@push.rocks/smartpath': 6.0.0
|
||||
ws: 8.19.0
|
||||
ws: 8.20.0
|
||||
transitivePeerDependencies:
|
||||
- bufferutil
|
||||
- utf-8-validate
|
||||
@@ -8033,8 +8027,6 @@ snapshots:
|
||||
|
||||
wrappy@1.0.2: {}
|
||||
|
||||
ws@8.19.0: {}
|
||||
|
||||
ws@8.20.0: {}
|
||||
|
||||
xml-parse-from-string@1.0.1: {}
|
||||
|
||||
@@ -110,6 +110,36 @@ impl Drop for ActiveRequestGuard {
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII guard that calls frontend_protocol_closed or backend_protocol_closed on drop.
|
||||
/// Ensures active protocol counters are decremented on all exit paths.
|
||||
pub(crate) struct ProtocolGuard {
|
||||
metrics: Arc<MetricsCollector>,
|
||||
version: &'static str,
|
||||
is_frontend: bool,
|
||||
}
|
||||
|
||||
impl ProtocolGuard {
|
||||
pub fn frontend(metrics: Arc<MetricsCollector>, version: &'static str) -> Self {
|
||||
metrics.frontend_protocol_opened(version);
|
||||
Self { metrics, version, is_frontend: true }
|
||||
}
|
||||
|
||||
pub fn backend(metrics: Arc<MetricsCollector>, version: &'static str) -> Self {
|
||||
metrics.backend_protocol_opened(version);
|
||||
Self { metrics, version, is_frontend: false }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ProtocolGuard {
|
||||
fn drop(&mut self) {
|
||||
if self.is_frontend {
|
||||
self.metrics.frontend_protocol_closed(self.version);
|
||||
} else {
|
||||
self.metrics.backend_protocol_closed(self.version);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Backend stream that can be either plain TCP or TLS-wrapped.
|
||||
/// Used for `terminate-and-reencrypt` mode where the backend requires TLS.
|
||||
pub(crate) enum BackendStream {
|
||||
@@ -625,6 +655,18 @@ impl HttpProxyService {
|
||||
.map(|p| p.as_str().eq_ignore_ascii_case("websocket"))
|
||||
.unwrap_or(false);
|
||||
|
||||
// Track frontend protocol for distribution metrics (h1/h2/h3/ws)
|
||||
let frontend_proto: &'static str = if is_h1_websocket || is_h2_websocket {
|
||||
"ws"
|
||||
} else {
|
||||
match req.version() {
|
||||
hyper::Version::HTTP_2 => "h2",
|
||||
hyper::Version::HTTP_3 => "h3",
|
||||
_ => "h1", // HTTP/1.0, HTTP/1.1
|
||||
}
|
||||
};
|
||||
let _frontend_proto_guard = ProtocolGuard::frontend(Arc::clone(&self.metrics), frontend_proto);
|
||||
|
||||
if is_h1_websocket || is_h2_websocket {
|
||||
let result = self.handle_websocket_upgrade(
|
||||
req, peer_addr, &upstream, route_match.route, route_id, &upstream_key, cancel, &ip_str, is_h2_websocket,
|
||||
@@ -2383,6 +2425,8 @@ impl HttpProxyService {
|
||||
selector: upstream_selector,
|
||||
key: upstream_key_owned.clone(),
|
||||
};
|
||||
// Track backend WebSocket connection — guard decrements on tunnel close
|
||||
let _backend_ws_guard = ProtocolGuard::backend(Arc::clone(&metrics), "ws");
|
||||
|
||||
let client_upgraded = match on_client_upgrade.await {
|
||||
Ok(upgraded) => upgraded,
|
||||
|
||||
@@ -33,6 +33,9 @@ pub struct Metrics {
|
||||
pub total_datagrams_out: u64,
|
||||
// Protocol detection cache snapshot (populated by RustProxy from HttpProxyService)
|
||||
pub detected_protocols: Vec<ProtocolCacheEntryMetric>,
|
||||
// Protocol distribution for frontend (client→proxy) and backend (proxy→upstream)
|
||||
pub frontend_protocols: ProtocolMetrics,
|
||||
pub backend_protocols: ProtocolMetrics,
|
||||
}
|
||||
|
||||
/// Per-route metrics.
|
||||
@@ -99,6 +102,23 @@ pub struct ProtocolCacheEntryMetric {
|
||||
pub h3_consecutive_failures: Option<u32>,
|
||||
}
|
||||
|
||||
/// Protocol distribution metrics for frontend (client→proxy) and backend (proxy→upstream).
|
||||
/// Tracks active and total counts for each protocol category.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ProtocolMetrics {
|
||||
pub h1_active: u64,
|
||||
pub h1_total: u64,
|
||||
pub h2_active: u64,
|
||||
pub h2_total: u64,
|
||||
pub h3_active: u64,
|
||||
pub h3_total: u64,
|
||||
pub ws_active: u64,
|
||||
pub ws_total: u64,
|
||||
pub other_active: u64,
|
||||
pub other_total: u64,
|
||||
}
|
||||
|
||||
/// Statistics snapshot.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
@@ -170,6 +190,30 @@ pub struct MetricsCollector {
|
||||
total_datagrams_in: AtomicU64,
|
||||
total_datagrams_out: AtomicU64,
|
||||
|
||||
// ── Frontend protocol tracking (h1/h2/h3/ws/other) ──
|
||||
frontend_h1_active: AtomicU64,
|
||||
frontend_h1_total: AtomicU64,
|
||||
frontend_h2_active: AtomicU64,
|
||||
frontend_h2_total: AtomicU64,
|
||||
frontend_h3_active: AtomicU64,
|
||||
frontend_h3_total: AtomicU64,
|
||||
frontend_ws_active: AtomicU64,
|
||||
frontend_ws_total: AtomicU64,
|
||||
frontend_other_active: AtomicU64,
|
||||
frontend_other_total: AtomicU64,
|
||||
|
||||
// ── Backend protocol tracking (h1/h2/h3/ws/other) ──
|
||||
backend_h1_active: AtomicU64,
|
||||
backend_h1_total: AtomicU64,
|
||||
backend_h2_active: AtomicU64,
|
||||
backend_h2_total: AtomicU64,
|
||||
backend_h3_active: AtomicU64,
|
||||
backend_h3_total: AtomicU64,
|
||||
backend_ws_active: AtomicU64,
|
||||
backend_ws_total: AtomicU64,
|
||||
backend_other_active: AtomicU64,
|
||||
backend_other_total: AtomicU64,
|
||||
|
||||
// ── Lock-free pending throughput counters (hot path) ──
|
||||
global_pending_tp_in: AtomicU64,
|
||||
global_pending_tp_out: AtomicU64,
|
||||
@@ -221,6 +265,26 @@ impl MetricsCollector {
|
||||
total_http_requests: AtomicU64::new(0),
|
||||
pending_http_requests: AtomicU64::new(0),
|
||||
http_request_throughput: Mutex::new(ThroughputTracker::new(retention_seconds)),
|
||||
frontend_h1_active: AtomicU64::new(0),
|
||||
frontend_h1_total: AtomicU64::new(0),
|
||||
frontend_h2_active: AtomicU64::new(0),
|
||||
frontend_h2_total: AtomicU64::new(0),
|
||||
frontend_h3_active: AtomicU64::new(0),
|
||||
frontend_h3_total: AtomicU64::new(0),
|
||||
frontend_ws_active: AtomicU64::new(0),
|
||||
frontend_ws_total: AtomicU64::new(0),
|
||||
frontend_other_active: AtomicU64::new(0),
|
||||
frontend_other_total: AtomicU64::new(0),
|
||||
backend_h1_active: AtomicU64::new(0),
|
||||
backend_h1_total: AtomicU64::new(0),
|
||||
backend_h2_active: AtomicU64::new(0),
|
||||
backend_h2_total: AtomicU64::new(0),
|
||||
backend_h3_active: AtomicU64::new(0),
|
||||
backend_h3_total: AtomicU64::new(0),
|
||||
backend_ws_active: AtomicU64::new(0),
|
||||
backend_ws_total: AtomicU64::new(0),
|
||||
backend_other_active: AtomicU64::new(0),
|
||||
backend_other_total: AtomicU64::new(0),
|
||||
global_pending_tp_in: AtomicU64::new(0),
|
||||
global_pending_tp_out: AtomicU64::new(0),
|
||||
route_pending_tp: DashMap::new(),
|
||||
@@ -411,6 +475,62 @@ impl MetricsCollector {
|
||||
self.total_datagrams_out.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
// ── Frontend/backend protocol distribution tracking ──
|
||||
|
||||
/// Get the (active, total) counter pair for a frontend protocol.
|
||||
fn frontend_proto_counters(&self, proto: &str) -> (&AtomicU64, &AtomicU64) {
|
||||
match proto {
|
||||
"h2" => (&self.frontend_h2_active, &self.frontend_h2_total),
|
||||
"h3" => (&self.frontend_h3_active, &self.frontend_h3_total),
|
||||
"ws" => (&self.frontend_ws_active, &self.frontend_ws_total),
|
||||
"other" => (&self.frontend_other_active, &self.frontend_other_total),
|
||||
_ => (&self.frontend_h1_active, &self.frontend_h1_total), // h1 + default
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the (active, total) counter pair for a backend protocol.
|
||||
fn backend_proto_counters(&self, proto: &str) -> (&AtomicU64, &AtomicU64) {
|
||||
match proto {
|
||||
"h2" => (&self.backend_h2_active, &self.backend_h2_total),
|
||||
"h3" => (&self.backend_h3_active, &self.backend_h3_total),
|
||||
"ws" => (&self.backend_ws_active, &self.backend_ws_total),
|
||||
"other" => (&self.backend_other_active, &self.backend_other_total),
|
||||
_ => (&self.backend_h1_active, &self.backend_h1_total), // h1 + default
|
||||
}
|
||||
}
|
||||
|
||||
/// Record a frontend request/connection opened with a given protocol.
|
||||
pub fn frontend_protocol_opened(&self, proto: &str) {
|
||||
let (active, total) = self.frontend_proto_counters(proto);
|
||||
active.fetch_add(1, Ordering::Relaxed);
|
||||
total.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Record a frontend request/connection closed with a given protocol.
|
||||
pub fn frontend_protocol_closed(&self, proto: &str) {
|
||||
let (active, _) = self.frontend_proto_counters(proto);
|
||||
let val = active.load(Ordering::Relaxed);
|
||||
if val > 0 {
|
||||
active.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
/// Record a backend connection opened with a given protocol.
|
||||
pub fn backend_protocol_opened(&self, proto: &str) {
|
||||
let (active, total) = self.backend_proto_counters(proto);
|
||||
active.fetch_add(1, Ordering::Relaxed);
|
||||
total.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Record a backend connection closed with a given protocol.
|
||||
pub fn backend_protocol_closed(&self, proto: &str) {
|
||||
let (active, _) = self.backend_proto_counters(proto);
|
||||
let val = active.load(Ordering::Relaxed);
|
||||
if val > 0 {
|
||||
active.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
// ── Per-backend recording methods ──
|
||||
|
||||
/// Record a successful backend connection with its connect duration.
|
||||
@@ -866,6 +986,30 @@ impl MetricsCollector {
|
||||
total_datagrams_in: self.total_datagrams_in.load(Ordering::Relaxed),
|
||||
total_datagrams_out: self.total_datagrams_out.load(Ordering::Relaxed),
|
||||
detected_protocols: vec![],
|
||||
frontend_protocols: ProtocolMetrics {
|
||||
h1_active: self.frontend_h1_active.load(Ordering::Relaxed),
|
||||
h1_total: self.frontend_h1_total.load(Ordering::Relaxed),
|
||||
h2_active: self.frontend_h2_active.load(Ordering::Relaxed),
|
||||
h2_total: self.frontend_h2_total.load(Ordering::Relaxed),
|
||||
h3_active: self.frontend_h3_active.load(Ordering::Relaxed),
|
||||
h3_total: self.frontend_h3_total.load(Ordering::Relaxed),
|
||||
ws_active: self.frontend_ws_active.load(Ordering::Relaxed),
|
||||
ws_total: self.frontend_ws_total.load(Ordering::Relaxed),
|
||||
other_active: self.frontend_other_active.load(Ordering::Relaxed),
|
||||
other_total: self.frontend_other_total.load(Ordering::Relaxed),
|
||||
},
|
||||
backend_protocols: ProtocolMetrics {
|
||||
h1_active: self.backend_h1_active.load(Ordering::Relaxed),
|
||||
h1_total: self.backend_h1_total.load(Ordering::Relaxed),
|
||||
h2_active: self.backend_h2_active.load(Ordering::Relaxed),
|
||||
h2_total: self.backend_h2_total.load(Ordering::Relaxed),
|
||||
h3_active: self.backend_h3_active.load(Ordering::Relaxed),
|
||||
h3_total: self.backend_h3_total.load(Ordering::Relaxed),
|
||||
ws_active: self.backend_ws_active.load(Ordering::Relaxed),
|
||||
ws_total: self.backend_ws_total.load(Ordering::Relaxed),
|
||||
other_active: self.backend_other_active.load(Ordering::Relaxed),
|
||||
other_total: self.backend_other_total.load(Ordering::Relaxed),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
329
rust/crates/rustproxy-passthrough/src/connection_registry.rs
Normal file
329
rust/crates/rustproxy-passthrough/src/connection_registry.rs
Normal file
@@ -0,0 +1,329 @@
|
||||
//! Shared connection registry for selective connection recycling.
|
||||
//!
|
||||
//! Tracks active connections across both TCP and QUIC with metadata
|
||||
//! (source IP, SNI domain, route ID, cancel token) so that connections
|
||||
//! can be selectively recycled when certificates, security rules, or
|
||||
//! route targets change.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::net::IpAddr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
use dashmap::DashMap;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
|
||||
use rustproxy_config::RouteSecurity;
|
||||
use rustproxy_http::request_filter::RequestFilter;
|
||||
use rustproxy_routing::matchers::domain_matches;
|
||||
|
||||
/// Metadata about an active connection.
|
||||
pub struct ConnectionEntry {
|
||||
/// Per-connection cancel token (child of per-route token).
|
||||
pub cancel: CancellationToken,
|
||||
/// Client source IP.
|
||||
pub source_ip: IpAddr,
|
||||
/// SNI domain from TLS handshake (None for non-TLS connections).
|
||||
pub domain: Option<String>,
|
||||
/// Route ID this connection was matched to (None if route has no ID).
|
||||
pub route_id: Option<String>,
|
||||
}
|
||||
|
||||
/// Transport-agnostic registry of active connections.
|
||||
///
|
||||
/// Used by both `TcpListenerManager` and `UdpListenerManager` to track
|
||||
/// connections and enable selective recycling on config changes.
|
||||
pub struct ConnectionRegistry {
|
||||
connections: DashMap<u64, ConnectionEntry>,
|
||||
next_id: AtomicU64,
|
||||
}
|
||||
|
||||
impl ConnectionRegistry {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
connections: DashMap::new(),
|
||||
next_id: AtomicU64::new(1),
|
||||
}
|
||||
}
|
||||
|
||||
/// Register a connection and return its ID + RAII guard.
|
||||
///
|
||||
/// The guard automatically removes the connection from the registry on drop.
|
||||
pub fn register(self: &Arc<Self>, entry: ConnectionEntry) -> (u64, ConnectionRegistryGuard) {
|
||||
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
|
||||
self.connections.insert(id, entry);
|
||||
let guard = ConnectionRegistryGuard {
|
||||
registry: Arc::clone(self),
|
||||
conn_id: id,
|
||||
};
|
||||
(id, guard)
|
||||
}
|
||||
|
||||
/// Number of tracked connections (for metrics/debugging).
|
||||
pub fn len(&self) -> usize {
|
||||
self.connections.len()
|
||||
}
|
||||
|
||||
/// Recycle connections whose SNI domain matches a renewed certificate domain.
|
||||
///
|
||||
/// Uses bidirectional domain matching so that:
|
||||
/// - Cert `*.example.com` recycles connections for `sub.example.com`
|
||||
/// - Cert `sub.example.com` recycles connections on routes with `*.example.com`
|
||||
pub fn recycle_for_cert_change(&self, cert_domain: &str) {
|
||||
let mut recycled = 0u64;
|
||||
self.connections.retain(|_, entry| {
|
||||
let matches = entry.domain.as_deref()
|
||||
.map(|d| domain_matches(cert_domain, d) || domain_matches(d, cert_domain))
|
||||
.unwrap_or(false);
|
||||
if matches {
|
||||
entry.cancel.cancel();
|
||||
recycled += 1;
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
if recycled > 0 {
|
||||
info!(
|
||||
"Recycled {} connection(s) for cert change on domain '{}'",
|
||||
recycled, cert_domain
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Recycle connections on a route whose security config changed.
|
||||
///
|
||||
/// Re-evaluates each connection's source IP against the new security rules.
|
||||
/// Only connections from now-blocked IPs are terminated; allowed IPs are undisturbed.
|
||||
pub fn recycle_for_security_change(&self, route_id: &str, new_security: &RouteSecurity) {
|
||||
let mut recycled = 0u64;
|
||||
self.connections.retain(|_, entry| {
|
||||
if entry.route_id.as_deref() == Some(route_id) {
|
||||
if !RequestFilter::check_ip_security(new_security, &entry.source_ip) {
|
||||
info!(
|
||||
"Terminating connection from {} — IP now blocked on route '{}'",
|
||||
entry.source_ip, route_id
|
||||
);
|
||||
entry.cancel.cancel();
|
||||
recycled += 1;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
});
|
||||
if recycled > 0 {
|
||||
info!(
|
||||
"Recycled {} connection(s) for security change on route '{}'",
|
||||
recycled, route_id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Recycle all connections on a route (e.g., when targets changed).
|
||||
pub fn recycle_for_route_change(&self, route_id: &str) {
|
||||
let mut recycled = 0u64;
|
||||
self.connections.retain(|_, entry| {
|
||||
if entry.route_id.as_deref() == Some(route_id) {
|
||||
entry.cancel.cancel();
|
||||
recycled += 1;
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
if recycled > 0 {
|
||||
info!(
|
||||
"Recycled {} connection(s) for config change on route '{}'",
|
||||
recycled, route_id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove connections on routes that no longer exist.
|
||||
///
|
||||
/// This complements per-route CancellationToken cancellation —
|
||||
/// the token cascade handles graceful shutdown, this cleans up the registry.
|
||||
pub fn cleanup_removed_routes(&self, active_route_ids: &HashSet<String>) {
|
||||
self.connections.retain(|_, entry| {
|
||||
match &entry.route_id {
|
||||
Some(id) => active_route_ids.contains(id),
|
||||
None => true, // keep connections without a route ID
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII guard that removes a connection from the registry on drop.
|
||||
pub struct ConnectionRegistryGuard {
|
||||
registry: Arc<ConnectionRegistry>,
|
||||
conn_id: u64,
|
||||
}
|
||||
|
||||
impl Drop for ConnectionRegistryGuard {
|
||||
fn drop(&mut self) {
|
||||
self.registry.connections.remove(&self.conn_id);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn make_registry() -> Arc<ConnectionRegistry> {
|
||||
Arc::new(ConnectionRegistry::new())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_register_and_guard_cleanup() {
|
||||
let reg = make_registry();
|
||||
let token = CancellationToken::new();
|
||||
let entry = ConnectionEntry {
|
||||
cancel: token.clone(),
|
||||
source_ip: "10.0.0.1".parse().unwrap(),
|
||||
domain: Some("example.com".to_string()),
|
||||
route_id: Some("route-1".to_string()),
|
||||
};
|
||||
let (id, guard) = reg.register(entry);
|
||||
assert_eq!(reg.len(), 1);
|
||||
assert!(reg.connections.contains_key(&id));
|
||||
|
||||
drop(guard);
|
||||
assert_eq!(reg.len(), 0);
|
||||
assert!(!token.is_cancelled());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_recycle_for_cert_change_exact() {
|
||||
let reg = make_registry();
|
||||
let t1 = CancellationToken::new();
|
||||
let t2 = CancellationToken::new();
|
||||
let (_, _g1) = reg.register(ConnectionEntry {
|
||||
cancel: t1.clone(),
|
||||
source_ip: "10.0.0.1".parse().unwrap(),
|
||||
domain: Some("api.example.com".to_string()),
|
||||
route_id: Some("r1".to_string()),
|
||||
});
|
||||
let (_, _g2) = reg.register(ConnectionEntry {
|
||||
cancel: t2.clone(),
|
||||
source_ip: "10.0.0.2".parse().unwrap(),
|
||||
domain: Some("other.com".to_string()),
|
||||
route_id: Some("r2".to_string()),
|
||||
});
|
||||
|
||||
reg.recycle_for_cert_change("api.example.com");
|
||||
assert!(t1.is_cancelled());
|
||||
assert!(!t2.is_cancelled());
|
||||
// Registry retains unmatched entry (guard still alive keeps it too,
|
||||
// but the retain removed the matched one before guard could)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_recycle_for_cert_change_wildcard() {
|
||||
let reg = make_registry();
|
||||
let t1 = CancellationToken::new();
|
||||
let t2 = CancellationToken::new();
|
||||
let (_, _g1) = reg.register(ConnectionEntry {
|
||||
cancel: t1.clone(),
|
||||
source_ip: "10.0.0.1".parse().unwrap(),
|
||||
domain: Some("sub.example.com".to_string()),
|
||||
route_id: Some("r1".to_string()),
|
||||
});
|
||||
let (_, _g2) = reg.register(ConnectionEntry {
|
||||
cancel: t2.clone(),
|
||||
source_ip: "10.0.0.2".parse().unwrap(),
|
||||
domain: Some("other.com".to_string()),
|
||||
route_id: Some("r2".to_string()),
|
||||
});
|
||||
|
||||
// Wildcard cert should match subdomain
|
||||
reg.recycle_for_cert_change("*.example.com");
|
||||
assert!(t1.is_cancelled());
|
||||
assert!(!t2.is_cancelled());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_recycle_for_security_change() {
|
||||
let reg = make_registry();
|
||||
let t1 = CancellationToken::new();
|
||||
let t2 = CancellationToken::new();
|
||||
let (_, _g1) = reg.register(ConnectionEntry {
|
||||
cancel: t1.clone(),
|
||||
source_ip: "10.0.0.1".parse().unwrap(),
|
||||
domain: None,
|
||||
route_id: Some("r1".to_string()),
|
||||
});
|
||||
let (_, _g2) = reg.register(ConnectionEntry {
|
||||
cancel: t2.clone(),
|
||||
source_ip: "10.0.0.2".parse().unwrap(),
|
||||
domain: None,
|
||||
route_id: Some("r1".to_string()),
|
||||
});
|
||||
|
||||
// Block 10.0.0.1, allow 10.0.0.2
|
||||
let security = RouteSecurity {
|
||||
ip_allow_list: None,
|
||||
ip_block_list: Some(vec!["10.0.0.1".to_string()]),
|
||||
max_connections: None,
|
||||
authentication: None,
|
||||
rate_limit: None,
|
||||
basic_auth: None,
|
||||
jwt_auth: None,
|
||||
};
|
||||
|
||||
reg.recycle_for_security_change("r1", &security);
|
||||
assert!(t1.is_cancelled());
|
||||
assert!(!t2.is_cancelled());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_recycle_for_route_change() {
|
||||
let reg = make_registry();
|
||||
let t1 = CancellationToken::new();
|
||||
let t2 = CancellationToken::new();
|
||||
let (_, _g1) = reg.register(ConnectionEntry {
|
||||
cancel: t1.clone(),
|
||||
source_ip: "10.0.0.1".parse().unwrap(),
|
||||
domain: None,
|
||||
route_id: Some("r1".to_string()),
|
||||
});
|
||||
let (_, _g2) = reg.register(ConnectionEntry {
|
||||
cancel: t2.clone(),
|
||||
source_ip: "10.0.0.2".parse().unwrap(),
|
||||
domain: None,
|
||||
route_id: Some("r2".to_string()),
|
||||
});
|
||||
|
||||
reg.recycle_for_route_change("r1");
|
||||
assert!(t1.is_cancelled());
|
||||
assert!(!t2.is_cancelled());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cleanup_removed_routes() {
|
||||
let reg = make_registry();
|
||||
let t1 = CancellationToken::new();
|
||||
let t2 = CancellationToken::new();
|
||||
let (_, _g1) = reg.register(ConnectionEntry {
|
||||
cancel: t1.clone(),
|
||||
source_ip: "10.0.0.1".parse().unwrap(),
|
||||
domain: None,
|
||||
route_id: Some("active".to_string()),
|
||||
});
|
||||
let (_, _g2) = reg.register(ConnectionEntry {
|
||||
cancel: t2.clone(),
|
||||
source_ip: "10.0.0.2".parse().unwrap(),
|
||||
domain: None,
|
||||
route_id: Some("removed".to_string()),
|
||||
});
|
||||
|
||||
let mut active = HashSet::new();
|
||||
active.insert("active".to_string());
|
||||
reg.cleanup_removed_routes(&active);
|
||||
|
||||
// "removed" route entry was cleaned from registry
|
||||
// (but guard is still alive so len may differ — the retain already removed it)
|
||||
assert!(!t1.is_cancelled()); // not cancelled by cleanup, only by token cascade
|
||||
assert!(!t2.is_cancelled()); // cleanup doesn't cancel, just removes from registry
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,7 @@ pub mod forwarder;
|
||||
pub mod proxy_protocol;
|
||||
pub mod tls_handler;
|
||||
pub mod connection_tracker;
|
||||
pub mod connection_registry;
|
||||
pub mod socket_opts;
|
||||
pub mod udp_session;
|
||||
pub mod udp_listener;
|
||||
@@ -21,6 +22,7 @@ pub use forwarder::*;
|
||||
pub use proxy_protocol::*;
|
||||
pub use tls_handler::*;
|
||||
pub use connection_tracker::*;
|
||||
pub use connection_registry::*;
|
||||
pub use socket_opts::*;
|
||||
pub use udp_session::*;
|
||||
pub use udp_listener::*;
|
||||
|
||||
@@ -30,6 +30,7 @@ use rustproxy_routing::{MatchContext, RouteManager};
|
||||
use rustproxy_http::h3_service::H3ProxyService;
|
||||
|
||||
use crate::connection_tracker::ConnectionTracker;
|
||||
use crate::connection_registry::{ConnectionEntry, ConnectionRegistry};
|
||||
|
||||
/// Create a QUIC server endpoint on the given port with the provided TLS config.
|
||||
///
|
||||
@@ -350,6 +351,8 @@ pub async fn quic_accept_loop(
|
||||
cancel: CancellationToken,
|
||||
h3_service: Option<Arc<H3ProxyService>>,
|
||||
real_client_map: Option<Arc<DashMap<SocketAddr, SocketAddr>>>,
|
||||
route_cancels: Arc<DashMap<String, CancellationToken>>,
|
||||
connection_registry: Arc<ConnectionRegistry>,
|
||||
) {
|
||||
loop {
|
||||
let incoming = tokio::select! {
|
||||
@@ -406,17 +409,48 @@ pub async fn quic_accept_loop(
|
||||
}
|
||||
};
|
||||
|
||||
// Check route-level IP security (previously missing for QUIC)
|
||||
if let Some(ref security) = route.security {
|
||||
if !rustproxy_http::request_filter::RequestFilter::check_ip_security(
|
||||
security, &ip,
|
||||
) {
|
||||
debug!("QUIC connection from {} blocked by route security", real_addr);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
conn_tracker.connection_opened(&ip);
|
||||
let route_id = route.name.clone().or(route.id.clone());
|
||||
metrics.connection_opened(route_id.as_deref(), Some(&ip_str));
|
||||
|
||||
// Resolve per-route cancel token (child of global cancel)
|
||||
let route_cancel = match route_id.as_deref() {
|
||||
Some(id) => route_cancels.entry(id.to_string())
|
||||
.or_insert_with(|| cancel.child_token())
|
||||
.clone(),
|
||||
None => cancel.child_token(),
|
||||
};
|
||||
// Per-connection child token for selective recycling
|
||||
let conn_cancel = route_cancel.child_token();
|
||||
|
||||
// Register in connection registry
|
||||
let registry = Arc::clone(&connection_registry);
|
||||
let reg_entry = ConnectionEntry {
|
||||
cancel: conn_cancel.clone(),
|
||||
source_ip: ip,
|
||||
domain: None, // QUIC Initial is encrypted, domain comes later via H3 :authority
|
||||
route_id: route_id.clone(),
|
||||
};
|
||||
|
||||
let metrics = Arc::clone(&metrics);
|
||||
let conn_tracker = Arc::clone(&conn_tracker);
|
||||
let cancel = cancel.child_token();
|
||||
let h3_svc = h3_service.clone();
|
||||
let real_client_addr = if real_addr != remote_addr { Some(real_addr) } else { None };
|
||||
|
||||
tokio::spawn(async move {
|
||||
// Register in connection registry (RAII guard removes on drop)
|
||||
let (_conn_id, _registry_guard) = registry.register(reg_entry);
|
||||
|
||||
// RAII guard: ensures metrics/tracker cleanup even on panic
|
||||
struct QuicConnGuard {
|
||||
tracker: Arc<ConnectionTracker>,
|
||||
@@ -439,7 +473,7 @@ pub async fn quic_accept_loop(
|
||||
route_id,
|
||||
};
|
||||
|
||||
match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel, h3_svc, real_client_addr).await {
|
||||
match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &conn_cancel, h3_svc, real_client_addr).await {
|
||||
Ok(()) => debug!("QUIC connection from {} completed", real_addr),
|
||||
Err(e) => debug!("QUIC connection from {} error: {}", real_addr, e),
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ use crate::sni_parser;
|
||||
use crate::forwarder;
|
||||
use crate::tls_handler;
|
||||
use crate::connection_tracker::ConnectionTracker;
|
||||
use crate::connection_registry::{ConnectionEntry, ConnectionRegistry};
|
||||
use crate::socket_opts;
|
||||
|
||||
/// RAII guard that decrements the active connection metric on drop.
|
||||
@@ -42,6 +43,33 @@ impl Drop for ConnectionGuard {
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII guard for frontend+backend protocol distribution tracking.
|
||||
/// Calls the appropriate _closed methods on drop for both frontend and backend.
|
||||
struct ProtocolGuard {
|
||||
metrics: Arc<MetricsCollector>,
|
||||
frontend_proto: Option<&'static str>,
|
||||
backend_proto: Option<&'static str>,
|
||||
}
|
||||
|
||||
impl ProtocolGuard {
|
||||
fn new(metrics: Arc<MetricsCollector>, frontend: &'static str, backend: &'static str) -> Self {
|
||||
metrics.frontend_protocol_opened(frontend);
|
||||
metrics.backend_protocol_opened(backend);
|
||||
Self { metrics, frontend_proto: Some(frontend), backend_proto: Some(backend) }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ProtocolGuard {
|
||||
fn drop(&mut self) {
|
||||
if let Some(proto) = self.frontend_proto {
|
||||
self.metrics.frontend_protocol_closed(proto);
|
||||
}
|
||||
if let Some(proto) = self.backend_proto {
|
||||
self.metrics.backend_protocol_closed(proto);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII guard that calls ConnectionTracker::connection_closed on drop.
|
||||
/// Ensures per-IP tracking is cleaned up on ALL exit paths — normal, error, or panic.
|
||||
struct ConnectionTrackerGuard {
|
||||
@@ -166,6 +194,8 @@ pub struct TcpListenerManager {
|
||||
/// Per-route cancellation tokens (child of cancel_token).
|
||||
/// When a route is removed, its token is cancelled, terminating all connections on that route.
|
||||
route_cancels: Arc<DashMap<String, CancellationToken>>,
|
||||
/// Shared connection registry for selective recycling on config changes.
|
||||
connection_registry: Arc<ConnectionRegistry>,
|
||||
}
|
||||
|
||||
impl TcpListenerManager {
|
||||
@@ -205,6 +235,7 @@ impl TcpListenerManager {
|
||||
socket_handler_relay: Arc::new(std::sync::RwLock::new(None)),
|
||||
conn_semaphore: Arc::new(tokio::sync::Semaphore::new(max_conns)),
|
||||
route_cancels: Arc::new(DashMap::new()),
|
||||
connection_registry: Arc::new(ConnectionRegistry::new()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -244,6 +275,7 @@ impl TcpListenerManager {
|
||||
socket_handler_relay: Arc::new(std::sync::RwLock::new(None)),
|
||||
conn_semaphore: Arc::new(tokio::sync::Semaphore::new(max_conns)),
|
||||
route_cancels: Arc::new(DashMap::new()),
|
||||
connection_registry: Arc::new(ConnectionRegistry::new()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -328,12 +360,13 @@ impl TcpListenerManager {
|
||||
let relay = Arc::clone(&self.socket_handler_relay);
|
||||
let semaphore = Arc::clone(&self.conn_semaphore);
|
||||
let route_cancels = Arc::clone(&self.route_cancels);
|
||||
let connection_registry = Arc::clone(&self.connection_registry);
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
Self::accept_loop(
|
||||
listener, port, route_manager_swap, metrics, tls_configs,
|
||||
shared_tls_acceptor, http_proxy, conn_config, conn_tracker, cancel, relay,
|
||||
semaphore, route_cancels,
|
||||
semaphore, route_cancels, connection_registry,
|
||||
).await;
|
||||
});
|
||||
|
||||
@@ -446,6 +479,16 @@ impl TcpListenerManager {
|
||||
&self.metrics
|
||||
}
|
||||
|
||||
/// Get a reference to the shared connection registry.
|
||||
pub fn connection_registry(&self) -> &Arc<ConnectionRegistry> {
|
||||
&self.connection_registry
|
||||
}
|
||||
|
||||
/// Get a reference to the per-route cancellation tokens.
|
||||
pub fn route_cancels(&self) -> &Arc<DashMap<String, CancellationToken>> {
|
||||
&self.route_cancels
|
||||
}
|
||||
|
||||
/// Accept loop for a single port.
|
||||
async fn accept_loop(
|
||||
listener: TcpListener,
|
||||
@@ -461,6 +504,7 @@ impl TcpListenerManager {
|
||||
socket_handler_relay: Arc<std::sync::RwLock<Option<String>>>,
|
||||
conn_semaphore: Arc<tokio::sync::Semaphore>,
|
||||
route_cancels: Arc<DashMap<String, CancellationToken>>,
|
||||
connection_registry: Arc<ConnectionRegistry>,
|
||||
) {
|
||||
loop {
|
||||
tokio::select! {
|
||||
@@ -514,6 +558,7 @@ impl TcpListenerManager {
|
||||
let cn = cancel.clone();
|
||||
let sr = Arc::clone(&socket_handler_relay);
|
||||
let rc = Arc::clone(&route_cancels);
|
||||
let cr = Arc::clone(&connection_registry);
|
||||
debug!("Accepted connection from {} on port {}", peer_addr, port);
|
||||
|
||||
tokio::spawn(async move {
|
||||
@@ -522,7 +567,7 @@ impl TcpListenerManager {
|
||||
// RAII guard ensures connection_closed is called on all paths
|
||||
let _ct_guard = ConnectionTrackerGuard::new(ct, ip);
|
||||
let result = Self::handle_connection(
|
||||
stream, port, peer_addr, rm, m, tc, sa, hp, cc, cn, sr, rc,
|
||||
stream, port, peer_addr, rm, m, tc, sa, hp, cc, cn, sr, rc, cr,
|
||||
).await;
|
||||
if let Err(e) = result {
|
||||
warn!("Connection error from {}: {}", peer_addr, e);
|
||||
@@ -553,6 +598,7 @@ impl TcpListenerManager {
|
||||
cancel: CancellationToken,
|
||||
socket_handler_relay: Arc<std::sync::RwLock<Option<String>>>,
|
||||
route_cancels: Arc<DashMap<String, CancellationToken>>,
|
||||
connection_registry: Arc<ConnectionRegistry>,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
@@ -672,12 +718,24 @@ impl TcpListenerManager {
|
||||
let route_id = quick_match.route.id.as_deref();
|
||||
|
||||
// Resolve per-route cancel token (child of global cancel)
|
||||
let conn_cancel = match route_id {
|
||||
let route_cancel = match route_id {
|
||||
Some(id) => route_cancels.entry(id.to_string())
|
||||
.or_insert_with(|| cancel.child_token())
|
||||
.clone(),
|
||||
None => cancel.clone(),
|
||||
};
|
||||
// Per-connection child token for selective recycling
|
||||
let conn_cancel = route_cancel.child_token();
|
||||
|
||||
// Register in connection registry for selective recycling
|
||||
let (_conn_id, _registry_guard) = connection_registry.register(
|
||||
ConnectionEntry {
|
||||
cancel: conn_cancel.clone(),
|
||||
source_ip: peer_addr.ip(),
|
||||
domain: None, // fast path has no domain
|
||||
route_id: route_id.map(|s| s.to_string()),
|
||||
},
|
||||
);
|
||||
|
||||
// Check route-level IP security
|
||||
if let Some(ref security) = quick_match.route.security {
|
||||
@@ -852,12 +910,24 @@ impl TcpListenerManager {
|
||||
// Resolve per-route cancel token (child of global cancel).
|
||||
// When this route is removed via updateRoutes, the token is cancelled,
|
||||
// terminating all connections on this route.
|
||||
let cancel = match route_id {
|
||||
let route_cancel = match route_id {
|
||||
Some(id) => route_cancels.entry(id.to_string())
|
||||
.or_insert_with(|| cancel.child_token())
|
||||
.clone(),
|
||||
None => cancel,
|
||||
};
|
||||
// Per-connection child token for selective recycling
|
||||
let cancel = route_cancel.child_token();
|
||||
|
||||
// Register in connection registry for selective recycling
|
||||
let (_conn_id, _registry_guard) = connection_registry.register(
|
||||
ConnectionEntry {
|
||||
cancel: cancel.clone(),
|
||||
source_ip: peer_addr.ip(),
|
||||
domain: domain.clone(),
|
||||
route_id: route_id.map(|s| s.to_string()),
|
||||
},
|
||||
);
|
||||
|
||||
// Check route-level IP security for passthrough connections
|
||||
if let Some(ref security) = route_match.route.security {
|
||||
@@ -981,6 +1051,9 @@ impl TcpListenerManager {
|
||||
peer_addr, target_host, target_port, domain
|
||||
);
|
||||
|
||||
// Track as "other" protocol (non-HTTP passthrough)
|
||||
let _proto_guard = ProtocolGuard::new(Arc::clone(&metrics), "other", "other");
|
||||
|
||||
let mut actual_buf = vec![0u8; n];
|
||||
stream.read_exact(&mut actual_buf).await?;
|
||||
|
||||
@@ -1047,6 +1120,8 @@ impl TcpListenerManager {
|
||||
"TLS Terminate + TCP: {} -> {}:{} (domain: {:?})",
|
||||
peer_addr, target_host, target_port, domain
|
||||
);
|
||||
// Track as "other" protocol (TLS-terminated non-HTTP)
|
||||
let _proto_guard = ProtocolGuard::new(Arc::clone(&metrics), "other", "other");
|
||||
// Raw TCP forwarding of decrypted stream
|
||||
let backend = match tokio::time::timeout(
|
||||
connect_timeout,
|
||||
@@ -1133,6 +1208,8 @@ impl TcpListenerManager {
|
||||
"TLS Terminate+Reencrypt + TCP: {} -> {}:{}",
|
||||
peer_addr, target_host, target_port
|
||||
);
|
||||
// Track as "other" protocol (TLS-terminated non-HTTP, re-encrypted)
|
||||
let _proto_guard = ProtocolGuard::new(Arc::clone(&metrics), "other", "other");
|
||||
Self::handle_tls_reencrypt_tunnel(
|
||||
buf_stream, &target_host, target_port,
|
||||
peer_addr, Arc::clone(&metrics), route_id,
|
||||
@@ -1149,6 +1226,8 @@ impl TcpListenerManager {
|
||||
Ok(())
|
||||
} else {
|
||||
// Plain TCP forwarding (non-HTTP)
|
||||
// Track as "other" protocol (plain TCP, non-HTTP)
|
||||
let _proto_guard = ProtocolGuard::new(Arc::clone(&metrics), "other", "other");
|
||||
let mut backend = match tokio::time::timeout(
|
||||
connect_timeout,
|
||||
tokio::net::TcpStream::connect(format!("{}:{}", target_host, target_port)),
|
||||
|
||||
@@ -28,6 +28,8 @@ use rustproxy_routing::{MatchContext, RouteManager};
|
||||
|
||||
use rustproxy_http::h3_service::H3ProxyService;
|
||||
|
||||
use crate::connection_registry::ConnectionRegistry;
|
||||
|
||||
use crate::connection_tracker::ConnectionTracker;
|
||||
use crate::udp_session::{SessionKey, UdpSession, UdpSessionConfig, UdpSessionTable};
|
||||
|
||||
@@ -56,6 +58,10 @@ pub struct UdpListenerManager {
|
||||
/// Trusted proxy IPs that may send PROXY protocol v2 headers.
|
||||
/// When non-empty, PROXY v2 detection is enabled on both raw UDP and QUIC paths.
|
||||
proxy_ips: Arc<Vec<IpAddr>>,
|
||||
/// Per-route cancellation tokens (shared with TcpListenerManager).
|
||||
route_cancels: Arc<DashMap<String, CancellationToken>>,
|
||||
/// Shared connection registry for selective recycling.
|
||||
connection_registry: Arc<ConnectionRegistry>,
|
||||
}
|
||||
|
||||
impl Drop for UdpListenerManager {
|
||||
@@ -76,6 +82,8 @@ impl UdpListenerManager {
|
||||
metrics: Arc<MetricsCollector>,
|
||||
conn_tracker: Arc<ConnectionTracker>,
|
||||
cancel_token: CancellationToken,
|
||||
route_cancels: Arc<DashMap<String, CancellationToken>>,
|
||||
connection_registry: Arc<ConnectionRegistry>,
|
||||
) -> Self {
|
||||
Self {
|
||||
listeners: HashMap::new(),
|
||||
@@ -89,6 +97,8 @@ impl UdpListenerManager {
|
||||
relay_reader_cancel: None,
|
||||
h3_service: None,
|
||||
proxy_ips: Arc::new(Vec::new()),
|
||||
route_cancels,
|
||||
connection_registry,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -152,6 +162,8 @@ impl UdpListenerManager {
|
||||
self.cancel_token.child_token(),
|
||||
self.h3_service.clone(),
|
||||
None,
|
||||
Arc::clone(&self.route_cancels),
|
||||
Arc::clone(&self.connection_registry),
|
||||
));
|
||||
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||
info!("QUIC endpoint started on port {}", port);
|
||||
@@ -173,6 +185,8 @@ impl UdpListenerManager {
|
||||
self.cancel_token.child_token(),
|
||||
self.h3_service.clone(),
|
||||
Some(relay.real_client_map),
|
||||
Arc::clone(&self.route_cancels),
|
||||
Arc::clone(&self.connection_registry),
|
||||
));
|
||||
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||
info!("QUIC endpoint with PROXY relay started on port {}", port);
|
||||
@@ -356,6 +370,8 @@ impl UdpListenerManager {
|
||||
self.cancel_token.child_token(),
|
||||
self.h3_service.clone(),
|
||||
None,
|
||||
Arc::clone(&self.route_cancels),
|
||||
Arc::clone(&self.connection_registry),
|
||||
));
|
||||
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||
Ok(())
|
||||
@@ -379,6 +395,8 @@ impl UdpListenerManager {
|
||||
self.cancel_token.child_token(),
|
||||
self.h3_service.clone(),
|
||||
Some(relay.real_client_map),
|
||||
Arc::clone(&self.route_cancels),
|
||||
Arc::clone(&self.connection_registry),
|
||||
));
|
||||
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||
Ok(())
|
||||
|
||||
@@ -281,6 +281,11 @@ impl RouteManager {
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Get all enabled routes.
|
||||
pub fn routes(&self) -> &[RouteConfig] {
|
||||
&self.routes
|
||||
}
|
||||
|
||||
/// Get the total number of enabled routes.
|
||||
pub fn route_count(&self) -> usize {
|
||||
self.routes.len()
|
||||
|
||||
@@ -356,12 +356,17 @@ impl RustProxy {
|
||||
|
||||
// Bind UDP ports (if any)
|
||||
if !udp_ports.is_empty() {
|
||||
let conn_tracker = self.listener_manager.as_ref().unwrap().conn_tracker().clone();
|
||||
let tcp_mgr = self.listener_manager.as_ref().unwrap();
|
||||
let conn_tracker = tcp_mgr.conn_tracker().clone();
|
||||
let route_cancels = tcp_mgr.route_cancels().clone();
|
||||
let connection_registry = tcp_mgr.connection_registry().clone();
|
||||
let mut udp_mgr = UdpListenerManager::new(
|
||||
Arc::clone(&*self.route_table.load()),
|
||||
Arc::clone(&self.metrics),
|
||||
conn_tracker,
|
||||
self.cancel_token.clone(),
|
||||
route_cancels,
|
||||
connection_registry,
|
||||
);
|
||||
udp_mgr.set_proxy_ips(udp_proxy_ips.clone());
|
||||
|
||||
@@ -707,6 +712,9 @@ impl RustProxy {
|
||||
.collect();
|
||||
self.metrics.retain_backends(&active_backends);
|
||||
|
||||
// Capture old route manager for diff-based connection recycling
|
||||
let old_manager = self.route_table.load_full();
|
||||
|
||||
// Atomically swap the route table
|
||||
let new_manager = Arc::new(new_manager);
|
||||
self.route_table.store(Arc::clone(&new_manager));
|
||||
@@ -742,9 +750,47 @@ impl RustProxy {
|
||||
listener.update_route_manager(Arc::clone(&new_manager));
|
||||
// Cancel connections on routes that were removed or disabled
|
||||
listener.invalidate_removed_routes(&active_route_ids);
|
||||
// Clean up registry entries for removed routes
|
||||
listener.connection_registry().cleanup_removed_routes(&active_route_ids);
|
||||
// Prune HTTP proxy caches (rate limiters, regex cache, round-robin counters)
|
||||
listener.prune_http_proxy_caches(&active_route_ids);
|
||||
|
||||
// Diff-based connection recycling for changed routes
|
||||
{
|
||||
let registry = listener.connection_registry();
|
||||
for new_route in &routes {
|
||||
let new_id = match &new_route.id {
|
||||
Some(id) => id.as_str(),
|
||||
None => continue,
|
||||
};
|
||||
// Find corresponding old route
|
||||
let old_route = old_manager.routes().iter().find(|r| {
|
||||
r.id.as_deref() == Some(new_id)
|
||||
});
|
||||
let old_route = match old_route {
|
||||
Some(r) => r,
|
||||
None => continue, // new route, no existing connections to recycle
|
||||
};
|
||||
|
||||
// Security diff: re-evaluate existing connections' IPs
|
||||
let old_sec = serde_json::to_string(&old_route.security).ok();
|
||||
let new_sec = serde_json::to_string(&new_route.security).ok();
|
||||
if old_sec != new_sec {
|
||||
if let Some(ref security) = new_route.security {
|
||||
registry.recycle_for_security_change(new_id, security);
|
||||
}
|
||||
// If security removed entirely (became more permissive), no recycling needed
|
||||
}
|
||||
|
||||
// Action diff (targets, TLS mode, etc.): recycle all connections on route
|
||||
let old_action = serde_json::to_string(&old_route.action).ok();
|
||||
let new_action = serde_json::to_string(&new_route.action).ok();
|
||||
if old_action != new_action {
|
||||
registry.recycle_for_route_change(new_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add new ports
|
||||
for port in &new_ports {
|
||||
if !old_ports.contains(port) {
|
||||
@@ -787,12 +833,16 @@ impl RustProxy {
|
||||
if self.udp_listener_manager.is_none() {
|
||||
if let Some(ref listener) = self.listener_manager {
|
||||
let conn_tracker = listener.conn_tracker().clone();
|
||||
let route_cancels = listener.route_cancels().clone();
|
||||
let connection_registry = listener.connection_registry().clone();
|
||||
let conn_config = Self::build_connection_config(&self.options);
|
||||
let mut udp_mgr = UdpListenerManager::new(
|
||||
Arc::clone(&new_manager),
|
||||
Arc::clone(&self.metrics),
|
||||
conn_tracker,
|
||||
self.cancel_token.clone(),
|
||||
route_cancels,
|
||||
connection_registry,
|
||||
);
|
||||
udp_mgr.set_proxy_ips(conn_config.proxy_ips);
|
||||
self.udp_listener_manager = Some(udp_mgr);
|
||||
@@ -1096,6 +1146,10 @@ impl RustProxy {
|
||||
}
|
||||
|
||||
/// Load a certificate for a domain and hot-swap the TLS configuration.
|
||||
///
|
||||
/// If the cert PEM differs from the currently loaded cert for this domain,
|
||||
/// existing connections for the domain are gracefully recycled (GOAWAY for
|
||||
/// HTTP/2, Connection: close for HTTP/1.1, graceful FIN for TCP).
|
||||
pub async fn load_certificate(
|
||||
&mut self,
|
||||
domain: &str,
|
||||
@@ -1105,6 +1159,12 @@ impl RustProxy {
|
||||
) -> Result<()> {
|
||||
info!("Loading certificate for domain: {}", domain);
|
||||
|
||||
// Check if the cert actually changed (for selective connection recycling)
|
||||
let cert_changed = self.loaded_certs
|
||||
.get(domain)
|
||||
.map(|existing| existing.cert_pem != cert_pem)
|
||||
.unwrap_or(false); // new domain = no existing connections to recycle
|
||||
|
||||
// Store in cert manager if available
|
||||
if let Some(ref cm_arc) = self.cert_manager {
|
||||
let now = std::time::SystemTime::now()
|
||||
@@ -1153,6 +1213,13 @@ impl RustProxy {
|
||||
}
|
||||
}
|
||||
|
||||
// Recycle existing connections if cert actually changed
|
||||
if cert_changed {
|
||||
if let Some(ref listener) = self.listener_manager {
|
||||
listener.connection_registry().recycle_for_cert_change(domain);
|
||||
}
|
||||
}
|
||||
|
||||
info!("Certificate loaded and TLS config updated for {}", domain);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
418
test/test.websocket-e2e.ts
Normal file
418
test/test.websocket-e2e.ts
Normal file
@@ -0,0 +1,418 @@
|
||||
import { tap, expect } from '@git.zone/tstest/tapbundle';
|
||||
import { SmartProxy } from '../ts/index.js';
|
||||
import * as http from 'http';
|
||||
import WebSocket, { WebSocketServer } from 'ws';
|
||||
import { findFreePorts, assertPortsFree } from './helpers/port-allocator.js';
|
||||
|
||||
/**
|
||||
* Helper: create a WebSocket client that connects through the proxy.
|
||||
* Registers the message handler BEFORE awaiting open to avoid race conditions.
|
||||
*/
|
||||
function connectWs(
|
||||
url: string,
|
||||
headers: Record<string, string> = {},
|
||||
opts: WebSocket.ClientOptions = {},
|
||||
): { ws: WebSocket; messages: string[]; opened: Promise<void> } {
|
||||
const messages: string[] = [];
|
||||
const ws = new WebSocket(url, { headers, ...opts });
|
||||
|
||||
// Register message handler immediately — before open fires
|
||||
ws.on('message', (data) => {
|
||||
messages.push(data.toString());
|
||||
});
|
||||
|
||||
const opened = new Promise<void>((resolve, reject) => {
|
||||
const timeout = setTimeout(() => reject(new Error('WebSocket open timeout')), 5000);
|
||||
ws.on('open', () => { clearTimeout(timeout); resolve(); });
|
||||
ws.on('error', (err) => { clearTimeout(timeout); reject(err); });
|
||||
});
|
||||
|
||||
return { ws, messages, opened };
|
||||
}
|
||||
|
||||
/** Wait until `predicate` returns true, with a hard timeout. */
|
||||
function waitFor(predicate: () => boolean, timeoutMs = 5000): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const deadline = setTimeout(() => reject(new Error('waitFor timeout')), timeoutMs);
|
||||
const check = () => {
|
||||
if (predicate()) { clearTimeout(deadline); resolve(); }
|
||||
else setTimeout(check, 30);
|
||||
};
|
||||
check();
|
||||
});
|
||||
}
|
||||
|
||||
/** Graceful close helper */
|
||||
function closeWs(ws: WebSocket): Promise<void> {
|
||||
return new Promise((resolve) => {
|
||||
if (ws.readyState === WebSocket.CLOSED) return resolve();
|
||||
ws.on('close', () => resolve());
|
||||
ws.close();
|
||||
setTimeout(resolve, 2000); // fallback
|
||||
});
|
||||
}
|
||||
|
||||
// ─── Test 1: Basic WebSocket upgrade and bidirectional messaging ───
|
||||
tap.test('should proxy WebSocket connections with bidirectional messaging', async () => {
|
||||
const [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2);
|
||||
|
||||
// Backend: echoes messages with prefix, sends greeting on connect
|
||||
const backendServer = http.createServer();
|
||||
const wss = new WebSocketServer({ server: backendServer });
|
||||
const backendMessages: string[] = [];
|
||||
|
||||
wss.on('connection', (ws) => {
|
||||
ws.on('message', (data) => {
|
||||
const msg = data.toString();
|
||||
backendMessages.push(msg);
|
||||
ws.send(`echo: ${msg}`);
|
||||
});
|
||||
ws.send('hello from backend');
|
||||
});
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
backendServer.listen(BACKEND_PORT, '127.0.0.1', () => resolve());
|
||||
});
|
||||
|
||||
const proxy = new SmartProxy({
|
||||
routes: [{
|
||||
name: 'ws-test-route',
|
||||
match: { ports: PROXY_PORT },
|
||||
action: {
|
||||
type: 'forward',
|
||||
targets: [{ host: '127.0.0.1', port: BACKEND_PORT }],
|
||||
websocket: { enabled: true },
|
||||
},
|
||||
}],
|
||||
});
|
||||
await proxy.start();
|
||||
|
||||
// Connect client — message handler registered before open
|
||||
const { ws, messages, opened } = connectWs(
|
||||
`ws://127.0.0.1:${PROXY_PORT}/`,
|
||||
{ Host: 'test.local' },
|
||||
);
|
||||
await opened;
|
||||
|
||||
// Wait for the backend greeting
|
||||
await waitFor(() => messages.length >= 1);
|
||||
expect(messages[0]).toEqual('hello from backend');
|
||||
|
||||
// Send 3 messages, expect 3 echoes
|
||||
ws.send('ping 1');
|
||||
ws.send('ping 2');
|
||||
ws.send('ping 3');
|
||||
|
||||
await waitFor(() => messages.length >= 4);
|
||||
|
||||
expect(messages).toContain('echo: ping 1');
|
||||
expect(messages).toContain('echo: ping 2');
|
||||
expect(messages).toContain('echo: ping 3');
|
||||
expect(backendMessages).toInclude('ping 1');
|
||||
expect(backendMessages).toInclude('ping 2');
|
||||
expect(backendMessages).toInclude('ping 3');
|
||||
|
||||
await closeWs(ws);
|
||||
await proxy.stop();
|
||||
await new Promise<void>((resolve) => backendServer.close(() => resolve()));
|
||||
await new Promise((r) => setTimeout(r, 500));
|
||||
await assertPortsFree([PROXY_PORT, BACKEND_PORT]);
|
||||
});
|
||||
|
||||
// ─── Test 2: Multiple concurrent WebSocket connections ───
|
||||
tap.test('should handle multiple concurrent WebSocket connections', async () => {
|
||||
const [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2);
|
||||
|
||||
const backendServer = http.createServer();
|
||||
const wss = new WebSocketServer({ server: backendServer });
|
||||
|
||||
let connectionCount = 0;
|
||||
wss.on('connection', (ws) => {
|
||||
const id = ++connectionCount;
|
||||
ws.on('message', (data) => {
|
||||
ws.send(`conn${id}: ${data.toString()}`);
|
||||
});
|
||||
});
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
backendServer.listen(BACKEND_PORT, '127.0.0.1', () => resolve());
|
||||
});
|
||||
|
||||
const proxy = new SmartProxy({
|
||||
routes: [{
|
||||
name: 'ws-multi-route',
|
||||
match: { ports: PROXY_PORT },
|
||||
action: {
|
||||
type: 'forward',
|
||||
targets: [{ host: '127.0.0.1', port: BACKEND_PORT }],
|
||||
websocket: { enabled: true },
|
||||
},
|
||||
}],
|
||||
});
|
||||
await proxy.start();
|
||||
|
||||
const NUM_CLIENTS = 5;
|
||||
const clients: { ws: WebSocket; messages: string[] }[] = [];
|
||||
|
||||
for (let i = 0; i < NUM_CLIENTS; i++) {
|
||||
const c = connectWs(
|
||||
`ws://127.0.0.1:${PROXY_PORT}/`,
|
||||
{ Host: 'test.local' },
|
||||
);
|
||||
await c.opened;
|
||||
clients.push(c);
|
||||
}
|
||||
|
||||
// Each client sends a unique message
|
||||
for (let i = 0; i < NUM_CLIENTS; i++) {
|
||||
clients[i].ws.send(`hello from client ${i}`);
|
||||
}
|
||||
|
||||
// Wait for all replies
|
||||
await waitFor(() => clients.every((c) => c.messages.length >= 1));
|
||||
|
||||
for (let i = 0; i < NUM_CLIENTS; i++) {
|
||||
expect(clients[i].messages.length).toBeGreaterThanOrEqual(1);
|
||||
expect(clients[i].messages[0]).toInclude(`hello from client ${i}`);
|
||||
}
|
||||
expect(connectionCount).toEqual(NUM_CLIENTS);
|
||||
|
||||
for (const c of clients) await closeWs(c.ws);
|
||||
await proxy.stop();
|
||||
await new Promise<void>((resolve) => backendServer.close(() => resolve()));
|
||||
await new Promise((r) => setTimeout(r, 500));
|
||||
await assertPortsFree([PROXY_PORT, BACKEND_PORT]);
|
||||
});
|
||||
|
||||
// ─── Test 3: WebSocket with binary data ───
|
||||
tap.test('should proxy binary WebSocket frames', async () => {
|
||||
const [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2);
|
||||
|
||||
const backendServer = http.createServer();
|
||||
const wss = new WebSocketServer({ server: backendServer });
|
||||
|
||||
wss.on('connection', (ws) => {
|
||||
ws.on('message', (data) => {
|
||||
ws.send(data, { binary: true });
|
||||
});
|
||||
});
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
backendServer.listen(BACKEND_PORT, '127.0.0.1', () => resolve());
|
||||
});
|
||||
|
||||
const proxy = new SmartProxy({
|
||||
routes: [{
|
||||
name: 'ws-binary-route',
|
||||
match: { ports: PROXY_PORT },
|
||||
action: {
|
||||
type: 'forward',
|
||||
targets: [{ host: '127.0.0.1', port: BACKEND_PORT }],
|
||||
websocket: { enabled: true },
|
||||
},
|
||||
}],
|
||||
});
|
||||
await proxy.start();
|
||||
|
||||
const receivedBuffers: Buffer[] = [];
|
||||
const ws = new WebSocket(`ws://127.0.0.1:${PROXY_PORT}/`, {
|
||||
headers: { Host: 'test.local' },
|
||||
});
|
||||
ws.on('message', (data) => {
|
||||
receivedBuffers.push(Buffer.from(data as ArrayBuffer));
|
||||
});
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const timeout = setTimeout(() => reject(new Error('timeout')), 5000);
|
||||
ws.on('open', () => { clearTimeout(timeout); resolve(); });
|
||||
ws.on('error', (err) => { clearTimeout(timeout); reject(err); });
|
||||
});
|
||||
|
||||
// Send a 256-byte buffer with known content
|
||||
const sentBuffer = Buffer.alloc(256);
|
||||
for (let i = 0; i < 256; i++) sentBuffer[i] = i;
|
||||
ws.send(sentBuffer);
|
||||
|
||||
await waitFor(() => receivedBuffers.length >= 1);
|
||||
|
||||
expect(receivedBuffers[0].length).toEqual(256);
|
||||
expect(Buffer.compare(receivedBuffers[0], sentBuffer)).toEqual(0);
|
||||
|
||||
await closeWs(ws);
|
||||
await proxy.stop();
|
||||
await new Promise<void>((resolve) => backendServer.close(() => resolve()));
|
||||
await new Promise((r) => setTimeout(r, 500));
|
||||
await assertPortsFree([PROXY_PORT, BACKEND_PORT]);
|
||||
});
|
||||
|
||||
// ─── Test 4: WebSocket path and query string preserved ───
|
||||
tap.test('should preserve path and query string through proxy', async () => {
|
||||
const [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2);
|
||||
|
||||
const backendServer = http.createServer();
|
||||
const wss = new WebSocketServer({ server: backendServer });
|
||||
|
||||
let receivedUrl = '';
|
||||
wss.on('connection', (ws, req) => {
|
||||
receivedUrl = req.url || '';
|
||||
ws.send(`url: ${receivedUrl}`);
|
||||
});
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
backendServer.listen(BACKEND_PORT, '127.0.0.1', () => resolve());
|
||||
});
|
||||
|
||||
const proxy = new SmartProxy({
|
||||
routes: [{
|
||||
name: 'ws-path-route',
|
||||
match: { ports: PROXY_PORT },
|
||||
action: {
|
||||
type: 'forward',
|
||||
targets: [{ host: '127.0.0.1', port: BACKEND_PORT }],
|
||||
websocket: { enabled: true },
|
||||
},
|
||||
}],
|
||||
});
|
||||
await proxy.start();
|
||||
|
||||
const { ws, messages, opened } = connectWs(
|
||||
`ws://127.0.0.1:${PROXY_PORT}/chat/room1?token=abc123`,
|
||||
{ Host: 'test.local' },
|
||||
);
|
||||
await opened;
|
||||
|
||||
await waitFor(() => messages.length >= 1);
|
||||
|
||||
expect(receivedUrl).toEqual('/chat/room1?token=abc123');
|
||||
expect(messages[0]).toEqual('url: /chat/room1?token=abc123');
|
||||
|
||||
await closeWs(ws);
|
||||
await proxy.stop();
|
||||
await new Promise<void>((resolve) => backendServer.close(() => resolve()));
|
||||
await new Promise((r) => setTimeout(r, 500));
|
||||
await assertPortsFree([PROXY_PORT, BACKEND_PORT]);
|
||||
});
|
||||
|
||||
// ─── Test 5: Clean close propagation ───
|
||||
tap.test('should handle clean WebSocket close from client', async () => {
|
||||
const [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2);
|
||||
|
||||
const backendServer = http.createServer();
|
||||
const wss = new WebSocketServer({ server: backendServer });
|
||||
|
||||
let backendGotClose = false;
|
||||
let backendCloseCode = 0;
|
||||
wss.on('connection', (ws) => {
|
||||
ws.on('close', (code) => {
|
||||
backendGotClose = true;
|
||||
backendCloseCode = code;
|
||||
});
|
||||
ws.on('message', (data) => {
|
||||
ws.send(data);
|
||||
});
|
||||
});
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
backendServer.listen(BACKEND_PORT, '127.0.0.1', () => resolve());
|
||||
});
|
||||
|
||||
const proxy = new SmartProxy({
|
||||
routes: [{
|
||||
name: 'ws-close-route',
|
||||
match: { ports: PROXY_PORT },
|
||||
action: {
|
||||
type: 'forward',
|
||||
targets: [{ host: '127.0.0.1', port: BACKEND_PORT }],
|
||||
websocket: { enabled: true },
|
||||
},
|
||||
}],
|
||||
});
|
||||
await proxy.start();
|
||||
|
||||
const { ws, messages, opened } = connectWs(
|
||||
`ws://127.0.0.1:${PROXY_PORT}/`,
|
||||
{ Host: 'test.local' },
|
||||
);
|
||||
await opened;
|
||||
|
||||
// Confirm connection works with a round-trip
|
||||
ws.send('test');
|
||||
await waitFor(() => messages.length >= 1);
|
||||
|
||||
// Close with code 1000
|
||||
let clientCloseCode = 0;
|
||||
const closed = new Promise<void>((resolve) => {
|
||||
ws.on('close', (code) => {
|
||||
clientCloseCode = code;
|
||||
resolve();
|
||||
});
|
||||
setTimeout(resolve, 3000);
|
||||
});
|
||||
ws.close(1000, 'done');
|
||||
await closed;
|
||||
|
||||
// Wait for backend to register
|
||||
await waitFor(() => backendGotClose, 3000);
|
||||
|
||||
expect(backendGotClose).toBeTrue();
|
||||
expect(clientCloseCode).toEqual(1000);
|
||||
|
||||
await proxy.stop();
|
||||
await new Promise<void>((resolve) => backendServer.close(() => resolve()));
|
||||
await new Promise((r) => setTimeout(r, 500));
|
||||
await assertPortsFree([PROXY_PORT, BACKEND_PORT]);
|
||||
});
|
||||
|
||||
// ─── Test 6: Large messages ───
|
||||
tap.test('should handle large WebSocket messages', async () => {
|
||||
const [PROXY_PORT, BACKEND_PORT] = await findFreePorts(2);
|
||||
|
||||
const backendServer = http.createServer();
|
||||
const wss = new WebSocketServer({ server: backendServer, maxPayload: 5 * 1024 * 1024 });
|
||||
|
||||
wss.on('connection', (ws) => {
|
||||
ws.on('message', (data) => {
|
||||
const buf = Buffer.from(data as ArrayBuffer);
|
||||
ws.send(`received ${buf.length} bytes`);
|
||||
});
|
||||
});
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
backendServer.listen(BACKEND_PORT, '127.0.0.1', () => resolve());
|
||||
});
|
||||
|
||||
const proxy = new SmartProxy({
|
||||
routes: [{
|
||||
name: 'ws-large-route',
|
||||
match: { ports: PROXY_PORT },
|
||||
action: {
|
||||
type: 'forward',
|
||||
targets: [{ host: '127.0.0.1', port: BACKEND_PORT }],
|
||||
websocket: { enabled: true },
|
||||
},
|
||||
}],
|
||||
});
|
||||
await proxy.start();
|
||||
|
||||
const { ws, messages, opened } = connectWs(
|
||||
`ws://127.0.0.1:${PROXY_PORT}/`,
|
||||
{ Host: 'test.local' },
|
||||
{ maxPayload: 5 * 1024 * 1024 },
|
||||
);
|
||||
await opened;
|
||||
|
||||
// Send a 1MB message
|
||||
const largePayload = Buffer.alloc(1024 * 1024, 0x42);
|
||||
ws.send(largePayload);
|
||||
|
||||
await waitFor(() => messages.length >= 1);
|
||||
expect(messages[0]).toEqual(`received ${1024 * 1024} bytes`);
|
||||
|
||||
await closeWs(ws);
|
||||
await proxy.stop();
|
||||
await new Promise<void>((resolve) => backendServer.close(() => resolve()));
|
||||
await new Promise((r) => setTimeout(r, 500));
|
||||
await assertPortsFree([PROXY_PORT, BACKEND_PORT]);
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartproxy',
|
||||
version: '27.0.0',
|
||||
version: '27.3.0',
|
||||
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
|
||||
}
|
||||
|
||||
@@ -32,6 +32,23 @@ export interface IThroughputHistoryPoint {
|
||||
/**
|
||||
* Main metrics interface with clean, grouped API
|
||||
*/
|
||||
/**
|
||||
* Protocol distribution for frontend (client→proxy) or backend (proxy→upstream).
|
||||
* Tracks active and total counts for h1/h2/h3/ws/other.
|
||||
*/
|
||||
export interface IProtocolDistribution {
|
||||
h1Active: number;
|
||||
h1Total: number;
|
||||
h2Active: number;
|
||||
h2Total: number;
|
||||
h3Active: number;
|
||||
h3Total: number;
|
||||
wsActive: number;
|
||||
wsTotal: number;
|
||||
otherActive: number;
|
||||
otherTotal: number;
|
||||
}
|
||||
|
||||
export interface IMetrics {
|
||||
// Connection metrics
|
||||
connections: {
|
||||
@@ -40,6 +57,8 @@ export interface IMetrics {
|
||||
byRoute(): Map<string, number>;
|
||||
byIP(): Map<string, number>;
|
||||
topIPs(limit?: number): Array<{ ip: string; count: number }>;
|
||||
frontendProtocols(): IProtocolDistribution;
|
||||
backendProtocols(): IProtocolDistribution;
|
||||
};
|
||||
|
||||
// Throughput metrics (bytes per second)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import type { IMetrics, IBackendMetrics, IProtocolCacheEntry, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js';
|
||||
import type { IMetrics, IBackendMetrics, IProtocolCacheEntry, IProtocolDistribution, IThroughputData, IThroughputHistoryPoint } from './models/metrics-types.js';
|
||||
import type { RustProxyBridge } from './rust-proxy-bridge.js';
|
||||
|
||||
/**
|
||||
@@ -90,6 +90,49 @@ export class RustMetricsAdapter implements IMetrics {
|
||||
result.sort((a, b) => b.count - a.count);
|
||||
return result.slice(0, limit);
|
||||
},
|
||||
frontendProtocols: (): IProtocolDistribution => {
|
||||
const fp = this.cache?.frontendProtocols;
|
||||
return {
|
||||
h1Active: fp?.h1Active ?? 0,
|
||||
h1Total: fp?.h1Total ?? 0,
|
||||
h2Active: fp?.h2Active ?? 0,
|
||||
h2Total: fp?.h2Total ?? 0,
|
||||
h3Active: fp?.h3Active ?? 0,
|
||||
h3Total: fp?.h3Total ?? 0,
|
||||
wsActive: fp?.wsActive ?? 0,
|
||||
wsTotal: fp?.wsTotal ?? 0,
|
||||
otherActive: fp?.otherActive ?? 0,
|
||||
otherTotal: fp?.otherTotal ?? 0,
|
||||
};
|
||||
},
|
||||
backendProtocols: (): IProtocolDistribution => {
|
||||
// Merge per-backend h1/h2/h3 data with aggregate ws/other counters
|
||||
const bp = this.cache?.backendProtocols;
|
||||
let h1Active = 0, h1Total = 0;
|
||||
let h2Active = 0, h2Total = 0;
|
||||
let h3Active = 0, h3Total = 0;
|
||||
if (this.cache?.backends) {
|
||||
for (const bm of Object.values(this.cache.backends)) {
|
||||
const m = bm as any;
|
||||
const active = m.activeConnections ?? 0;
|
||||
const total = m.totalConnections ?? 0;
|
||||
switch (m.protocol) {
|
||||
case 'h2': h2Active += active; h2Total += total; break;
|
||||
case 'h3': h3Active += active; h3Total += total; break;
|
||||
default: h1Active += active; h1Total += total; break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return {
|
||||
h1Active, h1Total,
|
||||
h2Active, h2Total,
|
||||
h3Active, h3Total,
|
||||
wsActive: bp?.wsActive ?? 0,
|
||||
wsTotal: bp?.wsTotal ?? 0,
|
||||
otherActive: bp?.otherActive ?? 0,
|
||||
otherTotal: bp?.otherTotal ?? 0,
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
public throughput = {
|
||||
|
||||
Reference in New Issue
Block a user