fix(rustdb-storage): detect stale hint files using data file size metadata and add restart persistence regression tests
This commit is contained in:
@@ -178,7 +178,8 @@ impl CollectionState {
|
||||
tracing::warn!("compaction failed for {:?}: {e}", self.coll_dir);
|
||||
} else {
|
||||
// Persist hint file after successful compaction to prevent stale hints
|
||||
if let Err(e) = self.keydir.persist_to_hint_file(&self.hint_path()) {
|
||||
let current_size = self.data_file_size.load(Ordering::Relaxed);
|
||||
if let Err(e) = self.keydir.persist_to_hint_file(&self.hint_path(), current_size) {
|
||||
tracing::warn!("failed to persist hint after compaction for {:?}: {e}", self.coll_dir);
|
||||
}
|
||||
}
|
||||
@@ -257,26 +258,47 @@ impl FileStorageAdapter {
|
||||
// Try loading from hint file first, fall back to data file scan
|
||||
let (keydir, dead_bytes, loaded_from_hint) = if hint_path.exists() && data_path.exists() {
|
||||
match KeyDir::load_from_hint_file(&hint_path) {
|
||||
Ok(Some(kd)) => {
|
||||
// Validate hint against actual data file
|
||||
let hint_valid = kd.validate_against_data_file(&data_path, 16)
|
||||
.unwrap_or(false);
|
||||
if hint_valid {
|
||||
debug!("loaded KeyDir from hint file: {:?}", hint_path);
|
||||
let file_size = std::fs::metadata(&data_path)
|
||||
.map(|m| m.len())
|
||||
.unwrap_or(FILE_HEADER_SIZE as u64);
|
||||
let live_bytes: u64 = {
|
||||
let mut total = 0u64;
|
||||
kd.for_each(|_, e| total += e.record_len as u64);
|
||||
total
|
||||
};
|
||||
let dead = file_size.saturating_sub(FILE_HEADER_SIZE as u64).saturating_sub(live_bytes);
|
||||
(kd, dead, true)
|
||||
} else {
|
||||
tracing::warn!("hint file {:?} is stale, rebuilding from data file", hint_path);
|
||||
Ok(Some((kd, stored_size))) => {
|
||||
let actual_size = std::fs::metadata(&data_path)
|
||||
.map(|m| m.len())
|
||||
.unwrap_or(0);
|
||||
|
||||
// Check if data.rdb changed since the hint was written.
|
||||
// If stored_size is 0, this is an old-format hint without size tracking.
|
||||
let size_matches = stored_size > 0 && stored_size == actual_size;
|
||||
|
||||
if !size_matches {
|
||||
// data.rdb size differs from hint snapshot — records were appended
|
||||
// (inserts, tombstones) after the hint was written. Full scan required
|
||||
// to pick up tombstones that would otherwise be invisible.
|
||||
if stored_size == 0 {
|
||||
debug!("hint file {:?} has no size tracking, rebuilding from data file", hint_path);
|
||||
} else {
|
||||
tracing::warn!(
|
||||
"hint file {:?} is stale: data size changed ({} -> {}), rebuilding",
|
||||
hint_path, stored_size, actual_size
|
||||
);
|
||||
}
|
||||
let (kd, dead, _stats) = KeyDir::build_from_data_file(&data_path)?;
|
||||
(kd, dead, false)
|
||||
} else {
|
||||
// Size matches — validate entry integrity with spot-checks
|
||||
let hint_valid = kd.validate_against_data_file(&data_path, 16)
|
||||
.unwrap_or(false);
|
||||
if hint_valid {
|
||||
debug!("loaded KeyDir from hint file: {:?}", hint_path);
|
||||
let live_bytes: u64 = {
|
||||
let mut total = 0u64;
|
||||
kd.for_each(|_, e| total += e.record_len as u64);
|
||||
total
|
||||
};
|
||||
let dead = actual_size.saturating_sub(FILE_HEADER_SIZE as u64).saturating_sub(live_bytes);
|
||||
(kd, dead, true)
|
||||
} else {
|
||||
tracing::warn!("hint file {:?} failed validation, rebuilding from data file", hint_path);
|
||||
let (kd, dead, _stats) = KeyDir::build_from_data_file(&data_path)?;
|
||||
(kd, dead, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
@@ -510,10 +532,11 @@ impl StorageAdapter for FileStorageAdapter {
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
// Persist all KeyDir hint files
|
||||
// Persist all KeyDir hint files with current data file sizes
|
||||
for entry in self.collections.iter() {
|
||||
let state = entry.value();
|
||||
let _ = state.keydir.persist_to_hint_file(&state.hint_path());
|
||||
let current_size = state.data_file_size.load(Ordering::Relaxed);
|
||||
let _ = state.keydir.persist_to_hint_file(&state.hint_path(), current_size);
|
||||
}
|
||||
debug!("FileStorageAdapter closed");
|
||||
Ok(())
|
||||
|
||||
@@ -198,14 +198,17 @@ impl KeyDir {
|
||||
|
||||
/// Persist the KeyDir to a hint file for fast restart.
|
||||
///
|
||||
/// `data_file_size` is the current size of data.rdb — stored in the hint header
|
||||
/// so that on next load we can detect if data.rdb changed (stale hint).
|
||||
///
|
||||
/// Hint file format (after the 64-byte file header):
|
||||
/// For each entry: [key_len:u32 LE][key bytes][offset:u64 LE][record_len:u32 LE][value_len:u32 LE][timestamp:u64 LE]
|
||||
pub fn persist_to_hint_file(&self, path: &Path) -> StorageResult<()> {
|
||||
pub fn persist_to_hint_file(&self, path: &Path, data_file_size: u64) -> StorageResult<()> {
|
||||
let file = std::fs::File::create(path)?;
|
||||
let mut writer = BufWriter::new(file);
|
||||
|
||||
// Write file header
|
||||
let hdr = FileHeader::new(FileType::Hint);
|
||||
// Write file header with data_file_size for staleness detection
|
||||
let hdr = FileHeader::new_hint(data_file_size);
|
||||
writer.write_all(&hdr.encode())?;
|
||||
|
||||
// Write entries
|
||||
@@ -225,7 +228,9 @@ impl KeyDir {
|
||||
}
|
||||
|
||||
/// Load a KeyDir from a hint file. Returns None if the file doesn't exist.
|
||||
pub fn load_from_hint_file(path: &Path) -> StorageResult<Option<Self>> {
|
||||
/// Returns `(keydir, stored_data_file_size)` where `stored_data_file_size` is the
|
||||
/// data.rdb size recorded when the hint was written (0 = old format, unknown).
|
||||
pub fn load_from_hint_file(path: &Path) -> StorageResult<Option<(Self, u64)>> {
|
||||
if !path.exists() {
|
||||
return Ok(None);
|
||||
}
|
||||
@@ -254,6 +259,7 @@ impl KeyDir {
|
||||
)));
|
||||
}
|
||||
|
||||
let stored_data_file_size = hdr.data_file_size;
|
||||
let keydir = KeyDir::new();
|
||||
|
||||
loop {
|
||||
@@ -292,7 +298,7 @@ impl KeyDir {
|
||||
);
|
||||
}
|
||||
|
||||
Ok(Some(keydir))
|
||||
Ok(Some((keydir, stored_data_file_size)))
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
@@ -517,9 +523,10 @@ mod tests {
|
||||
},
|
||||
);
|
||||
|
||||
kd.persist_to_hint_file(&hint_path).unwrap();
|
||||
let loaded = KeyDir::load_from_hint_file(&hint_path).unwrap().unwrap();
|
||||
kd.persist_to_hint_file(&hint_path, 12345).unwrap();
|
||||
let (loaded, stored_size) = KeyDir::load_from_hint_file(&hint_path).unwrap().unwrap();
|
||||
|
||||
assert_eq!(stored_size, 12345);
|
||||
assert_eq!(loaded.len(), 2);
|
||||
let e1 = loaded.get("doc1").unwrap();
|
||||
assert_eq!(e1.offset, 64);
|
||||
|
||||
@@ -79,6 +79,9 @@ pub struct FileHeader {
|
||||
pub file_type: FileType,
|
||||
pub flags: u32,
|
||||
pub created_ms: u64,
|
||||
/// For hint files: the data.rdb file size at the time the hint was written.
|
||||
/// Used to detect stale hints after ungraceful shutdown. 0 = unknown (old format).
|
||||
pub data_file_size: u64,
|
||||
}
|
||||
|
||||
impl FileHeader {
|
||||
@@ -89,6 +92,18 @@ impl FileHeader {
|
||||
file_type,
|
||||
flags: 0,
|
||||
created_ms: now_ms(),
|
||||
data_file_size: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new hint header that records the data file size.
|
||||
pub fn new_hint(data_file_size: u64) -> Self {
|
||||
Self {
|
||||
version: FORMAT_VERSION,
|
||||
file_type: FileType::Hint,
|
||||
flags: 0,
|
||||
created_ms: now_ms(),
|
||||
data_file_size,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -100,7 +115,8 @@ impl FileHeader {
|
||||
buf[10] = self.file_type as u8;
|
||||
buf[11..15].copy_from_slice(&self.flags.to_le_bytes());
|
||||
buf[15..23].copy_from_slice(&self.created_ms.to_le_bytes());
|
||||
// bytes 23..64 are reserved (zeros)
|
||||
buf[23..31].copy_from_slice(&self.data_file_size.to_le_bytes());
|
||||
// bytes 31..64 are reserved (zeros)
|
||||
buf
|
||||
}
|
||||
|
||||
@@ -127,11 +143,15 @@ impl FileHeader {
|
||||
let created_ms = u64::from_le_bytes([
|
||||
buf[15], buf[16], buf[17], buf[18], buf[19], buf[20], buf[21], buf[22],
|
||||
]);
|
||||
let data_file_size = u64::from_le_bytes([
|
||||
buf[23], buf[24], buf[25], buf[26], buf[27], buf[28], buf[29], buf[30],
|
||||
]);
|
||||
Ok(Self {
|
||||
version,
|
||||
file_type,
|
||||
flags,
|
||||
created_ms,
|
||||
data_file_size,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -295,7 +295,13 @@ fn validate_collection(db: &str, coll: &str, coll_dir: &Path) -> CollectionRepor
|
||||
// Validate hint file if present
|
||||
if hint_path.exists() {
|
||||
match KeyDir::load_from_hint_file(&hint_path) {
|
||||
Ok(Some(hint_kd)) => {
|
||||
Ok(Some((hint_kd, stored_size))) => {
|
||||
if stored_size > 0 && stored_size != report.data_file_size {
|
||||
report.errors.push(format!(
|
||||
"hint file is stale: recorded data size {} but actual is {}",
|
||||
stored_size, report.data_file_size
|
||||
));
|
||||
}
|
||||
// Check for orphaned entries: keys in hint but not live in data
|
||||
hint_kd.for_each(|key, _entry| {
|
||||
if !live_ids.contains(key) {
|
||||
|
||||
Reference in New Issue
Block a user