feat(core,edge,hub,transport): add QUIC tunnel transport support with optional edge transport selection

This commit is contained in:
2026-03-19 10:44:22 +00:00
parent e4807be00b
commit 6abfd2ff2a
11 changed files with 2051 additions and 19 deletions

View File

@@ -13,6 +13,8 @@ use serde::{Deserialize, Serialize};
use bytes::Bytes;
use remoteingress_protocol::*;
use crate::transport::TransportMode;
use crate::transport::quic as quic_transport;
type EdgeTlsStream = tokio_rustls::client::TlsStream<TcpStream>;
@@ -47,6 +49,9 @@ pub struct EdgeConfig {
/// Useful for testing on localhost where edge and upstream share the same machine.
#[serde(default)]
pub bind_address: Option<String>,
/// Transport mode for the tunnel connection (defaults to TcpTls).
#[serde(default)]
pub transport_mode: Option<TransportMode>,
}
/// Handshake config received from hub after authentication.
@@ -210,6 +215,8 @@ async fn edge_main_loop(
let mut backoff_ms: u64 = 1000;
let max_backoff_ms: u64 = 30000;
let transport_mode = config.transport_mode.unwrap_or(TransportMode::TcpTls);
// Build TLS config ONCE outside the reconnect loop — preserves session
// cache across reconnections for TLS session resumption (saves 1 RTT).
let tls_config = rustls::ClientConfig::builder()
@@ -218,24 +225,77 @@ async fn edge_main_loop(
.with_no_client_auth();
let connector = TlsConnector::from(Arc::new(tls_config));
// Build QUIC client config ONCE (shares session cache across reconnections).
let quic_client_config = quic_transport::build_quic_client_config();
let quic_endpoint = if matches!(transport_mode, TransportMode::Quic | TransportMode::QuicWithFallback) {
match quinn::Endpoint::client("0.0.0.0:0".parse().unwrap()) {
Ok(mut ep) => {
ep.set_default_client_config(quic_client_config);
Some(ep)
}
Err(e) => {
log::error!("Failed to create QUIC endpoint: {}", e);
None
}
}
} else {
None
};
loop {
// Create a per-connection child token
let connection_token = cancel_token.child_token();
// Try to connect to hub
let result = connect_to_hub_and_run(
&config,
&connected,
&public_ip,
&active_streams,
&next_stream_id,
&event_tx,
&listen_ports,
&mut shutdown_rx,
&connection_token,
&connector,
)
.await;
// Try to connect to hub using the configured transport
let result = match transport_mode {
TransportMode::TcpTls => {
connect_to_hub_and_run(
&config, &connected, &public_ip, &active_streams, &next_stream_id,
&event_tx, &listen_ports, &mut shutdown_rx, &connection_token, &connector,
).await
}
TransportMode::Quic => {
if let Some(ep) = &quic_endpoint {
connect_to_hub_and_run_quic(
&config, &connected, &public_ip, &active_streams, &next_stream_id,
&event_tx, &listen_ports, &mut shutdown_rx, &connection_token, ep,
).await
} else {
EdgeLoopResult::Reconnect("quic_endpoint_unavailable".to_string())
}
}
TransportMode::QuicWithFallback => {
if let Some(ep) = &quic_endpoint {
// Try QUIC first with a 5s timeout
let quic_result = tokio::time::timeout(
Duration::from_secs(5),
connect_to_hub_quic_handshake(&config, ep, &connection_token),
).await;
match quic_result {
Ok(Ok(quic_conn)) => {
connect_to_hub_and_run_quic_with_connection(
&config, &connected, &public_ip, &active_streams, &next_stream_id,
&event_tx, &listen_ports, &mut shutdown_rx, &connection_token,
quic_conn,
).await
}
_ => {
log::info!("QUIC connect failed or timed out, falling back to TCP+TLS");
connect_to_hub_and_run(
&config, &connected, &public_ip, &active_streams, &next_stream_id,
&event_tx, &listen_ports, &mut shutdown_rx, &connection_token, &connector,
).await
}
}
} else {
// No QUIC endpoint, fall back to TCP+TLS
connect_to_hub_and_run(
&config, &connected, &public_ip, &active_streams, &next_stream_id,
&event_tx, &listen_ports, &mut shutdown_rx, &connection_token, &connector,
).await
}
}
};
// Cancel connection token to kill all orphaned tasks from this cycle
connection_token.cancel();
@@ -942,6 +1002,437 @@ async fn handle_client_connection(
let _ = edge_id; // used for logging context
}
// ===== QUIC transport functions =====
/// Perform QUIC handshake only (used by QuicWithFallback to test connectivity).
async fn connect_to_hub_quic_handshake(
config: &EdgeConfig,
endpoint: &quinn::Endpoint,
_connection_token: &CancellationToken,
) -> Result<quinn::Connection, Box<dyn std::error::Error + Send + Sync>> {
let addr = format!("{}:{}", config.hub_host, config.hub_port);
let server_addr: std::net::SocketAddr = tokio::net::lookup_host(&addr)
.await?
.next()
.ok_or("DNS resolution failed")?;
// QUIC/TLS SNI requires a hostname, not an IP address.
// If hub_host is an IP, use the same fallback as the TCP+TLS path.
let server_name = match rustls::pki_types::ServerName::try_from(config.hub_host.as_str()) {
Ok(rustls::pki_types::ServerName::DnsName(_)) => config.hub_host.clone(),
_ => "remoteingress-hub".to_string(),
};
let connection = endpoint.connect(server_addr, &server_name)?.await?;
Ok(connection)
}
/// QUIC edge: connect to hub, authenticate, and run the stream multiplexer.
async fn connect_to_hub_and_run_quic(
config: &EdgeConfig,
connected: &Arc<RwLock<bool>>,
public_ip: &Arc<RwLock<Option<String>>>,
active_streams: &Arc<AtomicU32>,
next_stream_id: &Arc<AtomicU32>,
event_tx: &mpsc::Sender<EdgeEvent>,
listen_ports: &Arc<RwLock<Vec<u16>>>,
shutdown_rx: &mut mpsc::Receiver<()>,
connection_token: &CancellationToken,
endpoint: &quinn::Endpoint,
) -> EdgeLoopResult {
// Establish QUIC connection
let quic_conn = match connect_to_hub_quic_handshake(config, endpoint, connection_token).await {
Ok(c) => c,
Err(e) => {
log::error!("QUIC connect failed: {}", e);
return EdgeLoopResult::Reconnect(format!("quic_connect_failed: {}", e));
}
};
connect_to_hub_and_run_quic_with_connection(
config, connected, public_ip, active_streams, next_stream_id,
event_tx, listen_ports, shutdown_rx, connection_token, quic_conn,
).await
}
/// QUIC edge: run with an already-established QUIC connection.
async fn connect_to_hub_and_run_quic_with_connection(
config: &EdgeConfig,
connected: &Arc<RwLock<bool>>,
public_ip: &Arc<RwLock<Option<String>>>,
active_streams: &Arc<AtomicU32>,
next_stream_id: &Arc<AtomicU32>,
event_tx: &mpsc::Sender<EdgeEvent>,
listen_ports: &Arc<RwLock<Vec<u16>>>,
shutdown_rx: &mut mpsc::Receiver<()>,
connection_token: &CancellationToken,
quic_conn: quinn::Connection,
) -> EdgeLoopResult {
log::info!("QUIC connection established to {}", quic_conn.remote_address());
// Open control stream (first bidirectional stream)
let (mut ctrl_send, mut ctrl_recv) = match quic_conn.open_bi().await {
Ok(s) => s,
Err(e) => {
log::error!("Failed to open QUIC control stream: {}", e);
return EdgeLoopResult::Reconnect(format!("quic_ctrl_open_failed: {}", e));
}
};
// Auth handshake on control stream (same protocol as TCP+TLS)
let auth_line = format!("EDGE {} {}\n", config.edge_id, config.secret);
if let Err(e) = ctrl_send.write_all(auth_line.as_bytes()).await {
return EdgeLoopResult::Reconnect(format!("quic_auth_write_failed: {}", e));
}
// Read handshake response (newline-delimited JSON)
let mut handshake_bytes = Vec::with_capacity(512);
let mut byte = [0u8; 1];
loop {
match ctrl_recv.read_exact(&mut byte).await {
Ok(()) => {
handshake_bytes.push(byte[0]);
if byte[0] == b'\n' { break; }
if handshake_bytes.len() > 8192 {
return EdgeLoopResult::Reconnect("quic_handshake_too_long".to_string());
}
}
Err(e) => {
log::error!("QUIC handshake read failed: {}", e);
return EdgeLoopResult::Reconnect(format!("quic_handshake_read_failed: {}", e));
}
}
}
let handshake_line = String::from_utf8_lossy(&handshake_bytes);
let handshake: HandshakeConfig = match serde_json::from_str(handshake_line.trim()) {
Ok(h) => h,
Err(e) => {
log::error!("Invalid QUIC handshake response: {}", e);
return EdgeLoopResult::Reconnect(format!("quic_handshake_invalid: {}", e));
}
};
log::info!(
"QUIC handshake from hub: ports {:?}, stun_interval {}s",
handshake.listen_ports,
handshake.stun_interval_secs
);
*connected.write().await = true;
let _ = event_tx.try_send(EdgeEvent::TunnelConnected);
log::info!("Connected to hub via QUIC at {}", quic_conn.remote_address());
*listen_ports.write().await = handshake.listen_ports.clone();
let _ = event_tx.try_send(EdgeEvent::PortsAssigned {
listen_ports: handshake.listen_ports.clone(),
});
// Start STUN discovery
let stun_interval = handshake.stun_interval_secs;
let public_ip_clone = public_ip.clone();
let event_tx_clone = event_tx.clone();
let stun_token = connection_token.clone();
let stun_handle = tokio::spawn(async move {
loop {
tokio::select! {
ip_result = crate::stun::discover_public_ip() => {
if let Some(ip) = ip_result {
let mut pip = public_ip_clone.write().await;
let changed = pip.as_ref() != Some(&ip);
*pip = Some(ip.clone());
if changed {
let _ = event_tx_clone.try_send(EdgeEvent::PublicIpDiscovered { ip });
}
}
}
_ = stun_token.cancelled() => break,
}
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(stun_interval)) => {}
_ = stun_token.cancelled() => break,
}
}
});
// Start TCP listeners for the assigned ports.
// For QUIC, each client connection opens a new QUIC bidirectional stream.
let mut port_listeners: HashMap<u16, JoinHandle<()>> = HashMap::new();
let bind_address = config.bind_address.as_deref().unwrap_or("0.0.0.0");
apply_port_config_quic(
&handshake.listen_ports,
&mut port_listeners,
&quic_conn,
active_streams,
next_stream_id,
&config.edge_id,
connection_token,
bind_address,
);
// Monitor control stream for config updates, and connection health.
// Also handle shutdown signals.
let result = 'quic_loop: loop {
tokio::select! {
// Read control messages from hub
ctrl_msg = quic_transport::read_ctrl_message(&mut ctrl_recv) => {
match ctrl_msg {
Ok(Some((msg_type, payload))) => {
match msg_type {
quic_transport::CTRL_CONFIG => {
if let Ok(update) = serde_json::from_slice::<ConfigUpdate>(&payload) {
log::info!("QUIC config update from hub: ports {:?}", update.listen_ports);
*listen_ports.write().await = update.listen_ports.clone();
let _ = event_tx.try_send(EdgeEvent::PortsUpdated {
listen_ports: update.listen_ports.clone(),
});
apply_port_config_quic(
&update.listen_ports,
&mut port_listeners,
&quic_conn,
active_streams,
next_stream_id,
&config.edge_id,
connection_token,
bind_address,
);
}
}
quic_transport::CTRL_PING => {
// Respond with PONG on control stream
if let Err(e) = quic_transport::write_ctrl_message(
&mut ctrl_send, quic_transport::CTRL_PONG, &[],
).await {
log::error!("Failed to send QUIC PONG: {}", e);
break 'quic_loop EdgeLoopResult::Reconnect(
format!("quic_pong_failed: {}", e),
);
}
}
_ => {
log::warn!("Unknown QUIC control message type: {}", msg_type);
}
}
}
Ok(None) => {
log::info!("Hub closed QUIC control stream (EOF)");
break 'quic_loop EdgeLoopResult::Reconnect("quic_ctrl_eof".to_string());
}
Err(e) => {
log::error!("QUIC control stream read error: {}", e);
break 'quic_loop EdgeLoopResult::Reconnect(
format!("quic_ctrl_error: {}", e),
);
}
}
}
// QUIC connection closed
reason = quic_conn.closed() => {
log::info!("QUIC connection closed: {}", reason);
break 'quic_loop EdgeLoopResult::Reconnect(format!("quic_closed: {}", reason));
}
// Shutdown signal
_ = connection_token.cancelled() => {
if shutdown_rx.try_recv().is_ok() {
break 'quic_loop EdgeLoopResult::Shutdown;
}
break 'quic_loop EdgeLoopResult::Shutdown;
}
}
};
// Cleanup
connection_token.cancel();
stun_handle.abort();
for (_, h) in port_listeners.drain() {
h.abort();
}
// Graceful QUIC close
quic_conn.close(quinn::VarInt::from_u32(0), b"shutdown");
result
}
/// Apply port config for QUIC transport: spawn TCP listeners that open QUIC streams.
fn apply_port_config_quic(
new_ports: &[u16],
port_listeners: &mut HashMap<u16, JoinHandle<()>>,
quic_conn: &quinn::Connection,
active_streams: &Arc<AtomicU32>,
next_stream_id: &Arc<AtomicU32>,
edge_id: &str,
connection_token: &CancellationToken,
bind_address: &str,
) {
let new_set: std::collections::HashSet<u16> = new_ports.iter().copied().collect();
let old_set: std::collections::HashSet<u16> = port_listeners.keys().copied().collect();
// Remove ports no longer needed
for &port in old_set.difference(&new_set) {
if let Some(handle) = port_listeners.remove(&port) {
log::info!("Stopping QUIC listener on port {}", port);
handle.abort();
}
}
// Add new ports
for &port in new_set.difference(&old_set) {
let quic_conn = quic_conn.clone();
let active_streams = active_streams.clone();
let next_stream_id = next_stream_id.clone();
let _edge_id = edge_id.to_string();
let port_token = connection_token.child_token();
let bind_addr = bind_address.to_string();
let handle = tokio::spawn(async move {
let listener = match TcpListener::bind((bind_addr.as_str(), port)).await {
Ok(l) => l,
Err(e) => {
log::error!("Failed to bind port {} (QUIC): {}", port, e);
return;
}
};
log::info!("Listening on port {} (QUIC transport)", port);
loop {
tokio::select! {
accept_result = listener.accept() => {
match accept_result {
Ok((client_stream, client_addr)) => {
let _ = client_stream.set_nodelay(true);
let ka = socket2::TcpKeepalive::new()
.with_time(Duration::from_secs(60));
#[cfg(target_os = "linux")]
let ka = ka.with_interval(Duration::from_secs(60));
let _ = socket2::SockRef::from(&client_stream).set_tcp_keepalive(&ka);
let stream_id = next_stream_id.fetch_add(1, Ordering::Relaxed);
let quic_conn = quic_conn.clone();
let active_streams = active_streams.clone();
let client_token = port_token.child_token();
active_streams.fetch_add(1, Ordering::Relaxed);
tokio::spawn(async move {
handle_client_connection_quic(
client_stream,
client_addr,
stream_id,
port,
quic_conn,
client_token,
).await;
// Saturating decrement
loop {
let current = active_streams.load(Ordering::Relaxed);
if current == 0 { break; }
if active_streams.compare_exchange_weak(
current, current - 1,
Ordering::Relaxed, Ordering::Relaxed,
).is_ok() {
break;
}
}
});
}
Err(e) => {
log::error!("Accept error on port {} (QUIC): {}", port, e);
}
}
}
_ = port_token.cancelled() => {
log::info!("Port {} QUIC listener cancelled", port);
break;
}
}
}
});
port_listeners.insert(port, handle);
}
}
/// Handle a single client connection via QUIC transport.
/// Opens a new QUIC bidirectional stream, sends the PROXY header,
/// then bidirectionally copies data between the client TCP socket and the QUIC stream.
async fn handle_client_connection_quic(
client_stream: TcpStream,
client_addr: std::net::SocketAddr,
stream_id: u32,
dest_port: u16,
quic_conn: quinn::Connection,
client_token: CancellationToken,
) {
let client_ip = client_addr.ip().to_string();
let client_port = client_addr.port();
let edge_ip = "0.0.0.0";
// Open a new QUIC bidirectional stream for this client connection
let (mut quic_send, mut quic_recv) = match quic_conn.open_bi().await {
Ok(s) => s,
Err(e) => {
log::error!("Stream {} failed to open QUIC bi stream: {}", stream_id, e);
return;
}
};
// Send PROXY header as first bytes on the stream
let proxy_header = build_proxy_v1_header(&client_ip, edge_ip, client_port, dest_port);
if let Err(e) = quic_transport::write_proxy_header(&mut quic_send, &proxy_header).await {
log::error!("Stream {} failed to write PROXY header: {}", stream_id, e);
return;
}
let (mut client_read, mut client_write) = client_stream.into_split();
// Task: QUIC -> client (download direction)
let dl_token = client_token.clone();
let mut dl_task = tokio::spawn(async move {
let mut buf = vec![0u8; 32768];
loop {
tokio::select! {
read_result = quic_recv.read(&mut buf) => {
match read_result {
Ok(Some(n)) => {
if client_write.write_all(&buf[..n]).await.is_err() {
break;
}
}
Ok(None) => break, // QUIC stream finished
Err(_) => break,
}
}
_ = dl_token.cancelled() => break,
}
}
let _ = client_write.shutdown().await;
});
// Task: client -> QUIC (upload direction)
let mut buf = vec![0u8; 32768];
loop {
tokio::select! {
read_result = client_read.read(&mut buf) => {
match read_result {
Ok(0) => break, // client EOF
Ok(n) => {
if quic_send.write_all(&buf[..n]).await.is_err() {
break;
}
}
Err(_) => break,
}
}
_ = client_token.cancelled() => break,
}
}
// Wait for download task to finish before closing the QUIC stream
let _ = tokio::time::timeout(Duration::from_secs(300), &mut dl_task).await;
// Gracefully close the QUIC send stream
let _ = quic_send.finish();
dl_task.abort();
}
#[cfg(test)]
mod tests {
use super::*;
@@ -971,6 +1462,7 @@ mod tests {
edge_id: "e1".to_string(),
secret: "sec".to_string(),
bind_address: None,
transport_mode: None,
};
let json = serde_json::to_string(&config).unwrap();
let back: EdgeConfig = serde_json::from_str(&json).unwrap();
@@ -988,6 +1480,44 @@ mod tests {
assert_eq!(hc.stun_interval_secs, 120);
}
#[test]
fn test_edge_config_transport_mode_deserialize() {
let json = r#"{
"hubHost": "hub.test",
"hubPort": 8443,
"edgeId": "e1",
"secret": "s",
"transportMode": "quic"
}"#;
let config: EdgeConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.transport_mode, Some(TransportMode::Quic));
}
#[test]
fn test_edge_config_transport_mode_default() {
let json = r#"{
"hubHost": "hub.test",
"hubPort": 8443,
"edgeId": "e1",
"secret": "s"
}"#;
let config: EdgeConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.transport_mode, None);
}
#[test]
fn test_edge_config_transport_mode_quic_with_fallback() {
let json = r#"{
"hubHost": "hub.test",
"hubPort": 8443,
"edgeId": "e1",
"secret": "s",
"transportMode": "quicWithFallback"
}"#;
let config: EdgeConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.transport_mode, Some(TransportMode::QuicWithFallback));
}
#[test]
fn test_handshake_config_default_stun_interval() {
let json = r#"{"listenPorts": [443]}"#;
@@ -1088,6 +1618,7 @@ mod tests {
edge_id: "test-edge".to_string(),
secret: "test-secret".to_string(),
bind_address: None,
transport_mode: None,
});
let status = edge.get_status().await;
assert!(!status.running);
@@ -1105,6 +1636,7 @@ mod tests {
edge_id: "e".to_string(),
secret: "s".to_string(),
bind_address: None,
transport_mode: None,
});
let rx1 = edge.take_event_rx().await;
assert!(rx1.is_some());
@@ -1120,6 +1652,7 @@ mod tests {
edge_id: "e".to_string(),
secret: "s".to_string(),
bind_address: None,
transport_mode: None,
});
edge.stop().await; // should not panic
let status = edge.get_status().await;