948 lines
31 KiB
Rust
948 lines
31 KiB
Rust
//! Bitcask-style file storage adapter.
|
|
//!
|
|
//! Each collection is stored as a directory:
|
|
//! ```text
|
|
//! {base_path}/{db}/{coll}/
|
|
//! data.rdb — append-only binary record log
|
|
//! wal.rdb — binary write-ahead log
|
|
//! keydir.hint — persisted KeyDir for fast restart
|
|
//! indexes.json — index metadata
|
|
//! ```
|
|
//!
|
|
//! Point operations (insert, update, delete, find_by_id) are O(1).
|
|
//! Full scans (find_all) are O(n) in live document count.
|
|
|
|
use std::collections::HashSet;
|
|
use std::io::{Seek, SeekFrom, Write};
|
|
use std::path::PathBuf;
|
|
use std::sync::atomic::{AtomicU64, Ordering};
|
|
use std::sync::Arc;
|
|
|
|
use async_trait::async_trait;
|
|
use bson::{doc, oid::ObjectId, Document};
|
|
use dashmap::DashMap;
|
|
use tracing::debug;
|
|
|
|
use crate::adapter::StorageAdapter;
|
|
use crate::binary_wal::{BinaryWal, WalOpType};
|
|
use crate::error::{StorageError, StorageResult};
|
|
use crate::keydir::{KeyDir, KeyDirEntry};
|
|
use crate::record::{now_ms, DataRecord, FileHeader, FileType, FILE_HEADER_SIZE};
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Per-collection state
|
|
// ---------------------------------------------------------------------------
|
|
|
|
struct CollectionState {
|
|
/// In-memory document location index.
|
|
keydir: KeyDir,
|
|
/// Path to the collection directory.
|
|
coll_dir: PathBuf,
|
|
/// Write-ahead log.
|
|
wal: std::sync::Mutex<BinaryWal>,
|
|
/// Serializes append writes to the data file.
|
|
write_lock: std::sync::Mutex<()>,
|
|
/// Live document count.
|
|
doc_count: AtomicU64,
|
|
/// Bytes occupied by dead (superseded/tombstone) records. For compaction.
|
|
dead_bytes: AtomicU64,
|
|
/// Total size of the data file.
|
|
data_file_size: AtomicU64,
|
|
}
|
|
|
|
impl CollectionState {
|
|
fn data_path(&self) -> PathBuf {
|
|
self.coll_dir.join("data.rdb")
|
|
}
|
|
|
|
fn hint_path(&self) -> PathBuf {
|
|
self.coll_dir.join("keydir.hint")
|
|
}
|
|
|
|
fn index_path(&self) -> PathBuf {
|
|
self.coll_dir.join("indexes.json")
|
|
}
|
|
|
|
/// Read a single document from the data file at the given KeyDir entry.
|
|
fn read_document(&self, entry: &KeyDirEntry) -> StorageResult<Document> {
|
|
let data_path = self.data_path();
|
|
let mut file = std::fs::File::open(&data_path)?;
|
|
file.seek(SeekFrom::Start(entry.offset))?;
|
|
|
|
let (record, _) = DataRecord::decode_from(&mut file)?
|
|
.ok_or_else(|| StorageError::CorruptRecord("unexpected EOF reading document".into()))?;
|
|
|
|
if record.is_tombstone() {
|
|
return Err(StorageError::CorruptRecord(
|
|
"KeyDir pointed to a tombstone record".into(),
|
|
));
|
|
}
|
|
|
|
// Parse BSON from raw bytes
|
|
bson::from_slice(&record.value)
|
|
.map_err(|e| StorageError::SerializationError(format!("BSON decode: {e}")))
|
|
}
|
|
|
|
/// Append a data record and update the KeyDir. Must be called under write_lock.
|
|
fn append_record(
|
|
&self,
|
|
key: &str,
|
|
bson_bytes: &[u8],
|
|
timestamp: u64,
|
|
) -> StorageResult<KeyDirEntry> {
|
|
let record = DataRecord {
|
|
timestamp,
|
|
key: key.as_bytes().to_vec(),
|
|
value: bson_bytes.to_vec(),
|
|
};
|
|
let encoded = record.encode();
|
|
let disk_size = encoded.len() as u32;
|
|
|
|
let data_path = self.data_path();
|
|
let mut file = std::fs::OpenOptions::new()
|
|
.create(true)
|
|
.append(true)
|
|
.open(&data_path)?;
|
|
|
|
let offset = file.seek(SeekFrom::End(0))?;
|
|
file.write_all(&encoded)?;
|
|
file.sync_all()?;
|
|
|
|
let entry = KeyDirEntry {
|
|
offset,
|
|
record_len: disk_size,
|
|
value_len: bson_bytes.len() as u32,
|
|
timestamp,
|
|
};
|
|
|
|
self.data_file_size
|
|
.fetch_add(disk_size as u64, Ordering::Relaxed);
|
|
|
|
Ok(entry)
|
|
}
|
|
|
|
/// Append a tombstone record. Must be called under write_lock.
|
|
fn append_tombstone(&self, key: &str, timestamp: u64) -> StorageResult<u32> {
|
|
let record = DataRecord {
|
|
timestamp,
|
|
key: key.as_bytes().to_vec(),
|
|
value: vec![],
|
|
};
|
|
let encoded = record.encode();
|
|
let disk_size = encoded.len() as u32;
|
|
|
|
let data_path = self.data_path();
|
|
let mut file = std::fs::OpenOptions::new()
|
|
.create(true)
|
|
.append(true)
|
|
.open(&data_path)?;
|
|
|
|
file.write_all(&encoded)?;
|
|
file.sync_all()?;
|
|
|
|
self.data_file_size
|
|
.fetch_add(disk_size as u64, Ordering::Relaxed);
|
|
|
|
Ok(disk_size)
|
|
}
|
|
|
|
/// Try compaction if dead bytes exceed the threshold.
|
|
/// Must be called while holding the write_lock.
|
|
fn try_compact(&self) {
|
|
let dead = self.dead_bytes.load(Ordering::Relaxed);
|
|
let total = self.data_file_size.load(Ordering::Relaxed);
|
|
if crate::compaction::should_compact(dead, total) {
|
|
debug!("triggering compaction for {:?} (dead={dead}, total={total})", self.coll_dir);
|
|
if let Err(e) = crate::compaction::compact_data_file(
|
|
&self.data_path(),
|
|
&self.keydir,
|
|
&self.dead_bytes,
|
|
&self.data_file_size,
|
|
) {
|
|
tracing::warn!("compaction failed for {:?}: {e}", self.coll_dir);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Collection cache key: "db\0coll"
|
|
// ---------------------------------------------------------------------------
|
|
|
|
fn coll_cache_key(db: &str, coll: &str) -> String {
|
|
format!("{db}\0{coll}")
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// FileStorageAdapter
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/// Bitcask-style file storage adapter.
|
|
pub struct FileStorageAdapter {
|
|
base_path: PathBuf,
|
|
/// Cache of initialized collection states, keyed by "db\0coll".
|
|
collections: DashMap<String, Arc<CollectionState>>,
|
|
/// Handle to the periodic compaction task. Dropped on close.
|
|
compaction_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
|
|
}
|
|
|
|
impl FileStorageAdapter {
|
|
pub fn new(base_path: impl Into<PathBuf>) -> Self {
|
|
Self {
|
|
base_path: base_path.into(),
|
|
collections: DashMap::new(),
|
|
compaction_handle: std::sync::Mutex::new(None),
|
|
}
|
|
}
|
|
|
|
fn db_dir(&self, db: &str) -> PathBuf {
|
|
self.base_path.join(db)
|
|
}
|
|
|
|
fn coll_dir(&self, db: &str, coll: &str) -> PathBuf {
|
|
self.db_dir(db).join(coll)
|
|
}
|
|
|
|
/// Get or initialize the CollectionState for a given db/coll.
|
|
fn get_or_init_collection(
|
|
&self,
|
|
db: &str,
|
|
coll: &str,
|
|
) -> StorageResult<Arc<CollectionState>> {
|
|
let key = coll_cache_key(db, coll);
|
|
if let Some(state) = self.collections.get(&key) {
|
|
return Ok(Arc::clone(state.value()));
|
|
}
|
|
|
|
let coll_dir = self.coll_dir(db, coll);
|
|
if !coll_dir.exists() {
|
|
return Err(StorageError::NotFound(format!(
|
|
"collection '{db}.{coll}'"
|
|
)));
|
|
}
|
|
|
|
let state = self.init_collection_state(&coll_dir)?;
|
|
let state = Arc::new(state);
|
|
self.collections.insert(key, Arc::clone(&state));
|
|
Ok(state)
|
|
}
|
|
|
|
/// Initialize a CollectionState from disk.
|
|
fn init_collection_state(&self, coll_dir: &PathBuf) -> StorageResult<CollectionState> {
|
|
let data_path = coll_dir.join("data.rdb");
|
|
let wal_path = coll_dir.join("wal.rdb");
|
|
let hint_path = coll_dir.join("keydir.hint");
|
|
|
|
// Try loading from hint file first, fall back to data file scan
|
|
let (keydir, dead_bytes) = if hint_path.exists() && data_path.exists() {
|
|
match KeyDir::load_from_hint_file(&hint_path) {
|
|
Ok(Some(kd)) => {
|
|
debug!("loaded KeyDir from hint file: {:?}", hint_path);
|
|
// We don't know dead_bytes from the hint file; estimate from file size
|
|
let file_size = std::fs::metadata(&data_path)
|
|
.map(|m| m.len())
|
|
.unwrap_or(FILE_HEADER_SIZE as u64);
|
|
let live_bytes: u64 = {
|
|
let mut total = 0u64;
|
|
kd.for_each(|_, e| total += e.record_len as u64);
|
|
total
|
|
};
|
|
let dead = file_size.saturating_sub(FILE_HEADER_SIZE as u64).saturating_sub(live_bytes);
|
|
(kd, dead)
|
|
}
|
|
_ => {
|
|
debug!("hint file invalid, rebuilding KeyDir from data file");
|
|
KeyDir::build_from_data_file(&data_path)?
|
|
}
|
|
}
|
|
} else if data_path.exists() {
|
|
KeyDir::build_from_data_file(&data_path)?
|
|
} else {
|
|
(KeyDir::new(), 0)
|
|
};
|
|
|
|
let doc_count = keydir.len();
|
|
let data_file_size = if data_path.exists() {
|
|
std::fs::metadata(&data_path)?.len()
|
|
} else {
|
|
FILE_HEADER_SIZE as u64
|
|
};
|
|
|
|
// Initialize WAL and recover
|
|
let wal = BinaryWal::new(wal_path);
|
|
wal.initialize()?;
|
|
|
|
// Recover uncommitted WAL entries
|
|
let uncommitted = wal.recover()?;
|
|
if !uncommitted.is_empty() {
|
|
debug!(
|
|
"recovering {} uncommitted WAL entries for {:?}",
|
|
uncommitted.len(),
|
|
coll_dir
|
|
);
|
|
}
|
|
|
|
let state = CollectionState {
|
|
keydir,
|
|
coll_dir: coll_dir.clone(),
|
|
wal: std::sync::Mutex::new(wal),
|
|
write_lock: std::sync::Mutex::new(()),
|
|
doc_count: AtomicU64::new(doc_count),
|
|
dead_bytes: AtomicU64::new(dead_bytes),
|
|
data_file_size: AtomicU64::new(data_file_size),
|
|
};
|
|
|
|
// Replay uncommitted entries
|
|
for entry in uncommitted {
|
|
let key_str = String::from_utf8(entry.key.clone())
|
|
.map_err(|e| StorageError::CorruptRecord(format!("invalid UTF-8 key in WAL: {e}")))?;
|
|
|
|
match entry.op {
|
|
WalOpType::Insert | WalOpType::Update => {
|
|
if !entry.value.is_empty() {
|
|
let ts = now_ms();
|
|
let new_entry = state.append_record(&key_str, &entry.value, ts)?;
|
|
|
|
if entry.op == WalOpType::Insert {
|
|
if state.keydir.insert(key_str, new_entry).is_none() {
|
|
state.doc_count.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
} else {
|
|
if let Some(prev) = state.keydir.insert(key_str, new_entry) {
|
|
state.dead_bytes.fetch_add(prev.record_len as u64, Ordering::Relaxed);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
WalOpType::Delete => {
|
|
let ts = now_ms();
|
|
let tombstone_size = state.append_tombstone(&key_str, ts)?;
|
|
if let Some(prev) = state.keydir.remove(&key_str) {
|
|
state.dead_bytes.fetch_add(prev.record_len as u64 + tombstone_size as u64, Ordering::Relaxed);
|
|
state.doc_count.fetch_sub(1, Ordering::Relaxed);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Truncate WAL after successful recovery
|
|
if let Ok(wal) = state.wal.lock() {
|
|
wal.truncate()?;
|
|
}
|
|
|
|
Ok(state)
|
|
}
|
|
|
|
/// Create a fresh, empty collection on disk. Returns the new CollectionState.
|
|
fn create_empty_collection(&self, db: &str, coll: &str) -> StorageResult<Arc<CollectionState>> {
|
|
let coll_dir = self.coll_dir(db, coll);
|
|
std::fs::create_dir_all(&coll_dir)?;
|
|
|
|
// Write empty data file with header
|
|
let data_path = coll_dir.join("data.rdb");
|
|
{
|
|
let mut f = std::fs::File::create(&data_path)?;
|
|
let hdr = FileHeader::new(FileType::Data);
|
|
f.write_all(&hdr.encode())?;
|
|
f.sync_all()?;
|
|
}
|
|
|
|
// Write default _id index
|
|
let idx_spec = vec![doc! { "name": "_id_", "key": { "_id": 1 } }];
|
|
let idx_json = serde_json::to_string_pretty(&idx_spec)?;
|
|
let idx_path = coll_dir.join("indexes.json");
|
|
std::fs::write(&idx_path, idx_json)?;
|
|
|
|
// Initialize WAL
|
|
let wal_path = coll_dir.join("wal.rdb");
|
|
let wal = BinaryWal::new(wal_path);
|
|
wal.initialize()?;
|
|
|
|
let state = Arc::new(CollectionState {
|
|
keydir: KeyDir::new(),
|
|
coll_dir,
|
|
wal: std::sync::Mutex::new(wal),
|
|
write_lock: std::sync::Mutex::new(()),
|
|
doc_count: AtomicU64::new(0),
|
|
dead_bytes: AtomicU64::new(0),
|
|
data_file_size: AtomicU64::new(FILE_HEADER_SIZE as u64),
|
|
});
|
|
|
|
let key = coll_cache_key(db, coll);
|
|
self.collections.insert(key, Arc::clone(&state));
|
|
|
|
Ok(state)
|
|
}
|
|
|
|
fn extract_id_hex(doc: &Document) -> StorageResult<String> {
|
|
match doc.get("_id") {
|
|
Some(bson::Bson::ObjectId(oid)) => Ok(oid.to_hex()),
|
|
Some(bson::Bson::String(s)) => Ok(s.clone()),
|
|
Some(other) => Ok(format!("{other}")),
|
|
None => Err(StorageError::NotFound("document missing _id".into())),
|
|
}
|
|
}
|
|
|
|
/// Scan database directory for collection subdirectories.
|
|
fn list_collection_dirs(&self, db: &str) -> StorageResult<Vec<String>> {
|
|
let db_dir = self.db_dir(db);
|
|
if !db_dir.exists() {
|
|
return Err(StorageError::NotFound(format!("database '{db}'")));
|
|
}
|
|
let mut colls = Vec::new();
|
|
let entries = std::fs::read_dir(&db_dir)?;
|
|
for entry in entries {
|
|
let entry = entry?;
|
|
if entry.file_type()?.is_dir() {
|
|
if let Some(name) = entry.file_name().to_str() {
|
|
// Only consider directories that contain data.rdb
|
|
let data_path = entry.path().join("data.rdb");
|
|
if data_path.exists() {
|
|
colls.push(name.to_string());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(colls)
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// StorageAdapter implementation
|
|
// ---------------------------------------------------------------------------
|
|
|
|
#[async_trait]
|
|
impl StorageAdapter for FileStorageAdapter {
|
|
async fn initialize(&self) -> StorageResult<()> {
|
|
std::fs::create_dir_all(&self.base_path)?;
|
|
debug!("FileStorageAdapter initialized at {:?}", self.base_path);
|
|
|
|
// Pre-load all existing collections
|
|
if let Ok(entries) = std::fs::read_dir(&self.base_path) {
|
|
for entry in entries.flatten() {
|
|
if entry.file_type().map(|ft| ft.is_dir()).unwrap_or(false) {
|
|
if let Some(db_name) = entry.file_name().to_str() {
|
|
let db_name = db_name.to_string();
|
|
if let Ok(colls) = self.list_collection_dirs(&db_name) {
|
|
for coll_name in colls {
|
|
let _ = self.get_or_init_collection(&db_name, &coll_name);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Start periodic compaction task (runs every 24 hours)
|
|
{
|
|
let collections = self.collections.clone();
|
|
let handle = tokio::spawn(async move {
|
|
let mut interval = tokio::time::interval(std::time::Duration::from_secs(24 * 60 * 60));
|
|
interval.tick().await; // first tick is immediate — skip it
|
|
loop {
|
|
interval.tick().await;
|
|
debug!("running periodic compaction sweep");
|
|
for entry in collections.iter() {
|
|
let state = entry.value();
|
|
let _guard = state.write_lock.lock().unwrap();
|
|
state.try_compact();
|
|
}
|
|
}
|
|
});
|
|
*self.compaction_handle.lock().unwrap() = Some(handle);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn close(&self) -> StorageResult<()> {
|
|
// Stop the periodic compaction task
|
|
if let Some(handle) = self.compaction_handle.lock().unwrap().take() {
|
|
handle.abort();
|
|
}
|
|
|
|
// Persist all KeyDir hint files
|
|
for entry in self.collections.iter() {
|
|
let state = entry.value();
|
|
let _ = state.keydir.persist_to_hint_file(&state.hint_path());
|
|
}
|
|
debug!("FileStorageAdapter closed");
|
|
Ok(())
|
|
}
|
|
|
|
// ---- database ----
|
|
|
|
async fn list_databases(&self) -> StorageResult<Vec<String>> {
|
|
let mut dbs = Vec::new();
|
|
let entries = std::fs::read_dir(&self.base_path)?;
|
|
for entry in entries {
|
|
let entry = entry?;
|
|
if entry.file_type()?.is_dir() {
|
|
if let Some(name) = entry.file_name().to_str() {
|
|
dbs.push(name.to_string());
|
|
}
|
|
}
|
|
}
|
|
Ok(dbs)
|
|
}
|
|
|
|
async fn create_database(&self, db: &str) -> StorageResult<()> {
|
|
let dir = self.db_dir(db);
|
|
if dir.exists() {
|
|
return Err(StorageError::AlreadyExists(format!("database '{db}'")));
|
|
}
|
|
std::fs::create_dir_all(&dir)?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn drop_database(&self, db: &str) -> StorageResult<()> {
|
|
// Remove cached states for all collections in this db
|
|
let prefix = format!("{db}\0");
|
|
self.collections.retain(|k, _| !k.starts_with(&prefix));
|
|
|
|
let dir = self.db_dir(db);
|
|
if dir.exists() {
|
|
std::fs::remove_dir_all(&dir)?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn database_exists(&self, db: &str) -> StorageResult<bool> {
|
|
Ok(self.db_dir(db).exists())
|
|
}
|
|
|
|
// ---- collection ----
|
|
|
|
async fn list_collections(&self, db: &str) -> StorageResult<Vec<String>> {
|
|
self.list_collection_dirs(db)
|
|
}
|
|
|
|
async fn create_collection(&self, db: &str, coll: &str) -> StorageResult<()> {
|
|
let coll_dir = self.coll_dir(db, coll);
|
|
if coll_dir.exists() {
|
|
return Err(StorageError::AlreadyExists(format!(
|
|
"collection '{db}.{coll}'"
|
|
)));
|
|
}
|
|
std::fs::create_dir_all(self.db_dir(db))?;
|
|
self.create_empty_collection(db, coll)?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn drop_collection(&self, db: &str, coll: &str) -> StorageResult<()> {
|
|
let key = coll_cache_key(db, coll);
|
|
self.collections.remove(&key);
|
|
|
|
let coll_dir = self.coll_dir(db, coll);
|
|
if coll_dir.exists() {
|
|
std::fs::remove_dir_all(&coll_dir)?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn collection_exists(&self, db: &str, coll: &str) -> StorageResult<bool> {
|
|
let coll_dir = self.coll_dir(db, coll);
|
|
Ok(coll_dir.exists() && coll_dir.join("data.rdb").exists())
|
|
}
|
|
|
|
async fn rename_collection(
|
|
&self,
|
|
db: &str,
|
|
old_name: &str,
|
|
new_name: &str,
|
|
) -> StorageResult<()> {
|
|
let old_dir = self.coll_dir(db, old_name);
|
|
let new_dir = self.coll_dir(db, new_name);
|
|
if !old_dir.exists() {
|
|
return Err(StorageError::NotFound(format!(
|
|
"collection '{db}.{old_name}'"
|
|
)));
|
|
}
|
|
if new_dir.exists() {
|
|
return Err(StorageError::AlreadyExists(format!(
|
|
"collection '{db}.{new_name}'"
|
|
)));
|
|
}
|
|
|
|
// Remove old cache entry
|
|
let old_key = coll_cache_key(db, old_name);
|
|
self.collections.remove(&old_key);
|
|
|
|
// Rename directory
|
|
std::fs::rename(&old_dir, &new_dir)?;
|
|
|
|
// Re-init under new name
|
|
let _ = self.get_or_init_collection(db, new_name);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// ---- document writes ----
|
|
|
|
async fn insert_one(
|
|
&self,
|
|
db: &str,
|
|
coll: &str,
|
|
mut doc: Document,
|
|
) -> StorageResult<String> {
|
|
if !doc.contains_key("_id") {
|
|
doc.insert("_id", ObjectId::new());
|
|
}
|
|
let id = Self::extract_id_hex(&doc)?;
|
|
|
|
let state = self.get_or_init_collection(db, coll)?;
|
|
|
|
// Check for duplicate
|
|
if state.keydir.contains(&id) {
|
|
return Err(StorageError::AlreadyExists(format!("document '{id}'")));
|
|
}
|
|
|
|
let bson_bytes = bson::to_vec(&doc)
|
|
.map_err(|e| StorageError::SerializationError(e.to_string()))?;
|
|
let ts = now_ms();
|
|
|
|
let _guard = state.write_lock.lock().unwrap();
|
|
|
|
// WAL → data → KeyDir → WAL commit
|
|
{
|
|
let wal = state.wal.lock().unwrap();
|
|
let seq = wal.append(WalOpType::Insert, id.as_bytes(), &bson_bytes)?;
|
|
let entry = state.append_record(&id, &bson_bytes, ts)?;
|
|
state.keydir.insert(id.clone(), entry);
|
|
state.doc_count.fetch_add(1, Ordering::Relaxed);
|
|
wal.append_commit(seq)?;
|
|
}
|
|
|
|
// Insert doesn't create dead bytes, but check anyway for consistency
|
|
state.try_compact();
|
|
|
|
Ok(id)
|
|
}
|
|
|
|
async fn insert_many(
|
|
&self,
|
|
db: &str,
|
|
coll: &str,
|
|
mut docs: Vec<Document>,
|
|
) -> StorageResult<Vec<String>> {
|
|
let state = self.get_or_init_collection(db, coll)?;
|
|
let mut ids = Vec::with_capacity(docs.len());
|
|
|
|
let _guard = state.write_lock.lock().unwrap();
|
|
|
|
for doc in &mut docs {
|
|
if !doc.contains_key("_id") {
|
|
doc.insert("_id", ObjectId::new());
|
|
}
|
|
let id = Self::extract_id_hex(doc)?;
|
|
|
|
let bson_bytes = bson::to_vec(doc)
|
|
.map_err(|e| StorageError::SerializationError(e.to_string()))?;
|
|
let ts = now_ms();
|
|
|
|
let wal = state.wal.lock().unwrap();
|
|
let seq = wal.append(WalOpType::Insert, id.as_bytes(), &bson_bytes)?;
|
|
let entry = state.append_record(&id, &bson_bytes, ts)?;
|
|
state.keydir.insert(id.clone(), entry);
|
|
state.doc_count.fetch_add(1, Ordering::Relaxed);
|
|
wal.append_commit(seq)?;
|
|
|
|
ids.push(id);
|
|
}
|
|
|
|
state.try_compact();
|
|
|
|
Ok(ids)
|
|
}
|
|
|
|
async fn update_by_id(
|
|
&self,
|
|
db: &str,
|
|
coll: &str,
|
|
id: &str,
|
|
doc: Document,
|
|
) -> StorageResult<()> {
|
|
let state = self.get_or_init_collection(db, coll)?;
|
|
|
|
if !state.keydir.contains(id) {
|
|
return Err(StorageError::NotFound(format!("document '{id}'")));
|
|
}
|
|
|
|
let bson_bytes = bson::to_vec(&doc)
|
|
.map_err(|e| StorageError::SerializationError(e.to_string()))?;
|
|
let ts = now_ms();
|
|
|
|
let _guard = state.write_lock.lock().unwrap();
|
|
|
|
let wal = state.wal.lock().unwrap();
|
|
let seq = wal.append(WalOpType::Update, id.as_bytes(), &bson_bytes)?;
|
|
let new_entry = state.append_record(id, &bson_bytes, ts)?;
|
|
|
|
if let Some(prev) = state.keydir.insert(id.to_string(), new_entry) {
|
|
state
|
|
.dead_bytes
|
|
.fetch_add(prev.record_len as u64, Ordering::Relaxed);
|
|
}
|
|
wal.append_commit(seq)?;
|
|
|
|
state.try_compact();
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn delete_by_id(
|
|
&self,
|
|
db: &str,
|
|
coll: &str,
|
|
id: &str,
|
|
) -> StorageResult<()> {
|
|
let state = self.get_or_init_collection(db, coll)?;
|
|
|
|
if !state.keydir.contains(id) {
|
|
return Err(StorageError::NotFound(format!("document '{id}'")));
|
|
}
|
|
|
|
let ts = now_ms();
|
|
let _guard = state.write_lock.lock().unwrap();
|
|
|
|
let wal = state.wal.lock().unwrap();
|
|
let seq = wal.append(WalOpType::Delete, id.as_bytes(), &[])?;
|
|
let tombstone_size = state.append_tombstone(id, ts)?;
|
|
|
|
if let Some(prev) = state.keydir.remove(id) {
|
|
state.dead_bytes.fetch_add(
|
|
prev.record_len as u64 + tombstone_size as u64,
|
|
Ordering::Relaxed,
|
|
);
|
|
state.doc_count.fetch_sub(1, Ordering::Relaxed);
|
|
}
|
|
wal.append_commit(seq)?;
|
|
|
|
state.try_compact();
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn delete_by_ids(
|
|
&self,
|
|
db: &str,
|
|
coll: &str,
|
|
ids: &[String],
|
|
) -> StorageResult<()> {
|
|
let state = self.get_or_init_collection(db, coll)?;
|
|
let _guard = state.write_lock.lock().unwrap();
|
|
|
|
for id in ids {
|
|
if !state.keydir.contains(id) {
|
|
continue; // skip non-existent
|
|
}
|
|
|
|
let ts = now_ms();
|
|
let wal = state.wal.lock().unwrap();
|
|
let seq = wal.append(WalOpType::Delete, id.as_bytes(), &[])?;
|
|
let tombstone_size = state.append_tombstone(id, ts)?;
|
|
|
|
if let Some(prev) = state.keydir.remove(id) {
|
|
state.dead_bytes.fetch_add(
|
|
prev.record_len as u64 + tombstone_size as u64,
|
|
Ordering::Relaxed,
|
|
);
|
|
state.doc_count.fetch_sub(1, Ordering::Relaxed);
|
|
}
|
|
wal.append_commit(seq)?;
|
|
}
|
|
|
|
state.try_compact();
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// ---- document reads ----
|
|
|
|
async fn find_all(&self, db: &str, coll: &str) -> StorageResult<Vec<Document>> {
|
|
let state = self.get_or_init_collection(db, coll)?;
|
|
let mut docs = Vec::with_capacity(state.doc_count.load(Ordering::Relaxed) as usize);
|
|
|
|
state.keydir.for_each(|_key, entry| {
|
|
// We collect entries first, then read. This avoids holding DashMap refs during I/O.
|
|
docs.push(*entry);
|
|
});
|
|
|
|
let mut result = Vec::with_capacity(docs.len());
|
|
for entry in &docs {
|
|
match state.read_document(entry) {
|
|
Ok(doc) => result.push(doc),
|
|
Err(e) => {
|
|
tracing::warn!("skipping unreadable document: {e}");
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
async fn find_by_ids(
|
|
&self,
|
|
db: &str,
|
|
coll: &str,
|
|
ids: HashSet<String>,
|
|
) -> StorageResult<Vec<Document>> {
|
|
let state = self.get_or_init_collection(db, coll)?;
|
|
let mut docs = Vec::with_capacity(ids.len());
|
|
|
|
for id in &ids {
|
|
if let Some(entry) = state.keydir.get(id) {
|
|
match state.read_document(&entry) {
|
|
Ok(doc) => docs.push(doc),
|
|
Err(e) => {
|
|
tracing::warn!("skipping unreadable document {id}: {e}");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(docs)
|
|
}
|
|
|
|
async fn find_by_id(
|
|
&self,
|
|
db: &str,
|
|
coll: &str,
|
|
id: &str,
|
|
) -> StorageResult<Option<Document>> {
|
|
let state = self.get_or_init_collection(db, coll)?;
|
|
|
|
match state.keydir.get(id) {
|
|
Some(entry) => {
|
|
let doc = state.read_document(&entry)?;
|
|
Ok(Some(doc))
|
|
}
|
|
None => Ok(None),
|
|
}
|
|
}
|
|
|
|
async fn count(&self, db: &str, coll: &str) -> StorageResult<u64> {
|
|
let state = self.get_or_init_collection(db, coll)?;
|
|
Ok(state.doc_count.load(Ordering::Relaxed))
|
|
}
|
|
|
|
// ---- indexes ----
|
|
|
|
async fn save_index(
|
|
&self,
|
|
db: &str,
|
|
coll: &str,
|
|
name: &str,
|
|
spec: Document,
|
|
) -> StorageResult<()> {
|
|
let state = self.get_or_init_collection(db, coll)?;
|
|
let path = state.index_path();
|
|
|
|
let mut indexes = self.read_index_file(&path)?;
|
|
indexes.retain(|s| {
|
|
s.get_str("name").unwrap_or("") != name
|
|
});
|
|
let mut full_spec = spec;
|
|
full_spec.insert("name", name);
|
|
indexes.push(full_spec);
|
|
self.write_index_file(&path, &indexes)?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn get_indexes(
|
|
&self,
|
|
db: &str,
|
|
coll: &str,
|
|
) -> StorageResult<Vec<Document>> {
|
|
let state = self.get_or_init_collection(db, coll)?;
|
|
self.read_index_file(&state.index_path())
|
|
}
|
|
|
|
async fn drop_index(
|
|
&self,
|
|
db: &str,
|
|
coll: &str,
|
|
name: &str,
|
|
) -> StorageResult<()> {
|
|
let state = self.get_or_init_collection(db, coll)?;
|
|
let path = state.index_path();
|
|
|
|
let mut indexes = self.read_index_file(&path)?;
|
|
let before = indexes.len();
|
|
indexes.retain(|s| {
|
|
s.get_str("name").unwrap_or("") != name
|
|
});
|
|
if indexes.len() == before {
|
|
return Err(StorageError::NotFound(format!("index '{name}'")));
|
|
}
|
|
self.write_index_file(&path, &indexes)?;
|
|
Ok(())
|
|
}
|
|
|
|
// ---- snapshot / conflict detection ----
|
|
|
|
async fn create_snapshot(&self, _db: &str, _coll: &str) -> StorageResult<i64> {
|
|
Ok(now_ms() as i64)
|
|
}
|
|
|
|
async fn has_conflicts(
|
|
&self,
|
|
db: &str,
|
|
coll: &str,
|
|
ids: &HashSet<String>,
|
|
snapshot_time: i64,
|
|
) -> StorageResult<bool> {
|
|
let state = self.get_or_init_collection(db, coll)?;
|
|
let snapshot_ts = snapshot_time as u64;
|
|
|
|
for id in ids {
|
|
if let Some(entry) = state.keydir.get(id) {
|
|
if entry.timestamp > snapshot_ts {
|
|
return Ok(true);
|
|
}
|
|
}
|
|
}
|
|
Ok(false)
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Index file helpers (JSON — kept simple, rarely written)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
impl FileStorageAdapter {
|
|
fn read_index_file(&self, path: &PathBuf) -> StorageResult<Vec<Document>> {
|
|
if !path.exists() {
|
|
return Ok(vec![]);
|
|
}
|
|
let data = std::fs::read_to_string(path)?;
|
|
let json_docs: Vec<serde_json::Value> = serde_json::from_str(&data)?;
|
|
let mut docs = Vec::with_capacity(json_docs.len());
|
|
for jv in json_docs {
|
|
let bson_val: bson::Bson = serde_json::from_value(jv)
|
|
.map_err(|e| StorageError::SerializationError(e.to_string()))?;
|
|
let doc = bson_val
|
|
.as_document()
|
|
.ok_or_else(|| StorageError::SerializationError("expected document".into()))?
|
|
.clone();
|
|
docs.push(doc);
|
|
}
|
|
Ok(docs)
|
|
}
|
|
|
|
fn write_index_file(&self, path: &PathBuf, specs: &[Document]) -> StorageResult<()> {
|
|
let json_vals: Vec<serde_json::Value> = specs
|
|
.iter()
|
|
.map(|d| {
|
|
let b = bson::to_bson(d)
|
|
.map_err(|e| StorageError::SerializationError(e.to_string()))?;
|
|
serde_json::to_value(&b)
|
|
.map_err(|e| StorageError::SerializationError(e.to_string()))
|
|
})
|
|
.collect::<StorageResult<Vec<_>>>()?;
|
|
let json = serde_json::to_string_pretty(&json_vals)?;
|
|
std::fs::write(path, json)?;
|
|
Ok(())
|
|
}
|
|
}
|