feat(storage): add Bitcask storage migration, binary WAL, and data compaction support

This commit is contained in:
2026-04-04 19:49:47 +00:00
parent 9e7ce25b45
commit d8a8259c73
22 changed files with 2807 additions and 412 deletions

View File

@@ -0,0 +1,452 @@
//! Binary data record format for the Bitcask-style storage engine.
//!
//! # File Version Header (64 bytes, at offset 0 of every .rdb / .hint file)
//!
//! ```text
//! ┌──────────────┬──────────┬──────────┬──────────┬──────────┬───────────────┐
//! │ magic │ version │ file_type│ flags │ created │ reserved │
//! │ 8 bytes │ u16 LE │ u8 │ u32 LE │ u64 LE │ 41 bytes │
//! │ "SMARTDB\0" │ │ │ │ epoch_ms │ (zeros) │
//! └──────────────┴──────────┴──────────┴──────────┴──────────┴───────────────┘
//! ```
//!
//! # Data Record (appended after the header)
//!
//! ```text
//! ┌──────────┬──────────┬──────────┬──────────┬──────────┬──────────────────┐
//! │ magic │ timestamp│ key_len │ val_len │ crc32 │ payload │
//! │ u16 LE │ u64 LE │ u32 LE │ u32 LE │ u32 LE │ [key][value] │
//! │ 0xDB01 │ epoch_ms │ │ 0=delete │ │ │
//! └──────────┴──────────┴──────────┴──────────┴──────────┴──────────────────┘
//! ```
use std::io::{self, Read};
use std::time::{SystemTime, UNIX_EPOCH};
use crate::error::{StorageError, StorageResult};
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
/// File-level magic: b"SMARTDB\0"
pub const FILE_MAGIC: &[u8; 8] = b"SMARTDB\0";
/// Current storage format version.
pub const FORMAT_VERSION: u16 = 1;
/// File version header size.
pub const FILE_HEADER_SIZE: usize = 64;
/// Per-record magic.
pub const RECORD_MAGIC: u16 = 0xDB01;
/// Per-record header size (before payload).
pub const RECORD_HEADER_SIZE: usize = 2 + 8 + 4 + 4 + 4; // 22 bytes
// ---------------------------------------------------------------------------
// File type tag stored in the version header
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum FileType {
Data = 1,
Wal = 2,
Hint = 3,
}
impl FileType {
pub fn from_u8(v: u8) -> StorageResult<Self> {
match v {
1 => Ok(FileType::Data),
2 => Ok(FileType::Wal),
3 => Ok(FileType::Hint),
_ => Err(StorageError::CorruptRecord(format!(
"unknown file type tag: {v}"
))),
}
}
}
// ---------------------------------------------------------------------------
// File Version Header
// ---------------------------------------------------------------------------
#[derive(Debug, Clone)]
pub struct FileHeader {
pub version: u16,
pub file_type: FileType,
pub flags: u32,
pub created_ms: u64,
}
impl FileHeader {
/// Create a new header for the current format version.
pub fn new(file_type: FileType) -> Self {
Self {
version: FORMAT_VERSION,
file_type,
flags: 0,
created_ms: now_ms(),
}
}
/// Encode the header to a 64-byte buffer.
pub fn encode(&self) -> [u8; FILE_HEADER_SIZE] {
let mut buf = [0u8; FILE_HEADER_SIZE];
buf[0..8].copy_from_slice(FILE_MAGIC);
buf[8..10].copy_from_slice(&self.version.to_le_bytes());
buf[10] = self.file_type as u8;
buf[11..15].copy_from_slice(&self.flags.to_le_bytes());
buf[15..23].copy_from_slice(&self.created_ms.to_le_bytes());
// bytes 23..64 are reserved (zeros)
buf
}
/// Decode a 64-byte header. Validates magic and version.
pub fn decode(buf: &[u8; FILE_HEADER_SIZE]) -> StorageResult<Self> {
if &buf[0..8] != FILE_MAGIC {
return Err(StorageError::CorruptRecord(
"invalid file magic — not a SmartDB file".into(),
));
}
let version = u16::from_le_bytes([buf[8], buf[9]]);
if version > FORMAT_VERSION {
return Err(StorageError::CorruptRecord(format!(
"file format version {version} is newer than supported version {FORMAT_VERSION} — please upgrade"
)));
}
if version == 0 {
return Err(StorageError::CorruptRecord(
"file format version 0 is invalid".into(),
));
}
let file_type = FileType::from_u8(buf[10])?;
let flags = u32::from_le_bytes([buf[11], buf[12], buf[13], buf[14]]);
let created_ms = u64::from_le_bytes([
buf[15], buf[16], buf[17], buf[18], buf[19], buf[20], buf[21], buf[22],
]);
Ok(Self {
version,
file_type,
flags,
created_ms,
})
}
}
// ---------------------------------------------------------------------------
// Data Record
// ---------------------------------------------------------------------------
/// A single data record (live document or tombstone).
#[derive(Debug, Clone)]
pub struct DataRecord {
pub timestamp: u64,
pub key: Vec<u8>,
/// BSON value bytes. Empty for tombstones.
pub value: Vec<u8>,
}
impl DataRecord {
/// Whether this record is a tombstone (delete marker).
pub fn is_tombstone(&self) -> bool {
self.value.is_empty()
}
/// Total size on disk (header + payload).
pub fn disk_size(&self) -> usize {
RECORD_HEADER_SIZE + self.key.len() + self.value.len()
}
/// Encode to bytes. CRC32 covers magic + timestamp + key_len + val_len + payload.
pub fn encode(&self) -> Vec<u8> {
let key_len = self.key.len() as u32;
let val_len = self.value.len() as u32;
let total = RECORD_HEADER_SIZE + self.key.len() + self.value.len();
let mut buf = Vec::with_capacity(total);
// Write fields WITHOUT crc first to compute checksum.
buf.extend_from_slice(&RECORD_MAGIC.to_le_bytes()); // 2
buf.extend_from_slice(&self.timestamp.to_le_bytes()); // 8
buf.extend_from_slice(&key_len.to_le_bytes()); // 4
buf.extend_from_slice(&val_len.to_le_bytes()); // 4
// placeholder for crc32 — we'll fill it after computing
buf.extend_from_slice(&0u32.to_le_bytes()); // 4
buf.extend_from_slice(&self.key); // key_len
buf.extend_from_slice(&self.value); // val_len
// CRC covers everything except the crc32 field itself:
// bytes [0..18] (magic+ts+key_len+val_len) + bytes [22..] (payload)
let mut hasher = crc32fast::Hasher::new();
hasher.update(&buf[0..18]);
hasher.update(&buf[22..]);
let crc = hasher.finalize();
buf[18..22].copy_from_slice(&crc.to_le_bytes());
buf
}
/// Decode a record from a reader. Returns the record and its total disk size.
/// On EOF at the very start (no bytes to read), returns Ok(None).
pub fn decode_from<R: Read>(reader: &mut R) -> StorageResult<Option<(Self, usize)>> {
// Read header
let mut hdr = [0u8; RECORD_HEADER_SIZE];
match reader.read_exact(&mut hdr) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(e.into()),
}
let magic = u16::from_le_bytes([hdr[0], hdr[1]]);
if magic != RECORD_MAGIC {
return Err(StorageError::CorruptRecord(format!(
"invalid record magic: 0x{magic:04X}, expected 0x{RECORD_MAGIC:04X}"
)));
}
let timestamp = u64::from_le_bytes(hdr[2..10].try_into().unwrap());
let key_len = u32::from_le_bytes(hdr[10..14].try_into().unwrap()) as usize;
let val_len = u32::from_le_bytes(hdr[14..18].try_into().unwrap()) as usize;
let stored_crc = u32::from_le_bytes(hdr[18..22].try_into().unwrap());
// Read payload
let payload_len = key_len + val_len;
let mut payload = vec![0u8; payload_len];
reader.read_exact(&mut payload)?;
// Verify CRC: covers header bytes [0..18] + payload
let mut hasher = crc32fast::Hasher::new();
hasher.update(&hdr[0..18]);
hasher.update(&payload);
let computed_crc = hasher.finalize();
if computed_crc != stored_crc {
return Err(StorageError::ChecksumMismatch {
expected: stored_crc,
actual: computed_crc,
});
}
let key = payload[..key_len].to_vec();
let value = payload[key_len..].to_vec();
let disk_size = RECORD_HEADER_SIZE + payload_len;
Ok(Some((
DataRecord {
timestamp,
key,
value,
},
disk_size,
)))
}
}
// ---------------------------------------------------------------------------
// Record Scanner — iterate records from a byte slice or reader
// ---------------------------------------------------------------------------
/// Scans records sequentially from a reader, yielding (offset, record) pairs.
/// Starts reading from the current reader position. The `base_offset` parameter
/// indicates the byte offset in the file where reading begins (typically
/// `FILE_HEADER_SIZE` for a data file).
pub struct RecordScanner<R> {
reader: R,
offset: u64,
}
impl<R: Read> RecordScanner<R> {
pub fn new(reader: R, base_offset: u64) -> Self {
Self {
reader,
offset: base_offset,
}
}
}
impl<R: Read> Iterator for RecordScanner<R> {
/// (file_offset, record) or an error. Iteration stops on EOF or error.
type Item = StorageResult<(u64, DataRecord)>;
fn next(&mut self) -> Option<Self::Item> {
match DataRecord::decode_from(&mut self.reader) {
Ok(Some((record, disk_size))) => {
let offset = self.offset;
self.offset += disk_size as u64;
Some(Ok((offset, record)))
}
Ok(None) => None, // clean EOF
Err(e) => Some(Err(e)),
}
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/// Current time in milliseconds since UNIX epoch.
pub fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn file_header_roundtrip() {
let hdr = FileHeader::new(FileType::Data);
let buf = hdr.encode();
assert_eq!(buf.len(), FILE_HEADER_SIZE);
let decoded = FileHeader::decode(&buf).unwrap();
assert_eq!(decoded.version, FORMAT_VERSION);
assert_eq!(decoded.file_type, FileType::Data);
assert_eq!(decoded.flags, 0);
assert_eq!(decoded.created_ms, hdr.created_ms);
}
#[test]
fn file_header_rejects_bad_magic() {
let mut buf = [0u8; FILE_HEADER_SIZE];
buf[0..8].copy_from_slice(b"BADMAGIC");
assert!(FileHeader::decode(&buf).is_err());
}
#[test]
fn file_header_rejects_future_version() {
let mut hdr = FileHeader::new(FileType::Data);
hdr.version = FORMAT_VERSION + 1;
let buf = hdr.encode();
// Manually patch the version in the buffer
let mut buf2 = buf;
buf2[8..10].copy_from_slice(&(FORMAT_VERSION + 1).to_le_bytes());
assert!(FileHeader::decode(&buf2).is_err());
}
#[test]
fn record_roundtrip_live() {
let rec = DataRecord {
timestamp: 1700000000000,
key: b"abc123".to_vec(),
value: b"\x10\x00\x00\x00\x02hi\x00\x03\x00\x00\x00ok\x00\x00".to_vec(),
};
let encoded = rec.encode();
assert_eq!(encoded.len(), rec.disk_size());
let mut cursor = std::io::Cursor::new(&encoded);
let (decoded, size) = DataRecord::decode_from(&mut cursor).unwrap().unwrap();
assert_eq!(size, encoded.len());
assert_eq!(decoded.timestamp, rec.timestamp);
assert_eq!(decoded.key, rec.key);
assert_eq!(decoded.value, rec.value);
assert!(!decoded.is_tombstone());
}
#[test]
fn record_roundtrip_tombstone() {
let rec = DataRecord {
timestamp: 1700000000000,
key: b"def456".to_vec(),
value: vec![],
};
assert!(rec.is_tombstone());
let encoded = rec.encode();
let mut cursor = std::io::Cursor::new(&encoded);
let (decoded, _) = DataRecord::decode_from(&mut cursor).unwrap().unwrap();
assert!(decoded.is_tombstone());
assert_eq!(decoded.key, b"def456");
}
#[test]
fn record_detects_corruption() {
let rec = DataRecord {
timestamp: 42,
key: b"key".to_vec(),
value: b"value".to_vec(),
};
let mut encoded = rec.encode();
// Flip a bit in the payload
let last = encoded.len() - 1;
encoded[last] ^= 0xFF;
let mut cursor = std::io::Cursor::new(&encoded);
let result = DataRecord::decode_from(&mut cursor);
assert!(matches!(result, Err(StorageError::ChecksumMismatch { .. })));
}
#[test]
fn record_detects_bad_magic() {
let rec = DataRecord {
timestamp: 42,
key: b"key".to_vec(),
value: b"value".to_vec(),
};
let mut encoded = rec.encode();
encoded[0] = 0xFF;
encoded[1] = 0xFF;
let mut cursor = std::io::Cursor::new(&encoded);
let result = DataRecord::decode_from(&mut cursor);
assert!(matches!(result, Err(StorageError::CorruptRecord(_))));
}
#[test]
fn eof_returns_none() {
let empty: &[u8] = &[];
let mut cursor = std::io::Cursor::new(empty);
let result = DataRecord::decode_from(&mut cursor).unwrap();
assert!(result.is_none());
}
#[test]
fn scanner_iterates_multiple_records() {
let records = vec![
DataRecord {
timestamp: 1,
key: b"a".to_vec(),
value: b"v1".to_vec(),
},
DataRecord {
timestamp: 2,
key: b"b".to_vec(),
value: b"v2".to_vec(),
},
DataRecord {
timestamp: 3,
key: b"c".to_vec(),
value: vec![],
},
];
let mut buf = Vec::new();
for r in &records {
buf.extend_from_slice(&r.encode());
}
let scanner = RecordScanner::new(std::io::Cursor::new(&buf), 0);
let results: Vec<_> = scanner.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(results.len(), 3);
assert_eq!(results[0].1.key, b"a");
assert_eq!(results[1].1.key, b"b");
assert!(results[2].1.is_tombstone());
// Verify offsets are correct
assert_eq!(results[0].0, 0);
assert_eq!(results[1].0, records[0].disk_size() as u64);
assert_eq!(
results[2].0,
(records[0].disk_size() + records[1].disk_size()) as u64
);
}
}