Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b4b8bd925d | |||
| 5ac44b898b | |||
| 9b4393b5ac | |||
| 02b4ed8018 | |||
| e4e4b4f1ec | |||
| d361a21543 |
24
changelog.md
24
changelog.md
@@ -1,5 +1,29 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-02-19 - 25.7.4 - fix(smart-proxy)
|
||||
include proxy IPs in smart proxy configuration
|
||||
|
||||
- Add proxyIps: this.settings.proxyIPs to proxy options in ts/proxies/smart-proxy/smart-proxy.ts
|
||||
- Ensures proxy IPs from settings are passed into the proxy implementation (enables proxy IP filtering/whitelisting)
|
||||
|
||||
## 2026-02-16 - 25.7.3 - fix(metrics)
|
||||
centralize connection-closed reporting via ConnectionGuard and remove duplicate explicit metrics.connection_closed calls
|
||||
|
||||
- Removed numerous explicit metrics.connection_closed calls from rust/crates/rustproxy-http/src/proxy_service.rs so connection teardown and byte counting are handled by the connection guard / counting body instead of ad-hoc calls.
|
||||
- Simplified ConnectionGuard in rust/crates/rustproxy-passthrough/src/tcp_listener.rs: removed the disarm flag and disarm() method so Drop always reports connection_closed.
|
||||
- Stopped disarming the TCP-level guard when handing connections off to HTTP proxy paths (HTTP/WebSocket/streaming flows) to avoid missing or double-reporting metrics.
|
||||
- Fixes incorrect/duplicate connection-closed metric emission and ensures consistent byte/connection accounting during streaming and WebSocket upgrades.
|
||||
|
||||
## 2026-02-16 - 25.7.2 - fix(rustproxy-http)
|
||||
preserve original Host header when proxying and add X-Forwarded-* headers; add TLS WebSocket echo backend helper and integration test for terminate-and-reencrypt websocket
|
||||
|
||||
- Preserve the client's original Host header instead of replacing it with backend host:port when proxying requests.
|
||||
- Add standard reverse-proxy headers: X-Forwarded-For (appends client IP), X-Forwarded-Host, and X-Forwarded-Proto for upstream requests.
|
||||
- Ensure raw TCP/HTTP upstream requests copy original headers and skip X-Forwarded-* (which are added explicitly).
|
||||
- Add start_tls_ws_echo_backend test helper to start a TLS WebSocket echo backend for tests.
|
||||
- Add integration test test_terminate_and_reencrypt_websocket to verify WS upgrade through terminate-and-reencrypt TLS path.
|
||||
- Rename unused parameter upstream to _upstream in proxy_service functions to avoid warnings.
|
||||
|
||||
## 2026-02-16 - 25.7.1 - fix(proxy)
|
||||
use TLS to backends for terminate-and-reencrypt routes
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartproxy",
|
||||
"version": "25.7.1",
|
||||
"version": "25.7.4",
|
||||
"private": false,
|
||||
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
|
||||
"main": "dist_ts/index.js",
|
||||
|
||||
@@ -309,12 +309,10 @@ impl HttpProxyService {
|
||||
let route_id = route_match.route.id.as_deref();
|
||||
let ip_str = peer_addr.ip().to_string();
|
||||
self.metrics.record_http_request();
|
||||
self.metrics.connection_opened(route_id, Some(&ip_str));
|
||||
|
||||
// Apply request filters (IP check, rate limiting, auth)
|
||||
if let Some(ref security) = route_match.route.security {
|
||||
if let Some(response) = RequestFilter::apply(security, &req, &peer_addr) {
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(response);
|
||||
}
|
||||
}
|
||||
@@ -322,7 +320,6 @@ impl HttpProxyService {
|
||||
// Check for test response (returns immediately, no upstream needed)
|
||||
if let Some(ref advanced) = route_match.route.action.advanced {
|
||||
if let Some(ref test_response) = advanced.test_response {
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(Self::build_test_response(test_response));
|
||||
}
|
||||
}
|
||||
@@ -330,7 +327,6 @@ impl HttpProxyService {
|
||||
// Check for static file serving
|
||||
if let Some(ref advanced) = route_match.route.action.advanced {
|
||||
if let Some(ref static_files) = advanced.static_files {
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(Self::serve_static_file(&path, static_files));
|
||||
}
|
||||
}
|
||||
@@ -339,7 +335,6 @@ impl HttpProxyService {
|
||||
let target = match route_match.target {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "No target available"));
|
||||
}
|
||||
};
|
||||
@@ -404,6 +399,51 @@ impl HttpProxyService {
|
||||
}
|
||||
}
|
||||
|
||||
// Add standard reverse-proxy headers (X-Forwarded-*)
|
||||
{
|
||||
let original_host = parts.headers.get("host")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.unwrap_or("");
|
||||
let forwarded_proto = if route_match.route.action.tls.as_ref()
|
||||
.map(|t| matches!(t.mode,
|
||||
rustproxy_config::TlsMode::Terminate
|
||||
| rustproxy_config::TlsMode::TerminateAndReencrypt))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
"https"
|
||||
} else {
|
||||
"http"
|
||||
};
|
||||
|
||||
// X-Forwarded-For: append client IP to existing chain
|
||||
let client_ip = peer_addr.ip().to_string();
|
||||
let xff_value = if let Some(existing) = upstream_headers.get("x-forwarded-for") {
|
||||
format!("{}, {}", existing.to_str().unwrap_or(""), client_ip)
|
||||
} else {
|
||||
client_ip
|
||||
};
|
||||
if let Ok(val) = hyper::header::HeaderValue::from_str(&xff_value) {
|
||||
upstream_headers.insert(
|
||||
hyper::header::HeaderName::from_static("x-forwarded-for"),
|
||||
val,
|
||||
);
|
||||
}
|
||||
// X-Forwarded-Host: original Host header
|
||||
if let Ok(val) = hyper::header::HeaderValue::from_str(original_host) {
|
||||
upstream_headers.insert(
|
||||
hyper::header::HeaderName::from_static("x-forwarded-host"),
|
||||
val,
|
||||
);
|
||||
}
|
||||
// X-Forwarded-Proto: original client protocol
|
||||
if let Ok(val) = hyper::header::HeaderValue::from_str(forwarded_proto) {
|
||||
upstream_headers.insert(
|
||||
hyper::header::HeaderName::from_static("x-forwarded-proto"),
|
||||
val,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Connect to upstream with timeout (TLS if upstream.use_tls is set)
|
||||
let backend = if upstream.use_tls {
|
||||
match tokio::time::timeout(
|
||||
@@ -414,13 +454,11 @@ impl HttpProxyService {
|
||||
Ok(Err(e)) => {
|
||||
error!("Failed TLS connect to upstream {}:{}: {}", upstream.host, upstream.port, e);
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend TLS unavailable"));
|
||||
}
|
||||
Err(_) => {
|
||||
error!("Upstream TLS connect timeout for {}:{}", upstream.host, upstream.port);
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend TLS connect timeout"));
|
||||
}
|
||||
}
|
||||
@@ -436,13 +474,11 @@ impl HttpProxyService {
|
||||
Ok(Err(e)) => {
|
||||
error!("Failed to connect to upstream {}:{}: {}", upstream.host, upstream.port, e);
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable"));
|
||||
}
|
||||
Err(_) => {
|
||||
error!("Upstream connect timeout for {}:{}", upstream.host, upstream.port);
|
||||
self.upstream_selector.connection_ended(&upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(&ip_str));
|
||||
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout"));
|
||||
}
|
||||
}
|
||||
@@ -469,7 +505,7 @@ impl HttpProxyService {
|
||||
body: Incoming,
|
||||
upstream_headers: hyper::HeaderMap,
|
||||
upstream_path: &str,
|
||||
upstream: &crate::upstream_selector::UpstreamSelection,
|
||||
_upstream: &crate::upstream_selector::UpstreamSelection,
|
||||
route: &rustproxy_config::RouteConfig,
|
||||
route_id: Option<&str>,
|
||||
source_ip: &str,
|
||||
@@ -478,7 +514,6 @@ impl HttpProxyService {
|
||||
Ok(h) => h,
|
||||
Err(e) => {
|
||||
error!("Upstream handshake failed: {}", e);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend handshake failed"));
|
||||
}
|
||||
};
|
||||
@@ -496,11 +531,6 @@ impl HttpProxyService {
|
||||
|
||||
if let Some(headers) = upstream_req.headers_mut() {
|
||||
*headers = upstream_headers;
|
||||
if let Ok(host_val) = hyper::header::HeaderValue::from_str(
|
||||
&format!("{}:{}", upstream.host, upstream.port)
|
||||
) {
|
||||
headers.insert(hyper::header::HOST, host_val);
|
||||
}
|
||||
}
|
||||
|
||||
// Wrap the request body in CountingBody to track bytes_in
|
||||
@@ -519,7 +549,6 @@ impl HttpProxyService {
|
||||
Ok(resp) => resp,
|
||||
Err(e) => {
|
||||
error!("Upstream request failed: {}", e);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend request failed"));
|
||||
}
|
||||
};
|
||||
@@ -535,7 +564,7 @@ impl HttpProxyService {
|
||||
body: Incoming,
|
||||
upstream_headers: hyper::HeaderMap,
|
||||
upstream_path: &str,
|
||||
upstream: &crate::upstream_selector::UpstreamSelection,
|
||||
_upstream: &crate::upstream_selector::UpstreamSelection,
|
||||
route: &rustproxy_config::RouteConfig,
|
||||
route_id: Option<&str>,
|
||||
source_ip: &str,
|
||||
@@ -545,7 +574,6 @@ impl HttpProxyService {
|
||||
Ok(h) => h,
|
||||
Err(e) => {
|
||||
error!("HTTP/2 upstream handshake failed: {}", e);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 handshake failed"));
|
||||
}
|
||||
};
|
||||
@@ -562,11 +590,6 @@ impl HttpProxyService {
|
||||
|
||||
if let Some(headers) = upstream_req.headers_mut() {
|
||||
*headers = upstream_headers;
|
||||
if let Ok(host_val) = hyper::header::HeaderValue::from_str(
|
||||
&format!("{}:{}", upstream.host, upstream.port)
|
||||
) {
|
||||
headers.insert(hyper::header::HOST, host_val);
|
||||
}
|
||||
}
|
||||
|
||||
// Wrap the request body in CountingBody to track bytes_in
|
||||
@@ -585,7 +608,6 @@ impl HttpProxyService {
|
||||
Ok(resp) => resp,
|
||||
Err(e) => {
|
||||
error!("HTTP/2 upstream request failed: {}", e);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 request failed"));
|
||||
}
|
||||
};
|
||||
@@ -596,8 +618,7 @@ impl HttpProxyService {
|
||||
/// Build the client-facing response from an upstream response, streaming the body.
|
||||
///
|
||||
/// The response body is wrapped in a `CountingBody` that counts bytes as they
|
||||
/// stream from upstream to client. When the body is fully consumed (or dropped),
|
||||
/// it reports byte counts to the metrics collector and calls `connection_closed`.
|
||||
/// stream from upstream to client.
|
||||
async fn build_streaming_response(
|
||||
&self,
|
||||
upstream_response: Response<Incoming>,
|
||||
@@ -626,11 +647,6 @@ impl HttpProxyService {
|
||||
Direction::Out,
|
||||
);
|
||||
|
||||
// Close the connection metric now — the HTTP request/response cycle is done
|
||||
// from the proxy's perspective once we hand the streaming body to hyper.
|
||||
// Bytes will still be counted as they flow.
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
|
||||
let body: BoxBody<Bytes, hyper::Error> = BoxBody::new(counting_body);
|
||||
|
||||
Ok(response.body(body).unwrap())
|
||||
@@ -662,7 +678,6 @@ impl HttpProxyService {
|
||||
.unwrap_or("");
|
||||
if !allowed_origins.is_empty() && !allowed_origins.iter().any(|o| o == "*" || o == origin) {
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::FORBIDDEN, "Origin not allowed"));
|
||||
}
|
||||
}
|
||||
@@ -680,13 +695,11 @@ impl HttpProxyService {
|
||||
Ok(Err(e)) => {
|
||||
error!("WebSocket: failed TLS connect upstream {}:{}: {}", upstream.host, upstream.port, e);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend TLS unavailable"));
|
||||
}
|
||||
Err(_) => {
|
||||
error!("WebSocket: upstream TLS connect timeout for {}:{}", upstream.host, upstream.port);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend TLS connect timeout"));
|
||||
}
|
||||
}
|
||||
@@ -702,13 +715,11 @@ impl HttpProxyService {
|
||||
Ok(Err(e)) => {
|
||||
error!("WebSocket: failed to connect upstream {}:{}: {}", upstream.host, upstream.port, e);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable"));
|
||||
}
|
||||
Err(_) => {
|
||||
error!("WebSocket: upstream connect timeout for {}:{}", upstream.host, upstream.port);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend connect timeout"));
|
||||
}
|
||||
}
|
||||
@@ -739,13 +750,44 @@ impl HttpProxyService {
|
||||
parts.method, upstream_path
|
||||
);
|
||||
|
||||
let upstream_host = format!("{}:{}", upstream.host, upstream.port);
|
||||
// Copy all original headers (preserving the client's Host header).
|
||||
// Skip X-Forwarded-* since we set them ourselves below.
|
||||
for (name, value) in parts.headers.iter() {
|
||||
if name == hyper::header::HOST {
|
||||
raw_request.push_str(&format!("host: {}\r\n", upstream_host));
|
||||
} else {
|
||||
raw_request.push_str(&format!("{}: {}\r\n", name, value.to_str().unwrap_or("")));
|
||||
let name_str = name.as_str();
|
||||
if name_str == "x-forwarded-for"
|
||||
|| name_str == "x-forwarded-host"
|
||||
|| name_str == "x-forwarded-proto"
|
||||
{
|
||||
continue;
|
||||
}
|
||||
raw_request.push_str(&format!("{}: {}\r\n", name, value.to_str().unwrap_or("")));
|
||||
}
|
||||
|
||||
// Add standard reverse-proxy headers (X-Forwarded-*)
|
||||
{
|
||||
let original_host = parts.headers.get("host")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.unwrap_or("");
|
||||
let forwarded_proto = if route.action.tls.as_ref()
|
||||
.map(|t| matches!(t.mode,
|
||||
rustproxy_config::TlsMode::Terminate
|
||||
| rustproxy_config::TlsMode::TerminateAndReencrypt))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
"https"
|
||||
} else {
|
||||
"http"
|
||||
};
|
||||
|
||||
let client_ip = peer_addr.ip().to_string();
|
||||
let xff_value = if let Some(existing) = parts.headers.get("x-forwarded-for") {
|
||||
format!("{}, {}", existing.to_str().unwrap_or(""), client_ip)
|
||||
} else {
|
||||
client_ip
|
||||
};
|
||||
raw_request.push_str(&format!("x-forwarded-for: {}\r\n", xff_value));
|
||||
raw_request.push_str(&format!("x-forwarded-host: {}\r\n", original_host));
|
||||
raw_request.push_str(&format!("x-forwarded-proto: {}\r\n", forwarded_proto));
|
||||
}
|
||||
|
||||
if let Some(ref route_headers) = route.headers {
|
||||
@@ -770,7 +812,6 @@ impl HttpProxyService {
|
||||
if let Err(e) = upstream_stream.write_all(raw_request.as_bytes()).await {
|
||||
error!("WebSocket: failed to send upgrade request to upstream: {}", e);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend write failed"));
|
||||
}
|
||||
|
||||
@@ -781,7 +822,6 @@ impl HttpProxyService {
|
||||
Ok(0) => {
|
||||
error!("WebSocket: upstream closed before completing handshake");
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend closed"));
|
||||
}
|
||||
Ok(_) => {
|
||||
@@ -795,14 +835,12 @@ impl HttpProxyService {
|
||||
if response_buf.len() > 8192 {
|
||||
error!("WebSocket: upstream response headers too large");
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend response too large"));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("WebSocket: failed to read upstream response: {}", e);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend read failed"));
|
||||
}
|
||||
}
|
||||
@@ -820,7 +858,6 @@ impl HttpProxyService {
|
||||
if status_code != 101 {
|
||||
debug!("WebSocket: upstream rejected upgrade with status {}", status_code);
|
||||
self.upstream_selector.connection_ended(upstream_key);
|
||||
self.metrics.connection_closed(route_id, Some(source_ip));
|
||||
return Ok(error_response(
|
||||
StatusCode::from_u16(status_code).unwrap_or(StatusCode::BAD_GATEWAY),
|
||||
"WebSocket upgrade rejected by backend",
|
||||
@@ -864,9 +901,6 @@ impl HttpProxyService {
|
||||
Err(e) => {
|
||||
debug!("WebSocket: client upgrade failed: {}", e);
|
||||
upstream_selector.connection_ended(&upstream_key_owned);
|
||||
if let Some(ref rid) = route_id_owned {
|
||||
metrics.connection_closed(Some(rid.as_str()), Some(&source_ip_owned));
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -971,7 +1005,6 @@ impl HttpProxyService {
|
||||
upstream_selector.connection_ended(&upstream_key_owned);
|
||||
if let Some(ref rid) = route_id_owned {
|
||||
metrics.record_bytes(bytes_in, bytes_out, Some(rid.as_str()), Some(&source_ip_owned));
|
||||
metrics.connection_closed(Some(rid.as_str()), Some(&source_ip_owned));
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -22,7 +22,6 @@ struct ConnectionGuard {
|
||||
metrics: Arc<MetricsCollector>,
|
||||
route_id: Option<String>,
|
||||
source_ip: Option<String>,
|
||||
disarmed: bool,
|
||||
}
|
||||
|
||||
impl ConnectionGuard {
|
||||
@@ -31,22 +30,13 @@ impl ConnectionGuard {
|
||||
metrics,
|
||||
route_id: route_id.map(|s| s.to_string()),
|
||||
source_ip: source_ip.map(|s| s.to_string()),
|
||||
disarmed: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Disarm the guard — prevents the Drop from running.
|
||||
/// Use when handing off to a path that manages its own cleanup (e.g., HTTP proxy).
|
||||
fn disarm(mut self) {
|
||||
self.disarmed = true;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ConnectionGuard {
|
||||
fn drop(&mut self) {
|
||||
if !self.disarmed {
|
||||
self.metrics.connection_closed(self.route_id.as_deref(), self.source_ip.as_deref());
|
||||
}
|
||||
self.metrics.connection_closed(self.route_id.as_deref(), self.source_ip.as_deref());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -844,8 +834,6 @@ impl TcpListenerManager {
|
||||
"TLS Terminate + HTTP: {} -> {}:{} (domain: {:?})",
|
||||
peer_addr, target_host, target_port, domain
|
||||
);
|
||||
// HTTP proxy manages its own per-request metrics — disarm TCP-level guard
|
||||
_conn_guard.disarm();
|
||||
http_proxy.handle_io(buf_stream, peer_addr, port, cancel.clone()).await;
|
||||
} else {
|
||||
debug!(
|
||||
@@ -917,7 +905,6 @@ impl TcpListenerManager {
|
||||
"TLS Terminate+Reencrypt + HTTP: {} (domain: {:?})",
|
||||
peer_addr, domain
|
||||
);
|
||||
_conn_guard.disarm();
|
||||
http_proxy.handle_io(buf_stream, peer_addr, port, cancel.clone()).await;
|
||||
} else {
|
||||
// Non-HTTP: TLS-to-TLS tunnel (existing behavior for raw TCP protocols)
|
||||
@@ -937,8 +924,6 @@ impl TcpListenerManager {
|
||||
if is_http {
|
||||
// Plain HTTP - use HTTP proxy for request-level routing
|
||||
debug!("HTTP proxy: {} on port {}", peer_addr, port);
|
||||
// HTTP proxy manages its own per-request metrics — disarm TCP-level guard
|
||||
_conn_guard.disarm();
|
||||
http_proxy.handle_connection(stream, peer_addr, port, cancel.clone()).await;
|
||||
Ok(())
|
||||
} else {
|
||||
|
||||
@@ -452,6 +452,86 @@ pub fn make_tls_terminate_route(
|
||||
route
|
||||
}
|
||||
|
||||
/// Start a TLS WebSocket echo backend: accepts TLS, performs WS handshake, then echoes data.
|
||||
/// Combines TLS acceptance (like `start_tls_http_backend`) with WebSocket echo (like `start_ws_echo_backend`).
|
||||
pub async fn start_tls_ws_echo_backend(
|
||||
port: u16,
|
||||
cert_pem: &str,
|
||||
key_pem: &str,
|
||||
) -> JoinHandle<()> {
|
||||
use std::sync::Arc;
|
||||
|
||||
let acceptor = rustproxy_passthrough::build_tls_acceptor(cert_pem, key_pem)
|
||||
.expect("Failed to build TLS acceptor");
|
||||
let acceptor = Arc::new(acceptor);
|
||||
|
||||
let listener = TcpListener::bind(format!("127.0.0.1:{}", port))
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("Failed to bind TLS WS echo backend on port {}", port));
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let (stream, _) = match listener.accept().await {
|
||||
Ok(conn) => conn,
|
||||
Err(_) => break,
|
||||
};
|
||||
let acc = acceptor.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut tls_stream = match acc.accept(stream).await {
|
||||
Ok(s) => s,
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
// Read the HTTP upgrade request
|
||||
let mut buf = vec![0u8; 4096];
|
||||
let n = match tls_stream.read(&mut buf).await {
|
||||
Ok(0) | Err(_) => return,
|
||||
Ok(n) => n,
|
||||
};
|
||||
|
||||
let req_str = String::from_utf8_lossy(&buf[..n]);
|
||||
|
||||
// Extract Sec-WebSocket-Key for handshake
|
||||
let ws_key = req_str
|
||||
.lines()
|
||||
.find(|l| l.to_lowercase().starts_with("sec-websocket-key:"))
|
||||
.map(|l| l.split(':').nth(1).unwrap_or("").trim().to_string())
|
||||
.unwrap_or_default();
|
||||
|
||||
// Send 101 Switching Protocols
|
||||
let accept_response = format!(
|
||||
"HTTP/1.1 101 Switching Protocols\r\n\
|
||||
Upgrade: websocket\r\n\
|
||||
Connection: Upgrade\r\n\
|
||||
Sec-WebSocket-Accept: {}\r\n\
|
||||
\r\n",
|
||||
ws_key
|
||||
);
|
||||
|
||||
if tls_stream
|
||||
.write_all(accept_response.as_bytes())
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// Echo all data back (raw TCP after upgrade)
|
||||
let mut echo_buf = vec![0u8; 65536];
|
||||
loop {
|
||||
let n = match tls_stream.read(&mut echo_buf).await {
|
||||
Ok(0) | Err(_) => break,
|
||||
Ok(n) => n,
|
||||
};
|
||||
if tls_stream.write_all(&echo_buf[..n]).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Helper to create a TLS passthrough route for testing.
|
||||
pub fn make_tls_passthrough_route(
|
||||
port: u16,
|
||||
|
||||
@@ -490,6 +490,12 @@ async fn test_terminate_and_reencrypt_http_routing() {
|
||||
"Expected /api/data path, got: {}",
|
||||
alpha_body
|
||||
);
|
||||
// Verify original Host header is preserved (not replaced with backend IP:port)
|
||||
assert!(
|
||||
alpha_body.contains(r#""host":"alpha.example.com"#),
|
||||
"Expected original Host header alpha.example.com, got: {}",
|
||||
alpha_body
|
||||
);
|
||||
|
||||
// Test beta domain - different host goes to different backend
|
||||
let beta_result = with_timeout(async {
|
||||
@@ -527,10 +533,136 @@ async fn test_terminate_and_reencrypt_http_routing() {
|
||||
"Expected /other path, got: {}",
|
||||
beta_body
|
||||
);
|
||||
// Verify original Host header is preserved for beta too
|
||||
assert!(
|
||||
beta_body.contains(r#""host":"beta.example.com"#),
|
||||
"Expected original Host header beta.example.com, got: {}",
|
||||
beta_body
|
||||
);
|
||||
|
||||
proxy.stop().await.unwrap();
|
||||
}
|
||||
|
||||
/// Test that WebSocket upgrade works through terminate-and-reencrypt mode.
|
||||
///
|
||||
/// Verifies the full chain: client→TLS→proxy terminates→re-encrypts→TLS→backend WebSocket.
|
||||
/// The proxy's `handle_websocket_upgrade` checks `upstream.use_tls` and calls
|
||||
/// `connect_tls_backend()` when true. This test covers that path.
|
||||
#[tokio::test]
|
||||
async fn test_terminate_and_reencrypt_websocket() {
|
||||
let backend_port = next_port();
|
||||
let proxy_port = next_port();
|
||||
let domain = "ws.example.com";
|
||||
|
||||
// Frontend cert (client→proxy TLS)
|
||||
let (frontend_cert, frontend_key) = generate_self_signed_cert(domain);
|
||||
// Backend cert (proxy→backend TLS)
|
||||
let (backend_cert, backend_key) = generate_self_signed_cert("localhost");
|
||||
|
||||
// Start TLS WebSocket echo backend
|
||||
let _backend = start_tls_ws_echo_backend(backend_port, &backend_cert, &backend_key).await;
|
||||
|
||||
// Create terminate-and-reencrypt route
|
||||
let mut route = make_tls_terminate_route(
|
||||
proxy_port,
|
||||
domain,
|
||||
"127.0.0.1",
|
||||
backend_port,
|
||||
&frontend_cert,
|
||||
&frontend_key,
|
||||
);
|
||||
route.action.tls.as_mut().unwrap().mode = rustproxy_config::TlsMode::TerminateAndReencrypt;
|
||||
|
||||
let options = RustProxyOptions {
|
||||
routes: vec![route],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut proxy = RustProxy::new(options).unwrap();
|
||||
proxy.start().await.unwrap();
|
||||
assert!(wait_for_port(proxy_port, 2000).await);
|
||||
|
||||
let result = with_timeout(
|
||||
async {
|
||||
let _ = rustls::crypto::ring::default_provider().install_default();
|
||||
let tls_config = rustls::ClientConfig::builder()
|
||||
.dangerous()
|
||||
.with_custom_certificate_verifier(std::sync::Arc::new(InsecureVerifier))
|
||||
.with_no_client_auth();
|
||||
let connector =
|
||||
tokio_rustls::TlsConnector::from(std::sync::Arc::new(tls_config));
|
||||
|
||||
let stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{}", proxy_port))
|
||||
.await
|
||||
.unwrap();
|
||||
let server_name =
|
||||
rustls::pki_types::ServerName::try_from(domain.to_string()).unwrap();
|
||||
let mut tls_stream = connector.connect(server_name, stream).await.unwrap();
|
||||
|
||||
// Send WebSocket upgrade request through TLS
|
||||
let request = format!(
|
||||
"GET /ws HTTP/1.1\r\n\
|
||||
Host: {}\r\n\
|
||||
Upgrade: websocket\r\n\
|
||||
Connection: Upgrade\r\n\
|
||||
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n\
|
||||
Sec-WebSocket-Version: 13\r\n\
|
||||
\r\n",
|
||||
domain
|
||||
);
|
||||
tls_stream.write_all(request.as_bytes()).await.unwrap();
|
||||
|
||||
// Read the 101 response (byte-by-byte until \r\n\r\n)
|
||||
let mut response_buf = Vec::with_capacity(4096);
|
||||
let mut temp = [0u8; 1];
|
||||
loop {
|
||||
let n = tls_stream.read(&mut temp).await.unwrap();
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
response_buf.push(temp[0]);
|
||||
if response_buf.len() >= 4 {
|
||||
let len = response_buf.len();
|
||||
if response_buf[len - 4..] == *b"\r\n\r\n" {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let response_str = String::from_utf8_lossy(&response_buf).to_string();
|
||||
assert!(
|
||||
response_str.contains("101"),
|
||||
"Expected 101 Switching Protocols, got: {}",
|
||||
response_str
|
||||
);
|
||||
assert!(
|
||||
response_str.to_lowercase().contains("upgrade: websocket"),
|
||||
"Expected Upgrade header, got: {}",
|
||||
response_str
|
||||
);
|
||||
|
||||
// After upgrade, send data and verify echo
|
||||
let test_data = b"Hello TLS WebSocket!";
|
||||
tls_stream.write_all(test_data).await.unwrap();
|
||||
|
||||
// Read echoed data
|
||||
let mut echo_buf = vec![0u8; 256];
|
||||
let n = tls_stream.read(&mut echo_buf).await.unwrap();
|
||||
let echoed = &echo_buf[..n];
|
||||
|
||||
assert_eq!(echoed, test_data, "Expected echo of sent data");
|
||||
|
||||
"ok".to_string()
|
||||
},
|
||||
10,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(result, "ok");
|
||||
proxy.stop().await.unwrap();
|
||||
}
|
||||
|
||||
/// Test that the protocol field on route config is accepted and processed.
|
||||
#[tokio::test]
|
||||
async fn test_protocol_field_in_route_config() {
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartproxy',
|
||||
version: '25.7.1',
|
||||
version: '25.7.4',
|
||||
description: 'A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.'
|
||||
}
|
||||
|
||||
@@ -409,6 +409,7 @@ export class SmartProxy extends plugins.EventEmitter {
|
||||
keepAliveTreatment: this.settings.keepAliveTreatment,
|
||||
keepAliveInactivityMultiplier: this.settings.keepAliveInactivityMultiplier,
|
||||
extendedKeepAliveLifetime: this.settings.extendedKeepAliveLifetime,
|
||||
proxyIps: this.settings.proxyIPs,
|
||||
acceptProxyProtocol: this.settings.acceptProxyProtocol,
|
||||
sendProxyProtocol: this.settings.sendProxyProtocol,
|
||||
metrics: this.settings.metrics,
|
||||
|
||||
Reference in New Issue
Block a user