//! Binary Write-Ahead Log for crash recovery. //! //! # Protocol //! //! Every mutation follows this sequence: //! 1. Append WAL record → fsync //! 2. Perform the actual data write //! 3. Append WAL commit marker → fsync //! //! On recovery, uncommitted entries (those without a matching commit marker) //! are replayed or verified. //! //! # Record format //! //! ```text //! ┌──────────┬──────────┬──────────┬──────────┬──────────┬──────────┬────────────┐ //! │ magic │ seq │ op │ key_len │ val_len │ crc32 │ payload │ //! │ u16 LE │ u64 LE │ u8 │ u32 LE │ u32 LE │ u32 LE │ [key][val] │ //! │ 0xWA01 │ │ │ │ │ │ │ //! └──────────┴──────────┴──────────┴──────────┴──────────┴──────────┴────────────┘ //! ``` //! //! # Commit marker //! //! ```text //! ┌──────────┬──────────┬──────────┐ //! │ magic │ seq │ crc32 │ //! │ u16 LE │ u64 LE │ u32 LE │ //! │ 0xCA01 │ │ │ //! └──────────┴──────────┴──────────┘ //! ``` use std::io::{self, BufReader, Read, Write}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; use crate::error::{StorageError, StorageResult}; use crate::record::{FileHeader, FileType, FILE_HEADER_SIZE}; // --------------------------------------------------------------------------- // Constants // --------------------------------------------------------------------------- const WAL_RECORD_MAGIC: u16 = 0xAA01; const WAL_COMMIT_MAGIC: u16 = 0xCC01; /// WAL record header: magic(2) + seq(8) + op(1) + key_len(4) + val_len(4) + crc(4) = 23 const WAL_RECORD_HEADER: usize = 23; /// Commit marker size: magic(2) + seq(8) + crc(4) = 14 const WAL_COMMIT_SIZE: usize = 14; // --------------------------------------------------------------------------- // WAL operation type // --------------------------------------------------------------------------- #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub enum WalOpType { Insert = 1, Update = 2, Delete = 3, } impl WalOpType { fn from_u8(v: u8) -> StorageResult { match v { 1 => Ok(WalOpType::Insert), 2 => Ok(WalOpType::Update), 3 => Ok(WalOpType::Delete), _ => Err(StorageError::WalError(format!("unknown WAL op: {v}"))), } } } // --------------------------------------------------------------------------- // WAL entry (parsed from file) // --------------------------------------------------------------------------- #[derive(Debug, Clone)] pub struct WalEntry { pub seq: u64, pub op: WalOpType, pub key: Vec, pub value: Vec, } // --------------------------------------------------------------------------- // Internal: what we read from the WAL file // --------------------------------------------------------------------------- #[derive(Debug)] enum WalItem { Record(WalEntry), Commit(u64), // seq that was committed } // --------------------------------------------------------------------------- // BinaryWal // --------------------------------------------------------------------------- /// Binary write-ahead log backed by a single file. pub struct BinaryWal { path: PathBuf, next_seq: AtomicU64, } impl BinaryWal { /// Create a new WAL. Does not touch the filesystem until `initialize()`. pub fn new(path: PathBuf) -> Self { Self { path, next_seq: AtomicU64::new(1), } } /// Initialize: create parent dirs, recover sequence counter from existing file. pub fn initialize(&self) -> StorageResult<()> { if let Some(parent) = self.path.parent() { std::fs::create_dir_all(parent)?; } if self.path.exists() { // Scan to find highest seq let items = self.read_all_items()?; let max_seq = items .iter() .map(|item| match item { WalItem::Record(e) => e.seq, WalItem::Commit(s) => *s, }) .max() .unwrap_or(0); self.next_seq.store(max_seq + 1, Ordering::SeqCst); } else { // Create the file with a header let mut f = std::fs::File::create(&self.path)?; let hdr = FileHeader::new(FileType::Wal); f.write_all(&hdr.encode())?; f.flush()?; f.sync_all()?; } Ok(()) } /// Append a WAL record. Returns the sequence number. Fsyncs. pub fn append( &self, op: WalOpType, key: &[u8], value: &[u8], ) -> StorageResult { let seq = self.next_seq.fetch_add(1, Ordering::SeqCst); let key_len = key.len() as u32; let val_len = value.len() as u32; // Build header bytes (without CRC) let mut hdr = Vec::with_capacity(WAL_RECORD_HEADER); hdr.extend_from_slice(&WAL_RECORD_MAGIC.to_le_bytes()); hdr.extend_from_slice(&seq.to_le_bytes()); hdr.push(op as u8); hdr.extend_from_slice(&key_len.to_le_bytes()); hdr.extend_from_slice(&val_len.to_le_bytes()); // CRC placeholder hdr.extend_from_slice(&0u32.to_le_bytes()); // Compute CRC over header (without crc field) + payload let mut hasher = crc32fast::Hasher::new(); hasher.update(&hdr[0..19]); // magic + seq + op + key_len + val_len hasher.update(key); hasher.update(value); let crc = hasher.finalize(); hdr[19..23].copy_from_slice(&crc.to_le_bytes()); // Append to file let mut f = std::fs::OpenOptions::new() .create(true) .append(true) .open(&self.path)?; f.write_all(&hdr)?; f.write_all(key)?; f.write_all(value)?; f.sync_all()?; Ok(seq) } /// Append a commit marker for the given sequence. Fsyncs. pub fn append_commit(&self, seq: u64) -> StorageResult<()> { let mut buf = Vec::with_capacity(WAL_COMMIT_SIZE); buf.extend_from_slice(&WAL_COMMIT_MAGIC.to_le_bytes()); buf.extend_from_slice(&seq.to_le_bytes()); // CRC over magic + seq let mut hasher = crc32fast::Hasher::new(); hasher.update(&buf[0..10]); let crc = hasher.finalize(); buf.extend_from_slice(&crc.to_le_bytes()); let mut f = std::fs::OpenOptions::new() .create(true) .append(true) .open(&self.path)?; f.write_all(&buf)?; f.sync_all()?; Ok(()) } /// Recover: return all WAL entries that were NOT committed. pub fn recover(&self) -> StorageResult> { let items = self.read_all_items()?; // Collect committed seq numbers let committed: std::collections::HashSet = items .iter() .filter_map(|item| { if let WalItem::Commit(s) = item { Some(*s) } else { None } }) .collect(); // Return records without a commit marker let uncommitted: Vec = items .into_iter() .filter_map(|item| { if let WalItem::Record(entry) = item { if !committed.contains(&entry.seq) { return Some(entry); } } None }) .collect(); Ok(uncommitted) } /// Truncate the WAL: rewrite with just the file header (clears all entries). pub fn truncate(&self) -> StorageResult<()> { let mut f = std::fs::File::create(&self.path)?; let hdr = FileHeader::new(FileType::Wal); f.write_all(&hdr.encode())?; f.flush()?; f.sync_all()?; // Don't reset next_seq — it should keep incrementing Ok(()) } /// Path to the WAL file. pub fn path(&self) -> &Path { &self.path } // ----------------------------------------------------------------------- // Internal: read all items from the WAL file // ----------------------------------------------------------------------- fn read_all_items(&self) -> StorageResult> { if !self.path.exists() { return Ok(vec![]); } let file = std::fs::File::open(&self.path)?; let mut reader = BufReader::new(file); // Skip file header (if present) let file_len = std::fs::metadata(&self.path)?.len(); if file_len >= FILE_HEADER_SIZE as u64 { let mut hdr_buf = [0u8; FILE_HEADER_SIZE]; reader.read_exact(&mut hdr_buf)?; // Validate but don't fail hard — allow reading even slightly off headers let _ = FileHeader::decode(&hdr_buf); } let mut items = Vec::new(); loop { // Peek at the magic to determine if this is a record or commit marker let mut magic_buf = [0u8; 2]; match reader.read_exact(&mut magic_buf) { Ok(()) => {} Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break, Err(e) => return Err(e.into()), } let magic = u16::from_le_bytes(magic_buf); match magic { WAL_RECORD_MAGIC => { // Read rest of header: seq(8) + op(1) + key_len(4) + val_len(4) + crc(4) = 21 let mut rest = [0u8; 21]; match reader.read_exact(&mut rest) { Ok(()) => {} Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break, Err(e) => return Err(e.into()), } let seq = u64::from_le_bytes(rest[0..8].try_into().unwrap()); let op = WalOpType::from_u8(rest[8])?; let key_len = u32::from_le_bytes(rest[9..13].try_into().unwrap()) as usize; let val_len = u32::from_le_bytes(rest[13..17].try_into().unwrap()) as usize; let stored_crc = u32::from_le_bytes(rest[17..21].try_into().unwrap()); let mut payload = vec![0u8; key_len + val_len]; match reader.read_exact(&mut payload) { Ok(()) => {} Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break, Err(e) => return Err(e.into()), } // Verify CRC let mut hasher = crc32fast::Hasher::new(); hasher.update(&magic_buf); hasher.update(&rest[0..17]); // seq + op + key_len + val_len hasher.update(&payload); let computed = hasher.finalize(); if computed != stored_crc { // Corrupt WAL entry — skip it (best-effort recovery) tracing::warn!( seq, "skipping corrupt WAL record: CRC mismatch (expected 0x{stored_crc:08X}, got 0x{computed:08X})" ); continue; } let key = payload[..key_len].to_vec(); let value = payload[key_len..].to_vec(); items.push(WalItem::Record(WalEntry { seq, op, key, value, })); } WAL_COMMIT_MAGIC => { // Read rest: seq(8) + crc(4) = 12 let mut rest = [0u8; 12]; match reader.read_exact(&mut rest) { Ok(()) => {} Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break, Err(e) => return Err(e.into()), } let seq = u64::from_le_bytes(rest[0..8].try_into().unwrap()); let stored_crc = u32::from_le_bytes(rest[8..12].try_into().unwrap()); let mut hasher = crc32fast::Hasher::new(); hasher.update(&magic_buf); hasher.update(&rest[0..8]); let computed = hasher.finalize(); if computed != stored_crc { tracing::warn!( seq, "skipping corrupt WAL commit marker: CRC mismatch" ); continue; } items.push(WalItem::Commit(seq)); } _ => { // Unknown magic — file is corrupt past this point tracing::warn!("unknown WAL magic 0x{magic:04X}, stopping scan"); break; } } } Ok(items) } } // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- #[cfg(test)] mod tests { use super::*; fn make_wal(dir: &tempfile::TempDir) -> BinaryWal { let path = dir.path().join("test.wal"); let wal = BinaryWal::new(path); wal.initialize().unwrap(); wal } #[test] fn append_and_commit() { let dir = tempfile::tempdir().unwrap(); let wal = make_wal(&dir); let seq = wal .append(WalOpType::Insert, b"key1", b"value1") .unwrap(); assert_eq!(seq, 1); wal.append_commit(seq).unwrap(); // All committed — recover should return empty let uncommitted = wal.recover().unwrap(); assert!(uncommitted.is_empty()); } #[test] fn uncommitted_entries_recovered() { let dir = tempfile::tempdir().unwrap(); let wal = make_wal(&dir); let s1 = wal .append(WalOpType::Insert, b"k1", b"v1") .unwrap(); wal.append_commit(s1).unwrap(); // s2 is NOT committed let s2 = wal .append(WalOpType::Update, b"k2", b"v2") .unwrap(); let uncommitted = wal.recover().unwrap(); assert_eq!(uncommitted.len(), 1); assert_eq!(uncommitted[0].seq, s2); assert_eq!(uncommitted[0].op, WalOpType::Update); assert_eq!(uncommitted[0].key, b"k2"); assert_eq!(uncommitted[0].value, b"v2"); } #[test] fn truncate_clears_wal() { let dir = tempfile::tempdir().unwrap(); let wal = make_wal(&dir); wal.append(WalOpType::Insert, b"k", b"v").unwrap(); wal.truncate().unwrap(); let uncommitted = wal.recover().unwrap(); assert!(uncommitted.is_empty()); } #[test] fn multiple_operations() { let dir = tempfile::tempdir().unwrap(); let wal = make_wal(&dir); let s1 = wal.append(WalOpType::Insert, b"a", b"1").unwrap(); let s2 = wal.append(WalOpType::Update, b"b", b"2").unwrap(); let s3 = wal.append(WalOpType::Delete, b"c", b"").unwrap(); // Commit only s1 and s3 wal.append_commit(s1).unwrap(); wal.append_commit(s3).unwrap(); let uncommitted = wal.recover().unwrap(); assert_eq!(uncommitted.len(), 1); assert_eq!(uncommitted[0].seq, s2); } #[test] fn sequence_numbers_persist_across_reinit() { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("persist.wal"); { let wal = BinaryWal::new(path.clone()); wal.initialize().unwrap(); let s1 = wal.append(WalOpType::Insert, b"k", b"v").unwrap(); assert_eq!(s1, 1); wal.append_commit(s1).unwrap(); } // Re-open — seq should continue from 2+ (since max committed was 1) { let wal = BinaryWal::new(path); wal.initialize().unwrap(); let s2 = wal.append(WalOpType::Insert, b"k2", b"v2").unwrap(); assert!(s2 >= 2, "seq should continue: got {s2}"); } } #[test] fn delete_has_empty_value() { let dir = tempfile::tempdir().unwrap(); let wal = make_wal(&dir); let seq = wal.append(WalOpType::Delete, b"key", b"").unwrap(); let uncommitted = wal.recover().unwrap(); assert_eq!(uncommitted.len(), 1); assert_eq!(uncommitted[0].seq, seq); assert_eq!(uncommitted[0].op, WalOpType::Delete); assert!(uncommitted[0].value.is_empty()); } }