Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a55ff20391 | |||
| 3c24bf659b |
@@ -1,5 +1,13 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 2026-03-27 - 27.1.0 - feat(rustproxy-passthrough)
|
||||||
|
add selective connection recycling for route, security, and certificate updates
|
||||||
|
|
||||||
|
- introduce a shared connection registry to track active TCP and QUIC connections by route, source IP, and domain
|
||||||
|
- recycle only affected connections when route actions or security rules change instead of broadly invalidating traffic
|
||||||
|
- gracefully recycle existing connections when TLS certificates change for a domain
|
||||||
|
- apply route-level IP security checks to QUIC connections and share route cancellation state with UDP listeners
|
||||||
|
|
||||||
## 2026-03-26 - 27.0.0 - BREAKING CHANGE(smart-proxy)
|
## 2026-03-26 - 27.0.0 - BREAKING CHANGE(smart-proxy)
|
||||||
remove route helper APIs and standardize route configuration on plain route objects
|
remove route helper APIs and standardize route configuration on plain route objects
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@push.rocks/smartproxy",
|
"name": "@push.rocks/smartproxy",
|
||||||
"version": "27.0.0",
|
"version": "27.1.0",
|
||||||
"private": false,
|
"private": false,
|
||||||
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
|
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
|
||||||
"main": "dist_ts/index.js",
|
"main": "dist_ts/index.js",
|
||||||
|
|||||||
329
rust/crates/rustproxy-passthrough/src/connection_registry.rs
Normal file
329
rust/crates/rustproxy-passthrough/src/connection_registry.rs
Normal file
@@ -0,0 +1,329 @@
|
|||||||
|
//! Shared connection registry for selective connection recycling.
|
||||||
|
//!
|
||||||
|
//! Tracks active connections across both TCP and QUIC with metadata
|
||||||
|
//! (source IP, SNI domain, route ID, cancel token) so that connections
|
||||||
|
//! can be selectively recycled when certificates, security rules, or
|
||||||
|
//! route targets change.
|
||||||
|
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use std::net::IpAddr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
|
|
||||||
|
use dashmap::DashMap;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
use rustproxy_config::RouteSecurity;
|
||||||
|
use rustproxy_http::request_filter::RequestFilter;
|
||||||
|
use rustproxy_routing::matchers::domain_matches;
|
||||||
|
|
||||||
|
/// Metadata about an active connection.
|
||||||
|
pub struct ConnectionEntry {
|
||||||
|
/// Per-connection cancel token (child of per-route token).
|
||||||
|
pub cancel: CancellationToken,
|
||||||
|
/// Client source IP.
|
||||||
|
pub source_ip: IpAddr,
|
||||||
|
/// SNI domain from TLS handshake (None for non-TLS connections).
|
||||||
|
pub domain: Option<String>,
|
||||||
|
/// Route ID this connection was matched to (None if route has no ID).
|
||||||
|
pub route_id: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Transport-agnostic registry of active connections.
|
||||||
|
///
|
||||||
|
/// Used by both `TcpListenerManager` and `UdpListenerManager` to track
|
||||||
|
/// connections and enable selective recycling on config changes.
|
||||||
|
pub struct ConnectionRegistry {
|
||||||
|
connections: DashMap<u64, ConnectionEntry>,
|
||||||
|
next_id: AtomicU64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectionRegistry {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
connections: DashMap::new(),
|
||||||
|
next_id: AtomicU64::new(1),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Register a connection and return its ID + RAII guard.
|
||||||
|
///
|
||||||
|
/// The guard automatically removes the connection from the registry on drop.
|
||||||
|
pub fn register(self: &Arc<Self>, entry: ConnectionEntry) -> (u64, ConnectionRegistryGuard) {
|
||||||
|
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
|
||||||
|
self.connections.insert(id, entry);
|
||||||
|
let guard = ConnectionRegistryGuard {
|
||||||
|
registry: Arc::clone(self),
|
||||||
|
conn_id: id,
|
||||||
|
};
|
||||||
|
(id, guard)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Number of tracked connections (for metrics/debugging).
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
self.connections.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Recycle connections whose SNI domain matches a renewed certificate domain.
|
||||||
|
///
|
||||||
|
/// Uses bidirectional domain matching so that:
|
||||||
|
/// - Cert `*.example.com` recycles connections for `sub.example.com`
|
||||||
|
/// - Cert `sub.example.com` recycles connections on routes with `*.example.com`
|
||||||
|
pub fn recycle_for_cert_change(&self, cert_domain: &str) {
|
||||||
|
let mut recycled = 0u64;
|
||||||
|
self.connections.retain(|_, entry| {
|
||||||
|
let matches = entry.domain.as_deref()
|
||||||
|
.map(|d| domain_matches(cert_domain, d) || domain_matches(d, cert_domain))
|
||||||
|
.unwrap_or(false);
|
||||||
|
if matches {
|
||||||
|
entry.cancel.cancel();
|
||||||
|
recycled += 1;
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if recycled > 0 {
|
||||||
|
info!(
|
||||||
|
"Recycled {} connection(s) for cert change on domain '{}'",
|
||||||
|
recycled, cert_domain
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Recycle connections on a route whose security config changed.
|
||||||
|
///
|
||||||
|
/// Re-evaluates each connection's source IP against the new security rules.
|
||||||
|
/// Only connections from now-blocked IPs are terminated; allowed IPs are undisturbed.
|
||||||
|
pub fn recycle_for_security_change(&self, route_id: &str, new_security: &RouteSecurity) {
|
||||||
|
let mut recycled = 0u64;
|
||||||
|
self.connections.retain(|_, entry| {
|
||||||
|
if entry.route_id.as_deref() == Some(route_id) {
|
||||||
|
if !RequestFilter::check_ip_security(new_security, &entry.source_ip) {
|
||||||
|
info!(
|
||||||
|
"Terminating connection from {} — IP now blocked on route '{}'",
|
||||||
|
entry.source_ip, route_id
|
||||||
|
);
|
||||||
|
entry.cancel.cancel();
|
||||||
|
recycled += 1;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
true
|
||||||
|
});
|
||||||
|
if recycled > 0 {
|
||||||
|
info!(
|
||||||
|
"Recycled {} connection(s) for security change on route '{}'",
|
||||||
|
recycled, route_id
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Recycle all connections on a route (e.g., when targets changed).
|
||||||
|
pub fn recycle_for_route_change(&self, route_id: &str) {
|
||||||
|
let mut recycled = 0u64;
|
||||||
|
self.connections.retain(|_, entry| {
|
||||||
|
if entry.route_id.as_deref() == Some(route_id) {
|
||||||
|
entry.cancel.cancel();
|
||||||
|
recycled += 1;
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if recycled > 0 {
|
||||||
|
info!(
|
||||||
|
"Recycled {} connection(s) for config change on route '{}'",
|
||||||
|
recycled, route_id
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove connections on routes that no longer exist.
|
||||||
|
///
|
||||||
|
/// This complements per-route CancellationToken cancellation —
|
||||||
|
/// the token cascade handles graceful shutdown, this cleans up the registry.
|
||||||
|
pub fn cleanup_removed_routes(&self, active_route_ids: &HashSet<String>) {
|
||||||
|
self.connections.retain(|_, entry| {
|
||||||
|
match &entry.route_id {
|
||||||
|
Some(id) => active_route_ids.contains(id),
|
||||||
|
None => true, // keep connections without a route ID
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// RAII guard that removes a connection from the registry on drop.
|
||||||
|
pub struct ConnectionRegistryGuard {
|
||||||
|
registry: Arc<ConnectionRegistry>,
|
||||||
|
conn_id: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for ConnectionRegistryGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.registry.connections.remove(&self.conn_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
fn make_registry() -> Arc<ConnectionRegistry> {
|
||||||
|
Arc::new(ConnectionRegistry::new())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_register_and_guard_cleanup() {
|
||||||
|
let reg = make_registry();
|
||||||
|
let token = CancellationToken::new();
|
||||||
|
let entry = ConnectionEntry {
|
||||||
|
cancel: token.clone(),
|
||||||
|
source_ip: "10.0.0.1".parse().unwrap(),
|
||||||
|
domain: Some("example.com".to_string()),
|
||||||
|
route_id: Some("route-1".to_string()),
|
||||||
|
};
|
||||||
|
let (id, guard) = reg.register(entry);
|
||||||
|
assert_eq!(reg.len(), 1);
|
||||||
|
assert!(reg.connections.contains_key(&id));
|
||||||
|
|
||||||
|
drop(guard);
|
||||||
|
assert_eq!(reg.len(), 0);
|
||||||
|
assert!(!token.is_cancelled());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_recycle_for_cert_change_exact() {
|
||||||
|
let reg = make_registry();
|
||||||
|
let t1 = CancellationToken::new();
|
||||||
|
let t2 = CancellationToken::new();
|
||||||
|
let (_, _g1) = reg.register(ConnectionEntry {
|
||||||
|
cancel: t1.clone(),
|
||||||
|
source_ip: "10.0.0.1".parse().unwrap(),
|
||||||
|
domain: Some("api.example.com".to_string()),
|
||||||
|
route_id: Some("r1".to_string()),
|
||||||
|
});
|
||||||
|
let (_, _g2) = reg.register(ConnectionEntry {
|
||||||
|
cancel: t2.clone(),
|
||||||
|
source_ip: "10.0.0.2".parse().unwrap(),
|
||||||
|
domain: Some("other.com".to_string()),
|
||||||
|
route_id: Some("r2".to_string()),
|
||||||
|
});
|
||||||
|
|
||||||
|
reg.recycle_for_cert_change("api.example.com");
|
||||||
|
assert!(t1.is_cancelled());
|
||||||
|
assert!(!t2.is_cancelled());
|
||||||
|
// Registry retains unmatched entry (guard still alive keeps it too,
|
||||||
|
// but the retain removed the matched one before guard could)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_recycle_for_cert_change_wildcard() {
|
||||||
|
let reg = make_registry();
|
||||||
|
let t1 = CancellationToken::new();
|
||||||
|
let t2 = CancellationToken::new();
|
||||||
|
let (_, _g1) = reg.register(ConnectionEntry {
|
||||||
|
cancel: t1.clone(),
|
||||||
|
source_ip: "10.0.0.1".parse().unwrap(),
|
||||||
|
domain: Some("sub.example.com".to_string()),
|
||||||
|
route_id: Some("r1".to_string()),
|
||||||
|
});
|
||||||
|
let (_, _g2) = reg.register(ConnectionEntry {
|
||||||
|
cancel: t2.clone(),
|
||||||
|
source_ip: "10.0.0.2".parse().unwrap(),
|
||||||
|
domain: Some("other.com".to_string()),
|
||||||
|
route_id: Some("r2".to_string()),
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wildcard cert should match subdomain
|
||||||
|
reg.recycle_for_cert_change("*.example.com");
|
||||||
|
assert!(t1.is_cancelled());
|
||||||
|
assert!(!t2.is_cancelled());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_recycle_for_security_change() {
|
||||||
|
let reg = make_registry();
|
||||||
|
let t1 = CancellationToken::new();
|
||||||
|
let t2 = CancellationToken::new();
|
||||||
|
let (_, _g1) = reg.register(ConnectionEntry {
|
||||||
|
cancel: t1.clone(),
|
||||||
|
source_ip: "10.0.0.1".parse().unwrap(),
|
||||||
|
domain: None,
|
||||||
|
route_id: Some("r1".to_string()),
|
||||||
|
});
|
||||||
|
let (_, _g2) = reg.register(ConnectionEntry {
|
||||||
|
cancel: t2.clone(),
|
||||||
|
source_ip: "10.0.0.2".parse().unwrap(),
|
||||||
|
domain: None,
|
||||||
|
route_id: Some("r1".to_string()),
|
||||||
|
});
|
||||||
|
|
||||||
|
// Block 10.0.0.1, allow 10.0.0.2
|
||||||
|
let security = RouteSecurity {
|
||||||
|
ip_allow_list: None,
|
||||||
|
ip_block_list: Some(vec!["10.0.0.1".to_string()]),
|
||||||
|
max_connections: None,
|
||||||
|
authentication: None,
|
||||||
|
rate_limit: None,
|
||||||
|
basic_auth: None,
|
||||||
|
jwt_auth: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
reg.recycle_for_security_change("r1", &security);
|
||||||
|
assert!(t1.is_cancelled());
|
||||||
|
assert!(!t2.is_cancelled());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_recycle_for_route_change() {
|
||||||
|
let reg = make_registry();
|
||||||
|
let t1 = CancellationToken::new();
|
||||||
|
let t2 = CancellationToken::new();
|
||||||
|
let (_, _g1) = reg.register(ConnectionEntry {
|
||||||
|
cancel: t1.clone(),
|
||||||
|
source_ip: "10.0.0.1".parse().unwrap(),
|
||||||
|
domain: None,
|
||||||
|
route_id: Some("r1".to_string()),
|
||||||
|
});
|
||||||
|
let (_, _g2) = reg.register(ConnectionEntry {
|
||||||
|
cancel: t2.clone(),
|
||||||
|
source_ip: "10.0.0.2".parse().unwrap(),
|
||||||
|
domain: None,
|
||||||
|
route_id: Some("r2".to_string()),
|
||||||
|
});
|
||||||
|
|
||||||
|
reg.recycle_for_route_change("r1");
|
||||||
|
assert!(t1.is_cancelled());
|
||||||
|
assert!(!t2.is_cancelled());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_cleanup_removed_routes() {
|
||||||
|
let reg = make_registry();
|
||||||
|
let t1 = CancellationToken::new();
|
||||||
|
let t2 = CancellationToken::new();
|
||||||
|
let (_, _g1) = reg.register(ConnectionEntry {
|
||||||
|
cancel: t1.clone(),
|
||||||
|
source_ip: "10.0.0.1".parse().unwrap(),
|
||||||
|
domain: None,
|
||||||
|
route_id: Some("active".to_string()),
|
||||||
|
});
|
||||||
|
let (_, _g2) = reg.register(ConnectionEntry {
|
||||||
|
cancel: t2.clone(),
|
||||||
|
source_ip: "10.0.0.2".parse().unwrap(),
|
||||||
|
domain: None,
|
||||||
|
route_id: Some("removed".to_string()),
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut active = HashSet::new();
|
||||||
|
active.insert("active".to_string());
|
||||||
|
reg.cleanup_removed_routes(&active);
|
||||||
|
|
||||||
|
// "removed" route entry was cleaned from registry
|
||||||
|
// (but guard is still alive so len may differ — the retain already removed it)
|
||||||
|
assert!(!t1.is_cancelled()); // not cancelled by cleanup, only by token cascade
|
||||||
|
assert!(!t2.is_cancelled()); // cleanup doesn't cancel, just removes from registry
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -10,6 +10,7 @@ pub mod forwarder;
|
|||||||
pub mod proxy_protocol;
|
pub mod proxy_protocol;
|
||||||
pub mod tls_handler;
|
pub mod tls_handler;
|
||||||
pub mod connection_tracker;
|
pub mod connection_tracker;
|
||||||
|
pub mod connection_registry;
|
||||||
pub mod socket_opts;
|
pub mod socket_opts;
|
||||||
pub mod udp_session;
|
pub mod udp_session;
|
||||||
pub mod udp_listener;
|
pub mod udp_listener;
|
||||||
@@ -21,6 +22,7 @@ pub use forwarder::*;
|
|||||||
pub use proxy_protocol::*;
|
pub use proxy_protocol::*;
|
||||||
pub use tls_handler::*;
|
pub use tls_handler::*;
|
||||||
pub use connection_tracker::*;
|
pub use connection_tracker::*;
|
||||||
|
pub use connection_registry::*;
|
||||||
pub use socket_opts::*;
|
pub use socket_opts::*;
|
||||||
pub use udp_session::*;
|
pub use udp_session::*;
|
||||||
pub use udp_listener::*;
|
pub use udp_listener::*;
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ use rustproxy_routing::{MatchContext, RouteManager};
|
|||||||
use rustproxy_http::h3_service::H3ProxyService;
|
use rustproxy_http::h3_service::H3ProxyService;
|
||||||
|
|
||||||
use crate::connection_tracker::ConnectionTracker;
|
use crate::connection_tracker::ConnectionTracker;
|
||||||
|
use crate::connection_registry::{ConnectionEntry, ConnectionRegistry};
|
||||||
|
|
||||||
/// Create a QUIC server endpoint on the given port with the provided TLS config.
|
/// Create a QUIC server endpoint on the given port with the provided TLS config.
|
||||||
///
|
///
|
||||||
@@ -350,6 +351,8 @@ pub async fn quic_accept_loop(
|
|||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
h3_service: Option<Arc<H3ProxyService>>,
|
h3_service: Option<Arc<H3ProxyService>>,
|
||||||
real_client_map: Option<Arc<DashMap<SocketAddr, SocketAddr>>>,
|
real_client_map: Option<Arc<DashMap<SocketAddr, SocketAddr>>>,
|
||||||
|
route_cancels: Arc<DashMap<String, CancellationToken>>,
|
||||||
|
connection_registry: Arc<ConnectionRegistry>,
|
||||||
) {
|
) {
|
||||||
loop {
|
loop {
|
||||||
let incoming = tokio::select! {
|
let incoming = tokio::select! {
|
||||||
@@ -406,17 +409,48 @@ pub async fn quic_accept_loop(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Check route-level IP security (previously missing for QUIC)
|
||||||
|
if let Some(ref security) = route.security {
|
||||||
|
if !rustproxy_http::request_filter::RequestFilter::check_ip_security(
|
||||||
|
security, &ip,
|
||||||
|
) {
|
||||||
|
debug!("QUIC connection from {} blocked by route security", real_addr);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
conn_tracker.connection_opened(&ip);
|
conn_tracker.connection_opened(&ip);
|
||||||
let route_id = route.name.clone().or(route.id.clone());
|
let route_id = route.name.clone().or(route.id.clone());
|
||||||
metrics.connection_opened(route_id.as_deref(), Some(&ip_str));
|
metrics.connection_opened(route_id.as_deref(), Some(&ip_str));
|
||||||
|
|
||||||
|
// Resolve per-route cancel token (child of global cancel)
|
||||||
|
let route_cancel = match route_id.as_deref() {
|
||||||
|
Some(id) => route_cancels.entry(id.to_string())
|
||||||
|
.or_insert_with(|| cancel.child_token())
|
||||||
|
.clone(),
|
||||||
|
None => cancel.child_token(),
|
||||||
|
};
|
||||||
|
// Per-connection child token for selective recycling
|
||||||
|
let conn_cancel = route_cancel.child_token();
|
||||||
|
|
||||||
|
// Register in connection registry
|
||||||
|
let registry = Arc::clone(&connection_registry);
|
||||||
|
let reg_entry = ConnectionEntry {
|
||||||
|
cancel: conn_cancel.clone(),
|
||||||
|
source_ip: ip,
|
||||||
|
domain: None, // QUIC Initial is encrypted, domain comes later via H3 :authority
|
||||||
|
route_id: route_id.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
let metrics = Arc::clone(&metrics);
|
let metrics = Arc::clone(&metrics);
|
||||||
let conn_tracker = Arc::clone(&conn_tracker);
|
let conn_tracker = Arc::clone(&conn_tracker);
|
||||||
let cancel = cancel.child_token();
|
|
||||||
let h3_svc = h3_service.clone();
|
let h3_svc = h3_service.clone();
|
||||||
let real_client_addr = if real_addr != remote_addr { Some(real_addr) } else { None };
|
let real_client_addr = if real_addr != remote_addr { Some(real_addr) } else { None };
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
// Register in connection registry (RAII guard removes on drop)
|
||||||
|
let (_conn_id, _registry_guard) = registry.register(reg_entry);
|
||||||
|
|
||||||
// RAII guard: ensures metrics/tracker cleanup even on panic
|
// RAII guard: ensures metrics/tracker cleanup even on panic
|
||||||
struct QuicConnGuard {
|
struct QuicConnGuard {
|
||||||
tracker: Arc<ConnectionTracker>,
|
tracker: Arc<ConnectionTracker>,
|
||||||
@@ -439,7 +473,7 @@ pub async fn quic_accept_loop(
|
|||||||
route_id,
|
route_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &cancel, h3_svc, real_client_addr).await {
|
match handle_quic_connection(incoming, route, port, Arc::clone(&metrics), &conn_cancel, h3_svc, real_client_addr).await {
|
||||||
Ok(()) => debug!("QUIC connection from {} completed", real_addr),
|
Ok(()) => debug!("QUIC connection from {} completed", real_addr),
|
||||||
Err(e) => debug!("QUIC connection from {} error: {}", real_addr, e),
|
Err(e) => debug!("QUIC connection from {} error: {}", real_addr, e),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ use crate::sni_parser;
|
|||||||
use crate::forwarder;
|
use crate::forwarder;
|
||||||
use crate::tls_handler;
|
use crate::tls_handler;
|
||||||
use crate::connection_tracker::ConnectionTracker;
|
use crate::connection_tracker::ConnectionTracker;
|
||||||
|
use crate::connection_registry::{ConnectionEntry, ConnectionRegistry};
|
||||||
use crate::socket_opts;
|
use crate::socket_opts;
|
||||||
|
|
||||||
/// RAII guard that decrements the active connection metric on drop.
|
/// RAII guard that decrements the active connection metric on drop.
|
||||||
@@ -166,6 +167,8 @@ pub struct TcpListenerManager {
|
|||||||
/// Per-route cancellation tokens (child of cancel_token).
|
/// Per-route cancellation tokens (child of cancel_token).
|
||||||
/// When a route is removed, its token is cancelled, terminating all connections on that route.
|
/// When a route is removed, its token is cancelled, terminating all connections on that route.
|
||||||
route_cancels: Arc<DashMap<String, CancellationToken>>,
|
route_cancels: Arc<DashMap<String, CancellationToken>>,
|
||||||
|
/// Shared connection registry for selective recycling on config changes.
|
||||||
|
connection_registry: Arc<ConnectionRegistry>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TcpListenerManager {
|
impl TcpListenerManager {
|
||||||
@@ -205,6 +208,7 @@ impl TcpListenerManager {
|
|||||||
socket_handler_relay: Arc::new(std::sync::RwLock::new(None)),
|
socket_handler_relay: Arc::new(std::sync::RwLock::new(None)),
|
||||||
conn_semaphore: Arc::new(tokio::sync::Semaphore::new(max_conns)),
|
conn_semaphore: Arc::new(tokio::sync::Semaphore::new(max_conns)),
|
||||||
route_cancels: Arc::new(DashMap::new()),
|
route_cancels: Arc::new(DashMap::new()),
|
||||||
|
connection_registry: Arc::new(ConnectionRegistry::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -244,6 +248,7 @@ impl TcpListenerManager {
|
|||||||
socket_handler_relay: Arc::new(std::sync::RwLock::new(None)),
|
socket_handler_relay: Arc::new(std::sync::RwLock::new(None)),
|
||||||
conn_semaphore: Arc::new(tokio::sync::Semaphore::new(max_conns)),
|
conn_semaphore: Arc::new(tokio::sync::Semaphore::new(max_conns)),
|
||||||
route_cancels: Arc::new(DashMap::new()),
|
route_cancels: Arc::new(DashMap::new()),
|
||||||
|
connection_registry: Arc::new(ConnectionRegistry::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -328,12 +333,13 @@ impl TcpListenerManager {
|
|||||||
let relay = Arc::clone(&self.socket_handler_relay);
|
let relay = Arc::clone(&self.socket_handler_relay);
|
||||||
let semaphore = Arc::clone(&self.conn_semaphore);
|
let semaphore = Arc::clone(&self.conn_semaphore);
|
||||||
let route_cancels = Arc::clone(&self.route_cancels);
|
let route_cancels = Arc::clone(&self.route_cancels);
|
||||||
|
let connection_registry = Arc::clone(&self.connection_registry);
|
||||||
|
|
||||||
let handle = tokio::spawn(async move {
|
let handle = tokio::spawn(async move {
|
||||||
Self::accept_loop(
|
Self::accept_loop(
|
||||||
listener, port, route_manager_swap, metrics, tls_configs,
|
listener, port, route_manager_swap, metrics, tls_configs,
|
||||||
shared_tls_acceptor, http_proxy, conn_config, conn_tracker, cancel, relay,
|
shared_tls_acceptor, http_proxy, conn_config, conn_tracker, cancel, relay,
|
||||||
semaphore, route_cancels,
|
semaphore, route_cancels, connection_registry,
|
||||||
).await;
|
).await;
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -446,6 +452,16 @@ impl TcpListenerManager {
|
|||||||
&self.metrics
|
&self.metrics
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get a reference to the shared connection registry.
|
||||||
|
pub fn connection_registry(&self) -> &Arc<ConnectionRegistry> {
|
||||||
|
&self.connection_registry
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a reference to the per-route cancellation tokens.
|
||||||
|
pub fn route_cancels(&self) -> &Arc<DashMap<String, CancellationToken>> {
|
||||||
|
&self.route_cancels
|
||||||
|
}
|
||||||
|
|
||||||
/// Accept loop for a single port.
|
/// Accept loop for a single port.
|
||||||
async fn accept_loop(
|
async fn accept_loop(
|
||||||
listener: TcpListener,
|
listener: TcpListener,
|
||||||
@@ -461,6 +477,7 @@ impl TcpListenerManager {
|
|||||||
socket_handler_relay: Arc<std::sync::RwLock<Option<String>>>,
|
socket_handler_relay: Arc<std::sync::RwLock<Option<String>>>,
|
||||||
conn_semaphore: Arc<tokio::sync::Semaphore>,
|
conn_semaphore: Arc<tokio::sync::Semaphore>,
|
||||||
route_cancels: Arc<DashMap<String, CancellationToken>>,
|
route_cancels: Arc<DashMap<String, CancellationToken>>,
|
||||||
|
connection_registry: Arc<ConnectionRegistry>,
|
||||||
) {
|
) {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
@@ -514,6 +531,7 @@ impl TcpListenerManager {
|
|||||||
let cn = cancel.clone();
|
let cn = cancel.clone();
|
||||||
let sr = Arc::clone(&socket_handler_relay);
|
let sr = Arc::clone(&socket_handler_relay);
|
||||||
let rc = Arc::clone(&route_cancels);
|
let rc = Arc::clone(&route_cancels);
|
||||||
|
let cr = Arc::clone(&connection_registry);
|
||||||
debug!("Accepted connection from {} on port {}", peer_addr, port);
|
debug!("Accepted connection from {} on port {}", peer_addr, port);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
@@ -522,7 +540,7 @@ impl TcpListenerManager {
|
|||||||
// RAII guard ensures connection_closed is called on all paths
|
// RAII guard ensures connection_closed is called on all paths
|
||||||
let _ct_guard = ConnectionTrackerGuard::new(ct, ip);
|
let _ct_guard = ConnectionTrackerGuard::new(ct, ip);
|
||||||
let result = Self::handle_connection(
|
let result = Self::handle_connection(
|
||||||
stream, port, peer_addr, rm, m, tc, sa, hp, cc, cn, sr, rc,
|
stream, port, peer_addr, rm, m, tc, sa, hp, cc, cn, sr, rc, cr,
|
||||||
).await;
|
).await;
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
warn!("Connection error from {}: {}", peer_addr, e);
|
warn!("Connection error from {}: {}", peer_addr, e);
|
||||||
@@ -553,6 +571,7 @@ impl TcpListenerManager {
|
|||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
socket_handler_relay: Arc<std::sync::RwLock<Option<String>>>,
|
socket_handler_relay: Arc<std::sync::RwLock<Option<String>>>,
|
||||||
route_cancels: Arc<DashMap<String, CancellationToken>>,
|
route_cancels: Arc<DashMap<String, CancellationToken>>,
|
||||||
|
connection_registry: Arc<ConnectionRegistry>,
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
@@ -672,12 +691,24 @@ impl TcpListenerManager {
|
|||||||
let route_id = quick_match.route.id.as_deref();
|
let route_id = quick_match.route.id.as_deref();
|
||||||
|
|
||||||
// Resolve per-route cancel token (child of global cancel)
|
// Resolve per-route cancel token (child of global cancel)
|
||||||
let conn_cancel = match route_id {
|
let route_cancel = match route_id {
|
||||||
Some(id) => route_cancels.entry(id.to_string())
|
Some(id) => route_cancels.entry(id.to_string())
|
||||||
.or_insert_with(|| cancel.child_token())
|
.or_insert_with(|| cancel.child_token())
|
||||||
.clone(),
|
.clone(),
|
||||||
None => cancel.clone(),
|
None => cancel.clone(),
|
||||||
};
|
};
|
||||||
|
// Per-connection child token for selective recycling
|
||||||
|
let conn_cancel = route_cancel.child_token();
|
||||||
|
|
||||||
|
// Register in connection registry for selective recycling
|
||||||
|
let (_conn_id, _registry_guard) = connection_registry.register(
|
||||||
|
ConnectionEntry {
|
||||||
|
cancel: conn_cancel.clone(),
|
||||||
|
source_ip: peer_addr.ip(),
|
||||||
|
domain: None, // fast path has no domain
|
||||||
|
route_id: route_id.map(|s| s.to_string()),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
// Check route-level IP security
|
// Check route-level IP security
|
||||||
if let Some(ref security) = quick_match.route.security {
|
if let Some(ref security) = quick_match.route.security {
|
||||||
@@ -852,12 +883,24 @@ impl TcpListenerManager {
|
|||||||
// Resolve per-route cancel token (child of global cancel).
|
// Resolve per-route cancel token (child of global cancel).
|
||||||
// When this route is removed via updateRoutes, the token is cancelled,
|
// When this route is removed via updateRoutes, the token is cancelled,
|
||||||
// terminating all connections on this route.
|
// terminating all connections on this route.
|
||||||
let cancel = match route_id {
|
let route_cancel = match route_id {
|
||||||
Some(id) => route_cancels.entry(id.to_string())
|
Some(id) => route_cancels.entry(id.to_string())
|
||||||
.or_insert_with(|| cancel.child_token())
|
.or_insert_with(|| cancel.child_token())
|
||||||
.clone(),
|
.clone(),
|
||||||
None => cancel,
|
None => cancel,
|
||||||
};
|
};
|
||||||
|
// Per-connection child token for selective recycling
|
||||||
|
let cancel = route_cancel.child_token();
|
||||||
|
|
||||||
|
// Register in connection registry for selective recycling
|
||||||
|
let (_conn_id, _registry_guard) = connection_registry.register(
|
||||||
|
ConnectionEntry {
|
||||||
|
cancel: cancel.clone(),
|
||||||
|
source_ip: peer_addr.ip(),
|
||||||
|
domain: domain.clone(),
|
||||||
|
route_id: route_id.map(|s| s.to_string()),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
// Check route-level IP security for passthrough connections
|
// Check route-level IP security for passthrough connections
|
||||||
if let Some(ref security) = route_match.route.security {
|
if let Some(ref security) = route_match.route.security {
|
||||||
|
|||||||
@@ -28,6 +28,8 @@ use rustproxy_routing::{MatchContext, RouteManager};
|
|||||||
|
|
||||||
use rustproxy_http::h3_service::H3ProxyService;
|
use rustproxy_http::h3_service::H3ProxyService;
|
||||||
|
|
||||||
|
use crate::connection_registry::ConnectionRegistry;
|
||||||
|
|
||||||
use crate::connection_tracker::ConnectionTracker;
|
use crate::connection_tracker::ConnectionTracker;
|
||||||
use crate::udp_session::{SessionKey, UdpSession, UdpSessionConfig, UdpSessionTable};
|
use crate::udp_session::{SessionKey, UdpSession, UdpSessionConfig, UdpSessionTable};
|
||||||
|
|
||||||
@@ -56,6 +58,10 @@ pub struct UdpListenerManager {
|
|||||||
/// Trusted proxy IPs that may send PROXY protocol v2 headers.
|
/// Trusted proxy IPs that may send PROXY protocol v2 headers.
|
||||||
/// When non-empty, PROXY v2 detection is enabled on both raw UDP and QUIC paths.
|
/// When non-empty, PROXY v2 detection is enabled on both raw UDP and QUIC paths.
|
||||||
proxy_ips: Arc<Vec<IpAddr>>,
|
proxy_ips: Arc<Vec<IpAddr>>,
|
||||||
|
/// Per-route cancellation tokens (shared with TcpListenerManager).
|
||||||
|
route_cancels: Arc<DashMap<String, CancellationToken>>,
|
||||||
|
/// Shared connection registry for selective recycling.
|
||||||
|
connection_registry: Arc<ConnectionRegistry>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for UdpListenerManager {
|
impl Drop for UdpListenerManager {
|
||||||
@@ -76,6 +82,8 @@ impl UdpListenerManager {
|
|||||||
metrics: Arc<MetricsCollector>,
|
metrics: Arc<MetricsCollector>,
|
||||||
conn_tracker: Arc<ConnectionTracker>,
|
conn_tracker: Arc<ConnectionTracker>,
|
||||||
cancel_token: CancellationToken,
|
cancel_token: CancellationToken,
|
||||||
|
route_cancels: Arc<DashMap<String, CancellationToken>>,
|
||||||
|
connection_registry: Arc<ConnectionRegistry>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
listeners: HashMap::new(),
|
listeners: HashMap::new(),
|
||||||
@@ -89,6 +97,8 @@ impl UdpListenerManager {
|
|||||||
relay_reader_cancel: None,
|
relay_reader_cancel: None,
|
||||||
h3_service: None,
|
h3_service: None,
|
||||||
proxy_ips: Arc::new(Vec::new()),
|
proxy_ips: Arc::new(Vec::new()),
|
||||||
|
route_cancels,
|
||||||
|
connection_registry,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -152,6 +162,8 @@ impl UdpListenerManager {
|
|||||||
self.cancel_token.child_token(),
|
self.cancel_token.child_token(),
|
||||||
self.h3_service.clone(),
|
self.h3_service.clone(),
|
||||||
None,
|
None,
|
||||||
|
Arc::clone(&self.route_cancels),
|
||||||
|
Arc::clone(&self.connection_registry),
|
||||||
));
|
));
|
||||||
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||||
info!("QUIC endpoint started on port {}", port);
|
info!("QUIC endpoint started on port {}", port);
|
||||||
@@ -173,6 +185,8 @@ impl UdpListenerManager {
|
|||||||
self.cancel_token.child_token(),
|
self.cancel_token.child_token(),
|
||||||
self.h3_service.clone(),
|
self.h3_service.clone(),
|
||||||
Some(relay.real_client_map),
|
Some(relay.real_client_map),
|
||||||
|
Arc::clone(&self.route_cancels),
|
||||||
|
Arc::clone(&self.connection_registry),
|
||||||
));
|
));
|
||||||
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||||
info!("QUIC endpoint with PROXY relay started on port {}", port);
|
info!("QUIC endpoint with PROXY relay started on port {}", port);
|
||||||
@@ -356,6 +370,8 @@ impl UdpListenerManager {
|
|||||||
self.cancel_token.child_token(),
|
self.cancel_token.child_token(),
|
||||||
self.h3_service.clone(),
|
self.h3_service.clone(),
|
||||||
None,
|
None,
|
||||||
|
Arc::clone(&self.route_cancels),
|
||||||
|
Arc::clone(&self.connection_registry),
|
||||||
));
|
));
|
||||||
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -379,6 +395,8 @@ impl UdpListenerManager {
|
|||||||
self.cancel_token.child_token(),
|
self.cancel_token.child_token(),
|
||||||
self.h3_service.clone(),
|
self.h3_service.clone(),
|
||||||
Some(relay.real_client_map),
|
Some(relay.real_client_map),
|
||||||
|
Arc::clone(&self.route_cancels),
|
||||||
|
Arc::clone(&self.connection_registry),
|
||||||
));
|
));
|
||||||
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -281,6 +281,11 @@ impl RouteManager {
|
|||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get all enabled routes.
|
||||||
|
pub fn routes(&self) -> &[RouteConfig] {
|
||||||
|
&self.routes
|
||||||
|
}
|
||||||
|
|
||||||
/// Get the total number of enabled routes.
|
/// Get the total number of enabled routes.
|
||||||
pub fn route_count(&self) -> usize {
|
pub fn route_count(&self) -> usize {
|
||||||
self.routes.len()
|
self.routes.len()
|
||||||
|
|||||||
@@ -356,12 +356,17 @@ impl RustProxy {
|
|||||||
|
|
||||||
// Bind UDP ports (if any)
|
// Bind UDP ports (if any)
|
||||||
if !udp_ports.is_empty() {
|
if !udp_ports.is_empty() {
|
||||||
let conn_tracker = self.listener_manager.as_ref().unwrap().conn_tracker().clone();
|
let tcp_mgr = self.listener_manager.as_ref().unwrap();
|
||||||
|
let conn_tracker = tcp_mgr.conn_tracker().clone();
|
||||||
|
let route_cancels = tcp_mgr.route_cancels().clone();
|
||||||
|
let connection_registry = tcp_mgr.connection_registry().clone();
|
||||||
let mut udp_mgr = UdpListenerManager::new(
|
let mut udp_mgr = UdpListenerManager::new(
|
||||||
Arc::clone(&*self.route_table.load()),
|
Arc::clone(&*self.route_table.load()),
|
||||||
Arc::clone(&self.metrics),
|
Arc::clone(&self.metrics),
|
||||||
conn_tracker,
|
conn_tracker,
|
||||||
self.cancel_token.clone(),
|
self.cancel_token.clone(),
|
||||||
|
route_cancels,
|
||||||
|
connection_registry,
|
||||||
);
|
);
|
||||||
udp_mgr.set_proxy_ips(udp_proxy_ips.clone());
|
udp_mgr.set_proxy_ips(udp_proxy_ips.clone());
|
||||||
|
|
||||||
@@ -707,6 +712,9 @@ impl RustProxy {
|
|||||||
.collect();
|
.collect();
|
||||||
self.metrics.retain_backends(&active_backends);
|
self.metrics.retain_backends(&active_backends);
|
||||||
|
|
||||||
|
// Capture old route manager for diff-based connection recycling
|
||||||
|
let old_manager = self.route_table.load_full();
|
||||||
|
|
||||||
// Atomically swap the route table
|
// Atomically swap the route table
|
||||||
let new_manager = Arc::new(new_manager);
|
let new_manager = Arc::new(new_manager);
|
||||||
self.route_table.store(Arc::clone(&new_manager));
|
self.route_table.store(Arc::clone(&new_manager));
|
||||||
@@ -742,9 +750,47 @@ impl RustProxy {
|
|||||||
listener.update_route_manager(Arc::clone(&new_manager));
|
listener.update_route_manager(Arc::clone(&new_manager));
|
||||||
// Cancel connections on routes that were removed or disabled
|
// Cancel connections on routes that were removed or disabled
|
||||||
listener.invalidate_removed_routes(&active_route_ids);
|
listener.invalidate_removed_routes(&active_route_ids);
|
||||||
|
// Clean up registry entries for removed routes
|
||||||
|
listener.connection_registry().cleanup_removed_routes(&active_route_ids);
|
||||||
// Prune HTTP proxy caches (rate limiters, regex cache, round-robin counters)
|
// Prune HTTP proxy caches (rate limiters, regex cache, round-robin counters)
|
||||||
listener.prune_http_proxy_caches(&active_route_ids);
|
listener.prune_http_proxy_caches(&active_route_ids);
|
||||||
|
|
||||||
|
// Diff-based connection recycling for changed routes
|
||||||
|
{
|
||||||
|
let registry = listener.connection_registry();
|
||||||
|
for new_route in &routes {
|
||||||
|
let new_id = match &new_route.id {
|
||||||
|
Some(id) => id.as_str(),
|
||||||
|
None => continue,
|
||||||
|
};
|
||||||
|
// Find corresponding old route
|
||||||
|
let old_route = old_manager.routes().iter().find(|r| {
|
||||||
|
r.id.as_deref() == Some(new_id)
|
||||||
|
});
|
||||||
|
let old_route = match old_route {
|
||||||
|
Some(r) => r,
|
||||||
|
None => continue, // new route, no existing connections to recycle
|
||||||
|
};
|
||||||
|
|
||||||
|
// Security diff: re-evaluate existing connections' IPs
|
||||||
|
let old_sec = serde_json::to_string(&old_route.security).ok();
|
||||||
|
let new_sec = serde_json::to_string(&new_route.security).ok();
|
||||||
|
if old_sec != new_sec {
|
||||||
|
if let Some(ref security) = new_route.security {
|
||||||
|
registry.recycle_for_security_change(new_id, security);
|
||||||
|
}
|
||||||
|
// If security removed entirely (became more permissive), no recycling needed
|
||||||
|
}
|
||||||
|
|
||||||
|
// Action diff (targets, TLS mode, etc.): recycle all connections on route
|
||||||
|
let old_action = serde_json::to_string(&old_route.action).ok();
|
||||||
|
let new_action = serde_json::to_string(&new_route.action).ok();
|
||||||
|
if old_action != new_action {
|
||||||
|
registry.recycle_for_route_change(new_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Add new ports
|
// Add new ports
|
||||||
for port in &new_ports {
|
for port in &new_ports {
|
||||||
if !old_ports.contains(port) {
|
if !old_ports.contains(port) {
|
||||||
@@ -787,12 +833,16 @@ impl RustProxy {
|
|||||||
if self.udp_listener_manager.is_none() {
|
if self.udp_listener_manager.is_none() {
|
||||||
if let Some(ref listener) = self.listener_manager {
|
if let Some(ref listener) = self.listener_manager {
|
||||||
let conn_tracker = listener.conn_tracker().clone();
|
let conn_tracker = listener.conn_tracker().clone();
|
||||||
|
let route_cancels = listener.route_cancels().clone();
|
||||||
|
let connection_registry = listener.connection_registry().clone();
|
||||||
let conn_config = Self::build_connection_config(&self.options);
|
let conn_config = Self::build_connection_config(&self.options);
|
||||||
let mut udp_mgr = UdpListenerManager::new(
|
let mut udp_mgr = UdpListenerManager::new(
|
||||||
Arc::clone(&new_manager),
|
Arc::clone(&new_manager),
|
||||||
Arc::clone(&self.metrics),
|
Arc::clone(&self.metrics),
|
||||||
conn_tracker,
|
conn_tracker,
|
||||||
self.cancel_token.clone(),
|
self.cancel_token.clone(),
|
||||||
|
route_cancels,
|
||||||
|
connection_registry,
|
||||||
);
|
);
|
||||||
udp_mgr.set_proxy_ips(conn_config.proxy_ips);
|
udp_mgr.set_proxy_ips(conn_config.proxy_ips);
|
||||||
self.udp_listener_manager = Some(udp_mgr);
|
self.udp_listener_manager = Some(udp_mgr);
|
||||||
@@ -1096,6 +1146,10 @@ impl RustProxy {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Load a certificate for a domain and hot-swap the TLS configuration.
|
/// Load a certificate for a domain and hot-swap the TLS configuration.
|
||||||
|
///
|
||||||
|
/// If the cert PEM differs from the currently loaded cert for this domain,
|
||||||
|
/// existing connections for the domain are gracefully recycled (GOAWAY for
|
||||||
|
/// HTTP/2, Connection: close for HTTP/1.1, graceful FIN for TCP).
|
||||||
pub async fn load_certificate(
|
pub async fn load_certificate(
|
||||||
&mut self,
|
&mut self,
|
||||||
domain: &str,
|
domain: &str,
|
||||||
@@ -1105,6 +1159,12 @@ impl RustProxy {
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
info!("Loading certificate for domain: {}", domain);
|
info!("Loading certificate for domain: {}", domain);
|
||||||
|
|
||||||
|
// Check if the cert actually changed (for selective connection recycling)
|
||||||
|
let cert_changed = self.loaded_certs
|
||||||
|
.get(domain)
|
||||||
|
.map(|existing| existing.cert_pem != cert_pem)
|
||||||
|
.unwrap_or(false); // new domain = no existing connections to recycle
|
||||||
|
|
||||||
// Store in cert manager if available
|
// Store in cert manager if available
|
||||||
if let Some(ref cm_arc) = self.cert_manager {
|
if let Some(ref cm_arc) = self.cert_manager {
|
||||||
let now = std::time::SystemTime::now()
|
let now = std::time::SystemTime::now()
|
||||||
@@ -1153,6 +1213,13 @@ impl RustProxy {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Recycle existing connections if cert actually changed
|
||||||
|
if cert_changed {
|
||||||
|
if let Some(ref listener) = self.listener_manager {
|
||||||
|
listener.connection_registry().recycle_for_cert_change(domain);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
info!("Certificate loaded and TLS config updated for {}", domain);
|
info!("Certificate loaded and TLS config updated for {}", domain);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: '@push.rocks/smartproxy',
|
name: '@push.rocks/smartproxy',
|
||||||
version: '27.0.0',
|
version: '27.1.0',
|
||||||
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
|
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user