fix(rust-protocol): switch tunnel frame buffers from Vec<u8> to Bytes to reduce copying and memory overhead

This commit is contained in:
2026-03-17 23:29:02 +00:00
parent 4cfc518301
commit 93578d7034
8 changed files with 63 additions and 47 deletions

View File

@@ -2,6 +2,7 @@ use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::{Bytes, BytesMut, BufMut};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf};
// Frame type constants
@@ -31,7 +32,7 @@ pub const WINDOW_UPDATE_THRESHOLD: u32 = INITIAL_STREAM_WINDOW / 2;
pub const MAX_WINDOW_SIZE: u32 = 16 * 1024 * 1024;
/// Encode a WINDOW_UPDATE frame for a specific stream.
pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> Vec<u8> {
pub fn encode_window_update(stream_id: u32, frame_type: u8, increment: u32) -> Bytes {
encode_frame(stream_id, frame_type, &increment.to_be_bytes())
}
@@ -56,18 +57,18 @@ pub fn decode_window_update(payload: &[u8]) -> Option<u32> {
pub struct Frame {
pub stream_id: u32,
pub frame_type: u8,
pub payload: Vec<u8>,
pub payload: Bytes,
}
/// Encode a frame into bytes: [stream_id:4][type:1][length:4][payload]
pub fn encode_frame(stream_id: u32, frame_type: u8, payload: &[u8]) -> Vec<u8> {
pub fn encode_frame(stream_id: u32, frame_type: u8, payload: &[u8]) -> Bytes {
let len = payload.len() as u32;
let mut buf = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len());
buf.extend_from_slice(&stream_id.to_be_bytes());
buf.push(frame_type);
buf.extend_from_slice(&len.to_be_bytes());
buf.extend_from_slice(payload);
buf
let mut buf = BytesMut::with_capacity(FRAME_HEADER_SIZE + payload.len());
buf.put_slice(&stream_id.to_be_bytes());
buf.put_u8(frame_type);
buf.put_slice(&len.to_be_bytes());
buf.put_slice(payload);
buf.freeze()
}
/// Write a frame header into `buf[0..FRAME_HEADER_SIZE]`.
@@ -142,7 +143,7 @@ impl<R: AsyncRead + Unpin> FrameReader<R> {
));
}
let mut payload = vec![0u8; length as usize];
let mut payload = BytesMut::zeroed(length as usize);
if length > 0 {
self.reader.read_exact(&mut payload).await?;
}
@@ -150,7 +151,7 @@ impl<R: AsyncRead + Unpin> FrameReader<R> {
Ok(Some(Frame {
stream_id,
frame_type,
payload,
payload: payload.freeze(),
}))
}
@@ -184,8 +185,8 @@ pub enum TunnelEvent {
/// Write state extracted into a sub-struct so the borrow checker can see
/// disjoint field access between `self.write` and `self.stream`.
struct WriteState {
ctrl_queue: VecDeque<Vec<u8>>, // PONG, WINDOW_UPDATE, CLOSE, OPEN — always first
data_queue: VecDeque<Vec<u8>>, // DATA, DATA_BACK — only when ctrl is empty
ctrl_queue: VecDeque<Bytes>, // PONG, WINDOW_UPDATE, CLOSE, OPEN — always first
data_queue: VecDeque<Bytes>, // DATA, DATA_BACK — only when ctrl is empty
offset: usize, // progress within current frame being written
flush_needed: bool,
}
@@ -234,12 +235,12 @@ impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
}
/// Queue a high-priority control frame (PONG, WINDOW_UPDATE, CLOSE, OPEN).
pub fn queue_ctrl(&mut self, frame: Vec<u8>) {
pub fn queue_ctrl(&mut self, frame: Bytes) {
self.write.ctrl_queue.push_back(frame);
}
/// Queue a lower-priority data frame (DATA, DATA_BACK).
pub fn queue_data(&mut self, frame: Vec<u8>) {
pub fn queue_data(&mut self, frame: Bytes) {
self.write.data_queue.push_back(frame);
}
@@ -285,7 +286,9 @@ impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
return None;
}
let payload = self.read_buf[base + FRAME_HEADER_SIZE..base + total_frame_size].to_vec();
let payload = Bytes::copy_from_slice(
&self.read_buf[base + FRAME_HEADER_SIZE..base + total_frame_size],
);
self.parse_pos += total_frame_size;
// Compact when parse_pos > half the data to reclaim memory
@@ -300,12 +303,12 @@ impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
/// Poll-based I/O step. Returns Ready on events, Pending when idle.
///
/// Order: write(ctrldata) flush read channels timers
/// Order: write(ctrl->data) -> flush -> read -> channels -> timers
pub fn poll_step(
&mut self,
cx: &mut Context<'_>,
ctrl_rx: &mut tokio::sync::mpsc::Receiver<Vec<u8>>,
data_rx: &mut tokio::sync::mpsc::Receiver<Vec<u8>>,
ctrl_rx: &mut tokio::sync::mpsc::Receiver<Bytes>,
data_rx: &mut tokio::sync::mpsc::Receiver<Bytes>,
liveness_deadline: &mut Pin<Box<tokio::time::Sleep>>,
cancel_token: &tokio_util::sync::CancellationToken,
) -> Poll<TunnelEvent> {
@@ -407,7 +410,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
// Ctrl frames must never be delayed — always drain fully.
// Data frames are gated: keep data in the bounded channel for proper
// backpressure when TLS writes are slow. Without this gate, the internal
// data_queue (unbounded VecDeque) grows to hundreds of MB under throttle OOM.
// data_queue (unbounded VecDeque) grows to hundreds of MB under throttle -> OOM.
let mut got_new = false;
loop {
match ctrl_rx.poll_recv(cx) {
@@ -469,14 +472,14 @@ mod tests {
let mut buf = vec![0u8; FRAME_HEADER_SIZE + payload.len()];
buf[FRAME_HEADER_SIZE..].copy_from_slice(payload);
encode_frame_header(&mut buf, 42, FRAME_DATA, payload.len());
assert_eq!(buf, encode_frame(42, FRAME_DATA, payload));
assert_eq!(buf, &encode_frame(42, FRAME_DATA, payload)[..]);
}
#[test]
fn test_encode_frame_header_empty_payload() {
let mut buf = vec![0u8; FRAME_HEADER_SIZE];
encode_frame_header(&mut buf, 99, FRAME_CLOSE, 0);
assert_eq!(buf, encode_frame(99, FRAME_CLOSE, &[]));
assert_eq!(buf, &encode_frame(99, FRAME_CLOSE, &[])[..]);
}
#[test]
@@ -644,7 +647,7 @@ mod tests {
let frame = reader.next_frame().await.unwrap().unwrap();
assert_eq!(frame.stream_id, i as u32);
assert_eq!(frame.frame_type, ft);
assert_eq!(frame.payload, format!("payload_{}", i).as_bytes());
assert_eq!(&frame.payload[..], format!("payload_{}", i).as_bytes());
}
assert!(reader.next_frame().await.unwrap().is_none());
@@ -653,7 +656,7 @@ mod tests {
#[tokio::test]
async fn test_frame_reader_zero_length_payload() {
let data = encode_frame(42, FRAME_CLOSE, &[]);
let cursor = std::io::Cursor::new(data);
let cursor = std::io::Cursor::new(data.to_vec());
let mut reader = FrameReader::new(cursor);
let frame = reader.next_frame().await.unwrap().unwrap();
@@ -681,7 +684,7 @@ mod tests {
#[test]
fn test_adaptive_window_zero_streams() {
// 0 streams treated as 1: 800MB/1 clamped to 16MB max
// 0 streams treated as 1: 800MB/1 -> clamped to 16MB max
assert_eq!(compute_window_for_stream_count(0), INITIAL_STREAM_WINDOW);
}
@@ -718,13 +721,13 @@ mod tests {
#[test]
fn test_adaptive_window_500_streams_clamped() {
// 800MB/500 = 1.6MB clamped up to 4MB floor
// 800MB/500 = 1.6MB -> clamped up to 4MB floor
assert_eq!(compute_window_for_stream_count(500), 4 * 1024 * 1024);
}
#[test]
fn test_adaptive_window_max_u32() {
// Extreme: u32::MAX streams tiny value clamped to 4MB
// Extreme: u32::MAX streams -> tiny value -> clamped to 4MB
assert_eq!(compute_window_for_stream_count(u32::MAX), 4 * 1024 * 1024);
}
@@ -740,7 +743,7 @@ mod tests {
#[test]
fn test_adaptive_window_total_budget_bounded() {
// active × per_stream_window should never exceed 800MB (+ clamp overhead for high N)
// active x per_stream_window should never exceed 800MB (+ clamp overhead for high N)
for n in [1, 10, 50, 100, 200] {
let w = compute_window_for_stream_count(n);
let total = w as u64 * n as u64;