Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e8db7bc96d | |||
| 2621dea9fa | |||
| bb5b9b3d12 | |||
| d70c2d77ed |
14
changelog.md
14
changelog.md
@@ -1,5 +1,19 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-03-19 - 25.16.3 - fix(rustproxy)
|
||||
upgrade fallback UDP listeners to QUIC when TLS certificates become available
|
||||
|
||||
- Rebuild and apply QUIC TLS configuration during route and certificate updates instead of only when adding new UDP ports.
|
||||
- Add logic to drain UDP sessions, stop raw fallback listeners, and start QUIC endpoints on existing ports once TLS is available.
|
||||
- Retry QUIC endpoint creation during upgrade and fall back to rebinding raw UDP if the upgrade cannot complete.
|
||||
|
||||
## 2026-03-19 - 25.16.2 - fix(rustproxy-http)
|
||||
cache backend Alt-Svc only from original upstream responses during protocol auto-detection
|
||||
|
||||
- Moves Alt-Svc discovery into streaming response construction so it reads backend headers before response filters inject client-facing Alt-Svc values
|
||||
- Stores the protocol cache key in connection activity during auto-detect mode and clears it after HTTP/3 connection failure to avoid re-caching failed H3 routes
|
||||
- Prevents fallback requests from reintroducing stale or self-injected Alt-Svc entries that could cause repeated H3 retry loops
|
||||
|
||||
## 2026-03-19 - 25.16.1 - fix(http-proxy)
|
||||
avoid repeated HTTP/3 recaching after QUIC fallback and document backend protocol selection
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartproxy",
|
||||
"version": "25.16.1",
|
||||
"version": "25.16.3",
|
||||
"private": false,
|
||||
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
|
||||
"main": "dist_ts/index.js",
|
||||
|
||||
@@ -43,6 +43,10 @@ struct ConnActivity {
|
||||
/// increments on creation and decrements on Drop, keeping the watchdog aware that
|
||||
/// a response body is still streaming after the request handler has returned.
|
||||
active_requests: Option<Arc<AtomicU64>>,
|
||||
/// Protocol cache key for Alt-Svc discovery. When set, `build_streaming_response`
|
||||
/// checks the backend's original response headers for Alt-Svc before our
|
||||
/// ResponseFilter injects its own. None when not in auto-detect mode or after H3 failure.
|
||||
alt_svc_cache_key: Option<crate::protocol_cache::ProtocolCacheKey>,
|
||||
}
|
||||
|
||||
/// Default upstream connect timeout (30 seconds).
|
||||
@@ -341,7 +345,7 @@ impl HttpProxyService {
|
||||
let cn = cancel_inner.clone();
|
||||
let la = Arc::clone(&la_inner);
|
||||
let st = start;
|
||||
let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start, active_requests: Some(Arc::clone(&ar_inner)) };
|
||||
let ca = ConnActivity { last_activity: Arc::clone(&la_inner), start, active_requests: Some(Arc::clone(&ar_inner)), alt_svc_cache_key: None };
|
||||
async move {
|
||||
let result = svc.handle_request(req, peer, port, cn, ca).await;
|
||||
// Mark request end — update activity timestamp before guard drops
|
||||
@@ -418,7 +422,7 @@ impl HttpProxyService {
|
||||
peer_addr: std::net::SocketAddr,
|
||||
port: u16,
|
||||
cancel: CancellationToken,
|
||||
conn_activity: ConnActivity,
|
||||
mut conn_activity: ConnActivity,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let host = req.headers()
|
||||
.get("host")
|
||||
@@ -703,9 +707,11 @@ impl HttpProxyService {
|
||||
ProtocolDecision::AlpnProbe => (false, true),
|
||||
};
|
||||
|
||||
// Track whether H3 connect failed — suppresses Alt-Svc re-caching to prevent
|
||||
// the loop: H3 cached → QUIC timeout → H2/H1 fallback → Alt-Svc re-caches H3 → repeat
|
||||
let mut h3_connect_failed = false;
|
||||
// Set Alt-Svc cache key on conn_activity so build_streaming_response can check
|
||||
// the backend's original Alt-Svc header before ResponseFilter injects our own.
|
||||
if is_auto_detect_mode {
|
||||
conn_activity.alt_svc_cache_key = Some(protocol_cache_key.clone());
|
||||
}
|
||||
|
||||
// --- H3 path: try QUIC connection before TCP ---
|
||||
if let ProtocolDecision::H3 { port: h3_port } = protocol_decision {
|
||||
@@ -742,7 +748,9 @@ impl HttpProxyService {
|
||||
Err(e) => {
|
||||
warn!(backend = %upstream_key, error = %e,
|
||||
"H3 backend connect failed, falling back to H2/H1");
|
||||
h3_connect_failed = true;
|
||||
// Suppress Alt-Svc caching for the fallback to prevent re-caching H3
|
||||
// from our own injected Alt-Svc header or a stale backend Alt-Svc
|
||||
conn_activity.alt_svc_cache_key = None;
|
||||
// Force ALPN probe on TCP fallback so we correctly detect H2 vs H1
|
||||
// (don't cache anything yet — let the ALPN probe decide)
|
||||
if is_auto_detect_mode && upstream.use_tls {
|
||||
@@ -948,22 +956,6 @@ impl HttpProxyService {
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
self.metrics.backend_connection_closed(&upstream_key);
|
||||
|
||||
// --- Alt-Svc discovery: check if backend advertises H3 ---
|
||||
// Suppress Alt-Svc caching when we just failed an H3 attempt to prevent the loop:
|
||||
// H3 cached → QUIC timeout → fallback → Alt-Svc re-caches H3 → repeat.
|
||||
// The ALPN probe already cached H1 or H2; it will expire after 5min TTL,
|
||||
// at which point we'll re-probe and see Alt-Svc again, retrying QUIC then.
|
||||
if is_auto_detect_mode && !h3_connect_failed {
|
||||
if let Ok(ref resp) = result {
|
||||
if let Some(alt_svc) = resp.headers().get("alt-svc").and_then(|v| v.to_str().ok()) {
|
||||
if let Some(h3_port) = parse_alt_svc_h3_port(alt_svc) {
|
||||
debug!(backend = %upstream_key, h3_port, "Backend advertises H3 via Alt-Svc");
|
||||
self.protocol_cache.insert_h3(protocol_cache_key, h3_port);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
@@ -1762,6 +1754,19 @@ impl HttpProxyService {
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let (resp_parts, resp_body) = upstream_response.into_parts();
|
||||
|
||||
// Check for Alt-Svc in the backend's ORIGINAL response headers BEFORE
|
||||
// ResponseFilter::apply_headers runs — the filter may inject our own Alt-Svc
|
||||
// for client-facing HTTP/3 advertisement, which must not be confused with
|
||||
// backend-originated Alt-Svc.
|
||||
if let Some(ref cache_key) = conn_activity.alt_svc_cache_key {
|
||||
if let Some(alt_svc) = resp_parts.headers.get("alt-svc").and_then(|v| v.to_str().ok()) {
|
||||
if let Some(h3_port) = parse_alt_svc_h3_port(alt_svc) {
|
||||
debug!(h3_port, "Backend advertises H3 via Alt-Svc");
|
||||
self.protocol_cache.insert_h3(cache_key.clone(), h3_port);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut response = Response::builder()
|
||||
.status(resp_parts.status);
|
||||
|
||||
|
||||
@@ -222,6 +222,117 @@ impl UdpListenerManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Upgrade raw UDP fallback listeners to QUIC endpoints.
|
||||
///
|
||||
/// At startup, if no TLS certs are available, QUIC routes fall back to raw UDP.
|
||||
/// When certs become available later (via loadCertificate IPC or ACME), this method
|
||||
/// stops the raw UDP listener, drains sessions, and creates a proper QUIC endpoint.
|
||||
///
|
||||
/// This is idempotent — ports that already have QUIC endpoints are skipped.
|
||||
pub async fn upgrade_raw_to_quic(&mut self, tls_config: Arc<rustls::ServerConfig>) {
|
||||
// Find ports that are raw UDP fallback (endpoint=None) but have QUIC routes
|
||||
let rm = self.route_manager.load();
|
||||
let upgrade_ports: Vec<u16> = self.listeners.iter()
|
||||
.filter(|(_, (_, endpoint))| endpoint.is_none())
|
||||
.filter(|(port, _)| {
|
||||
rm.routes_for_port(**port).iter().any(|r| {
|
||||
r.action.udp.as_ref()
|
||||
.and_then(|u| u.quic.as_ref())
|
||||
.is_some()
|
||||
})
|
||||
})
|
||||
.map(|(port, _)| *port)
|
||||
.collect();
|
||||
|
||||
for port in upgrade_ports {
|
||||
info!("Upgrading raw UDP listener on port {} to QUIC endpoint", port);
|
||||
|
||||
// Stop the raw UDP listener task and drain sessions to release the socket
|
||||
if let Some((handle, _)) = self.listeners.remove(&port) {
|
||||
handle.abort();
|
||||
}
|
||||
let drained = self.session_table.drain_port(
|
||||
port, &self.metrics, &self.conn_tracker,
|
||||
);
|
||||
if drained > 0 {
|
||||
debug!("Drained {} UDP sessions on port {} for QUIC upgrade", drained, port);
|
||||
}
|
||||
|
||||
// Brief yield to let aborted tasks drop their socket references
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
// Create QUIC endpoint on the now-free port
|
||||
match crate::quic_handler::create_quic_endpoint(port, Arc::clone(&tls_config)) {
|
||||
Ok(endpoint) => {
|
||||
let endpoint_for_updates = endpoint.clone();
|
||||
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
|
||||
endpoint,
|
||||
port,
|
||||
Arc::clone(&self.route_manager),
|
||||
Arc::clone(&self.metrics),
|
||||
Arc::clone(&self.conn_tracker),
|
||||
self.cancel_token.child_token(),
|
||||
self.h3_service.clone(),
|
||||
));
|
||||
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||
info!("QUIC endpoint started on port {} (upgraded from raw UDP)", port);
|
||||
}
|
||||
Err(e) => {
|
||||
// Port may still be held — retry once after a brief delay
|
||||
warn!("QUIC endpoint creation failed on port {}, retrying: {}", port, e);
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
|
||||
match crate::quic_handler::create_quic_endpoint(port, Arc::clone(&tls_config)) {
|
||||
Ok(endpoint) => {
|
||||
let endpoint_for_updates = endpoint.clone();
|
||||
let handle = tokio::spawn(crate::quic_handler::quic_accept_loop(
|
||||
endpoint,
|
||||
port,
|
||||
Arc::clone(&self.route_manager),
|
||||
Arc::clone(&self.metrics),
|
||||
Arc::clone(&self.conn_tracker),
|
||||
self.cancel_token.child_token(),
|
||||
self.h3_service.clone(),
|
||||
));
|
||||
self.listeners.insert(port, (handle, Some(endpoint_for_updates)));
|
||||
info!("QUIC endpoint started on port {} (upgraded from raw UDP, retry)", port);
|
||||
}
|
||||
Err(e2) => {
|
||||
error!("Failed to upgrade port {} to QUIC after retry: {}. \
|
||||
Rebinding as raw UDP.", port, e2);
|
||||
// Fallback: rebind as raw UDP so the port isn't dead
|
||||
if let Ok(()) = self.rebind_raw_udp(port).await {
|
||||
warn!("Port {} rebound as raw UDP (QUIC upgrade failed)", port);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Rebind a port as a raw UDP listener (fallback when QUIC upgrade fails).
|
||||
async fn rebind_raw_udp(&mut self, port: u16) -> anyhow::Result<()> {
|
||||
let addr: std::net::SocketAddr = ([0, 0, 0, 0], port).into();
|
||||
let socket = UdpSocket::bind(addr).await?;
|
||||
let socket = Arc::new(socket);
|
||||
|
||||
let handle = tokio::spawn(Self::recv_loop(
|
||||
socket,
|
||||
port,
|
||||
Arc::clone(&self.route_manager),
|
||||
Arc::clone(&self.metrics),
|
||||
Arc::clone(&self.conn_tracker),
|
||||
Arc::clone(&self.session_table),
|
||||
Arc::clone(&self.datagram_handler_relay),
|
||||
Arc::clone(&self.relay_writer),
|
||||
self.cancel_token.child_token(),
|
||||
));
|
||||
|
||||
self.listeners.insert(port, (handle, None));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set the datagram handler relay socket path and establish connection.
|
||||
pub async fn set_datagram_handler_relay(&mut self, path: String) {
|
||||
// Cancel previous relay reader task if any
|
||||
|
||||
@@ -201,6 +201,36 @@ impl UdpSessionTable {
|
||||
removed
|
||||
}
|
||||
|
||||
/// Drain all sessions on a given listening port, releasing socket references.
|
||||
/// Used when upgrading a raw UDP listener to QUIC — the raw UDP socket's
|
||||
/// Arc refcount must drop to zero so the port can be rebound.
|
||||
pub fn drain_port(
|
||||
&self,
|
||||
port: u16,
|
||||
metrics: &MetricsCollector,
|
||||
conn_tracker: &ConnectionTracker,
|
||||
) -> usize {
|
||||
let keys: Vec<SessionKey> = self.sessions.iter()
|
||||
.filter(|entry| entry.key().1 == port)
|
||||
.map(|entry| *entry.key())
|
||||
.collect();
|
||||
|
||||
let mut removed = 0;
|
||||
for key in keys {
|
||||
if let Some(session) = self.remove(&key) {
|
||||
session.cancel.cancel();
|
||||
conn_tracker.connection_closed(&session.source_ip);
|
||||
metrics.connection_closed(
|
||||
session.route_id.as_deref(),
|
||||
Some(&session.source_ip.to_string()),
|
||||
);
|
||||
metrics.udp_session_closed();
|
||||
removed += 1;
|
||||
}
|
||||
}
|
||||
removed
|
||||
}
|
||||
|
||||
/// Total number of active sessions.
|
||||
pub fn session_count(&self) -> usize {
|
||||
self.sessions.len()
|
||||
|
||||
@@ -783,12 +783,10 @@ impl RustProxy {
|
||||
}
|
||||
}
|
||||
|
||||
// Build TLS config for QUIC before taking mutable borrow on udp_mgr
|
||||
let quic_tls = if new_udp_ports.iter().any(|p| !old_udp_ports.contains(p)) {
|
||||
// Build TLS config for QUIC (needed for new ports and upgrading existing raw UDP)
|
||||
let quic_tls = {
|
||||
let tls_configs = self.current_tls_configs().await;
|
||||
Self::build_quic_tls_config(&tls_configs)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
if let Some(ref mut udp_mgr) = self.udp_listener_manager {
|
||||
@@ -806,6 +804,12 @@ impl RustProxy {
|
||||
udp_mgr.remove_port(*port);
|
||||
}
|
||||
}
|
||||
|
||||
// Upgrade existing raw UDP fallback listeners to QUIC if TLS is now available
|
||||
if let Some(ref quic_config) = quic_tls {
|
||||
udp_mgr.update_quic_tls(Arc::clone(quic_config));
|
||||
udp_mgr.upgrade_raw_to_quic(Arc::clone(quic_config)).await;
|
||||
}
|
||||
}
|
||||
} else if self.udp_listener_manager.is_some() {
|
||||
// All UDP routes removed — shut down UDP manager
|
||||
@@ -862,12 +866,12 @@ impl RustProxy {
|
||||
.map_err(|e| anyhow::anyhow!("ACME provisioning failed: {}", e))?;
|
||||
|
||||
// Hot-swap into TLS configs
|
||||
if let Some(ref mut listener) = self.listener_manager {
|
||||
let mut tls_configs = Self::extract_tls_configs(&self.options.routes);
|
||||
tls_configs.insert(domain.clone(), TlsCertConfig {
|
||||
cert_pem: bundle.cert_pem.clone(),
|
||||
key_pem: bundle.key_pem.clone(),
|
||||
});
|
||||
let mut tls_configs = Self::extract_tls_configs(&self.options.routes);
|
||||
tls_configs.insert(domain.clone(), TlsCertConfig {
|
||||
cert_pem: bundle.cert_pem.clone(),
|
||||
key_pem: bundle.key_pem.clone(),
|
||||
});
|
||||
{
|
||||
let cm = cm_arc.lock().await;
|
||||
for (d, b) in cm.store().iter() {
|
||||
if !tls_configs.contains_key(d) {
|
||||
@@ -877,9 +881,22 @@ impl RustProxy {
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let quic_tls = Self::build_quic_tls_config(&tls_configs);
|
||||
|
||||
if let Some(ref listener) = self.listener_manager {
|
||||
listener.set_tls_configs(tls_configs);
|
||||
}
|
||||
|
||||
// Update existing QUIC endpoints and upgrade raw UDP fallback listeners
|
||||
if let Some(ref mut udp_mgr) = self.udp_listener_manager {
|
||||
if let Some(ref quic_config) = quic_tls {
|
||||
udp_mgr.update_quic_tls(Arc::clone(quic_config));
|
||||
udp_mgr.upgrade_raw_to_quic(Arc::clone(quic_config)).await;
|
||||
}
|
||||
}
|
||||
|
||||
info!("Certificate provisioned and loaded for route '{}'", route_name);
|
||||
Ok(())
|
||||
}
|
||||
@@ -1104,17 +1121,18 @@ impl RustProxy {
|
||||
// Hot-swap TLS config on TCP and QUIC listeners
|
||||
let tls_configs = self.current_tls_configs().await;
|
||||
|
||||
// Build QUIC TLS config before TCP consumes the map
|
||||
let quic_tls = Self::build_quic_tls_config(&tls_configs);
|
||||
|
||||
if let Some(ref listener) = self.listener_manager {
|
||||
// Build QUIC TLS config before TCP consumes the map
|
||||
let quic_tls = Self::build_quic_tls_config(&tls_configs);
|
||||
|
||||
listener.set_tls_configs(tls_configs);
|
||||
}
|
||||
|
||||
// Also update QUIC endpoints with the new certs
|
||||
if let Some(ref udp_mgr) = self.udp_listener_manager {
|
||||
if let Some(quic_config) = quic_tls {
|
||||
udp_mgr.update_quic_tls(quic_config);
|
||||
}
|
||||
// Update existing QUIC endpoints and upgrade raw UDP fallback listeners
|
||||
if let Some(ref mut udp_mgr) = self.udp_listener_manager {
|
||||
if let Some(ref quic_config) = quic_tls {
|
||||
udp_mgr.update_quic_tls(Arc::clone(quic_config));
|
||||
udp_mgr.upgrade_raw_to_quic(Arc::clone(quic_config)).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartproxy',
|
||||
version: '25.16.1',
|
||||
version: '25.16.3',
|
||||
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user