fix(remoteingress-core): send PROXY v2 headers for UDP upstream sessions and expire idle UDP sessions

This commit is contained in:
2026-03-19 14:09:32 +00:00
parent bc89e49f39
commit a400945371
5 changed files with 66 additions and 8 deletions

View File

@@ -642,8 +642,23 @@ async fn connect_to_hub_and_run(
let liveness_timeout_dur = Duration::from_secs(45);
let mut last_activity = Instant::now();
let mut liveness_deadline = Box::pin(sleep_until(last_activity + liveness_timeout_dur));
let mut next_udp_expiry = Instant::now() + Duration::from_secs(30);
let result = 'io_loop: loop {
// Expire idle UDP sessions periodically
if Instant::now() >= next_udp_expiry {
let mut sessions = udp_sessions.lock().await;
let expired = sessions.expire_idle();
for sid in &expired {
let close_frame = encode_frame(*sid, FRAME_UDP_CLOSE, &[]);
let _ = tunnel_data_tx.try_send(close_frame);
}
if !expired.is_empty() {
log::debug!("Expired {} idle UDP sessions", expired.len());
}
next_udp_expiry = Instant::now() + Duration::from_secs(30);
}
// Drain any buffered frames
loop {
let frame = match tunnel_io.try_parse_frame() {
@@ -1346,7 +1361,18 @@ async fn connect_to_hub_and_run_quic_with_connection(
);
// Monitor control stream for config updates, connection health, and QUIC datagrams.
let mut next_udp_expiry_quic = Instant::now() + Duration::from_secs(30);
let result = 'quic_loop: loop {
// Expire idle UDP sessions periodically
if Instant::now() >= next_udp_expiry_quic {
let mut sessions = udp_sessions_quic.lock().await;
let expired = sessions.expire_idle();
if !expired.is_empty() {
log::debug!("Expired {} idle QUIC UDP sessions", expired.len());
}
next_udp_expiry_quic = Instant::now() + Duration::from_secs(30);
}
tokio::select! {
// Read control messages from hub
ctrl_msg = quic_transport::read_ctrl_message(&mut ctrl_recv) => {
@@ -1370,6 +1396,16 @@ async fn connect_to_hub_and_run_quic_with_connection(
connection_token,
bind_address,
);
apply_udp_port_config_quic(
&update.listen_ports_udp,
&mut udp_listeners_quic,
&quic_conn,
&udp_sessions_quic,
&udp_sockets_quic,
next_stream_id,
connection_token,
bind_address,
);
}
}
quic_transport::CTRL_PING => {

View File

@@ -706,6 +706,7 @@ async fn handle_hub_frame(
let data_writer_tx = data_tx.clone();
let session_token = edge_token.child_token();
let edge_id_str = edge_id.to_string();
let proxy_v2_header = frame.payload.clone();
// Channel for forwarding datagrams from edge to upstream
let (udp_tx, mut udp_rx) = mpsc::channel::<Bytes>(256);
@@ -728,6 +729,12 @@ async fn handle_hub_frame(
return;
}
// Send PROXY v2 header as first datagram so SmartProxy knows the original client
if let Err(e) = upstream.send(&proxy_v2_header).await {
log::error!("UDP session {} failed to send PROXY v2 header: {}", stream_id, e);
return;
}
// Task: upstream -> edge (return datagrams)
let upstream_recv = Arc::new(upstream);
let upstream_send = upstream_recv.clone();
@@ -1367,6 +1374,7 @@ async fn handle_edge_connection_quic(
let sessions = dgram_sessions.clone();
let session_token = dgram_token.child_token();
let (tx, mut rx) = mpsc::channel::<Bytes>(256);
let proxy_v2_data: Vec<u8> = proxy_data.to_vec();
{
let mut s = sessions.lock().await;
@@ -1386,6 +1394,12 @@ async fn handle_edge_connection_quic(
return;
}
// Send PROXY v2 header as first datagram so SmartProxy knows the original client
if let Err(e) = upstream.send(&proxy_v2_data).await {
log::error!("QUIC UDP session {} failed to send PROXY v2 header: {}", session_id, e);
return;
}
// Upstream recv → QUIC datagram back to edge
let upstream_recv = upstream.clone();
let recv_conn = conn.clone();