BREAKING CHANGE(remoteingress): migrate core to Rust, add RemoteIngressHub/RemoteIngressEdge JS bridge, and bump package to v2.0.0
This commit is contained in:
7
rust/crates/remoteingress-protocol/Cargo.toml
Normal file
7
rust/crates/remoteingress-protocol/Cargo.toml
Normal file
@@ -0,0 +1,7 @@
|
||||
[package]
|
||||
name = "remoteingress-protocol"
|
||||
version = "2.0.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1", features = ["io-util"] }
|
||||
172
rust/crates/remoteingress-protocol/src/lib.rs
Normal file
172
rust/crates/remoteingress-protocol/src/lib.rs
Normal file
@@ -0,0 +1,172 @@
|
||||
use tokio::io::{AsyncRead, AsyncReadExt};
|
||||
|
||||
// Frame type constants
|
||||
pub const FRAME_OPEN: u8 = 0x01;
|
||||
pub const FRAME_DATA: u8 = 0x02;
|
||||
pub const FRAME_CLOSE: u8 = 0x03;
|
||||
pub const FRAME_DATA_BACK: u8 = 0x04;
|
||||
pub const FRAME_CLOSE_BACK: u8 = 0x05;
|
||||
|
||||
// Frame header size: 4 (stream_id) + 1 (type) + 4 (length) = 9 bytes
|
||||
pub const FRAME_HEADER_SIZE: usize = 9;
|
||||
|
||||
// Maximum payload size (16 MB)
|
||||
pub const MAX_PAYLOAD_SIZE: u32 = 16 * 1024 * 1024;
|
||||
|
||||
/// A single multiplexed frame.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Frame {
|
||||
pub stream_id: u32,
|
||||
pub frame_type: u8,
|
||||
pub payload: Vec<u8>,
|
||||
}
|
||||
|
||||
/// Encode a frame into bytes: [stream_id:4][type:1][length:4][payload]
|
||||
pub fn encode_frame(stream_id: u32, frame_type: u8, payload: &[u8]) -> Vec<u8> {
|
||||
let len = payload.len() as u32;
|
||||
let mut buf = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len());
|
||||
buf.extend_from_slice(&stream_id.to_be_bytes());
|
||||
buf.push(frame_type);
|
||||
buf.extend_from_slice(&len.to_be_bytes());
|
||||
buf.extend_from_slice(payload);
|
||||
buf
|
||||
}
|
||||
|
||||
/// Build a PROXY protocol v1 header line.
|
||||
/// Format: `PROXY TCP4 <client_ip> <edge_ip> <client_port> <dest_port>\r\n`
|
||||
pub fn build_proxy_v1_header(
|
||||
client_ip: &str,
|
||||
edge_ip: &str,
|
||||
client_port: u16,
|
||||
dest_port: u16,
|
||||
) -> String {
|
||||
format!(
|
||||
"PROXY TCP4 {} {} {} {}\r\n",
|
||||
client_ip, edge_ip, client_port, dest_port
|
||||
)
|
||||
}
|
||||
|
||||
/// Stateful async frame reader that yields `Frame` values from an `AsyncRead`.
|
||||
pub struct FrameReader<R> {
|
||||
reader: R,
|
||||
header_buf: [u8; FRAME_HEADER_SIZE],
|
||||
}
|
||||
|
||||
impl<R: AsyncRead + Unpin> FrameReader<R> {
|
||||
pub fn new(reader: R) -> Self {
|
||||
Self {
|
||||
reader,
|
||||
header_buf: [0u8; FRAME_HEADER_SIZE],
|
||||
}
|
||||
}
|
||||
|
||||
/// Read the next frame. Returns `None` on EOF, `Err` on protocol violation.
|
||||
pub async fn next_frame(&mut self) -> Result<Option<Frame>, std::io::Error> {
|
||||
// Read header
|
||||
match self.reader.read_exact(&mut self.header_buf).await {
|
||||
Ok(_) => {}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
|
||||
let stream_id = u32::from_be_bytes([
|
||||
self.header_buf[0],
|
||||
self.header_buf[1],
|
||||
self.header_buf[2],
|
||||
self.header_buf[3],
|
||||
]);
|
||||
let frame_type = self.header_buf[4];
|
||||
let length = u32::from_be_bytes([
|
||||
self.header_buf[5],
|
||||
self.header_buf[6],
|
||||
self.header_buf[7],
|
||||
self.header_buf[8],
|
||||
]);
|
||||
|
||||
if length > MAX_PAYLOAD_SIZE {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("frame payload too large: {} bytes", length),
|
||||
));
|
||||
}
|
||||
|
||||
let mut payload = vec![0u8; length as usize];
|
||||
if length > 0 {
|
||||
self.reader.read_exact(&mut payload).await?;
|
||||
}
|
||||
|
||||
Ok(Some(Frame {
|
||||
stream_id,
|
||||
frame_type,
|
||||
payload,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Consume the reader and return the inner stream.
|
||||
pub fn into_inner(self) -> R {
|
||||
self.reader
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_encode_frame() {
|
||||
let data = b"hello";
|
||||
let encoded = encode_frame(42, FRAME_DATA, data);
|
||||
assert_eq!(encoded.len(), FRAME_HEADER_SIZE + data.len());
|
||||
// stream_id = 42 in BE
|
||||
assert_eq!(&encoded[0..4], &42u32.to_be_bytes());
|
||||
// frame type
|
||||
assert_eq!(encoded[4], FRAME_DATA);
|
||||
// length
|
||||
assert_eq!(&encoded[5..9], &5u32.to_be_bytes());
|
||||
// payload
|
||||
assert_eq!(&encoded[9..], b"hello");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_encode_empty_frame() {
|
||||
let encoded = encode_frame(1, FRAME_CLOSE, &[]);
|
||||
assert_eq!(encoded.len(), FRAME_HEADER_SIZE);
|
||||
assert_eq!(&encoded[5..9], &0u32.to_be_bytes());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_proxy_v1_header() {
|
||||
let header = build_proxy_v1_header("1.2.3.4", "5.6.7.8", 12345, 443);
|
||||
assert_eq!(header, "PROXY TCP4 1.2.3.4 5.6.7.8 12345 443\r\n");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_frame_reader() {
|
||||
let frame1 = encode_frame(1, FRAME_OPEN, b"PROXY TCP4 1.2.3.4 5.6.7.8 1234 443\r\n");
|
||||
let frame2 = encode_frame(1, FRAME_DATA, b"GET / HTTP/1.1\r\n");
|
||||
let frame3 = encode_frame(1, FRAME_CLOSE, &[]);
|
||||
|
||||
let mut data = Vec::new();
|
||||
data.extend_from_slice(&frame1);
|
||||
data.extend_from_slice(&frame2);
|
||||
data.extend_from_slice(&frame3);
|
||||
|
||||
let cursor = std::io::Cursor::new(data);
|
||||
let mut reader = FrameReader::new(cursor);
|
||||
|
||||
let f1 = reader.next_frame().await.unwrap().unwrap();
|
||||
assert_eq!(f1.stream_id, 1);
|
||||
assert_eq!(f1.frame_type, FRAME_OPEN);
|
||||
assert!(f1.payload.starts_with(b"PROXY"));
|
||||
|
||||
let f2 = reader.next_frame().await.unwrap().unwrap();
|
||||
assert_eq!(f2.frame_type, FRAME_DATA);
|
||||
|
||||
let f3 = reader.next_frame().await.unwrap().unwrap();
|
||||
assert_eq!(f3.frame_type, FRAME_CLOSE);
|
||||
assert!(f3.payload.is_empty());
|
||||
|
||||
// EOF
|
||||
assert!(reader.next_frame().await.unwrap().is_none());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user