fix(hub): cancel per-stream tokens on stream close and avoid duplicate StreamClosed events; bump @types/node devDependency to ^25.3.0
This commit is contained in:
@@ -105,7 +105,7 @@ pub struct TunnelHub {
|
||||
|
||||
struct ConnectedEdgeInfo {
|
||||
connected_at: u64,
|
||||
active_streams: Arc<Mutex<HashMap<u32, mpsc::Sender<Vec<u8>>>>>,
|
||||
active_streams: Arc<Mutex<HashMap<u32, (mpsc::Sender<Vec<u8>>, CancellationToken)>>>,
|
||||
config_tx: mpsc::Sender<EdgeConfigUpdate>,
|
||||
#[allow(dead_code)] // kept alive for Drop — cancels child tokens when edge is removed
|
||||
cancel_token: CancellationToken,
|
||||
@@ -322,7 +322,7 @@ async fn handle_edge_connection(
|
||||
write_half.write_all(handshake_json.as_bytes()).await?;
|
||||
|
||||
// Track this edge
|
||||
let streams: Arc<Mutex<HashMap<u32, mpsc::Sender<Vec<u8>>>>> =
|
||||
let streams: Arc<Mutex<HashMap<u32, (mpsc::Sender<Vec<u8>>, CancellationToken)>>> =
|
||||
Arc::new(Mutex::new(HashMap::new()));
|
||||
let now = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
@@ -409,7 +409,7 @@ async fn handle_edge_connection(
|
||||
let (data_tx, mut data_rx) = mpsc::channel::<Vec<u8>>(256);
|
||||
{
|
||||
let mut s = streams.lock().await;
|
||||
s.insert(stream_id, data_tx);
|
||||
s.insert(stream_id, (data_tx, stream_token.clone()));
|
||||
}
|
||||
|
||||
// Spawn task: connect to SmartProxy, send PROXY header, pipe data
|
||||
@@ -487,26 +487,34 @@ async fn handle_edge_connection(
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up stream
|
||||
{
|
||||
// Clean up stream (guard against duplicate if FRAME_CLOSE already removed it)
|
||||
let was_present = {
|
||||
let mut s = streams_clone.lock().await;
|
||||
s.remove(&stream_id);
|
||||
s.remove(&stream_id).is_some()
|
||||
};
|
||||
if was_present {
|
||||
let _ = event_tx_clone.try_send(HubEvent::StreamClosed {
|
||||
edge_id: edge_id_clone,
|
||||
stream_id,
|
||||
});
|
||||
}
|
||||
let _ = event_tx_clone.try_send(HubEvent::StreamClosed {
|
||||
edge_id: edge_id_clone,
|
||||
stream_id,
|
||||
});
|
||||
});
|
||||
}
|
||||
FRAME_DATA => {
|
||||
let s = streams.lock().await;
|
||||
if let Some(tx) = s.get(&frame.stream_id) {
|
||||
if let Some((tx, _)) = s.get(&frame.stream_id) {
|
||||
let _ = tx.send(frame.payload).await;
|
||||
}
|
||||
}
|
||||
FRAME_CLOSE => {
|
||||
let mut s = streams.lock().await;
|
||||
s.remove(&frame.stream_id);
|
||||
if let Some((_, token)) = s.remove(&frame.stream_id) {
|
||||
token.cancel();
|
||||
let _ = event_tx.try_send(HubEvent::StreamClosed {
|
||||
edge_id: edge_id.clone(),
|
||||
stream_id: frame.stream_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
log::warn!("Unexpected frame type {} from edge", frame.frame_type);
|
||||
|
||||
Reference in New Issue
Block a user