fix(hub-core): improve stream shutdown handling and connection cleanup in hub and edge

This commit is contained in:
2026-03-26 07:10:15 +00:00
parent 5e93710c42
commit e5a91f298c
11 changed files with 1642 additions and 1197 deletions

View File

@@ -36,6 +36,9 @@ struct EdgeStreamState {
send_window: Arc<AtomicU32>,
/// Notifier to wake the client reader when the window opens.
window_notify: Arc<Notify>,
/// Per-stream cancellation token — cancelled on FRAME_CLOSE_BACK to promptly
/// terminate the upload loop instead of waiting for the window stall timeout.
cancel_token: CancellationToken,
}
/// Edge configuration (hub-host + credentials only; ports come from hub).
@@ -399,7 +402,11 @@ async fn handle_edge_frame(
}
FRAME_CLOSE_BACK => {
let mut writers = client_writers.lock().await;
writers.remove(&frame.stream_id);
if let Some(state) = writers.remove(&frame.stream_id) {
// Cancel the stream's token so the upload loop exits promptly
// instead of waiting for the window stall timeout.
state.cancel_token.cancel();
}
}
FRAME_CONFIG => {
if let Ok(update) = serde_json::from_slice::<ConfigUpdate>(&frame.payload) {
@@ -1012,6 +1019,7 @@ async fn handle_client_connection(
back_tx,
send_window: Arc::clone(&send_window),
window_notify: Arc::clone(&window_notify),
cancel_token: client_token.clone(),
});
}
@@ -1093,8 +1101,8 @@ async fn handle_client_connection(
tokio::select! {
_ = notified => continue,
_ = client_token.cancelled() => break,
_ = tokio::time::sleep(Duration::from_secs(120)) => {
log::warn!("Stream {} upload stalled (window empty for 120s)", stream_id);
_ = tokio::time::sleep(Duration::from_secs(55)) => {
log::warn!("Stream {} upload stalled (window empty for 55s)", stream_id);
break;
}
}

View File

@@ -475,6 +475,12 @@ async fn handle_hub_frame(
})??;
upstream.set_nodelay(true)?;
// TCP keepalive detects silent failures on the hub→SmartProxy connection
let ka = socket2::TcpKeepalive::new()
.with_time(Duration::from_secs(30));
#[cfg(target_os = "linux")]
let ka = ka.with_interval(Duration::from_secs(10));
let _ = socket2::SockRef::from(&upstream).set_tcp_keepalive(&ka);
upstream.write_all(proxy_header.as_bytes()).await?;
let (mut up_read, mut up_write) =
@@ -485,7 +491,7 @@ async fn handle_hub_frame(
let writer_token = stream_token.clone();
let wub_tx = writer_tx.clone();
let stream_counter_w = Arc::clone(&stream_counter);
let writer_for_edge_data = tokio::spawn(async move {
let mut writer_for_edge_data = tokio::spawn(async move {
let mut consumed_since_update: u32 = 0;
loop {
tokio::select! {
@@ -569,8 +575,8 @@ async fn handle_hub_frame(
tokio::select! {
_ = notified => continue,
_ = stream_token.cancelled() => break,
_ = tokio::time::sleep(Duration::from_secs(120)) => {
log::warn!("Stream {} download stalled (window empty for 120s)", stream_id);
_ = tokio::time::sleep(Duration::from_secs(55)) => {
log::warn!("Stream {} download stalled (window empty for 55s)", stream_id);
break;
}
}
@@ -633,7 +639,11 @@ async fn handle_hub_frame(
}
}
writer_for_edge_data.abort();
// Give the writer task 2s to shut down gracefully (sends TCP FIN
// via up_write.shutdown()) before force-aborting (which causes RST).
if tokio::time::timeout(Duration::from_secs(2), &mut writer_for_edge_data).await.is_err() {
writer_for_edge_data.abort();
}
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
}
.await;
@@ -1379,6 +1389,7 @@ async fn handle_edge_connection_quic(
let session_token = dgram_token.child_token();
let (tx, mut rx) = mpsc::channel::<Bytes>(256);
let proxy_v2_data: Vec<u8> = proxy_data.to_vec();
let cleanup_sessions = sessions.clone();
{
let mut s = sessions.lock().await;
@@ -1390,17 +1401,20 @@ async fn handle_edge_connection_quic(
Ok(s) => Arc::new(s),
Err(e) => {
log::error!("QUIC UDP session {} bind failed: {}", session_id, e);
cleanup_sessions.lock().await.remove(&session_id);
return;
}
};
if let Err(e) = upstream.connect((target.as_str(), dest_port)).await {
log::error!("QUIC UDP session {} connect failed: {}", session_id, e);
cleanup_sessions.lock().await.remove(&session_id);
return;
}
// Send PROXY v2 header as first datagram so SmartProxy knows the original client
if let Err(e) = upstream.send(&proxy_v2_data).await {
log::error!("QUIC UDP session {} failed to send PROXY v2 header: {}", session_id, e);
cleanup_sessions.lock().await.remove(&session_id);
return;
}
@@ -1443,6 +1457,8 @@ async fn handle_edge_connection_quic(
}
}
recv_handle.abort();
// Clean up session entry to prevent memory leak
cleanup_sessions.lock().await.remove(&session_id);
});
continue;
@@ -1590,6 +1606,12 @@ async fn handle_quic_stream(
};
let _ = upstream.set_nodelay(true);
// TCP keepalive detects silent failures on the hub→SmartProxy connection
let ka = socket2::TcpKeepalive::new()
.with_time(Duration::from_secs(30));
#[cfg(target_os = "linux")]
let ka = ka.with_interval(Duration::from_secs(10));
let _ = socket2::SockRef::from(&upstream).set_tcp_keepalive(&ka);
// Send PROXY header to SmartProxy
if let Err(e) = upstream.write_all(proxy_header.as_bytes()).await {
log::error!("QUIC stream {} failed to write PROXY header to upstream: {}", stream_id, e);
@@ -1600,7 +1622,7 @@ async fn handle_quic_stream(
// Task: QUIC -> upstream (edge data to SmartProxy)
let writer_token = stream_token.clone();
let writer_task = tokio::spawn(async move {
let mut writer_task = tokio::spawn(async move {
let mut buf = vec![0u8; 32768];
loop {
tokio::select! {
@@ -1651,7 +1673,11 @@ async fn handle_quic_stream(
// Gracefully close the QUIC send stream
let _ = quic_send.finish();
writer_task.abort();
// Give the writer task 2s to shut down gracefully (sends TCP FIN
// via up_write.shutdown()) before force-aborting (which causes RST).
if tokio::time::timeout(Duration::from_secs(2), &mut writer_task).await.is_err() {
writer_task.abort();
}
}
#[cfg(test)]