fix(protocol,edge): optimize tunnel frame handling and zero-copy uploads in edge I/O
This commit is contained in:
@@ -72,6 +72,16 @@ pub fn encode_frame(stream_id: u32, frame_type: u8, payload: &[u8]) -> Vec<u8> {
|
||||
buf
|
||||
}
|
||||
|
||||
/// Write a frame header into `buf[0..FRAME_HEADER_SIZE]`.
|
||||
/// The caller must ensure payload is already at `buf[FRAME_HEADER_SIZE..FRAME_HEADER_SIZE + payload_len]`.
|
||||
/// This enables zero-copy encoding: read directly into `buf[FRAME_HEADER_SIZE..]`, then
|
||||
/// prepend the header without copying the payload.
|
||||
pub fn encode_frame_header(buf: &mut [u8], stream_id: u32, frame_type: u8, payload_len: usize) {
|
||||
buf[0..4].copy_from_slice(&stream_id.to_be_bytes());
|
||||
buf[4] = frame_type;
|
||||
buf[5..9].copy_from_slice(&(payload_len as u32).to_be_bytes());
|
||||
}
|
||||
|
||||
/// Build a PROXY protocol v1 header line.
|
||||
/// Format: `PROXY TCP4 <client_ip> <edge_ip> <client_port> <dest_port>\r\n`
|
||||
pub fn build_proxy_v1_header(
|
||||
@@ -173,6 +183,21 @@ pub enum TunnelEvent {
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
/// Write state extracted into a sub-struct so the borrow checker can see
|
||||
/// disjoint field access between `self.write` and `self.stream`.
|
||||
struct WriteState {
|
||||
ctrl_queue: VecDeque<Vec<u8>>, // PONG, WINDOW_UPDATE, CLOSE, OPEN — always first
|
||||
data_queue: VecDeque<Vec<u8>>, // DATA, DATA_BACK — only when ctrl is empty
|
||||
offset: usize, // progress within current frame being written
|
||||
flush_needed: bool,
|
||||
}
|
||||
|
||||
impl WriteState {
|
||||
fn has_work(&self) -> bool {
|
||||
!self.ctrl_queue.is_empty() || !self.data_queue.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/// Single-owner I/O engine for the tunnel TLS connection.
|
||||
///
|
||||
/// Owns the TLS stream directly — no `tokio::io::split()`, no mutex.
|
||||
@@ -184,11 +209,9 @@ pub struct TunnelIo<S> {
|
||||
// Read state: accumulate bytes, parse frames incrementally
|
||||
read_buf: Vec<u8>,
|
||||
read_pos: usize,
|
||||
// Write state: dual priority queues
|
||||
ctrl_queue: VecDeque<Vec<u8>>, // PONG, WINDOW_UPDATE, CLOSE, OPEN — always first
|
||||
data_queue: VecDeque<Vec<u8>>, // DATA, DATA_BACK — only when ctrl is empty
|
||||
write_offset: usize, // progress within current frame being written
|
||||
flush_needed: bool,
|
||||
parse_pos: usize,
|
||||
// Write state: extracted sub-struct for safe disjoint borrows
|
||||
write: WriteState,
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
|
||||
@@ -202,42 +225,52 @@ impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
|
||||
stream,
|
||||
read_buf,
|
||||
read_pos,
|
||||
ctrl_queue: VecDeque::new(),
|
||||
data_queue: VecDeque::new(),
|
||||
write_offset: 0,
|
||||
flush_needed: false,
|
||||
parse_pos: 0,
|
||||
write: WriteState {
|
||||
ctrl_queue: VecDeque::new(),
|
||||
data_queue: VecDeque::new(),
|
||||
offset: 0,
|
||||
flush_needed: false,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Queue a high-priority control frame (PONG, WINDOW_UPDATE, CLOSE, OPEN).
|
||||
pub fn queue_ctrl(&mut self, frame: Vec<u8>) {
|
||||
self.ctrl_queue.push_back(frame);
|
||||
self.write.ctrl_queue.push_back(frame);
|
||||
}
|
||||
|
||||
/// Queue a lower-priority data frame (DATA, DATA_BACK).
|
||||
pub fn queue_data(&mut self, frame: Vec<u8>) {
|
||||
self.data_queue.push_back(frame);
|
||||
self.write.data_queue.push_back(frame);
|
||||
}
|
||||
|
||||
/// Try to parse a complete frame from the read buffer.
|
||||
/// Uses a parse_pos cursor to avoid drain() on every frame.
|
||||
pub fn try_parse_frame(&mut self) -> Option<Result<Frame, std::io::Error>> {
|
||||
if self.read_pos < FRAME_HEADER_SIZE {
|
||||
let available = self.read_pos - self.parse_pos;
|
||||
if available < FRAME_HEADER_SIZE {
|
||||
return None;
|
||||
}
|
||||
|
||||
let base = self.parse_pos;
|
||||
let stream_id = u32::from_be_bytes([
|
||||
self.read_buf[0], self.read_buf[1], self.read_buf[2], self.read_buf[3],
|
||||
self.read_buf[base], self.read_buf[base + 1],
|
||||
self.read_buf[base + 2], self.read_buf[base + 3],
|
||||
]);
|
||||
let frame_type = self.read_buf[4];
|
||||
let frame_type = self.read_buf[base + 4];
|
||||
let length = u32::from_be_bytes([
|
||||
self.read_buf[5], self.read_buf[6], self.read_buf[7], self.read_buf[8],
|
||||
self.read_buf[base + 5], self.read_buf[base + 6],
|
||||
self.read_buf[base + 7], self.read_buf[base + 8],
|
||||
]);
|
||||
|
||||
if length > MAX_PAYLOAD_SIZE {
|
||||
let header = [
|
||||
self.read_buf[0], self.read_buf[1], self.read_buf[2], self.read_buf[3],
|
||||
self.read_buf[4], self.read_buf[5], self.read_buf[6], self.read_buf[7],
|
||||
self.read_buf[8],
|
||||
self.read_buf[base], self.read_buf[base + 1],
|
||||
self.read_buf[base + 2], self.read_buf[base + 3],
|
||||
self.read_buf[base + 4], self.read_buf[base + 5],
|
||||
self.read_buf[base + 6], self.read_buf[base + 7],
|
||||
self.read_buf[base + 8],
|
||||
];
|
||||
log::error!(
|
||||
"CORRUPT FRAME HEADER: raw={:02x?} stream_id={} type=0x{:02x} length={}",
|
||||
@@ -250,21 +283,23 @@ impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
|
||||
}
|
||||
|
||||
let total_frame_size = FRAME_HEADER_SIZE + length as usize;
|
||||
if self.read_pos < total_frame_size {
|
||||
if available < total_frame_size {
|
||||
return None;
|
||||
}
|
||||
|
||||
let payload = self.read_buf[FRAME_HEADER_SIZE..total_frame_size].to_vec();
|
||||
self.read_buf.drain(..total_frame_size);
|
||||
self.read_pos -= total_frame_size;
|
||||
let payload = self.read_buf[base + FRAME_HEADER_SIZE..base + total_frame_size].to_vec();
|
||||
self.parse_pos += total_frame_size;
|
||||
|
||||
// Compact when parse_pos > half the data to reclaim memory
|
||||
if self.parse_pos > self.read_pos / 2 && self.parse_pos > 0 {
|
||||
self.read_buf.drain(..self.parse_pos);
|
||||
self.read_pos -= self.parse_pos;
|
||||
self.parse_pos = 0;
|
||||
}
|
||||
|
||||
Some(Ok(Frame { stream_id, frame_type, payload }))
|
||||
}
|
||||
|
||||
fn has_write_work(&self) -> bool {
|
||||
!self.ctrl_queue.is_empty() || !self.data_queue.is_empty()
|
||||
}
|
||||
|
||||
/// Poll-based I/O step. Returns Ready on events, Pending when idle.
|
||||
///
|
||||
/// Order: write(ctrl→data) → flush → read → channels → timers
|
||||
@@ -279,20 +314,16 @@ impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
|
||||
// 1. WRITE: drain ctrl queue first, then data queue.
|
||||
// TLS poll_write writes plaintext to session buffer (always Ready).
|
||||
// Batch up to 16 frames per poll cycle.
|
||||
// Safe: `self.write` and `self.stream` are disjoint fields.
|
||||
let mut writes = 0;
|
||||
while self.has_write_work() && writes < 16 {
|
||||
// Determine which queue to write from and the frame data.
|
||||
// We access the queues via raw pointers to avoid borrow conflicts with self.stream.
|
||||
let from_ctrl = !self.ctrl_queue.is_empty();
|
||||
let frame_ptr: *const Vec<u8> = if from_ctrl {
|
||||
self.ctrl_queue.front().unwrap()
|
||||
while self.write.has_work() && writes < 16 {
|
||||
let from_ctrl = !self.write.ctrl_queue.is_empty();
|
||||
let frame = if from_ctrl {
|
||||
self.write.ctrl_queue.front().unwrap()
|
||||
} else {
|
||||
self.data_queue.front().unwrap()
|
||||
self.write.data_queue.front().unwrap()
|
||||
};
|
||||
// SAFETY: the frame is not modified while we hold the pointer — poll_write
|
||||
// only writes to self.stream, and advance_write only runs after poll_write returns.
|
||||
let frame = unsafe { &*frame_ptr };
|
||||
let remaining = &frame[self.write_offset..];
|
||||
let remaining = &frame[self.write.offset..];
|
||||
|
||||
match Pin::new(&mut self.stream).poll_write(cx, remaining) {
|
||||
Poll::Ready(Ok(0)) => {
|
||||
@@ -301,12 +332,12 @@ impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
|
||||
));
|
||||
}
|
||||
Poll::Ready(Ok(n)) => {
|
||||
self.write_offset += n;
|
||||
self.flush_needed = true;
|
||||
if self.write_offset >= frame.len() {
|
||||
if from_ctrl { self.ctrl_queue.pop_front(); }
|
||||
else { self.data_queue.pop_front(); }
|
||||
self.write_offset = 0;
|
||||
self.write.offset += n;
|
||||
self.write.flush_needed = true;
|
||||
if self.write.offset >= frame.len() {
|
||||
if from_ctrl { self.write.ctrl_queue.pop_front(); }
|
||||
else { self.write.data_queue.pop_front(); }
|
||||
self.write.offset = 0;
|
||||
writes += 1;
|
||||
}
|
||||
}
|
||||
@@ -316,9 +347,9 @@ impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
|
||||
}
|
||||
|
||||
// 2. FLUSH: push encrypted data from TLS session to TCP.
|
||||
if self.flush_needed {
|
||||
if self.write.flush_needed {
|
||||
match Pin::new(&mut self.stream).poll_flush(cx) {
|
||||
Poll::Ready(Ok(())) => self.flush_needed = false,
|
||||
Poll::Ready(Ok(())) => self.write.flush_needed = false,
|
||||
Poll::Ready(Err(e)) => return Poll::Ready(TunnelEvent::WriteError(e)),
|
||||
Poll::Pending => {} // TCP waker will notify us
|
||||
}
|
||||
@@ -329,6 +360,12 @@ impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
|
||||
// the waker without re-registering it, causing the task to sleep until a
|
||||
// timer or channel wakes it (potentially 15+ seconds of lost reads).
|
||||
loop {
|
||||
// Compact if needed to make room for reads
|
||||
if self.parse_pos > 0 && self.read_buf.len() - self.read_pos < 32768 {
|
||||
self.read_buf.drain(..self.parse_pos);
|
||||
self.read_pos -= self.parse_pos;
|
||||
self.parse_pos = 0;
|
||||
}
|
||||
if self.read_buf.len() < self.read_pos + 32768 {
|
||||
self.read_buf.resize(self.read_pos + 32768, 0);
|
||||
}
|
||||
@@ -358,7 +395,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
|
||||
let mut got_new = false;
|
||||
loop {
|
||||
match ctrl_rx.poll_recv(cx) {
|
||||
Poll::Ready(Some(frame)) => { self.ctrl_queue.push_back(frame); got_new = true; }
|
||||
Poll::Ready(Some(frame)) => { self.write.ctrl_queue.push_back(frame); got_new = true; }
|
||||
Poll::Ready(None) => {
|
||||
return Poll::Ready(TunnelEvent::WriteError(
|
||||
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "ctrl channel closed"),
|
||||
@@ -369,7 +406,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
|
||||
}
|
||||
loop {
|
||||
match data_rx.poll_recv(cx) {
|
||||
Poll::Ready(Some(frame)) => { self.data_queue.push_back(frame); got_new = true; }
|
||||
Poll::Ready(Some(frame)) => { self.write.data_queue.push_back(frame); got_new = true; }
|
||||
Poll::Ready(None) => {
|
||||
return Poll::Ready(TunnelEvent::WriteError(
|
||||
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "data channel closed"),
|
||||
@@ -390,7 +427,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
|
||||
// 6. SELF-WAKE: only when we have frames AND flush is done.
|
||||
// If flush is pending, the TCP write-readiness waker will notify us.
|
||||
// If we got new channel frames, wake to write them.
|
||||
if got_new || (!self.flush_needed && self.has_write_work()) {
|
||||
if got_new || (!self.write.flush_needed && self.write.has_work()) {
|
||||
cx.waker().wake_by_ref();
|
||||
}
|
||||
|
||||
@@ -406,6 +443,22 @@ impl<S: AsyncRead + AsyncWrite + Unpin> TunnelIo<S> {
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_encode_frame_header() {
|
||||
let payload = b"hello";
|
||||
let mut buf = vec![0u8; FRAME_HEADER_SIZE + payload.len()];
|
||||
buf[FRAME_HEADER_SIZE..].copy_from_slice(payload);
|
||||
encode_frame_header(&mut buf, 42, FRAME_DATA, payload.len());
|
||||
assert_eq!(buf, encode_frame(42, FRAME_DATA, payload));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_encode_frame_header_empty_payload() {
|
||||
let mut buf = vec![0u8; FRAME_HEADER_SIZE];
|
||||
encode_frame_header(&mut buf, 99, FRAME_CLOSE, 0);
|
||||
assert_eq!(buf, encode_frame(99, FRAME_CLOSE, &[]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_encode_frame() {
|
||||
let data = b"hello";
|
||||
|
||||
Reference in New Issue
Block a user