|
|
|
|
@@ -502,7 +502,7 @@ impl HttpProxyService {
|
|
|
|
|
// Check for static file serving
|
|
|
|
|
if let Some(ref advanced) = route_match.route.action.advanced {
|
|
|
|
|
if let Some(ref static_files) = advanced.static_files {
|
|
|
|
|
return Ok(Self::serve_static_file(&path, static_files));
|
|
|
|
|
return Ok(Self::serve_static_file(&path, static_files).await);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -615,11 +615,10 @@ impl HttpProxyService {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// X-Forwarded-For: append client IP to existing chain
|
|
|
|
|
let client_ip = peer_addr.ip().to_string();
|
|
|
|
|
let xff_value = if let Some(existing) = upstream_headers.get("x-forwarded-for") {
|
|
|
|
|
format!("{}, {}", existing.to_str().unwrap_or(""), client_ip)
|
|
|
|
|
format!("{}, {}", existing.to_str().unwrap_or(""), ip_str)
|
|
|
|
|
} else {
|
|
|
|
|
client_ip
|
|
|
|
|
ip_str.clone()
|
|
|
|
|
};
|
|
|
|
|
if let Ok(val) = hyper::header::HeaderValue::from_str(&xff_value) {
|
|
|
|
|
upstream_headers.insert(
|
|
|
|
|
@@ -691,7 +690,7 @@ impl HttpProxyService {
|
|
|
|
|
self.metrics.set_backend_protocol(&upstream_key, "h2");
|
|
|
|
|
let result = self.forward_h2_pooled(
|
|
|
|
|
sender, parts, body, upstream_headers, &upstream_path,
|
|
|
|
|
route_match.route, route_id, &ip_str, &pool_key, domain_str, &conn_activity,
|
|
|
|
|
route_match.route, route_id, &ip_str, &pool_key, domain_str, &conn_activity, &upstream_key,
|
|
|
|
|
).await;
|
|
|
|
|
self.upstream_selector.connection_ended(&upstream_key);
|
|
|
|
|
return result;
|
|
|
|
|
@@ -844,19 +843,19 @@ impl HttpProxyService {
|
|
|
|
|
self.forward_h2_with_fallback(
|
|
|
|
|
io, parts, body, upstream_headers, &upstream_path,
|
|
|
|
|
&upstream, route_match.route, route_id, &ip_str, &final_pool_key,
|
|
|
|
|
host.clone(), domain_str, &conn_activity,
|
|
|
|
|
host.clone(), domain_str, &conn_activity, &upstream_key,
|
|
|
|
|
).await
|
|
|
|
|
} else {
|
|
|
|
|
// Explicit H2 mode: hard-fail on handshake error (preserved behavior)
|
|
|
|
|
self.forward_h2(
|
|
|
|
|
io, parts, body, upstream_headers, &upstream_path,
|
|
|
|
|
&upstream, route_match.route, route_id, &ip_str, &final_pool_key, domain_str, &conn_activity,
|
|
|
|
|
&upstream, route_match.route, route_id, &ip_str, &final_pool_key, domain_str, &conn_activity, &upstream_key,
|
|
|
|
|
).await
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
self.forward_h1(
|
|
|
|
|
io, parts, body, upstream_headers, &upstream_path,
|
|
|
|
|
&upstream, route_match.route, route_id, &ip_str, &final_pool_key, domain_str, &conn_activity,
|
|
|
|
|
&upstream, route_match.route, route_id, &ip_str, &final_pool_key, domain_str, &conn_activity, &upstream_key,
|
|
|
|
|
).await
|
|
|
|
|
};
|
|
|
|
|
self.upstream_selector.connection_ended(&upstream_key);
|
|
|
|
|
@@ -880,15 +879,14 @@ impl HttpProxyService {
|
|
|
|
|
pool_key: &crate::connection_pool::PoolKey,
|
|
|
|
|
domain: &str,
|
|
|
|
|
conn_activity: &ConnActivity,
|
|
|
|
|
backend_key: &str,
|
|
|
|
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
|
|
|
|
let backend_key = format!("{}:{}", pool_key.host, pool_key.port);
|
|
|
|
|
|
|
|
|
|
// Try pooled H1 connection first — avoids TCP+TLS handshake
|
|
|
|
|
if let Some(pooled_sender) = self.connection_pool.checkout_h1(pool_key) {
|
|
|
|
|
self.metrics.backend_pool_hit(&backend_key);
|
|
|
|
|
self.metrics.backend_pool_hit(backend_key);
|
|
|
|
|
return self.forward_h1_with_sender(
|
|
|
|
|
pooled_sender, parts, body, upstream_headers, upstream_path,
|
|
|
|
|
route, route_id, source_ip, pool_key, domain, conn_activity,
|
|
|
|
|
route, route_id, source_ip, domain, conn_activity, backend_key,
|
|
|
|
|
).await;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -900,7 +898,7 @@ impl HttpProxyService {
|
|
|
|
|
Ok(h) => h,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
error!(backend = %backend_key, domain = %domain, error = %e, "Backend H1 handshake failed");
|
|
|
|
|
self.metrics.backend_handshake_error(&backend_key);
|
|
|
|
|
self.metrics.backend_handshake_error(backend_key);
|
|
|
|
|
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend handshake failed"));
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
@@ -911,7 +909,7 @@ impl HttpProxyService {
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
self.forward_h1_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, pool_key, domain, conn_activity).await
|
|
|
|
|
self.forward_h1_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, domain, conn_activity, backend_key).await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Common H1 forwarding logic used by both fresh and pooled paths.
|
|
|
|
|
@@ -925,9 +923,9 @@ impl HttpProxyService {
|
|
|
|
|
route: &rustproxy_config::RouteConfig,
|
|
|
|
|
route_id: Option<&str>,
|
|
|
|
|
source_ip: &str,
|
|
|
|
|
pool_key: &crate::connection_pool::PoolKey,
|
|
|
|
|
domain: &str,
|
|
|
|
|
conn_activity: &ConnActivity,
|
|
|
|
|
backend_key: &str,
|
|
|
|
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
|
|
|
|
// Always use HTTP/1.1 for h1 backend connections (h2 incoming requests have version HTTP/2.0)
|
|
|
|
|
let mut upstream_req = Request::builder()
|
|
|
|
|
@@ -939,12 +937,16 @@ impl HttpProxyService {
|
|
|
|
|
*headers = upstream_headers;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Compute Arc<str> once for both request and response CountingBody
|
|
|
|
|
let rid: Option<Arc<str>> = route_id.map(Arc::from);
|
|
|
|
|
let sip: Arc<str> = Arc::from(source_ip);
|
|
|
|
|
|
|
|
|
|
// Wrap the request body in CountingBody then box it for the uniform pool type
|
|
|
|
|
let counting_req_body = CountingBody::new(
|
|
|
|
|
body,
|
|
|
|
|
Arc::clone(&self.metrics),
|
|
|
|
|
route_id.map(|s| s.to_string()),
|
|
|
|
|
Some(source_ip.to_string()),
|
|
|
|
|
rid.clone(),
|
|
|
|
|
Some(Arc::clone(&sip)),
|
|
|
|
|
Direction::In,
|
|
|
|
|
).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start);
|
|
|
|
|
let boxed_body: BoxBody<Bytes, hyper::Error> = BoxBody::new(counting_req_body);
|
|
|
|
|
@@ -954,9 +956,8 @@ impl HttpProxyService {
|
|
|
|
|
let upstream_response = match sender.send_request(upstream_req).await {
|
|
|
|
|
Ok(resp) => resp,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
let bk = format!("{}:{}", pool_key.host, pool_key.port);
|
|
|
|
|
error!(backend = %bk, domain = %domain, error = %e, "Backend H1 request failed");
|
|
|
|
|
self.metrics.backend_request_error(&bk);
|
|
|
|
|
error!(backend = %backend_key, domain = %domain, error = %e, "Backend H1 request failed");
|
|
|
|
|
self.metrics.backend_request_error(backend_key);
|
|
|
|
|
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend request failed"));
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
@@ -971,7 +972,7 @@ impl HttpProxyService {
|
|
|
|
|
// of large streaming responses (e.g. 352MB Docker layers) takes priority.
|
|
|
|
|
drop(sender);
|
|
|
|
|
|
|
|
|
|
self.build_streaming_response(upstream_response, route, route_id, source_ip, conn_activity).await
|
|
|
|
|
self.build_streaming_response(upstream_response, route, rid, sip, conn_activity).await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Forward request to backend via HTTP/2 with body streaming (fresh connection).
|
|
|
|
|
@@ -990,8 +991,8 @@ impl HttpProxyService {
|
|
|
|
|
pool_key: &crate::connection_pool::PoolKey,
|
|
|
|
|
domain: &str,
|
|
|
|
|
conn_activity: &ConnActivity,
|
|
|
|
|
backend_key: &str,
|
|
|
|
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
|
|
|
|
let backend_key = format!("{}:{}", pool_key.host, pool_key.port);
|
|
|
|
|
let exec = hyper_util::rt::TokioExecutor::new();
|
|
|
|
|
let mut h2_builder = hyper::client::conn::http2::Builder::new(exec);
|
|
|
|
|
h2_builder
|
|
|
|
|
@@ -1007,12 +1008,12 @@ impl HttpProxyService {
|
|
|
|
|
Ok(Ok(h)) => h,
|
|
|
|
|
Ok(Err(e)) => {
|
|
|
|
|
error!(backend = %backend_key, domain = %domain, error = %e, error_debug = ?e, "Backend H2 handshake failed");
|
|
|
|
|
self.metrics.backend_handshake_error(&backend_key);
|
|
|
|
|
self.metrics.backend_handshake_error(backend_key);
|
|
|
|
|
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 handshake failed"));
|
|
|
|
|
}
|
|
|
|
|
Err(_) => {
|
|
|
|
|
error!(backend = %backend_key, domain = %domain, "Backend H2 handshake timeout");
|
|
|
|
|
self.metrics.backend_handshake_error(&backend_key);
|
|
|
|
|
self.metrics.backend_handshake_error(backend_key);
|
|
|
|
|
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend H2 handshake timeout"));
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
@@ -1039,7 +1040,7 @@ impl HttpProxyService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let sender_for_pool = sender.clone();
|
|
|
|
|
let result = self.forward_h2_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, Some(pool_key), domain, conn_activity).await;
|
|
|
|
|
let result = self.forward_h2_with_sender(sender, parts, body, upstream_headers, upstream_path, route, route_id, source_ip, Some(pool_key), domain, conn_activity, backend_key).await;
|
|
|
|
|
if matches!(&result, Ok(ref resp) if resp.status() != StatusCode::BAD_GATEWAY) {
|
|
|
|
|
let g = self.connection_pool.register_h2(pool_key.clone(), sender_for_pool);
|
|
|
|
|
gen_holder.store(g, std::sync::atomic::Ordering::Relaxed);
|
|
|
|
|
@@ -1063,6 +1064,7 @@ impl HttpProxyService {
|
|
|
|
|
pool_key: &crate::connection_pool::PoolKey,
|
|
|
|
|
domain: &str,
|
|
|
|
|
conn_activity: &ConnActivity,
|
|
|
|
|
backend_key: &str,
|
|
|
|
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
|
|
|
|
// Save retry state for bodyless requests (cheap: Method is an enum, HeaderMap clones Arc-backed Bytes)
|
|
|
|
|
let retry_state = if body.is_end_stream() {
|
|
|
|
|
@@ -1073,18 +1075,18 @@ impl HttpProxyService {
|
|
|
|
|
|
|
|
|
|
let result = self.forward_h2_with_sender(
|
|
|
|
|
sender, parts, body, upstream_headers, upstream_path,
|
|
|
|
|
route, route_id, source_ip, Some(pool_key), domain, conn_activity,
|
|
|
|
|
route, route_id, source_ip, Some(pool_key), domain, conn_activity, backend_key,
|
|
|
|
|
).await;
|
|
|
|
|
|
|
|
|
|
// If the request failed (502) and we can retry with an empty body, do so
|
|
|
|
|
let is_502 = matches!(&result, Ok(resp) if resp.status() == StatusCode::BAD_GATEWAY);
|
|
|
|
|
if is_502 {
|
|
|
|
|
if let Some((method, headers)) = retry_state {
|
|
|
|
|
warn!(backend = %format!("{}:{}", pool_key.host, pool_key.port), domain = %domain,
|
|
|
|
|
warn!(backend = %backend_key, domain = %domain,
|
|
|
|
|
"Stale pooled H2 sender, retrying with fresh connection");
|
|
|
|
|
return self.retry_h2_with_fresh_connection(
|
|
|
|
|
method, headers, upstream_path,
|
|
|
|
|
pool_key, route, route_id, source_ip, domain, conn_activity,
|
|
|
|
|
pool_key, route, route_id, source_ip, domain, conn_activity, backend_key,
|
|
|
|
|
).await;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -1104,8 +1106,8 @@ impl HttpProxyService {
|
|
|
|
|
source_ip: &str,
|
|
|
|
|
domain: &str,
|
|
|
|
|
conn_activity: &ConnActivity,
|
|
|
|
|
backend_key: &str,
|
|
|
|
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
|
|
|
|
let backend_key = format!("{}:{}", pool_key.host, pool_key.port);
|
|
|
|
|
|
|
|
|
|
// Establish fresh backend connection
|
|
|
|
|
let retry_connect_start = std::time::Instant::now();
|
|
|
|
|
@@ -1117,12 +1119,12 @@ impl HttpProxyService {
|
|
|
|
|
Ok(Ok(tls)) => BackendStream::Tls(tls),
|
|
|
|
|
Ok(Err(e)) => {
|
|
|
|
|
error!(backend = %backend_key, domain = %domain, error = %e, "H2 retry: TLS connect failed");
|
|
|
|
|
self.metrics.backend_connect_error(&backend_key);
|
|
|
|
|
self.metrics.backend_connect_error(backend_key);
|
|
|
|
|
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable on H2 retry"));
|
|
|
|
|
}
|
|
|
|
|
Err(_) => {
|
|
|
|
|
error!(backend = %backend_key, domain = %domain, "H2 retry: TLS connect timeout");
|
|
|
|
|
self.metrics.backend_connect_error(&backend_key);
|
|
|
|
|
self.metrics.backend_connect_error(backend_key);
|
|
|
|
|
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend timeout on H2 retry"));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -1137,17 +1139,17 @@ impl HttpProxyService {
|
|
|
|
|
}
|
|
|
|
|
Ok(Err(e)) => {
|
|
|
|
|
error!(backend = %backend_key, domain = %domain, error = %e, "H2 retry: TCP connect failed");
|
|
|
|
|
self.metrics.backend_connect_error(&backend_key);
|
|
|
|
|
self.metrics.backend_connect_error(backend_key);
|
|
|
|
|
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend unavailable on H2 retry"));
|
|
|
|
|
}
|
|
|
|
|
Err(_) => {
|
|
|
|
|
error!(backend = %backend_key, domain = %domain, "H2 retry: TCP connect timeout");
|
|
|
|
|
self.metrics.backend_connect_error(&backend_key);
|
|
|
|
|
self.metrics.backend_connect_error(backend_key);
|
|
|
|
|
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend timeout on H2 retry"));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
self.metrics.backend_connection_opened(&backend_key, retry_connect_start.elapsed());
|
|
|
|
|
self.metrics.backend_connection_opened(backend_key, retry_connect_start.elapsed());
|
|
|
|
|
|
|
|
|
|
let io = TokioIo::new(backend);
|
|
|
|
|
let exec = hyper_util::rt::TokioExecutor::new();
|
|
|
|
|
@@ -1165,14 +1167,14 @@ impl HttpProxyService {
|
|
|
|
|
Ok(Ok(h)) => h,
|
|
|
|
|
Ok(Err(e)) => {
|
|
|
|
|
error!(backend = %backend_key, domain = %domain, error = %e, error_debug = ?e, "H2 retry: handshake failed");
|
|
|
|
|
self.metrics.backend_handshake_error(&backend_key);
|
|
|
|
|
self.metrics.backend_connection_closed(&backend_key);
|
|
|
|
|
self.metrics.backend_handshake_error(backend_key);
|
|
|
|
|
self.metrics.backend_connection_closed(backend_key);
|
|
|
|
|
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 retry handshake failed"));
|
|
|
|
|
}
|
|
|
|
|
Err(_) => {
|
|
|
|
|
error!(backend = %backend_key, domain = %domain, "H2 retry: handshake timeout");
|
|
|
|
|
self.metrics.backend_handshake_error(&backend_key);
|
|
|
|
|
self.metrics.backend_connection_closed(&backend_key);
|
|
|
|
|
self.metrics.backend_handshake_error(backend_key);
|
|
|
|
|
self.metrics.backend_connection_closed(backend_key);
|
|
|
|
|
return Ok(error_response(StatusCode::GATEWAY_TIMEOUT, "Backend H2 retry handshake timeout"));
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
@@ -1220,16 +1222,16 @@ impl HttpProxyService {
|
|
|
|
|
// Register in pool only after request succeeds
|
|
|
|
|
let g = self.connection_pool.register_h2(pool_key.clone(), sender);
|
|
|
|
|
gen_holder.store(g, std::sync::atomic::Ordering::Relaxed);
|
|
|
|
|
let result = self.build_streaming_response(resp, route, route_id, source_ip, conn_activity).await;
|
|
|
|
|
let result = self.build_streaming_response(resp, route, route_id.map(Arc::from), Arc::from(source_ip), conn_activity).await;
|
|
|
|
|
// Close the fresh backend connection (opened above)
|
|
|
|
|
self.metrics.backend_connection_closed(&backend_key);
|
|
|
|
|
self.metrics.backend_connection_closed(backend_key);
|
|
|
|
|
result
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
error!(backend = %backend_key, domain = %domain, error = %e, "H2 retry: request failed");
|
|
|
|
|
self.metrics.backend_request_error(&backend_key);
|
|
|
|
|
self.metrics.backend_request_error(backend_key);
|
|
|
|
|
// Close the fresh backend connection (opened above)
|
|
|
|
|
self.metrics.backend_connection_closed(&backend_key);
|
|
|
|
|
self.metrics.backend_connection_closed(backend_key);
|
|
|
|
|
Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H2 request failed on retry"))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -1257,6 +1259,7 @@ impl HttpProxyService {
|
|
|
|
|
requested_host: Option<String>,
|
|
|
|
|
domain: &str,
|
|
|
|
|
conn_activity: &ConnActivity,
|
|
|
|
|
backend_key: &str,
|
|
|
|
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
|
|
|
|
let exec = hyper_util::rt::TokioExecutor::new();
|
|
|
|
|
let mut h2_builder = hyper::client::conn::http2::Builder::new(exec);
|
|
|
|
|
@@ -1274,14 +1277,13 @@ impl HttpProxyService {
|
|
|
|
|
match handshake_result {
|
|
|
|
|
Err(_) => {
|
|
|
|
|
// H2 handshake timed out — fall back to H1
|
|
|
|
|
let bk = format!("{}:{}", upstream.host, upstream.port);
|
|
|
|
|
warn!(
|
|
|
|
|
backend = %bk,
|
|
|
|
|
backend = %backend_key,
|
|
|
|
|
domain = %domain,
|
|
|
|
|
"H2 handshake timeout, falling back to H1"
|
|
|
|
|
);
|
|
|
|
|
self.metrics.backend_h2_failure(&bk);
|
|
|
|
|
self.metrics.backend_handshake_error(&bk);
|
|
|
|
|
self.metrics.backend_h2_failure(backend_key);
|
|
|
|
|
self.metrics.backend_handshake_error(backend_key);
|
|
|
|
|
|
|
|
|
|
let cache_key = crate::protocol_cache::ProtocolCacheKey {
|
|
|
|
|
host: upstream.host.clone(),
|
|
|
|
|
@@ -1290,7 +1292,7 @@ impl HttpProxyService {
|
|
|
|
|
};
|
|
|
|
|
self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1);
|
|
|
|
|
|
|
|
|
|
match self.reconnect_backend(upstream, domain).await {
|
|
|
|
|
match self.reconnect_backend(upstream, domain, backend_key).await {
|
|
|
|
|
Some(fallback_backend) => {
|
|
|
|
|
let h1_pool_key = crate::connection_pool::PoolKey {
|
|
|
|
|
host: upstream.host.clone(),
|
|
|
|
|
@@ -1301,9 +1303,9 @@ impl HttpProxyService {
|
|
|
|
|
let fallback_io = TokioIo::new(fallback_backend);
|
|
|
|
|
let result = self.forward_h1(
|
|
|
|
|
fallback_io, parts, body, upstream_headers, upstream_path,
|
|
|
|
|
upstream, route, route_id, source_ip, &h1_pool_key, domain, conn_activity,
|
|
|
|
|
upstream, route, route_id, source_ip, &h1_pool_key, domain, conn_activity, backend_key,
|
|
|
|
|
).await;
|
|
|
|
|
self.metrics.backend_connection_closed(&bk);
|
|
|
|
|
self.metrics.backend_connection_closed(backend_key);
|
|
|
|
|
result
|
|
|
|
|
}
|
|
|
|
|
None => {
|
|
|
|
|
@@ -1354,11 +1356,13 @@ impl HttpProxyService {
|
|
|
|
|
*headers = upstream_headers;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let rid: Option<Arc<str>> = route_id.map(Arc::from);
|
|
|
|
|
let sip: Arc<str> = Arc::from(source_ip);
|
|
|
|
|
let counting_req_body = CountingBody::new(
|
|
|
|
|
body,
|
|
|
|
|
Arc::clone(&self.metrics),
|
|
|
|
|
route_id.map(|s| s.to_string()),
|
|
|
|
|
Some(source_ip.to_string()),
|
|
|
|
|
rid.clone(),
|
|
|
|
|
Some(Arc::clone(&sip)),
|
|
|
|
|
Direction::In,
|
|
|
|
|
).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start);
|
|
|
|
|
let boxed_body: BoxBody<Bytes, hyper::Error> = BoxBody::new(counting_req_body);
|
|
|
|
|
@@ -1368,40 +1372,33 @@ impl HttpProxyService {
|
|
|
|
|
Ok(upstream_response) => {
|
|
|
|
|
let g = self.connection_pool.register_h2(pool_key.clone(), sender);
|
|
|
|
|
gen_holder.store(g, std::sync::atomic::Ordering::Relaxed);
|
|
|
|
|
self.build_streaming_response(upstream_response, route, route_id, source_ip, conn_activity).await
|
|
|
|
|
self.build_streaming_response(upstream_response, route, rid, sip, conn_activity).await
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
// H2 request failed on a stream level (e.g. RST_STREAM PROTOCOL_ERROR).
|
|
|
|
|
// The H2 handshake succeeded, so the backend genuinely speaks H2 — don't
|
|
|
|
|
// poison the protocol cache. Only handshake-level failures (below) should
|
|
|
|
|
// downgrade the cache to H1.
|
|
|
|
|
let bk = format!("{}:{}", upstream.host, upstream.port);
|
|
|
|
|
debug!(
|
|
|
|
|
backend = %bk,
|
|
|
|
|
backend = %backend_key,
|
|
|
|
|
domain = %domain,
|
|
|
|
|
error = %e,
|
|
|
|
|
error_debug = ?e,
|
|
|
|
|
"H2 stream error, retrying this request as H1"
|
|
|
|
|
);
|
|
|
|
|
self.metrics.backend_h2_failure(&bk);
|
|
|
|
|
self.metrics.backend_h2_failure(backend_key);
|
|
|
|
|
|
|
|
|
|
// Retry as H1 for bodyless requests; return 502 for requests with bodies
|
|
|
|
|
if let Some((method, headers)) = retry_state {
|
|
|
|
|
match self.reconnect_backend(upstream, domain).await {
|
|
|
|
|
match self.reconnect_backend(upstream, domain, backend_key).await {
|
|
|
|
|
Some(fallback_backend) => {
|
|
|
|
|
let h1_pool_key = crate::connection_pool::PoolKey {
|
|
|
|
|
host: upstream.host.clone(),
|
|
|
|
|
port: upstream.port,
|
|
|
|
|
use_tls: upstream.use_tls,
|
|
|
|
|
h2: false,
|
|
|
|
|
};
|
|
|
|
|
let fallback_io = TokioIo::new(fallback_backend);
|
|
|
|
|
let result = self.forward_h1_empty_body(
|
|
|
|
|
fallback_io, method, headers, upstream_path,
|
|
|
|
|
route, route_id, source_ip, &h1_pool_key, domain, conn_activity,
|
|
|
|
|
route, route_id, source_ip, domain, conn_activity, backend_key,
|
|
|
|
|
).await;
|
|
|
|
|
// Close the reconnected backend connection (opened in reconnect_backend)
|
|
|
|
|
self.metrics.backend_connection_closed(&bk);
|
|
|
|
|
self.metrics.backend_connection_closed(backend_key);
|
|
|
|
|
result
|
|
|
|
|
}
|
|
|
|
|
None => {
|
|
|
|
|
@@ -1417,15 +1414,14 @@ impl HttpProxyService {
|
|
|
|
|
Ok(Err(e)) => {
|
|
|
|
|
// H2 handshake truly failed — fall back to H1
|
|
|
|
|
// Body is NOT consumed yet, so we can retry the full request.
|
|
|
|
|
let bk = format!("{}:{}", upstream.host, upstream.port);
|
|
|
|
|
warn!(
|
|
|
|
|
backend = %bk,
|
|
|
|
|
backend = %backend_key,
|
|
|
|
|
domain = %domain,
|
|
|
|
|
error = %e,
|
|
|
|
|
"H2 handshake failed, falling back to H1"
|
|
|
|
|
);
|
|
|
|
|
self.metrics.backend_h2_failure(&bk);
|
|
|
|
|
self.metrics.backend_handshake_error(&bk);
|
|
|
|
|
self.metrics.backend_h2_failure(backend_key);
|
|
|
|
|
self.metrics.backend_handshake_error(backend_key);
|
|
|
|
|
|
|
|
|
|
// Update cache to H1 so subsequent requests skip H2
|
|
|
|
|
let cache_key = crate::protocol_cache::ProtocolCacheKey {
|
|
|
|
|
@@ -1436,7 +1432,7 @@ impl HttpProxyService {
|
|
|
|
|
self.protocol_cache.insert(cache_key, crate::protocol_cache::DetectedProtocol::H1);
|
|
|
|
|
|
|
|
|
|
// Reconnect for H1 (the original io was consumed by the failed h2 handshake)
|
|
|
|
|
match self.reconnect_backend(upstream, domain).await {
|
|
|
|
|
match self.reconnect_backend(upstream, domain, backend_key).await {
|
|
|
|
|
Some(fallback_backend) => {
|
|
|
|
|
let h1_pool_key = crate::connection_pool::PoolKey {
|
|
|
|
|
host: upstream.host.clone(),
|
|
|
|
|
@@ -1447,10 +1443,10 @@ impl HttpProxyService {
|
|
|
|
|
let fallback_io = TokioIo::new(fallback_backend);
|
|
|
|
|
let result = self.forward_h1(
|
|
|
|
|
fallback_io, parts, body, upstream_headers, upstream_path,
|
|
|
|
|
upstream, route, route_id, source_ip, &h1_pool_key, domain, conn_activity,
|
|
|
|
|
upstream, route, route_id, source_ip, &h1_pool_key, domain, conn_activity, backend_key,
|
|
|
|
|
).await;
|
|
|
|
|
// Close the reconnected backend connection (opened in reconnect_backend)
|
|
|
|
|
self.metrics.backend_connection_closed(&bk);
|
|
|
|
|
self.metrics.backend_connection_closed(backend_key);
|
|
|
|
|
result
|
|
|
|
|
}
|
|
|
|
|
None => {
|
|
|
|
|
@@ -1472,11 +1468,10 @@ impl HttpProxyService {
|
|
|
|
|
route: &rustproxy_config::RouteConfig,
|
|
|
|
|
route_id: Option<&str>,
|
|
|
|
|
source_ip: &str,
|
|
|
|
|
pool_key: &crate::connection_pool::PoolKey,
|
|
|
|
|
domain: &str,
|
|
|
|
|
conn_activity: &ConnActivity,
|
|
|
|
|
backend_key: &str,
|
|
|
|
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
|
|
|
|
let backend_key = format!("{}:{}", pool_key.host, pool_key.port);
|
|
|
|
|
let (mut sender, conn): (
|
|
|
|
|
hyper::client::conn::http1::SendRequest<BoxBody<Bytes, hyper::Error>>,
|
|
|
|
|
hyper::client::conn::http1::Connection<TokioIo<BackendStream>, BoxBody<Bytes, hyper::Error>>,
|
|
|
|
|
@@ -1484,7 +1479,7 @@ impl HttpProxyService {
|
|
|
|
|
Ok(h) => h,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
error!(backend = %backend_key, domain = %domain, error = %e, "H1 fallback: handshake failed");
|
|
|
|
|
self.metrics.backend_handshake_error(&backend_key);
|
|
|
|
|
self.metrics.backend_handshake_error(backend_key);
|
|
|
|
|
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H1 fallback handshake failed"));
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
@@ -1513,7 +1508,7 @@ impl HttpProxyService {
|
|
|
|
|
Ok(resp) => resp,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
error!(backend = %backend_key, domain = %domain, error = %e, "H1 fallback: request failed");
|
|
|
|
|
self.metrics.backend_request_error(&backend_key);
|
|
|
|
|
self.metrics.backend_request_error(backend_key);
|
|
|
|
|
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend H1 fallback request failed"));
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
@@ -1521,7 +1516,7 @@ impl HttpProxyService {
|
|
|
|
|
// Don't pool the sender while response body is still streaming (same safety as forward_h1_with_sender)
|
|
|
|
|
drop(sender);
|
|
|
|
|
|
|
|
|
|
self.build_streaming_response(upstream_response, route, route_id, source_ip, conn_activity).await
|
|
|
|
|
self.build_streaming_response(upstream_response, route, route_id.map(Arc::from), Arc::from(source_ip), conn_activity).await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Reconnect to a backend (used for H2→H1 fallback).
|
|
|
|
|
@@ -1529,8 +1524,8 @@ impl HttpProxyService {
|
|
|
|
|
&self,
|
|
|
|
|
upstream: &crate::upstream_selector::UpstreamSelection,
|
|
|
|
|
domain: &str,
|
|
|
|
|
backend_key: &str,
|
|
|
|
|
) -> Option<BackendStream> {
|
|
|
|
|
let backend_key = format!("{}:{}", upstream.host, upstream.port);
|
|
|
|
|
let reconnect_start = std::time::Instant::now();
|
|
|
|
|
if upstream.use_tls {
|
|
|
|
|
match tokio::time::timeout(
|
|
|
|
|
@@ -1538,17 +1533,17 @@ impl HttpProxyService {
|
|
|
|
|
connect_tls_backend(&self.backend_tls_config, &upstream.host, upstream.port),
|
|
|
|
|
).await {
|
|
|
|
|
Ok(Ok(tls)) => {
|
|
|
|
|
self.metrics.backend_connection_opened(&backend_key, reconnect_start.elapsed());
|
|
|
|
|
self.metrics.backend_connection_opened(backend_key, reconnect_start.elapsed());
|
|
|
|
|
Some(BackendStream::Tls(tls))
|
|
|
|
|
}
|
|
|
|
|
Ok(Err(e)) => {
|
|
|
|
|
error!(backend = %backend_key, domain = %domain, error = %e, "H1 fallback: TLS reconnect failed");
|
|
|
|
|
self.metrics.backend_connect_error(&backend_key);
|
|
|
|
|
self.metrics.backend_connect_error(backend_key);
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
Err(_) => {
|
|
|
|
|
error!(backend = %backend_key, domain = %domain, "H1 fallback: TLS reconnect timeout");
|
|
|
|
|
self.metrics.backend_connect_error(&backend_key);
|
|
|
|
|
self.metrics.backend_connect_error(backend_key);
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -1562,17 +1557,17 @@ impl HttpProxyService {
|
|
|
|
|
let _ = socket2::SockRef::from(&s).set_tcp_keepalive(
|
|
|
|
|
&socket2::TcpKeepalive::new().with_time(std::time::Duration::from_secs(60))
|
|
|
|
|
);
|
|
|
|
|
self.metrics.backend_connection_opened(&backend_key, reconnect_start.elapsed());
|
|
|
|
|
self.metrics.backend_connection_opened(backend_key, reconnect_start.elapsed());
|
|
|
|
|
Some(BackendStream::Plain(s))
|
|
|
|
|
}
|
|
|
|
|
Ok(Err(e)) => {
|
|
|
|
|
error!(backend = %backend_key, domain = %domain, error = %e, "H1 fallback: TCP reconnect failed");
|
|
|
|
|
self.metrics.backend_connect_error(&backend_key);
|
|
|
|
|
self.metrics.backend_connect_error(backend_key);
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
Err(_) => {
|
|
|
|
|
error!(backend = %backend_key, domain = %domain, "H1 fallback: TCP reconnect timeout");
|
|
|
|
|
self.metrics.backend_connect_error(&backend_key);
|
|
|
|
|
self.metrics.backend_connect_error(backend_key);
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -1593,6 +1588,7 @@ impl HttpProxyService {
|
|
|
|
|
pool_key: Option<&crate::connection_pool::PoolKey>,
|
|
|
|
|
domain: &str,
|
|
|
|
|
conn_activity: &ConnActivity,
|
|
|
|
|
backend_key: &str,
|
|
|
|
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
|
|
|
|
// Build absolute URI for H2 pseudo-headers (:scheme, :authority)
|
|
|
|
|
// Use the requested domain as authority (not backend address) so :authority matches Host header
|
|
|
|
|
@@ -1614,12 +1610,16 @@ impl HttpProxyService {
|
|
|
|
|
*headers = upstream_headers;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Compute Arc<str> once for both request and response CountingBody
|
|
|
|
|
let rid: Option<Arc<str>> = route_id.map(Arc::from);
|
|
|
|
|
let sip: Arc<str> = Arc::from(source_ip);
|
|
|
|
|
|
|
|
|
|
// Wrap the request body in CountingBody then box it for the uniform pool type
|
|
|
|
|
let counting_req_body = CountingBody::new(
|
|
|
|
|
body,
|
|
|
|
|
Arc::clone(&self.metrics),
|
|
|
|
|
route_id.map(|s| s.to_string()),
|
|
|
|
|
Some(source_ip.to_string()),
|
|
|
|
|
rid.clone(),
|
|
|
|
|
Some(Arc::clone(&sip)),
|
|
|
|
|
Direction::In,
|
|
|
|
|
).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start);
|
|
|
|
|
let boxed_body: BoxBody<Bytes, hyper::Error> = BoxBody::new(counting_req_body);
|
|
|
|
|
@@ -1631,9 +1631,8 @@ impl HttpProxyService {
|
|
|
|
|
Err(e) => {
|
|
|
|
|
// Evict the dead sender so subsequent requests get fresh connections
|
|
|
|
|
if let Some(key) = pool_key {
|
|
|
|
|
let bk = format!("{}:{}", key.host, key.port);
|
|
|
|
|
error!(backend = %bk, domain = %domain, error = %e, error_debug = ?e, "Backend H2 request failed");
|
|
|
|
|
self.metrics.backend_request_error(&bk);
|
|
|
|
|
error!(backend = %backend_key, domain = %domain, error = %e, error_debug = ?e, "Backend H2 request failed");
|
|
|
|
|
self.metrics.backend_request_error(backend_key);
|
|
|
|
|
self.connection_pool.remove_h2(key);
|
|
|
|
|
} else {
|
|
|
|
|
error!(domain = %domain, error = %e, error_debug = ?e, "Backend H2 request failed");
|
|
|
|
|
@@ -1642,7 +1641,7 @@ impl HttpProxyService {
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
self.build_streaming_response(upstream_response, route, route_id, source_ip, conn_activity).await
|
|
|
|
|
self.build_streaming_response(upstream_response, route, rid, sip, conn_activity).await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Build the client-facing response from an upstream response, streaming the body.
|
|
|
|
|
@@ -1653,8 +1652,8 @@ impl HttpProxyService {
|
|
|
|
|
&self,
|
|
|
|
|
upstream_response: Response<Incoming>,
|
|
|
|
|
route: &rustproxy_config::RouteConfig,
|
|
|
|
|
route_id: Option<&str>,
|
|
|
|
|
source_ip: &str,
|
|
|
|
|
route_id: Option<Arc<str>>,
|
|
|
|
|
source_ip: Arc<str>,
|
|
|
|
|
conn_activity: &ConnActivity,
|
|
|
|
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
|
|
|
|
let (resp_parts, resp_body) = upstream_response.into_parts();
|
|
|
|
|
@@ -1686,8 +1685,8 @@ impl HttpProxyService {
|
|
|
|
|
let counting_body = CountingBody::new(
|
|
|
|
|
resp_body,
|
|
|
|
|
Arc::clone(&self.metrics),
|
|
|
|
|
route_id.map(|s| s.to_string()),
|
|
|
|
|
Some(source_ip.to_string()),
|
|
|
|
|
route_id,
|
|
|
|
|
Some(source_ip),
|
|
|
|
|
Direction::Out,
|
|
|
|
|
).with_connection_activity(Arc::clone(&conn_activity.last_activity), conn_activity.start);
|
|
|
|
|
|
|
|
|
|
@@ -1906,21 +1905,26 @@ impl HttpProxyService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let mut response_buf = Vec::with_capacity(4096);
|
|
|
|
|
let mut temp = [0u8; 1];
|
|
|
|
|
let mut read_buf = [0u8; 4096];
|
|
|
|
|
let extra_bytes: Vec<u8>;
|
|
|
|
|
loop {
|
|
|
|
|
match upstream_stream.read(&mut temp).await {
|
|
|
|
|
match upstream_stream.read(&mut read_buf).await {
|
|
|
|
|
Ok(0) => {
|
|
|
|
|
error!("WebSocket: upstream closed before completing handshake");
|
|
|
|
|
self.upstream_selector.connection_ended(upstream_key);
|
|
|
|
|
return Ok(error_response(StatusCode::BAD_GATEWAY, "Backend closed"));
|
|
|
|
|
}
|
|
|
|
|
Ok(_) => {
|
|
|
|
|
response_buf.push(temp[0]);
|
|
|
|
|
if response_buf.len() >= 4 {
|
|
|
|
|
let len = response_buf.len();
|
|
|
|
|
if response_buf[len-4..] == *b"\r\n\r\n" {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
Ok(n) => {
|
|
|
|
|
let prev_len = response_buf.len();
|
|
|
|
|
response_buf.extend_from_slice(&read_buf[..n]);
|
|
|
|
|
// Scan for \r\n\r\n, backing up 3 bytes to handle split across reads
|
|
|
|
|
let search_start = prev_len.saturating_sub(3);
|
|
|
|
|
if let Some(pos) = response_buf[search_start..].windows(4)
|
|
|
|
|
.position(|w| w == b"\r\n\r\n")
|
|
|
|
|
{
|
|
|
|
|
let header_end = search_start + pos + 4;
|
|
|
|
|
extra_bytes = response_buf.split_off(header_end);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
if response_buf.len() > 8192 {
|
|
|
|
|
error!("WebSocket: upstream response headers too large");
|
|
|
|
|
@@ -1995,8 +1999,8 @@ impl HttpProxyService {
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let metrics = Arc::clone(&self.metrics);
|
|
|
|
|
let route_id_owned = route_id.map(|s| s.to_string());
|
|
|
|
|
let source_ip_owned = source_ip.to_string();
|
|
|
|
|
let route_id_owned: Option<Arc<str>> = route_id.map(Arc::from);
|
|
|
|
|
let source_ip_owned: Arc<str> = Arc::from(source_ip);
|
|
|
|
|
let upstream_selector = self.upstream_selector.clone();
|
|
|
|
|
let upstream_key_owned = upstream_key.to_string();
|
|
|
|
|
let ws_inactivity_timeout = self.ws_inactivity_timeout;
|
|
|
|
|
@@ -2050,7 +2054,7 @@ impl HttpProxyService {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
total += n as u64;
|
|
|
|
|
metrics_c2u.record_bytes(n as u64, 0, route_c2u.as_deref(), Some(&ip_c2u));
|
|
|
|
|
metrics_c2u.record_bytes(n as u64, 0, route_c2u.as_deref(), Some(&*ip_c2u));
|
|
|
|
|
la1.store(start.elapsed().as_millis() as u64, Ordering::Relaxed);
|
|
|
|
|
if let Some((ref ca, ca_start)) = conn_act_c2u {
|
|
|
|
|
ca.store(ca_start.elapsed().as_millis() as u64, Ordering::Relaxed);
|
|
|
|
|
@@ -2072,6 +2076,23 @@ impl HttpProxyService {
|
|
|
|
|
let u2c = tokio::spawn(async move {
|
|
|
|
|
let mut buf = vec![0u8; 65536];
|
|
|
|
|
let mut total = 0u64;
|
|
|
|
|
// Forward any bytes buffered past the HTTP header terminator during handshake
|
|
|
|
|
if !extra_bytes.is_empty() {
|
|
|
|
|
let n = extra_bytes.len();
|
|
|
|
|
if cw.write_all(&extra_bytes).await.is_err() {
|
|
|
|
|
let _ = tokio::time::timeout(
|
|
|
|
|
std::time::Duration::from_secs(2),
|
|
|
|
|
cw.shutdown(),
|
|
|
|
|
).await;
|
|
|
|
|
return 0u64;
|
|
|
|
|
}
|
|
|
|
|
total += n as u64;
|
|
|
|
|
metrics_u2c.record_bytes(0, n as u64, route_u2c.as_deref(), Some(&*ip_u2c));
|
|
|
|
|
la2.store(start.elapsed().as_millis() as u64, Ordering::Relaxed);
|
|
|
|
|
if let Some((ref ca, ca_start)) = conn_act_u2c {
|
|
|
|
|
ca.store(ca_start.elapsed().as_millis() as u64, Ordering::Relaxed);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
loop {
|
|
|
|
|
let n = tokio::select! {
|
|
|
|
|
result = ur.read(&mut buf) => match result {
|
|
|
|
|
@@ -2084,7 +2105,7 @@ impl HttpProxyService {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
total += n as u64;
|
|
|
|
|
metrics_u2c.record_bytes(0, n as u64, route_u2c.as_deref(), Some(&ip_u2c));
|
|
|
|
|
metrics_u2c.record_bytes(0, n as u64, route_u2c.as_deref(), Some(&*ip_u2c));
|
|
|
|
|
la2.store(start.elapsed().as_millis() as u64, Ordering::Relaxed);
|
|
|
|
|
if let Some((ref ca, ca_start)) = conn_act_u2c {
|
|
|
|
|
ca.store(ca_start.elapsed().as_millis() as u64, Ordering::Relaxed);
|
|
|
|
|
@@ -2224,13 +2245,13 @@ impl HttpProxyService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Serve a static file from the configured directory.
|
|
|
|
|
fn serve_static_file(
|
|
|
|
|
async fn serve_static_file(
|
|
|
|
|
path: &str,
|
|
|
|
|
config: &rustproxy_config::RouteStaticFiles,
|
|
|
|
|
) -> Response<BoxBody<Bytes, hyper::Error>> {
|
|
|
|
|
use std::path::Path;
|
|
|
|
|
use std::path::PathBuf;
|
|
|
|
|
|
|
|
|
|
let root = Path::new(&config.root);
|
|
|
|
|
let root = PathBuf::from(&config.root);
|
|
|
|
|
|
|
|
|
|
// Sanitize path to prevent directory traversal
|
|
|
|
|
let clean_path = path.trim_start_matches('/');
|
|
|
|
|
@@ -2239,7 +2260,12 @@ impl HttpProxyService {
|
|
|
|
|
let mut file_path = root.join(&clean_path);
|
|
|
|
|
|
|
|
|
|
// If path points to a directory, try index files
|
|
|
|
|
if file_path.is_dir() || clean_path.is_empty() {
|
|
|
|
|
let is_dir = if clean_path.is_empty() {
|
|
|
|
|
true
|
|
|
|
|
} else {
|
|
|
|
|
tokio::fs::metadata(&file_path).await.map(|m| m.is_dir()).unwrap_or(false)
|
|
|
|
|
};
|
|
|
|
|
if is_dir {
|
|
|
|
|
let index_files = config.index_files.as_deref()
|
|
|
|
|
.or(config.index.as_deref())
|
|
|
|
|
.unwrap_or(&[]);
|
|
|
|
|
@@ -2253,7 +2279,7 @@ impl HttpProxyService {
|
|
|
|
|
} else {
|
|
|
|
|
file_path.join(index)
|
|
|
|
|
};
|
|
|
|
|
if candidate.is_file() {
|
|
|
|
|
if tokio::fs::metadata(&candidate).await.map(|m| m.is_file()).unwrap_or(false) {
|
|
|
|
|
file_path = candidate;
|
|
|
|
|
found = true;
|
|
|
|
|
break;
|
|
|
|
|
@@ -2265,11 +2291,11 @@ impl HttpProxyService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Ensure the resolved path is within the root (prevent traversal)
|
|
|
|
|
let canonical_root = match root.canonicalize() {
|
|
|
|
|
let canonical_root = match tokio::fs::canonicalize(&root).await {
|
|
|
|
|
Ok(p) => p,
|
|
|
|
|
Err(_) => return error_response(StatusCode::NOT_FOUND, "Not found"),
|
|
|
|
|
};
|
|
|
|
|
let canonical_file = match file_path.canonicalize() {
|
|
|
|
|
let canonical_file = match tokio::fs::canonicalize(&file_path).await {
|
|
|
|
|
Ok(p) => p,
|
|
|
|
|
Err(_) => return error_response(StatusCode::NOT_FOUND, "Not found"),
|
|
|
|
|
};
|
|
|
|
|
@@ -2283,7 +2309,7 @@ impl HttpProxyService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Read the file
|
|
|
|
|
match std::fs::read(&file_path) {
|
|
|
|
|
match tokio::fs::read(&file_path).await {
|
|
|
|
|
Ok(content) => {
|
|
|
|
|
let content_type = guess_content_type(&file_path);
|
|
|
|
|
let mut response = Response::builder()
|
|
|
|
|
|