Files
smartdb/rust/crates/rustdb-storage/src/keydir.rs
T

563 lines
19 KiB
Rust

//! KeyDir — in-memory document location index for the Bitcask storage engine.
//!
//! Maps document `_id` (hex string) to its location in the append-only data file.
//! Backed by `DashMap` for lock-free concurrent reads and fine-grained write locking.
//!
//! The KeyDir can be rebuilt from a data file scan, or loaded quickly from a
//! persisted hint file for fast restart.
use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use dashmap::DashMap;
use crate::error::{StorageError, StorageResult};
use crate::record::{
DataRecord, FileHeader, FileType, RecordScanner, FILE_HEADER_SIZE, FORMAT_VERSION,
};
// ---------------------------------------------------------------------------
// KeyDirEntry
// ---------------------------------------------------------------------------
/// Location of a single document in the data file.
#[derive(Debug, Clone, Copy)]
pub struct KeyDirEntry {
/// Byte offset of the record in `data.rdb`.
pub offset: u64,
/// Total record size on disk (header + payload).
pub record_len: u32,
/// BSON value length. 0 means tombstone (used during compaction accounting).
pub value_len: u32,
/// Timestamp (epoch ms) from the record. Used for conflict detection.
pub timestamp: u64,
}
// ---------------------------------------------------------------------------
// BuildStats — statistics from building KeyDir from a data file scan
// ---------------------------------------------------------------------------
/// Statistics collected while building a KeyDir from a data file scan.
#[derive(Debug, Clone, Default)]
pub struct BuildStats {
/// Total records scanned (live + tombstones + superseded).
pub total_records_scanned: u64,
/// Number of live documents in the final KeyDir.
pub live_documents: u64,
/// Number of tombstone records encountered.
pub tombstones: u64,
/// Number of records superseded by a later write for the same key.
pub superseded_records: u64,
}
// ---------------------------------------------------------------------------
// KeyDir
// ---------------------------------------------------------------------------
/// In-memory index mapping document ID → data file location.
pub struct KeyDir {
map: DashMap<String, KeyDirEntry>,
/// Running count of live documents.
doc_count: AtomicU64,
}
impl KeyDir {
/// Create an empty KeyDir.
pub fn new() -> Self {
Self {
map: DashMap::new(),
doc_count: AtomicU64::new(0),
}
}
/// Insert or update an entry. Returns the previous entry if one existed.
pub fn insert(&self, key: String, entry: KeyDirEntry) -> Option<KeyDirEntry> {
let prev = self.map.insert(key, entry);
if prev.is_none() {
self.doc_count.fetch_add(1, Ordering::Relaxed);
}
prev
}
/// Look up an entry by key.
pub fn get(&self, key: &str) -> Option<KeyDirEntry> {
self.map.get(key).map(|r| *r.value())
}
/// Remove an entry. Returns the removed entry if it existed.
pub fn remove(&self, key: &str) -> Option<KeyDirEntry> {
let removed = self.map.remove(key).map(|(_, v)| v);
if removed.is_some() {
self.doc_count.fetch_sub(1, Ordering::Relaxed);
}
removed
}
/// Number of live documents.
pub fn len(&self) -> u64 {
self.doc_count.load(Ordering::Relaxed)
}
/// Whether the index is empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Check if a key exists.
pub fn contains(&self, key: &str) -> bool {
self.map.contains_key(key)
}
/// Iterate over all entries. The closure receives (key, entry).
pub fn for_each(&self, mut f: impl FnMut(&str, &KeyDirEntry)) {
for entry in self.map.iter() {
f(entry.key(), entry.value());
}
}
/// Collect all keys.
pub fn keys(&self) -> Vec<String> {
self.map.iter().map(|e| e.key().clone()).collect()
}
/// Clear all entries.
pub fn clear(&self) {
self.map.clear();
self.doc_count.store(0, Ordering::Relaxed);
}
// -----------------------------------------------------------------------
// Build from data file
// -----------------------------------------------------------------------
/// Rebuild the KeyDir by scanning an entire data file.
/// The file must start with a valid `FileHeader`.
/// Returns `(keydir, dead_bytes, stats)` where `dead_bytes` is the total size of
/// stale records (superseded by later writes or tombstoned).
pub fn build_from_data_file(path: &Path) -> StorageResult<(Self, u64, BuildStats)> {
let file = std::fs::File::open(path)?;
let mut reader = BufReader::new(file);
// Read and validate file header
let mut hdr_buf = [0u8; FILE_HEADER_SIZE];
reader.read_exact(&mut hdr_buf)?;
let hdr = FileHeader::decode(&hdr_buf)?;
if hdr.file_type != FileType::Data {
return Err(StorageError::CorruptRecord(format!(
"expected data file (type 1), got type {:?}",
hdr.file_type
)));
}
let keydir = KeyDir::new();
let mut dead_bytes: u64 = 0;
let mut stats = BuildStats::default();
let scanner = RecordScanner::new(reader, FILE_HEADER_SIZE as u64);
for result in scanner {
let (offset, record) = result?;
let is_tombstone = record.is_tombstone();
let disk_size = record.disk_size() as u32;
let value_len = record.value.len() as u32;
let timestamp = record.timestamp;
let key = String::from_utf8(record.key)
.map_err(|e| StorageError::CorruptRecord(format!("invalid UTF-8 key: {e}")))?;
stats.total_records_scanned += 1;
if is_tombstone {
stats.tombstones += 1;
// Remove from index; the tombstone itself is dead weight
if let Some(prev) = keydir.remove(&key) {
dead_bytes += prev.record_len as u64;
}
dead_bytes += disk_size as u64;
} else {
let entry = KeyDirEntry {
offset,
record_len: disk_size,
value_len,
timestamp,
};
if let Some(prev) = keydir.insert(key, entry) {
// Previous version of same key is now dead
dead_bytes += prev.record_len as u64;
stats.superseded_records += 1;
}
}
}
stats.live_documents = keydir.len();
Ok((keydir, dead_bytes, stats))
}
// -----------------------------------------------------------------------
// Hint file persistence (for fast startup)
// -----------------------------------------------------------------------
/// Persist the KeyDir to a hint file for fast restart.
///
/// Hint file format (after the 64-byte file header):
/// For each entry: [key_len:u32 LE][key bytes][offset:u64 LE][record_len:u32 LE][value_len:u32 LE][timestamp:u64 LE]
pub fn persist_to_hint_file(&self, path: &Path) -> StorageResult<()> {
let file = std::fs::File::create(path)?;
let mut writer = BufWriter::new(file);
// Write file header
let hdr = FileHeader::new(FileType::Hint);
writer.write_all(&hdr.encode())?;
// Write entries
for entry in self.map.iter() {
let key_bytes = entry.key().as_bytes();
let key_len = key_bytes.len() as u32;
writer.write_all(&key_len.to_le_bytes())?;
writer.write_all(key_bytes)?;
writer.write_all(&entry.value().offset.to_le_bytes())?;
writer.write_all(&entry.value().record_len.to_le_bytes())?;
writer.write_all(&entry.value().value_len.to_le_bytes())?;
writer.write_all(&entry.value().timestamp.to_le_bytes())?;
}
writer.flush()?;
Ok(())
}
/// Load a KeyDir from a hint file. Returns None if the file doesn't exist.
pub fn load_from_hint_file(path: &Path) -> StorageResult<Option<Self>> {
if !path.exists() {
return Ok(None);
}
let file = std::fs::File::open(path)?;
let mut reader = BufReader::new(file);
// Read and validate header
let mut hdr_buf = [0u8; FILE_HEADER_SIZE];
match reader.read_exact(&mut hdr_buf) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(e.into()),
}
let hdr = FileHeader::decode(&hdr_buf)?;
if hdr.file_type != FileType::Hint {
return Err(StorageError::CorruptRecord(format!(
"expected hint file (type 3), got type {:?}",
hdr.file_type
)));
}
if hdr.version > FORMAT_VERSION {
return Err(StorageError::CorruptRecord(format!(
"hint file version {} is newer than supported {}",
hdr.version, FORMAT_VERSION
)));
}
let keydir = KeyDir::new();
loop {
// Read key_len
let mut key_len_buf = [0u8; 4];
match reader.read_exact(&mut key_len_buf) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
}
let key_len = u32::from_le_bytes(key_len_buf) as usize;
// Read key
let mut key_buf = vec![0u8; key_len];
reader.read_exact(&mut key_buf)?;
let key = String::from_utf8(key_buf)
.map_err(|e| StorageError::CorruptRecord(format!("invalid UTF-8 key: {e}")))?;
// Read entry fields
let mut fields = [0u8; 8 + 4 + 4 + 8]; // offset + record_len + value_len + timestamp = 24
reader.read_exact(&mut fields)?;
let offset = u64::from_le_bytes(fields[0..8].try_into().unwrap());
let record_len = u32::from_le_bytes(fields[8..12].try_into().unwrap());
let value_len = u32::from_le_bytes(fields[12..16].try_into().unwrap());
let timestamp = u64::from_le_bytes(fields[16..24].try_into().unwrap());
keydir.insert(
key,
KeyDirEntry {
offset,
record_len,
value_len,
timestamp,
},
);
}
Ok(Some(keydir))
}
// -----------------------------------------------------------------------
// Hint file validation
// -----------------------------------------------------------------------
/// Validate this KeyDir (loaded from a hint file) against the actual data file.
/// Returns `Ok(true)` if the hint appears consistent, `Ok(false)` if a rebuild
/// from the data file is recommended.
///
/// Checks:
/// 1. All entry offsets + record_len fit within the data file size.
/// 2. All entry offsets are >= FILE_HEADER_SIZE.
/// 3. A random sample of entries is spot-checked by reading the record at
/// the offset and verifying the key matches.
pub fn validate_against_data_file(&self, data_path: &Path, sample_size: usize) -> StorageResult<bool> {
let file_size = std::fs::metadata(data_path)
.map(|m| m.len())
.unwrap_or(0);
if file_size < FILE_HEADER_SIZE as u64 {
// Data file is too small to even contain a header
return Ok(self.is_empty());
}
// Pass 1: bounds check all entries
let mut all_keys: Vec<(String, KeyDirEntry)> = Vec::with_capacity(self.len() as usize);
let mut bounds_ok = true;
self.for_each(|key, entry| {
if entry.offset < FILE_HEADER_SIZE as u64
|| entry.offset + entry.record_len as u64 > file_size
{
bounds_ok = false;
}
all_keys.push((key.to_string(), *entry));
});
if !bounds_ok {
return Ok(false);
}
// Pass 2: spot-check a sample of entries by reading records from data.rdb
if all_keys.is_empty() {
return Ok(true);
}
// Sort by offset for sequential I/O, take first `sample_size` entries
all_keys.sort_by_key(|(_, e)| e.offset);
let step = if all_keys.len() <= sample_size {
1
} else {
all_keys.len() / sample_size
};
let mut file = std::fs::File::open(data_path)?;
let mut checked = 0usize;
for (i, (expected_key, entry)) in all_keys.iter().enumerate() {
if checked >= sample_size {
break;
}
if i % step != 0 {
continue;
}
// Seek to the entry's offset and try to decode the record
file.seek(SeekFrom::Start(entry.offset))?;
match DataRecord::decode_from(&mut file) {
Ok(Some((record, _disk_size))) => {
let record_key = String::from_utf8_lossy(&record.key);
if record_key != *expected_key {
return Ok(false);
}
}
Ok(None) | Err(_) => {
return Ok(false);
}
}
checked += 1;
}
Ok(true)
}
}
impl Default for KeyDir {
fn default() -> Self {
Self::new()
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use crate::record::DataRecord;
use std::io::Write;
#[test]
fn basic_insert_get_remove() {
let kd = KeyDir::new();
assert!(kd.is_empty());
let entry = KeyDirEntry {
offset: 100,
record_len: 50,
value_len: 30,
timestamp: 1700000000000,
};
assert!(kd.insert("abc".into(), entry).is_none());
assert_eq!(kd.len(), 1);
assert!(kd.contains("abc"));
let got = kd.get("abc").unwrap();
assert_eq!(got.offset, 100);
assert_eq!(got.value_len, 30);
let removed = kd.remove("abc").unwrap();
assert_eq!(removed.offset, 100);
assert_eq!(kd.len(), 0);
assert!(!kd.contains("abc"));
}
#[test]
fn insert_overwrites_returns_previous() {
let kd = KeyDir::new();
let e1 = KeyDirEntry {
offset: 100,
record_len: 50,
value_len: 30,
timestamp: 1,
};
let e2 = KeyDirEntry {
offset: 200,
record_len: 60,
value_len: 40,
timestamp: 2,
};
kd.insert("k".into(), e1);
assert_eq!(kd.len(), 1);
let prev = kd.insert("k".into(), e2).unwrap();
assert_eq!(prev.offset, 100);
// Count stays at 1 (overwrite, not new)
assert_eq!(kd.len(), 1);
assert_eq!(kd.get("k").unwrap().offset, 200);
}
#[test]
fn build_from_data_file() {
let dir = tempfile::tempdir().unwrap();
let data_path = dir.path().join("data.rdb");
// Write a data file with 3 records: insert A, insert B, delete A
{
let mut f = std::fs::File::create(&data_path).unwrap();
let hdr = FileHeader::new(FileType::Data);
f.write_all(&hdr.encode()).unwrap();
let r1 = DataRecord {
timestamp: 1,
key: b"aaa".to_vec(),
value: b"val_a".to_vec(),
};
let r2 = DataRecord {
timestamp: 2,
key: b"bbb".to_vec(),
value: b"val_b".to_vec(),
};
let r3 = DataRecord {
timestamp: 3,
key: b"aaa".to_vec(),
value: vec![], // tombstone
};
f.write_all(&r1.encode()).unwrap();
f.write_all(&r2.encode()).unwrap();
f.write_all(&r3.encode()).unwrap();
}
let (kd, dead_bytes, stats) = KeyDir::build_from_data_file(&data_path).unwrap();
// Only B should be live
assert_eq!(kd.len(), 1);
assert!(kd.contains("bbb"));
assert!(!kd.contains("aaa"));
// Dead bytes: r1 (aaa live, then superseded by tombstone) + r3 (tombstone itself)
assert!(dead_bytes > 0);
// Stats
assert_eq!(stats.total_records_scanned, 3);
assert_eq!(stats.live_documents, 1);
assert_eq!(stats.tombstones, 1);
assert_eq!(stats.superseded_records, 0); // aaa was removed by tombstone, not superseded
}
#[test]
fn hint_file_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let hint_path = dir.path().join("keydir.hint");
let kd = KeyDir::new();
kd.insert(
"doc1".into(),
KeyDirEntry {
offset: 64,
record_len: 100,
value_len: 80,
timestamp: 1000,
},
);
kd.insert(
"doc2".into(),
KeyDirEntry {
offset: 164,
record_len: 200,
value_len: 150,
timestamp: 2000,
},
);
kd.persist_to_hint_file(&hint_path).unwrap();
let loaded = KeyDir::load_from_hint_file(&hint_path).unwrap().unwrap();
assert_eq!(loaded.len(), 2);
let e1 = loaded.get("doc1").unwrap();
assert_eq!(e1.offset, 64);
assert_eq!(e1.record_len, 100);
assert_eq!(e1.value_len, 80);
assert_eq!(e1.timestamp, 1000);
let e2 = loaded.get("doc2").unwrap();
assert_eq!(e2.offset, 164);
assert_eq!(e2.timestamp, 2000);
}
#[test]
fn hint_file_nonexistent_returns_none() {
let result = KeyDir::load_from_hint_file(Path::new("/tmp/nonexistent_hint_file.hint"));
assert!(result.unwrap().is_none());
}
#[test]
fn for_each_and_keys() {
let kd = KeyDir::new();
let e = KeyDirEntry {
offset: 0,
record_len: 10,
value_len: 5,
timestamp: 1,
};
kd.insert("x".into(), e);
kd.insert("y".into(), e);
let mut collected = Vec::new();
kd.for_each(|k, _| collected.push(k.to_string()));
collected.sort();
assert_eq!(collected, vec!["x", "y"]);
let mut keys = kd.keys();
keys.sort();
assert_eq!(keys, vec!["x", "y"]);
}
}