feat(core): add performance profiles, transport observability, and edge stream budget controls
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::{TcpListener, TcpStream, UdpSocket};
|
||||
@@ -12,6 +12,8 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
use bytes::Bytes;
|
||||
use remoteingress_protocol::*;
|
||||
use crate::performance::{EffectivePerformanceConfig, PerformanceConfig};
|
||||
use crate::transport::TransportMode;
|
||||
use crate::transport::quic as quic_transport;
|
||||
|
||||
type HubTlsStream = tokio_rustls::server::TlsStream<TcpStream>;
|
||||
@@ -56,6 +58,8 @@ pub struct HubConfig {
|
||||
pub tls_cert_pem: Option<String>,
|
||||
#[serde(default)]
|
||||
pub tls_key_pem: Option<String>,
|
||||
#[serde(default)]
|
||||
pub performance: Option<PerformanceConfig>,
|
||||
}
|
||||
|
||||
impl Default for HubConfig {
|
||||
@@ -65,6 +69,7 @@ impl Default for HubConfig {
|
||||
target_host: Some("127.0.0.1".to_string()),
|
||||
tls_cert_pem: None,
|
||||
tls_key_pem: None,
|
||||
performance: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -82,6 +87,8 @@ pub struct AllowedEdge {
|
||||
pub stun_interval_secs: Option<u64>,
|
||||
#[serde(default)]
|
||||
pub firewall_config: Option<serde_json::Value>,
|
||||
#[serde(default)]
|
||||
pub performance: Option<PerformanceConfig>,
|
||||
}
|
||||
|
||||
/// Handshake response sent to edge after authentication.
|
||||
@@ -94,6 +101,7 @@ struct HandshakeResponse {
|
||||
stun_interval_secs: u64,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
firewall_config: Option<serde_json::Value>,
|
||||
performance: EffectivePerformanceConfig,
|
||||
}
|
||||
|
||||
/// Configuration update pushed to a connected edge at runtime.
|
||||
@@ -105,6 +113,44 @@ pub struct EdgeConfigUpdate {
|
||||
pub listen_ports_udp: Vec<u16>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub firewall_config: Option<serde_json::Value>,
|
||||
pub performance: EffectivePerformanceConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct FlowControlStatus {
|
||||
pub applies: bool,
|
||||
pub current_window_bytes: u32,
|
||||
pub min_window_bytes: u32,
|
||||
pub max_window_bytes: u32,
|
||||
pub total_window_budget_bytes: u64,
|
||||
pub estimated_in_flight_bytes: u64,
|
||||
pub stalled_streams: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct QueueStatus {
|
||||
pub ctrl_queue_depth: u64,
|
||||
pub data_queue_depth: u64,
|
||||
pub sustained_queue_depth: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct TrafficStatus {
|
||||
pub bytes_in: u64,
|
||||
pub bytes_out: u64,
|
||||
pub streams_opened_total: u64,
|
||||
pub streams_closed_total: u64,
|
||||
pub rejected_streams: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct UdpStatus {
|
||||
pub active_sessions: u64,
|
||||
pub dropped_datagrams: u64,
|
||||
}
|
||||
|
||||
/// Runtime status of a connected edge.
|
||||
@@ -115,6 +161,13 @@ pub struct ConnectedEdgeStatus {
|
||||
pub connected_at: u64,
|
||||
pub active_streams: usize,
|
||||
pub peer_addr: String,
|
||||
pub transport_mode: TransportMode,
|
||||
pub fallback_used: bool,
|
||||
pub performance: EffectivePerformanceConfig,
|
||||
pub flow_control: FlowControlStatus,
|
||||
pub queues: QueueStatus,
|
||||
pub traffic: TrafficStatus,
|
||||
pub udp: UdpStatus,
|
||||
}
|
||||
|
||||
/// Events emitted by the hub.
|
||||
@@ -157,11 +210,30 @@ struct ConnectedEdgeInfo {
|
||||
connected_at: u64,
|
||||
peer_addr: String,
|
||||
edge_stream_count: Arc<AtomicU32>,
|
||||
transport_mode: TransportMode,
|
||||
fallback_used: bool,
|
||||
performance: EffectivePerformanceConfig,
|
||||
metrics: Arc<EdgeRuntimeMetrics>,
|
||||
config_tx: mpsc::Sender<EdgeConfigUpdate>,
|
||||
/// Used to cancel the old connection when an edge reconnects.
|
||||
cancel_token: CancellationToken,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct EdgeRuntimeMetrics {
|
||||
streams_opened_total: AtomicU64,
|
||||
streams_closed_total: AtomicU64,
|
||||
rejected_streams: AtomicU64,
|
||||
bytes_in: AtomicU64,
|
||||
bytes_out: AtomicU64,
|
||||
stalled_streams: AtomicU64,
|
||||
dropped_datagrams: AtomicU64,
|
||||
active_udp_sessions: AtomicU64,
|
||||
ctrl_queue_depth: AtomicU64,
|
||||
data_queue_depth: AtomicU64,
|
||||
sustained_queue_depth: AtomicU64,
|
||||
}
|
||||
|
||||
impl TunnelHub {
|
||||
pub fn new(config: HubConfig) -> Self {
|
||||
let (event_tx, event_rx) = mpsc::channel(1024);
|
||||
@@ -185,6 +257,7 @@ impl TunnelHub {
|
||||
/// Update the list of allowed edges.
|
||||
/// For any currently-connected edge whose ports changed, push a config update.
|
||||
pub async fn update_allowed_edges(&self, edges: Vec<AllowedEdge>) {
|
||||
let global_performance = self.config.read().await.performance.clone();
|
||||
let mut map = self.allowed_edges.write().await;
|
||||
|
||||
// Build new map
|
||||
@@ -201,7 +274,8 @@ impl TunnelHub {
|
||||
let config_changed = match map.get(&edge.id) {
|
||||
Some(old) => old.listen_ports != edge.listen_ports
|
||||
|| old.listen_ports_udp != edge.listen_ports_udp
|
||||
|| old.firewall_config != edge.firewall_config,
|
||||
|| old.firewall_config != edge.firewall_config
|
||||
|| old.performance != edge.performance,
|
||||
None => true, // newly allowed edge that's already connected
|
||||
};
|
||||
if config_changed {
|
||||
@@ -209,6 +283,10 @@ impl TunnelHub {
|
||||
listen_ports: edge.listen_ports.clone(),
|
||||
listen_ports_udp: edge.listen_ports_udp.clone(),
|
||||
firewall_config: edge.firewall_config.clone(),
|
||||
performance: PerformanceConfig::merge(
|
||||
global_performance.as_ref(),
|
||||
edge.performance.as_ref(),
|
||||
).effective(),
|
||||
};
|
||||
let _ = info.config_tx.try_send(update);
|
||||
}
|
||||
@@ -226,11 +304,50 @@ impl TunnelHub {
|
||||
|
||||
let mut connected = Vec::new();
|
||||
for (id, info) in edges.iter() {
|
||||
let active_streams = info.edge_stream_count.load(Ordering::Relaxed);
|
||||
let flow_window = if info.transport_mode == TransportMode::TcpTls {
|
||||
compute_window_for_limits(
|
||||
active_streams,
|
||||
info.performance.total_window_budget_bytes,
|
||||
info.performance.min_stream_window_bytes,
|
||||
info.performance.max_stream_window_bytes,
|
||||
)
|
||||
} else {
|
||||
0
|
||||
};
|
||||
connected.push(ConnectedEdgeStatus {
|
||||
edge_id: id.clone(),
|
||||
connected_at: info.connected_at,
|
||||
active_streams: info.edge_stream_count.load(Ordering::Relaxed) as usize,
|
||||
active_streams: active_streams as usize,
|
||||
peer_addr: info.peer_addr.clone(),
|
||||
transport_mode: info.transport_mode,
|
||||
fallback_used: info.fallback_used,
|
||||
performance: info.performance.clone(),
|
||||
flow_control: FlowControlStatus {
|
||||
applies: info.transport_mode == TransportMode::TcpTls,
|
||||
current_window_bytes: flow_window,
|
||||
min_window_bytes: info.performance.min_stream_window_bytes,
|
||||
max_window_bytes: info.performance.max_stream_window_bytes,
|
||||
total_window_budget_bytes: info.performance.total_window_budget_bytes,
|
||||
estimated_in_flight_bytes: flow_window as u64 * active_streams as u64,
|
||||
stalled_streams: info.metrics.stalled_streams.load(Ordering::Relaxed),
|
||||
},
|
||||
queues: QueueStatus {
|
||||
ctrl_queue_depth: info.metrics.ctrl_queue_depth.load(Ordering::Relaxed),
|
||||
data_queue_depth: info.metrics.data_queue_depth.load(Ordering::Relaxed),
|
||||
sustained_queue_depth: info.metrics.sustained_queue_depth.load(Ordering::Relaxed),
|
||||
},
|
||||
traffic: TrafficStatus {
|
||||
bytes_in: info.metrics.bytes_in.load(Ordering::Relaxed),
|
||||
bytes_out: info.metrics.bytes_out.load(Ordering::Relaxed),
|
||||
streams_opened_total: info.metrics.streams_opened_total.load(Ordering::Relaxed),
|
||||
streams_closed_total: info.metrics.streams_closed_total.load(Ordering::Relaxed),
|
||||
rejected_streams: info.metrics.rejected_streams.load(Ordering::Relaxed),
|
||||
},
|
||||
udp: UdpStatus {
|
||||
active_sessions: info.metrics.active_udp_sessions.load(Ordering::Relaxed),
|
||||
dropped_datagrams: info.metrics.dropped_datagrams.load(Ordering::Relaxed),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
@@ -249,9 +366,14 @@ impl TunnelHub {
|
||||
|
||||
let listener = TcpListener::bind(("0.0.0.0", config.tunnel_port)).await?;
|
||||
log::info!("Hub listening on TCP port {}", config.tunnel_port);
|
||||
let effective_performance = config.performance.clone().unwrap_or_default().effective();
|
||||
|
||||
// Start QUIC endpoint on the same port (UDP)
|
||||
let quic_endpoint = match quic_transport::build_quic_server_config(tls_config) {
|
||||
let quic_endpoint = match quic_transport::build_quic_server_config_with_limits(
|
||||
tls_config,
|
||||
effective_performance.max_streams_per_edge.min(u32::MAX as usize) as u32,
|
||||
effective_performance.quic_datagram_receive_buffer_bytes,
|
||||
) {
|
||||
Ok(quic_server_config) => {
|
||||
let bind_addr: std::net::SocketAddr = ([0, 0, 0, 0], config.tunnel_port).into();
|
||||
match quinn::Endpoint::server(quic_server_config, bind_addr) {
|
||||
@@ -280,6 +402,7 @@ impl TunnelHub {
|
||||
let event_tx = self.event_tx.clone();
|
||||
let target_host = config.target_host.unwrap_or_else(|| "127.0.0.1".to_string());
|
||||
let hub_token = self.cancel_token.clone();
|
||||
let hub_performance = config.performance.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
// Spawn QUIC acceptor as a separate task
|
||||
@@ -289,6 +412,7 @@ impl TunnelHub {
|
||||
let event_tx_q = event_tx.clone();
|
||||
let target_q = target_host.clone();
|
||||
let hub_token_q = hub_token.clone();
|
||||
let performance_q = hub_performance.clone();
|
||||
Some(tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
@@ -301,6 +425,7 @@ impl TunnelHub {
|
||||
let target = target_q.clone();
|
||||
let edge_token = hub_token_q.child_token();
|
||||
let peer_addr = incoming.remote_address().ip().to_string();
|
||||
let performance = performance_q.clone();
|
||||
tokio::spawn(async move {
|
||||
// Accept the QUIC connection
|
||||
let quic_conn = match incoming.await {
|
||||
@@ -310,8 +435,8 @@ impl TunnelHub {
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Err(e) = handle_edge_connection_quic(
|
||||
quic_conn, allowed, connected, event_tx, target, edge_token, peer_addr,
|
||||
if let Err(e) = handle_edge_connection_quic(
|
||||
quic_conn, allowed, connected, event_tx, target, edge_token, peer_addr, performance,
|
||||
).await {
|
||||
log::error!("QUIC edge connection error: {}", e);
|
||||
}
|
||||
@@ -345,9 +470,10 @@ impl TunnelHub {
|
||||
let target = target_host.clone();
|
||||
let edge_token = hub_token.child_token();
|
||||
let peer_addr = addr.ip().to_string();
|
||||
let performance = hub_performance.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = handle_edge_connection(
|
||||
stream, acceptor, allowed, connected, event_tx, target, edge_token, peer_addr,
|
||||
stream, acceptor, allowed, connected, event_tx, target, edge_token, peer_addr, performance,
|
||||
).await {
|
||||
log::error!("Edge connection error: {}", e);
|
||||
}
|
||||
@@ -390,15 +516,21 @@ impl TunnelHub {
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_requested_transport_mode(value: Option<&str>) -> Option<TransportMode> {
|
||||
match value {
|
||||
Some("tcpTls") => Some(TransportMode::TcpTls),
|
||||
Some("quic") => Some(TransportMode::Quic),
|
||||
Some("quicWithFallback") => Some(TransportMode::QuicWithFallback),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TunnelHub {
|
||||
fn drop(&mut self) {
|
||||
self.cancel_token.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
/// Maximum concurrent streams per edge connection.
|
||||
const MAX_STREAMS_PER_EDGE: usize = 1024;
|
||||
|
||||
/// Process a single frame received from the edge side of the tunnel.
|
||||
/// Handles FRAME_OPEN, FRAME_DATA, FRAME_WINDOW_UPDATE, FRAME_CLOSE, and FRAME_PONG.
|
||||
async fn handle_hub_frame(
|
||||
@@ -416,6 +548,8 @@ async fn handle_hub_frame(
|
||||
target_host: &str,
|
||||
edge_token: &CancellationToken,
|
||||
cleanup_tx: &mpsc::Sender<u32>,
|
||||
performance: &EffectivePerformanceConfig,
|
||||
metrics: &Arc<EdgeRuntimeMetrics>,
|
||||
) -> FrameAction {
|
||||
match frame.frame_type {
|
||||
FRAME_OPEN => {
|
||||
@@ -423,8 +557,9 @@ async fn handle_hub_frame(
|
||||
let permit = match stream_semaphore.clone().try_acquire_owned() {
|
||||
Ok(p) => p,
|
||||
Err(_) => {
|
||||
metrics.rejected_streams.fetch_add(1, Ordering::Relaxed);
|
||||
log::warn!("Edge {} exceeded max streams ({}), rejecting stream {}",
|
||||
edge_id, MAX_STREAMS_PER_EDGE, frame.stream_id);
|
||||
edge_id, performance.max_streams_per_edge, frame.stream_id);
|
||||
let close_frame = encode_frame(frame.stream_id, FRAME_CLOSE_BACK, &[]);
|
||||
tunnel_io.queue_ctrl(close_frame);
|
||||
return FrameAction::Continue;
|
||||
@@ -444,6 +579,8 @@ async fn handle_hub_frame(
|
||||
let sustained_writer_tx = sustained_tx.clone(); // sustained: DATA_BACK from elephant flows
|
||||
let target = target_host.to_string();
|
||||
let stream_token = edge_token.child_token();
|
||||
let active_after_open = edge_stream_count.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
metrics.streams_opened_total.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let _ = event_tx.try_send(HubEvent::StreamOpened {
|
||||
edge_id: edge_id.to_string(),
|
||||
@@ -453,9 +590,12 @@ async fn handle_hub_frame(
|
||||
// Create channel for data from edge to this stream
|
||||
let (stream_data_tx, mut stream_data_rx) = mpsc::unbounded_channel::<Bytes>();
|
||||
// Adaptive initial window: scale with current stream count
|
||||
// to keep total in-flight data within the 200MB budget.
|
||||
let initial_window = compute_window_for_stream_count(
|
||||
edge_stream_count.load(Ordering::Relaxed),
|
||||
// to keep total in-flight data within the configured edge budget.
|
||||
let initial_window = compute_window_for_limits(
|
||||
active_after_open,
|
||||
performance.total_window_budget_bytes,
|
||||
performance.min_stream_window_bytes,
|
||||
performance.max_stream_window_bytes,
|
||||
);
|
||||
let send_window = Arc::new(AtomicU32::new(initial_window));
|
||||
let window_notify = Arc::new(Notify::new());
|
||||
@@ -468,9 +608,10 @@ async fn handle_hub_frame(
|
||||
|
||||
// Spawn task: connect to SmartProxy, send PROXY header, pipe data
|
||||
let stream_counter = Arc::clone(edge_stream_count);
|
||||
let stream_metrics = metrics.clone();
|
||||
let stream_performance = performance.clone();
|
||||
tokio::spawn(async move {
|
||||
let _permit = permit; // hold semaphore permit until stream completes
|
||||
stream_counter.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let result = async {
|
||||
// A2: Connect to SmartProxy with timeout
|
||||
@@ -528,8 +669,11 @@ async fn handle_hub_frame(
|
||||
// Track consumption for adaptive flow control.
|
||||
// Increment capped to adaptive window to limit per-stream in-flight data.
|
||||
consumed_since_update += len;
|
||||
let adaptive_window = remoteingress_protocol::compute_window_for_stream_count(
|
||||
let adaptive_window = remoteingress_protocol::compute_window_for_limits(
|
||||
stream_counter_w.load(Ordering::Relaxed),
|
||||
stream_performance.total_window_budget_bytes,
|
||||
stream_performance.min_stream_window_bytes,
|
||||
stream_performance.max_stream_window_bytes,
|
||||
);
|
||||
let threshold = adaptive_window / 2;
|
||||
if consumed_since_update >= threshold {
|
||||
@@ -584,9 +728,10 @@ async fn handle_hub_frame(
|
||||
tokio::select! {
|
||||
_ = notified => continue,
|
||||
_ = stream_token.cancelled() => break,
|
||||
_ = tokio::time::sleep(Duration::from_secs(55)) => {
|
||||
log::warn!("Stream {} download stalled (window empty for 55s)", stream_id);
|
||||
break;
|
||||
_ = tokio::time::sleep(Duration::from_secs(55)) => {
|
||||
stream_metrics.stalled_streams.fetch_add(1, Ordering::Relaxed);
|
||||
log::warn!("Stream {} download stalled (window empty for 55s)", stream_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -613,6 +758,7 @@ async fn handle_hub_frame(
|
||||
let frame = Bytes::copy_from_slice(&buf[..FRAME_HEADER_SIZE + n]);
|
||||
// Sustained classification: >2.5 MB/s for >10 seconds
|
||||
dl_bytes_sent += n as u64;
|
||||
stream_metrics.bytes_out.fetch_add(n as u64, Ordering::Relaxed);
|
||||
if !is_sustained {
|
||||
let elapsed = dl_start.elapsed().as_secs();
|
||||
if elapsed >= remoteingress_protocol::SUSTAINED_MIN_DURATION_SECS
|
||||
@@ -677,6 +823,7 @@ async fn handle_hub_frame(
|
||||
_ = cleanup.send(stream_id) => {}
|
||||
_ = stream_token.cancelled() => {}
|
||||
}
|
||||
stream_metrics.streams_closed_total.fetch_add(1, Ordering::Relaxed);
|
||||
stream_counter.fetch_sub(1, Ordering::Relaxed);
|
||||
});
|
||||
}
|
||||
@@ -685,6 +832,7 @@ async fn handle_hub_frame(
|
||||
// limits bytes-in-flight, so the channel won't grow unbounded. send() only
|
||||
// fails if the receiver is dropped (stream handler already exited).
|
||||
if let Some(state) = streams.get(&frame.stream_id) {
|
||||
metrics.bytes_in.fetch_add(frame.payload.len() as u64, Ordering::Relaxed);
|
||||
if state.data_tx.send(frame.payload).is_err() {
|
||||
// Receiver dropped — stream handler already exited, clean up
|
||||
streams.remove(&frame.stream_id);
|
||||
@@ -697,8 +845,8 @@ async fn handle_hub_frame(
|
||||
if increment > 0 {
|
||||
if let Some(state) = streams.get(&frame.stream_id) {
|
||||
let prev = state.send_window.fetch_add(increment, Ordering::Release);
|
||||
if prev + increment > MAX_WINDOW_SIZE {
|
||||
state.send_window.store(MAX_WINDOW_SIZE, Ordering::Release);
|
||||
if prev + increment > performance.max_stream_window_bytes {
|
||||
state.send_window.store(performance.max_stream_window_bytes, Ordering::Release);
|
||||
}
|
||||
state.window_notify.notify_one();
|
||||
}
|
||||
@@ -733,6 +881,7 @@ async fn handle_hub_frame(
|
||||
data_tx: udp_tx,
|
||||
cancel_token: session_token.clone(),
|
||||
});
|
||||
metrics.active_udp_sessions.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Spawn upstream UDP forwarder
|
||||
tokio::spawn(async move {
|
||||
@@ -767,6 +916,7 @@ async fn handle_hub_frame(
|
||||
Ok(len) => {
|
||||
let frame = encode_frame(stream_id, FRAME_UDP_DATA_BACK, &buf[..len]);
|
||||
if data_writer_tx.try_send(frame).is_err() {
|
||||
// Return datagrams may be dropped under pressure.
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -800,18 +950,22 @@ async fn handle_hub_frame(
|
||||
}
|
||||
|
||||
recv_handle.abort();
|
||||
// active_udp_sessions is decremented by the FRAME_UDP_CLOSE path or connection cleanup.
|
||||
log::debug!("UDP session {} closed for edge {}", stream_id, edge_id_str);
|
||||
});
|
||||
}
|
||||
FRAME_UDP_DATA => {
|
||||
// Forward datagram to upstream
|
||||
if let Some(state) = udp_sessions.get(&frame.stream_id) {
|
||||
let _ = state.data_tx.try_send(frame.payload);
|
||||
if state.data_tx.try_send(frame.payload).is_err() {
|
||||
metrics.dropped_datagrams.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
FRAME_UDP_CLOSE => {
|
||||
if let Some(state) = udp_sessions.remove(&frame.stream_id) {
|
||||
state.cancel_token.cancel();
|
||||
metrics.active_udp_sessions.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
@@ -831,6 +985,7 @@ async fn handle_edge_connection(
|
||||
target_host: String,
|
||||
edge_token: CancellationToken,
|
||||
peer_addr: String,
|
||||
hub_performance: Option<PerformanceConfig>,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
// Disable Nagle's algorithm for low-latency control frames (PING/PONG, WINDOW_UPDATE)
|
||||
stream.set_nodelay(true)?;
|
||||
@@ -861,29 +1016,38 @@ async fn handle_edge_connection(
|
||||
.map_err(|_| "auth line not valid UTF-8")?;
|
||||
let auth_line = auth_line.trim();
|
||||
|
||||
let parts: Vec<&str> = auth_line.splitn(3, ' ').collect();
|
||||
if parts.len() != 3 || parts[0] != "EDGE" {
|
||||
let parts: Vec<&str> = auth_line.split_whitespace().collect();
|
||||
if parts.len() < 3 || parts[0] != "EDGE" {
|
||||
return Err("invalid auth line".into());
|
||||
}
|
||||
|
||||
let edge_id = parts[1].to_string();
|
||||
let secret = parts[2];
|
||||
let requested_transport = parse_requested_transport_mode(parts.get(3).copied());
|
||||
let fallback_used = requested_transport == Some(TransportMode::QuicWithFallback);
|
||||
|
||||
// Verify credentials and extract edge config
|
||||
let (listen_ports, listen_ports_udp, stun_interval_secs, firewall_config) = {
|
||||
let (listen_ports, listen_ports_udp, stun_interval_secs, firewall_config, edge_performance) = {
|
||||
let edges = allowed.read().await;
|
||||
match edges.get(&edge_id) {
|
||||
Some(edge) => {
|
||||
if !constant_time_eq(secret.as_bytes(), edge.secret.as_bytes()) {
|
||||
return Err(format!("invalid secret for edge {}", edge_id).into());
|
||||
}
|
||||
(edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300), edge.firewall_config.clone())
|
||||
(
|
||||
edge.listen_ports.clone(),
|
||||
edge.listen_ports_udp.clone(),
|
||||
edge.stun_interval_secs.unwrap_or(300),
|
||||
edge.firewall_config.clone(),
|
||||
edge.performance.clone(),
|
||||
)
|
||||
}
|
||||
None => {
|
||||
return Err(format!("unknown edge {}", edge_id).into());
|
||||
}
|
||||
}
|
||||
};
|
||||
let performance = PerformanceConfig::merge(hub_performance.as_ref(), edge_performance.as_ref()).effective();
|
||||
|
||||
log::info!("Edge {} authenticated from {}", edge_id, peer_addr);
|
||||
let _ = event_tx.try_send(HubEvent::EdgeConnected {
|
||||
@@ -897,6 +1061,7 @@ async fn handle_edge_connection(
|
||||
listen_ports_udp: listen_ports_udp.clone(),
|
||||
stun_interval_secs,
|
||||
firewall_config,
|
||||
performance: performance.clone(),
|
||||
};
|
||||
let mut handshake_json = serde_json::to_string(&handshake)?;
|
||||
handshake_json.push('\n');
|
||||
@@ -908,6 +1073,7 @@ async fn handle_edge_connection(
|
||||
let mut udp_sessions: HashMap<u32, HubUdpSessionState> = HashMap::new();
|
||||
// Per-edge active stream counter for adaptive flow control
|
||||
let edge_stream_count = Arc::new(AtomicU32::new(0));
|
||||
let metrics = Arc::new(EdgeRuntimeMetrics::default());
|
||||
// Cleanup channel: spawned stream tasks send stream_id here when done
|
||||
let (cleanup_tx, mut cleanup_rx) = mpsc::channel::<u32>(256);
|
||||
let now = std::time::SystemTime::now()
|
||||
@@ -933,6 +1099,10 @@ async fn handle_edge_connection(
|
||||
connected_at: now,
|
||||
peer_addr,
|
||||
edge_stream_count: edge_stream_count.clone(),
|
||||
transport_mode: TransportMode::TcpTls,
|
||||
fallback_used,
|
||||
performance: performance.clone(),
|
||||
metrics: metrics.clone(),
|
||||
config_tx,
|
||||
cancel_token: edge_token.clone(),
|
||||
},
|
||||
@@ -973,7 +1143,7 @@ async fn handle_edge_connection(
|
||||
});
|
||||
|
||||
// A4: Semaphore to limit concurrent streams per edge
|
||||
let stream_semaphore = Arc::new(Semaphore::new(MAX_STREAMS_PER_EDGE));
|
||||
let stream_semaphore = Arc::new(Semaphore::new(performance.max_streams_per_edge));
|
||||
|
||||
// Heartbeat: periodic PING and liveness timeout
|
||||
let ping_interval_dur = Duration::from_secs(15);
|
||||
@@ -1019,7 +1189,7 @@ async fn handle_edge_connection(
|
||||
frame, &mut tunnel_io, &mut streams, &mut udp_sessions,
|
||||
&stream_semaphore, &edge_stream_count,
|
||||
&edge_id, &event_tx, &ctrl_tx, &data_tx, &sustained_tx, &target_host, &edge_token,
|
||||
&cleanup_tx,
|
||||
&cleanup_tx, &performance, &metrics,
|
||||
).await {
|
||||
disconnect_reason = reason;
|
||||
break 'hub_loop;
|
||||
@@ -1032,6 +1202,10 @@ async fn handle_edge_connection(
|
||||
if ping_ticker.poll_tick(cx).is_ready() {
|
||||
tunnel_io.queue_ctrl(encode_frame(0, FRAME_PING, &[]));
|
||||
}
|
||||
let depths = tunnel_io.queue_depths();
|
||||
metrics.ctrl_queue_depth.store(depths.ctrl as u64, Ordering::Relaxed);
|
||||
metrics.data_queue_depth.store(depths.data as u64, Ordering::Relaxed);
|
||||
metrics.sustained_queue_depth.store(depths.sustained as u64, Ordering::Relaxed);
|
||||
tunnel_io.poll_step(cx, &mut ctrl_rx, &mut data_rx, &mut sustained_rx, &mut liveness_deadline, &edge_token)
|
||||
}).await;
|
||||
|
||||
@@ -1043,7 +1217,7 @@ async fn handle_edge_connection(
|
||||
frame, &mut tunnel_io, &mut streams, &mut udp_sessions,
|
||||
&stream_semaphore, &edge_stream_count,
|
||||
&edge_id, &event_tx, &ctrl_tx, &data_tx, &sustained_tx, &target_host, &edge_token,
|
||||
&cleanup_tx,
|
||||
&cleanup_tx, &performance, &metrics,
|
||||
).await {
|
||||
disconnect_reason = reason;
|
||||
break;
|
||||
@@ -1201,6 +1375,7 @@ async fn handle_edge_connection_quic(
|
||||
target_host: String,
|
||||
edge_token: CancellationToken,
|
||||
peer_addr: String,
|
||||
hub_performance: Option<PerformanceConfig>,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
log::info!("QUIC edge connection from {}", peer_addr);
|
||||
|
||||
@@ -1229,8 +1404,8 @@ async fn handle_edge_connection_quic(
|
||||
.map_err(|_| "QUIC auth line not valid UTF-8")?;
|
||||
let auth_line = auth_line.trim();
|
||||
|
||||
let parts: Vec<&str> = auth_line.splitn(3, ' ').collect();
|
||||
if parts.len() != 3 || parts[0] != "EDGE" {
|
||||
let parts: Vec<&str> = auth_line.split_whitespace().collect();
|
||||
if parts.len() < 3 || parts[0] != "EDGE" {
|
||||
return Err("invalid QUIC auth line".into());
|
||||
}
|
||||
|
||||
@@ -1238,18 +1413,25 @@ async fn handle_edge_connection_quic(
|
||||
let secret = parts[2];
|
||||
|
||||
// Verify credentials
|
||||
let (listen_ports, listen_ports_udp, stun_interval_secs, firewall_config) = {
|
||||
let (listen_ports, listen_ports_udp, stun_interval_secs, firewall_config, edge_performance) = {
|
||||
let edges = allowed.read().await;
|
||||
match edges.get(&edge_id) {
|
||||
Some(edge) => {
|
||||
if !constant_time_eq(secret.as_bytes(), edge.secret.as_bytes()) {
|
||||
return Err(format!("invalid secret for edge {}", edge_id).into());
|
||||
}
|
||||
(edge.listen_ports.clone(), edge.listen_ports_udp.clone(), edge.stun_interval_secs.unwrap_or(300), edge.firewall_config.clone())
|
||||
(
|
||||
edge.listen_ports.clone(),
|
||||
edge.listen_ports_udp.clone(),
|
||||
edge.stun_interval_secs.unwrap_or(300),
|
||||
edge.firewall_config.clone(),
|
||||
edge.performance.clone(),
|
||||
)
|
||||
}
|
||||
None => return Err(format!("unknown edge {}", edge_id).into()),
|
||||
}
|
||||
};
|
||||
let performance = PerformanceConfig::merge(hub_performance.as_ref(), edge_performance.as_ref()).effective();
|
||||
|
||||
log::info!("QUIC edge {} authenticated from {}", edge_id, peer_addr);
|
||||
let _ = event_tx.try_send(HubEvent::EdgeConnected {
|
||||
@@ -1263,6 +1445,7 @@ async fn handle_edge_connection_quic(
|
||||
listen_ports_udp: listen_ports_udp.clone(),
|
||||
stun_interval_secs,
|
||||
firewall_config,
|
||||
performance: performance.clone(),
|
||||
};
|
||||
let mut handshake_json = serde_json::to_string(&handshake)?;
|
||||
handshake_json.push('\n');
|
||||
@@ -1271,6 +1454,7 @@ async fn handle_edge_connection_quic(
|
||||
|
||||
// Track this edge
|
||||
let edge_stream_count = Arc::new(AtomicU32::new(0));
|
||||
let metrics = Arc::new(EdgeRuntimeMetrics::default());
|
||||
let now = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
@@ -1290,13 +1474,17 @@ async fn handle_edge_connection_quic(
|
||||
connected_at: now,
|
||||
peer_addr,
|
||||
edge_stream_count: edge_stream_count.clone(),
|
||||
transport_mode: TransportMode::Quic,
|
||||
fallback_used: false,
|
||||
performance: performance.clone(),
|
||||
metrics: metrics.clone(),
|
||||
config_tx,
|
||||
cancel_token: edge_token.clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
let stream_semaphore = Arc::new(Semaphore::new(MAX_STREAMS_PER_EDGE));
|
||||
let stream_semaphore = Arc::new(Semaphore::new(performance.max_streams_per_edge));
|
||||
|
||||
// Spawn task to accept data streams (tunneled client connections)
|
||||
let data_stream_conn = quic_conn.clone();
|
||||
@@ -1305,6 +1493,7 @@ async fn handle_edge_connection_quic(
|
||||
let data_event_tx = event_tx.clone();
|
||||
let data_semaphore = stream_semaphore.clone();
|
||||
let data_stream_count = edge_stream_count.clone();
|
||||
let data_metrics = metrics.clone();
|
||||
let data_token = edge_token.clone();
|
||||
let data_handle = tokio::spawn(async move {
|
||||
let mut stream_id_counter: u32 = 0;
|
||||
@@ -1317,6 +1506,7 @@ async fn handle_edge_connection_quic(
|
||||
let permit = match data_semaphore.clone().try_acquire_owned() {
|
||||
Ok(p) => p,
|
||||
Err(_) => {
|
||||
data_metrics.rejected_streams.fetch_add(1, Ordering::Relaxed);
|
||||
log::warn!("QUIC edge {} exceeded max streams, rejecting", data_edge_id);
|
||||
// Drop the streams to reject
|
||||
drop(quic_send);
|
||||
@@ -1331,21 +1521,24 @@ async fn handle_edge_connection_quic(
|
||||
let edge_id = data_edge_id.clone();
|
||||
let event_tx = data_event_tx.clone();
|
||||
let stream_count = data_stream_count.clone();
|
||||
let stream_metrics = data_metrics.clone();
|
||||
let stream_token = data_token.child_token();
|
||||
|
||||
let _ = event_tx.try_send(HubEvent::StreamOpened {
|
||||
edge_id: edge_id.clone(),
|
||||
stream_id,
|
||||
});
|
||||
stream_metrics.streams_opened_total.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
stream_count.fetch_add(1, Ordering::Relaxed);
|
||||
tokio::spawn(async move {
|
||||
let _permit = permit;
|
||||
handle_quic_stream(
|
||||
quic_send, quic_recv, stream_id,
|
||||
&target, &edge_id, stream_token,
|
||||
&target, &edge_id, stream_token, stream_metrics.clone(),
|
||||
).await;
|
||||
stream_count.fetch_sub(1, Ordering::Relaxed);
|
||||
stream_metrics.streams_closed_total.fetch_add(1, Ordering::Relaxed);
|
||||
let _ = event_tx.try_send(HubEvent::StreamClosed {
|
||||
edge_id,
|
||||
stream_id,
|
||||
@@ -1364,7 +1557,7 @@ async fn handle_edge_connection_quic(
|
||||
});
|
||||
|
||||
// UDP sessions for QUIC datagram transport
|
||||
let quic_udp_sessions: Arc<Mutex<HashMap<u32, mpsc::Sender<Bytes>>>> =
|
||||
let quic_udp_sessions: Arc<Mutex<HashMap<u32, (mpsc::Sender<Bytes>, Instant)>>> =
|
||||
Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
// Spawn QUIC datagram receiver task
|
||||
@@ -1373,6 +1566,7 @@ async fn handle_edge_connection_quic(
|
||||
let dgram_target = target_host.clone();
|
||||
let dgram_edge_id = edge_id.clone();
|
||||
let dgram_token = edge_token.clone();
|
||||
let dgram_metrics = metrics.clone();
|
||||
let dgram_handle = tokio::spawn(async move {
|
||||
let mut cleanup_interval = tokio::time::interval(Duration::from_secs(30));
|
||||
cleanup_interval.tick().await; // consume initial tick
|
||||
@@ -1380,8 +1574,12 @@ async fn handle_edge_connection_quic(
|
||||
tokio::select! {
|
||||
// Periodic sweep: prune sessions whose task has exited (receiver dropped)
|
||||
_ = cleanup_interval.tick() => {
|
||||
let now = Instant::now();
|
||||
let mut s = dgram_sessions.lock().await;
|
||||
s.retain(|_id, tx| !tx.is_closed());
|
||||
s.retain(|_id, (tx, last_activity)| {
|
||||
!tx.is_closed() && now.duration_since(*last_activity) < Duration::from_secs(60)
|
||||
});
|
||||
dgram_metrics.active_udp_sessions.store(s.len() as u64, Ordering::Relaxed);
|
||||
}
|
||||
datagram = dgram_conn.read_datagram() => {
|
||||
match datagram {
|
||||
@@ -1408,10 +1606,12 @@ async fn handle_edge_connection_quic(
|
||||
let (tx, mut rx) = mpsc::channel::<Bytes>(256);
|
||||
let proxy_v2_data: Vec<u8> = proxy_data.to_vec();
|
||||
let cleanup_sessions = sessions.clone();
|
||||
let session_metrics = dgram_metrics.clone();
|
||||
|
||||
{
|
||||
let mut s = sessions.lock().await;
|
||||
s.insert(session_id, tx);
|
||||
s.insert(session_id, (tx, Instant::now()));
|
||||
dgram_metrics.active_udp_sessions.store(s.len() as u64, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
tokio::spawn(async move {
|
||||
@@ -1419,20 +1619,26 @@ async fn handle_edge_connection_quic(
|
||||
Ok(s) => Arc::new(s),
|
||||
Err(e) => {
|
||||
log::error!("QUIC UDP session {} bind failed: {}", session_id, e);
|
||||
cleanup_sessions.lock().await.remove(&session_id);
|
||||
let mut s = cleanup_sessions.lock().await;
|
||||
s.remove(&session_id);
|
||||
session_metrics.active_udp_sessions.store(s.len() as u64, Ordering::Relaxed);
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Err(e) = upstream.connect((target.as_str(), dest_port)).await {
|
||||
log::error!("QUIC UDP session {} connect failed: {}", session_id, e);
|
||||
cleanup_sessions.lock().await.remove(&session_id);
|
||||
let mut s = cleanup_sessions.lock().await;
|
||||
s.remove(&session_id);
|
||||
session_metrics.active_udp_sessions.store(s.len() as u64, Ordering::Relaxed);
|
||||
return;
|
||||
}
|
||||
|
||||
// Send PROXY v2 header as first datagram so SmartProxy knows the original client
|
||||
if let Err(e) = upstream.send(&proxy_v2_data).await {
|
||||
log::error!("QUIC UDP session {} failed to send PROXY v2 header: {}", session_id, e);
|
||||
cleanup_sessions.lock().await.remove(&session_id);
|
||||
let mut s = cleanup_sessions.lock().await;
|
||||
s.remove(&session_id);
|
||||
session_metrics.active_udp_sessions.store(s.len() as u64, Ordering::Relaxed);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1476,16 +1682,23 @@ async fn handle_edge_connection_quic(
|
||||
}
|
||||
recv_handle.abort();
|
||||
// Clean up session entry to prevent memory leak
|
||||
cleanup_sessions.lock().await.remove(&session_id);
|
||||
let mut s = cleanup_sessions.lock().await;
|
||||
s.remove(&session_id);
|
||||
session_metrics.active_udp_sessions.store(s.len() as u64, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
// Regular data datagram — forward to upstream
|
||||
let sessions = dgram_sessions.lock().await;
|
||||
if let Some(tx) = sessions.get(&session_id) {
|
||||
let _ = tx.try_send(Bytes::copy_from_slice(payload));
|
||||
let mut sessions = dgram_sessions.lock().await;
|
||||
if let Some((tx, last_activity)) = sessions.get_mut(&session_id) {
|
||||
*last_activity = Instant::now();
|
||||
if tx.try_send(Bytes::copy_from_slice(payload)).is_err() {
|
||||
dgram_metrics.dropped_datagrams.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
} else {
|
||||
dgram_metrics.dropped_datagrams.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -1595,6 +1808,7 @@ async fn handle_quic_stream(
|
||||
target_host: &str,
|
||||
_edge_id: &str,
|
||||
stream_token: CancellationToken,
|
||||
metrics: Arc<EdgeRuntimeMetrics>,
|
||||
) {
|
||||
// Read PROXY header from the beginning of the stream
|
||||
let proxy_header = match quic_transport::read_proxy_header(&mut quic_recv).await {
|
||||
@@ -1640,6 +1854,7 @@ async fn handle_quic_stream(
|
||||
|
||||
// Task: QUIC -> upstream (edge data to SmartProxy)
|
||||
let writer_token = stream_token.clone();
|
||||
let writer_metrics = metrics.clone();
|
||||
let mut writer_task = tokio::spawn(async move {
|
||||
let mut buf = vec![0u8; 32768];
|
||||
loop {
|
||||
@@ -1647,6 +1862,7 @@ async fn handle_quic_stream(
|
||||
read_result = quic_recv.read(&mut buf) => {
|
||||
match read_result {
|
||||
Ok(Some(n)) => {
|
||||
writer_metrics.bytes_in.fetch_add(n as u64, Ordering::Relaxed);
|
||||
let write_result = tokio::select! {
|
||||
r = tokio::time::timeout(
|
||||
Duration::from_secs(60),
|
||||
@@ -1678,6 +1894,7 @@ async fn handle_quic_stream(
|
||||
match read_result {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
metrics.bytes_out.fetch_add(n as u64, Ordering::Relaxed);
|
||||
if quic_send.write_all(&buf[..n]).await.is_err() {
|
||||
break;
|
||||
}
|
||||
@@ -1806,6 +2023,7 @@ mod tests {
|
||||
listen_ports_udp: vec![],
|
||||
stun_interval_secs: 300,
|
||||
firewall_config: None,
|
||||
performance: EffectivePerformanceConfig::default(),
|
||||
};
|
||||
let json = serde_json::to_value(&resp).unwrap();
|
||||
assert_eq!(json["listenPorts"], serde_json::json!([443, 8080]));
|
||||
@@ -1821,6 +2039,7 @@ mod tests {
|
||||
listen_ports: vec![80, 443],
|
||||
listen_ports_udp: vec![53],
|
||||
firewall_config: None,
|
||||
performance: EffectivePerformanceConfig::default(),
|
||||
};
|
||||
let json = serde_json::to_value(&update).unwrap();
|
||||
assert_eq!(json["listenPorts"], serde_json::json!([80, 443]));
|
||||
|
||||
Reference in New Issue
Block a user