//! Binary data record format for the Bitcask-style storage engine. //! //! # File Version Header (64 bytes, at offset 0 of every .rdb / .hint file) //! //! ```text //! ┌──────────────┬──────────┬──────────┬──────────┬──────────┬───────────────┐ //! │ magic │ version │ file_type│ flags │ created │ reserved │ //! │ 8 bytes │ u16 LE │ u8 │ u32 LE │ u64 LE │ 41 bytes │ //! │ "SMARTDB\0" │ │ │ │ epoch_ms │ (zeros) │ //! └──────────────┴──────────┴──────────┴──────────┴──────────┴───────────────┘ //! ``` //! //! # Data Record (appended after the header) //! //! ```text //! ┌──────────┬──────────┬──────────┬──────────┬──────────┬──────────────────┐ //! │ magic │ timestamp│ key_len │ val_len │ crc32 │ payload │ //! │ u16 LE │ u64 LE │ u32 LE │ u32 LE │ u32 LE │ [key][value] │ //! │ 0xDB01 │ epoch_ms │ │ 0=delete │ │ │ //! └──────────┴──────────┴──────────┴──────────┴──────────┴──────────────────┘ //! ``` use std::io::{self, Read}; use std::time::{SystemTime, UNIX_EPOCH}; use crate::error::{StorageError, StorageResult}; // --------------------------------------------------------------------------- // Constants // --------------------------------------------------------------------------- /// File-level magic: b"SMARTDB\0" pub const FILE_MAGIC: &[u8; 8] = b"SMARTDB\0"; /// Current storage format version. pub const FORMAT_VERSION: u16 = 1; /// File version header size. pub const FILE_HEADER_SIZE: usize = 64; /// Per-record magic. pub const RECORD_MAGIC: u16 = 0xDB01; /// Per-record header size (before payload). pub const RECORD_HEADER_SIZE: usize = 2 + 8 + 4 + 4 + 4; // 22 bytes // --------------------------------------------------------------------------- // File type tag stored in the version header // --------------------------------------------------------------------------- #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub enum FileType { Data = 1, Wal = 2, Hint = 3, } impl FileType { pub fn from_u8(v: u8) -> StorageResult { match v { 1 => Ok(FileType::Data), 2 => Ok(FileType::Wal), 3 => Ok(FileType::Hint), _ => Err(StorageError::CorruptRecord(format!( "unknown file type tag: {v}" ))), } } } // --------------------------------------------------------------------------- // File Version Header // --------------------------------------------------------------------------- #[derive(Debug, Clone)] pub struct FileHeader { pub version: u16, pub file_type: FileType, pub flags: u32, pub created_ms: u64, } impl FileHeader { /// Create a new header for the current format version. pub fn new(file_type: FileType) -> Self { Self { version: FORMAT_VERSION, file_type, flags: 0, created_ms: now_ms(), } } /// Encode the header to a 64-byte buffer. pub fn encode(&self) -> [u8; FILE_HEADER_SIZE] { let mut buf = [0u8; FILE_HEADER_SIZE]; buf[0..8].copy_from_slice(FILE_MAGIC); buf[8..10].copy_from_slice(&self.version.to_le_bytes()); buf[10] = self.file_type as u8; buf[11..15].copy_from_slice(&self.flags.to_le_bytes()); buf[15..23].copy_from_slice(&self.created_ms.to_le_bytes()); // bytes 23..64 are reserved (zeros) buf } /// Decode a 64-byte header. Validates magic and version. pub fn decode(buf: &[u8; FILE_HEADER_SIZE]) -> StorageResult { if &buf[0..8] != FILE_MAGIC { return Err(StorageError::CorruptRecord( "invalid file magic — not a SmartDB file".into(), )); } let version = u16::from_le_bytes([buf[8], buf[9]]); if version > FORMAT_VERSION { return Err(StorageError::CorruptRecord(format!( "file format version {version} is newer than supported version {FORMAT_VERSION} — please upgrade" ))); } if version == 0 { return Err(StorageError::CorruptRecord( "file format version 0 is invalid".into(), )); } let file_type = FileType::from_u8(buf[10])?; let flags = u32::from_le_bytes([buf[11], buf[12], buf[13], buf[14]]); let created_ms = u64::from_le_bytes([ buf[15], buf[16], buf[17], buf[18], buf[19], buf[20], buf[21], buf[22], ]); Ok(Self { version, file_type, flags, created_ms, }) } } // --------------------------------------------------------------------------- // Data Record // --------------------------------------------------------------------------- /// A single data record (live document or tombstone). #[derive(Debug, Clone)] pub struct DataRecord { pub timestamp: u64, pub key: Vec, /// BSON value bytes. Empty for tombstones. pub value: Vec, } impl DataRecord { /// Whether this record is a tombstone (delete marker). pub fn is_tombstone(&self) -> bool { self.value.is_empty() } /// Total size on disk (header + payload). pub fn disk_size(&self) -> usize { RECORD_HEADER_SIZE + self.key.len() + self.value.len() } /// Encode to bytes. CRC32 covers magic + timestamp + key_len + val_len + payload. pub fn encode(&self) -> Vec { let key_len = self.key.len() as u32; let val_len = self.value.len() as u32; let total = RECORD_HEADER_SIZE + self.key.len() + self.value.len(); let mut buf = Vec::with_capacity(total); // Write fields WITHOUT crc first to compute checksum. buf.extend_from_slice(&RECORD_MAGIC.to_le_bytes()); // 2 buf.extend_from_slice(&self.timestamp.to_le_bytes()); // 8 buf.extend_from_slice(&key_len.to_le_bytes()); // 4 buf.extend_from_slice(&val_len.to_le_bytes()); // 4 // placeholder for crc32 — we'll fill it after computing buf.extend_from_slice(&0u32.to_le_bytes()); // 4 buf.extend_from_slice(&self.key); // key_len buf.extend_from_slice(&self.value); // val_len // CRC covers everything except the crc32 field itself: // bytes [0..18] (magic+ts+key_len+val_len) + bytes [22..] (payload) let mut hasher = crc32fast::Hasher::new(); hasher.update(&buf[0..18]); hasher.update(&buf[22..]); let crc = hasher.finalize(); buf[18..22].copy_from_slice(&crc.to_le_bytes()); buf } /// Decode a record from a reader. Returns the record and its total disk size. /// On EOF at the very start (no bytes to read), returns Ok(None). pub fn decode_from(reader: &mut R) -> StorageResult> { // Read header let mut hdr = [0u8; RECORD_HEADER_SIZE]; match reader.read_exact(&mut hdr) { Ok(()) => {} Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), Err(e) => return Err(e.into()), } let magic = u16::from_le_bytes([hdr[0], hdr[1]]); if magic != RECORD_MAGIC { return Err(StorageError::CorruptRecord(format!( "invalid record magic: 0x{magic:04X}, expected 0x{RECORD_MAGIC:04X}" ))); } let timestamp = u64::from_le_bytes(hdr[2..10].try_into().unwrap()); let key_len = u32::from_le_bytes(hdr[10..14].try_into().unwrap()) as usize; let val_len = u32::from_le_bytes(hdr[14..18].try_into().unwrap()) as usize; let stored_crc = u32::from_le_bytes(hdr[18..22].try_into().unwrap()); // Read payload let payload_len = key_len + val_len; let mut payload = vec![0u8; payload_len]; reader.read_exact(&mut payload)?; // Verify CRC: covers header bytes [0..18] + payload let mut hasher = crc32fast::Hasher::new(); hasher.update(&hdr[0..18]); hasher.update(&payload); let computed_crc = hasher.finalize(); if computed_crc != stored_crc { return Err(StorageError::ChecksumMismatch { expected: stored_crc, actual: computed_crc, }); } let key = payload[..key_len].to_vec(); let value = payload[key_len..].to_vec(); let disk_size = RECORD_HEADER_SIZE + payload_len; Ok(Some(( DataRecord { timestamp, key, value, }, disk_size, ))) } } // --------------------------------------------------------------------------- // Record Scanner — iterate records from a byte slice or reader // --------------------------------------------------------------------------- /// Scans records sequentially from a reader, yielding (offset, record) pairs. /// Starts reading from the current reader position. The `base_offset` parameter /// indicates the byte offset in the file where reading begins (typically /// `FILE_HEADER_SIZE` for a data file). pub struct RecordScanner { reader: R, offset: u64, } impl RecordScanner { pub fn new(reader: R, base_offset: u64) -> Self { Self { reader, offset: base_offset, } } } impl Iterator for RecordScanner { /// (file_offset, record) or an error. Iteration stops on EOF or error. type Item = StorageResult<(u64, DataRecord)>; fn next(&mut self) -> Option { match DataRecord::decode_from(&mut self.reader) { Ok(Some((record, disk_size))) => { let offset = self.offset; self.offset += disk_size as u64; Some(Ok((offset, record))) } Ok(None) => None, // clean EOF Err(e) => Some(Err(e)), } } } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- /// Current time in milliseconds since UNIX epoch. pub fn now_ms() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis() as u64 } // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- #[cfg(test)] mod tests { use super::*; #[test] fn file_header_roundtrip() { let hdr = FileHeader::new(FileType::Data); let buf = hdr.encode(); assert_eq!(buf.len(), FILE_HEADER_SIZE); let decoded = FileHeader::decode(&buf).unwrap(); assert_eq!(decoded.version, FORMAT_VERSION); assert_eq!(decoded.file_type, FileType::Data); assert_eq!(decoded.flags, 0); assert_eq!(decoded.created_ms, hdr.created_ms); } #[test] fn file_header_rejects_bad_magic() { let mut buf = [0u8; FILE_HEADER_SIZE]; buf[0..8].copy_from_slice(b"BADMAGIC"); assert!(FileHeader::decode(&buf).is_err()); } #[test] fn file_header_rejects_future_version() { let mut hdr = FileHeader::new(FileType::Data); hdr.version = FORMAT_VERSION + 1; let buf = hdr.encode(); // Manually patch the version in the buffer let mut buf2 = buf; buf2[8..10].copy_from_slice(&(FORMAT_VERSION + 1).to_le_bytes()); assert!(FileHeader::decode(&buf2).is_err()); } #[test] fn record_roundtrip_live() { let rec = DataRecord { timestamp: 1700000000000, key: b"abc123".to_vec(), value: b"\x10\x00\x00\x00\x02hi\x00\x03\x00\x00\x00ok\x00\x00".to_vec(), }; let encoded = rec.encode(); assert_eq!(encoded.len(), rec.disk_size()); let mut cursor = std::io::Cursor::new(&encoded); let (decoded, size) = DataRecord::decode_from(&mut cursor).unwrap().unwrap(); assert_eq!(size, encoded.len()); assert_eq!(decoded.timestamp, rec.timestamp); assert_eq!(decoded.key, rec.key); assert_eq!(decoded.value, rec.value); assert!(!decoded.is_tombstone()); } #[test] fn record_roundtrip_tombstone() { let rec = DataRecord { timestamp: 1700000000000, key: b"def456".to_vec(), value: vec![], }; assert!(rec.is_tombstone()); let encoded = rec.encode(); let mut cursor = std::io::Cursor::new(&encoded); let (decoded, _) = DataRecord::decode_from(&mut cursor).unwrap().unwrap(); assert!(decoded.is_tombstone()); assert_eq!(decoded.key, b"def456"); } #[test] fn record_detects_corruption() { let rec = DataRecord { timestamp: 42, key: b"key".to_vec(), value: b"value".to_vec(), }; let mut encoded = rec.encode(); // Flip a bit in the payload let last = encoded.len() - 1; encoded[last] ^= 0xFF; let mut cursor = std::io::Cursor::new(&encoded); let result = DataRecord::decode_from(&mut cursor); assert!(matches!(result, Err(StorageError::ChecksumMismatch { .. }))); } #[test] fn record_detects_bad_magic() { let rec = DataRecord { timestamp: 42, key: b"key".to_vec(), value: b"value".to_vec(), }; let mut encoded = rec.encode(); encoded[0] = 0xFF; encoded[1] = 0xFF; let mut cursor = std::io::Cursor::new(&encoded); let result = DataRecord::decode_from(&mut cursor); assert!(matches!(result, Err(StorageError::CorruptRecord(_)))); } #[test] fn eof_returns_none() { let empty: &[u8] = &[]; let mut cursor = std::io::Cursor::new(empty); let result = DataRecord::decode_from(&mut cursor).unwrap(); assert!(result.is_none()); } #[test] fn scanner_iterates_multiple_records() { let records = vec![ DataRecord { timestamp: 1, key: b"a".to_vec(), value: b"v1".to_vec(), }, DataRecord { timestamp: 2, key: b"b".to_vec(), value: b"v2".to_vec(), }, DataRecord { timestamp: 3, key: b"c".to_vec(), value: vec![], }, ]; let mut buf = Vec::new(); for r in &records { buf.extend_from_slice(&r.encode()); } let scanner = RecordScanner::new(std::io::Cursor::new(&buf), 0); let results: Vec<_> = scanner.collect::, _>>().unwrap(); assert_eq!(results.len(), 3); assert_eq!(results[0].1.key, b"a"); assert_eq!(results[1].1.key, b"b"); assert!(results[2].1.is_tombstone()); // Verify offsets are correct assert_eq!(results[0].0, 0); assert_eq!(results[1].0, records[0].disk_size() as u64); assert_eq!( results[2].0, (records[0].disk_size() + records[1].disk_size()) as u64 ); } }