fix(tunnel): prevent tunnel backpressure buffering from exhausting memory and cancel stream handlers before TLS shutdown

This commit is contained in:
2026-03-17 13:27:26 +00:00
parent 79af6fd425
commit 81bbb33016
6 changed files with 429 additions and 22 deletions

View File

@@ -587,21 +587,23 @@ async fn connect_to_hub_and_run(
}
};
// Graceful TLS shutdown: send close_notify so the hub sees a clean disconnect
// instead of "peer closed connection without sending TLS close_notify".
let mut tls_stream = tunnel_io.into_inner();
let _ = tokio::time::timeout(
Duration::from_secs(2),
tls_stream.shutdown(),
).await;
// Cleanup
// Cancel stream tokens FIRST so stream handlers exit immediately.
// If we TLS-shutdown first, stream handlers are stuck sending to dead channels
// for up to 2 seconds while the shutdown times out on a dead connection.
connection_token.cancel();
stun_handle.abort();
for (_, h) in port_listeners.drain() {
h.abort();
}
// Graceful TLS shutdown: send close_notify so the hub sees a clean disconnect.
// Stream handlers are already cancelled, so no new data is being produced.
let mut tls_stream = tunnel_io.into_inner();
let _ = tokio::time::timeout(
Duration::from_secs(2),
tls_stream.shutdown(),
).await;
result
}

View File

@@ -844,17 +844,19 @@ async fn handle_edge_connection(
}
}
// Graceful TLS shutdown: send close_notify so the edge sees a clean disconnect
// instead of "peer closed connection without sending TLS close_notify".
// Cancel stream tokens FIRST so stream handlers exit immediately.
// If we TLS-shutdown first, stream handlers are stuck sending to dead channels
// for up to 2 seconds while the shutdown times out on a dead connection.
edge_token.cancel();
config_handle.abort();
// Graceful TLS shutdown: send close_notify so the edge sees a clean disconnect.
// Stream handlers are already cancelled, so no new data is being produced.
let mut tls_stream = tunnel_io.into_inner();
let _ = tokio::time::timeout(
Duration::from_secs(2),
tls_stream.shutdown(),
).await;
// Cleanup: cancel edge token to propagate to all child tasks
edge_token.cancel();
config_handle.abort();
{
let mut edges = connected.lock().await;
edges.remove(&edge_id);

View File

@@ -312,11 +312,12 @@ impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
cancel_token: &tokio_util::sync::CancellationToken,
) -> Poll<TunnelEvent> {
// 1. WRITE: drain ctrl queue first, then data queue.
// TLS poll_write writes plaintext to session buffer (always Ready).
// Batch up to 16 frames per poll cycle.
// Only write when flush is complete — otherwise the TLS session buffer
// grows without bound (poll_write always returns Ready, buffering plaintext
// in the TLS session even when TCP can't keep up).
// Safe: `self.write` and `self.stream` are disjoint fields.
let mut writes = 0;
while self.write.has_work() && writes < 16 {
while self.write.has_work() && writes < 16 && !self.write.flush_needed {
let from_ctrl = !self.write.ctrl_queue.is_empty();
let frame = if from_ctrl {
self.write.ctrl_queue.front().unwrap()
@@ -424,10 +425,12 @@ impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
return Poll::Ready(TunnelEvent::Cancelled);
}
// 6. SELF-WAKE: only when we have frames AND flush is done.
// 6. SELF-WAKE: only when flush is complete AND we have work.
// If flush is pending, the TCP write-readiness waker will notify us.
// If we got new channel frames, wake to write them.
if got_new || (!self.write.flush_needed && self.write.has_work()) {
// CRITICAL: do NOT self-wake when flush_needed — this causes unbounded
// TLS session buffer growth (poll_write always accepts plaintext, but TCP
// can't drain it fast enough → OOM → process killed → ECONNRESET).
if !self.write.flush_needed && (got_new || self.write.has_work()) {
cx.waker().wake_by_ref();
}