feat(rust-provider): Add Rust-backed provider with XFS-safe durability via IPC bridge, TypeScript provider, tests and docs

This commit is contained in:
2026-03-05 19:36:11 +00:00
parent 61e3f3a0b6
commit 5283247bea
24 changed files with 14453 additions and 1248 deletions

View File

@@ -0,0 +1,15 @@
[package]
name = "smartfs-core"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
smartfs-protocol = { path = "../smartfs-protocol" }
serde.workspace = true
serde_json.workspace = true
notify.workspace = true
libc = "0.2"
regex-lite = "0.1"
filetime = "0.2"
base64.workspace = true

View File

@@ -0,0 +1,5 @@
mod ops;
mod watch;
pub use ops::FsOps;
pub use watch::WatchManager;

View File

@@ -0,0 +1,649 @@
use base64::{Engine as _, engine::general_purpose::STANDARD};
use smartfs_protocol::*;
use std::fs;
use std::io;
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
/// Filesystem operations with XFS-safe fsync after metadata changes.
pub struct FsOps;
impl FsOps {
// ── Safety primitive ────────────────────────────────────────────────
/// Fsync a parent directory to ensure metadata durability on XFS.
/// This is the key operation that Node.js cannot do.
fn fsync_parent(path: &Path) -> io::Result<()> {
if let Some(parent) = path.parent() {
let dir = fs::File::open(parent)?;
dir.sync_all()?;
}
Ok(())
}
/// Fsync a specific directory.
fn fsync_dir(path: &Path) -> io::Result<()> {
let dir = fs::File::open(path)?;
dir.sync_all()?;
Ok(())
}
// ── File operations ─────────────────────────────────────────────────
pub fn read_file(params: &ReadFileParams) -> Result<serde_json::Value, String> {
let path = Path::new(&params.path);
let bytes = fs::read(path).map_err(|e| format!("read_file: {}", e))?;
let encoding = params.encoding.as_deref().unwrap_or("utf8");
match encoding {
"base64" => {
let encoded = STANDARD.encode(&bytes);
Ok(serde_json::json!({ "content": encoded }))
}
"hex" => {
let hex: String = bytes.iter().map(|b| format!("{:02x}", b)).collect();
Ok(serde_json::json!({ "content": hex }))
}
"buffer" => {
let encoded = STANDARD.encode(&bytes);
Ok(serde_json::json!({ "content": encoded, "isBuffer": true }))
}
_ => {
// utf8, utf-8, ascii
let content = String::from_utf8_lossy(&bytes).into_owned();
Ok(serde_json::json!({ "content": content }))
}
}
}
pub fn write_file(params: &WriteFileParams) -> Result<(), String> {
let path = Path::new(&params.path);
let content: Vec<u8> = if params.encoding.as_deref() == Some("base64") {
STANDARD.decode(&params.content).map_err(|e| format!("write_file base64 decode: {}", e))?
} else {
params.content.as_bytes().to_vec()
};
// Ensure parent directory exists
if let Some(parent) = path.parent() {
if !parent.exists() {
fs::create_dir_all(parent).map_err(|e| format!("write_file mkdir: {}", e))?;
Self::fsync_parent(parent).ok();
}
}
if params.atomic.unwrap_or(false) {
// Atomic write: write to temp → fsync file → rename → fsync parent
let temp_path = path.with_extension(format!(
"tmp.{}",
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
// Write to temp file
fs::write(&temp_path, content).map_err(|e| format!("write_file temp: {}", e))?;
// Fsync the temp file data
let f = fs::File::open(&temp_path).map_err(|e| format!("write_file open temp: {}", e))?;
f.sync_all().map_err(|e| format!("write_file fsync temp: {}", e))?;
drop(f);
// Set mode if requested
if let Some(mode) = params.mode {
fs::set_permissions(&temp_path, fs::Permissions::from_mode(mode))
.map_err(|e| format!("write_file chmod: {}", e))?;
}
// Rename (atomic on same filesystem)
fs::rename(&temp_path, path).map_err(|e| {
// Clean up temp on failure
let _ = fs::remove_file(&temp_path);
format!("write_file rename: {}", e)
})?;
// Fsync parent to ensure the rename is durable
Self::fsync_parent(path).map_err(|e| format!("write_file fsync parent: {}", e))?;
} else {
fs::write(path, content).map_err(|e| format!("write_file: {}", e))?;
// Fsync the file
let f = fs::File::open(path).map_err(|e| format!("write_file open: {}", e))?;
f.sync_all().map_err(|e| format!("write_file fsync: {}", e))?;
drop(f);
if let Some(mode) = params.mode {
fs::set_permissions(path, fs::Permissions::from_mode(mode))
.map_err(|e| format!("write_file chmod: {}", e))?;
}
// Fsync parent for new file creation
Self::fsync_parent(path).map_err(|e| format!("write_file fsync parent: {}", e))?;
}
Ok(())
}
pub fn append_file(params: &AppendFileParams) -> Result<(), String> {
use std::io::Write;
let path = Path::new(&params.path);
let content: Vec<u8> = if params.encoding.as_deref() == Some("base64") {
STANDARD.decode(&params.content).map_err(|e| format!("append_file base64 decode: {}", e))?
} else {
params.content.as_bytes().to_vec()
};
let mut file = fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.map_err(|e| format!("append_file: {}", e))?;
file.write_all(&content)
.map_err(|e| format!("append_file write: {}", e))?;
file.sync_all()
.map_err(|e| format!("append_file fsync: {}", e))?;
// Fsync parent in case this created the file
Self::fsync_parent(path).ok();
Ok(())
}
pub fn delete_file(path: &Path) -> Result<(), String> {
fs::remove_file(path).map_err(|e| format!("delete_file: {}", e))?;
Self::fsync_parent(path).map_err(|e| format!("delete_file fsync parent: {}", e))?;
Ok(())
}
pub fn copy_file(params: &CopyMoveParams) -> Result<(), String> {
let from = Path::new(&params.from);
let to = Path::new(&params.to);
if !params.overwrite.unwrap_or(true) && to.exists() {
return Err("copy_file: destination already exists".to_string());
}
fs::copy(from, to).map_err(|e| format!("copy_file: {}", e))?;
if params.preserve_timestamps.unwrap_or(false) {
// Copy timestamps
let metadata = fs::metadata(from).map_err(|e| format!("copy_file stat: {}", e))?;
let atime = filetime::FileTime::from_last_access_time(&metadata);
let mtime = filetime::FileTime::from_last_modification_time(&metadata);
filetime::set_file_times(to, atime, mtime).ok();
}
// Fsync parent after creating new file entry
Self::fsync_parent(to).map_err(|e| format!("copy_file fsync parent: {}", e))?;
Ok(())
}
pub fn move_file(params: &CopyMoveParams) -> Result<(), String> {
let from = Path::new(&params.from);
let to = Path::new(&params.to);
if !params.overwrite.unwrap_or(true) && to.exists() {
return Err("move_file: destination already exists".to_string());
}
match fs::rename(from, to) {
Ok(()) => {
// Fsync both parent directories (source and dest may differ)
Self::fsync_parent(from).ok();
Self::fsync_parent(to).map_err(|e| format!("move_file fsync parent: {}", e))?;
}
Err(e) if e.raw_os_error() == Some(libc::EXDEV) => {
// Cross-device: copy then delete
Self::copy_file(params)?;
Self::delete_file(from)?;
}
Err(e) => return Err(format!("move_file: {}", e)),
}
Ok(())
}
pub fn file_exists(path: &Path) -> bool {
path.exists() && path.is_file()
}
pub fn file_stat(path: &Path) -> Result<FileStats, String> {
Self::stat_path(path)
}
// ── Directory operations ────────────────────────────────────────────
pub fn list_directory(params: &ListDirectoryParams) -> Result<Vec<DirectoryEntry>, String> {
let path = Path::new(&params.path);
let mut entries = Vec::new();
if params.recursive.unwrap_or(false) {
Self::list_directory_recursive(path, &mut entries, params)?;
} else {
let dir_entries = fs::read_dir(path)
.map_err(|e| format!("list_directory: {}", e))?;
for entry_result in dir_entries {
let entry = entry_result.map_err(|e| format!("list_directory entry: {}", e))?;
let dir_entry = Self::to_directory_entry(&entry, params)?;
if let Some(filter) = &params.filter {
if !Self::matches_filter(&dir_entry.name, filter) {
continue;
}
}
entries.push(dir_entry);
}
}
Ok(entries)
}
fn list_directory_recursive(
path: &Path,
entries: &mut Vec<DirectoryEntry>,
params: &ListDirectoryParams,
) -> Result<(), String> {
let dir_entries = fs::read_dir(path)
.map_err(|e| format!("list_directory_recursive: {}", e))?;
for entry_result in dir_entries {
let entry = entry_result.map_err(|e| format!("list_directory entry: {}", e))?;
let dir_entry = Self::to_directory_entry(&entry, params)?;
let matches = if let Some(filter) = &params.filter {
Self::matches_filter(&dir_entry.name, filter)
} else {
true
};
if matches {
entries.push(dir_entry.clone());
}
if dir_entry.is_directory {
Self::list_directory_recursive(&entry.path(), entries, params)?;
}
}
Ok(())
}
pub fn create_directory(params: &CreateDirectoryParams) -> Result<(), String> {
let path = Path::new(&params.path);
if params.recursive.unwrap_or(true) {
fs::create_dir_all(path).map_err(|e| format!("create_directory: {}", e))?;
} else {
fs::create_dir(path).map_err(|e| format!("create_directory: {}", e))?;
}
if let Some(mode) = params.mode {
fs::set_permissions(path, fs::Permissions::from_mode(mode))
.map_err(|e| format!("create_directory chmod: {}", e))?;
}
// Fsync parent to ensure directory entry is durable
Self::fsync_parent(path).map_err(|e| format!("create_directory fsync: {}", e))?;
Ok(())
}
pub fn delete_directory(params: &DeleteDirectoryParams) -> Result<(), String> {
let path = Path::new(&params.path);
if params.recursive.unwrap_or(true) {
fs::remove_dir_all(path).map_err(|e| format!("delete_directory: {}", e))?;
} else {
fs::remove_dir(path).map_err(|e| format!("delete_directory: {}", e))?;
}
Self::fsync_parent(path).map_err(|e| format!("delete_directory fsync: {}", e))?;
Ok(())
}
pub fn directory_exists(path: &Path) -> bool {
path.exists() && path.is_dir()
}
pub fn directory_stat(path: &Path) -> Result<FileStats, String> {
Self::stat_path(path)
}
// ── Batch operations ────────────────────────────────────────────────
/// Execute multiple operations, collecting parent dirs for a single fsync pass at the end.
pub fn batch(params: &BatchParams) -> Vec<BatchResult> {
let mut results = Vec::with_capacity(params.operations.len());
let mut dirs_to_sync: Vec<PathBuf> = Vec::new();
for (index, op) in params.operations.iter().enumerate() {
let result = Self::execute_batch_op(op, &mut dirs_to_sync);
results.push(BatchResult {
index,
success: result.is_ok(),
error: result.err(),
});
}
// Batch fsync all affected parent directories
dirs_to_sync.sort();
dirs_to_sync.dedup();
for dir in &dirs_to_sync {
Self::fsync_dir(dir).ok();
}
results
}
fn execute_batch_op(op: &BatchOp, dirs_to_sync: &mut Vec<PathBuf>) -> Result<(), String> {
let path = Path::new(&op.path);
match op.op_type.as_str() {
"write" => {
let content = op.content.as_deref().unwrap_or("");
fs::write(path, content.as_bytes()).map_err(|e| e.to_string())?;
if let Some(parent) = path.parent() {
dirs_to_sync.push(parent.to_path_buf());
}
}
"append" => {
use std::io::Write;
let content = op.content.as_deref().unwrap_or("");
let mut file = fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.map_err(|e| e.to_string())?;
file.write_all(content.as_bytes()).map_err(|e| e.to_string())?;
if let Some(parent) = path.parent() {
dirs_to_sync.push(parent.to_path_buf());
}
}
"delete" => {
fs::remove_file(path).map_err(|e| e.to_string())?;
if let Some(parent) = path.parent() {
dirs_to_sync.push(parent.to_path_buf());
}
}
"copy" => {
let to = Path::new(op.target_path.as_deref().ok_or("copy: missing targetPath")?);
fs::copy(path, to).map_err(|e| e.to_string())?;
if let Some(parent) = to.parent() {
dirs_to_sync.push(parent.to_path_buf());
}
}
"move" => {
let to = Path::new(op.target_path.as_deref().ok_or("move: missing targetPath")?);
fs::rename(path, to).map_err(|e| e.to_string())?;
if let Some(parent) = path.parent() {
dirs_to_sync.push(parent.to_path_buf());
}
if let Some(parent) = to.parent() {
dirs_to_sync.push(parent.to_path_buf());
}
}
"mkdir" => {
if op.recursive.unwrap_or(true) {
fs::create_dir_all(path).map_err(|e| e.to_string())?;
} else {
fs::create_dir(path).map_err(|e| e.to_string())?;
}
if let Some(parent) = path.parent() {
dirs_to_sync.push(parent.to_path_buf());
}
}
"rmdir" => {
if op.recursive.unwrap_or(true) {
fs::remove_dir_all(path).map_err(|e| e.to_string())?;
} else {
fs::remove_dir(path).map_err(|e| e.to_string())?;
}
if let Some(parent) = path.parent() {
dirs_to_sync.push(parent.to_path_buf());
}
}
other => {
return Err(format!("unknown batch op type: {}", other));
}
}
Ok(())
}
// ── Transaction operations ──────────────────────────────────────────
pub fn execute_transaction(params: &TransactionParams) -> Result<(), String> {
// Phase 1: Prepare backups
let mut backups: Vec<(usize, Option<Vec<u8>>)> = Vec::new();
for (i, op) in params.operations.iter().enumerate() {
let path = Path::new(&op.path);
let backup = if path.exists() && path.is_file() {
Some(fs::read(path).map_err(|e| format!("transaction backup {}: {}", i, e))?)
} else {
None
};
backups.push((i, backup));
}
// Phase 2: Execute operations
let mut completed = 0;
let mut dirs_to_sync: Vec<PathBuf> = Vec::new();
for (i, op) in params.operations.iter().enumerate() {
let path = Path::new(&op.path);
let result = match op.op_type.as_str() {
"write" => {
let content = op.content.as_deref().unwrap_or("");
fs::write(path, content.as_bytes()).map_err(|e| e.to_string())
}
"append" => {
use std::io::Write;
let content = op.content.as_deref().unwrap_or("");
fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.and_then(|mut f| f.write_all(content.as_bytes()))
.map_err(|e| e.to_string())
}
"delete" => fs::remove_file(path).map_err(|e| e.to_string()),
"copy" => {
let to = op.target_path.as_deref().ok_or("copy: missing targetPath")?;
fs::copy(path, to).map(|_| ()).map_err(|e| e.to_string())
}
"move" => {
let to = op.target_path.as_deref().ok_or("move: missing targetPath")?;
fs::rename(path, to).map_err(|e| e.to_string())
}
other => Err(format!("unknown transaction op: {}", other)),
};
match result {
Ok(()) => {
completed = i + 1;
if let Some(parent) = path.parent() {
dirs_to_sync.push(parent.to_path_buf());
}
if let Some(tp) = &op.target_path {
if let Some(parent) = Path::new(tp).parent() {
dirs_to_sync.push(parent.to_path_buf());
}
}
}
Err(e) => {
// Rollback completed operations in reverse order
for j in (0..completed).rev() {
let (_, ref backup) = backups[j];
let rollback_path = Path::new(&params.operations[j].path);
if let Some(data) = backup {
let _ = fs::write(rollback_path, data);
} else {
let _ = fs::remove_file(rollback_path);
}
}
return Err(format!("transaction failed at op {}: {}", i, e));
}
}
}
// Phase 3: Batch fsync all affected directories
dirs_to_sync.sort();
dirs_to_sync.dedup();
for dir in &dirs_to_sync {
Self::fsync_dir(dir).ok();
}
Ok(())
}
// ── Path operations ─────────────────────────────────────────────────
pub fn normalize_path(path: &str) -> String {
let p = Path::new(path);
// Use canonicalize if the path exists, otherwise just clean it
match p.canonicalize() {
Ok(canonical) => canonical.to_string_lossy().into_owned(),
Err(_) => {
// Manual normalization for non-existent paths
let mut components = Vec::new();
for component in p.components() {
match component {
std::path::Component::ParentDir => { components.pop(); }
std::path::Component::CurDir => {}
_ => components.push(component),
}
}
let result: PathBuf = components.into_iter().collect();
result.to_string_lossy().into_owned()
}
}
}
pub fn join_path(segments: &[String]) -> String {
let mut result = PathBuf::new();
for seg in segments {
result.push(seg);
}
result.to_string_lossy().into_owned()
}
// ── Streaming operations ─────────────────────────────────────────────
/// Read a file in chunks, writing IpcStreamChunk messages to stdout.
/// Returns the total number of bytes read.
pub fn read_file_stream(
request_id: &str,
params: &ReadFileStreamParams,
) -> Result<u64, String> {
use std::io::{Read, Write};
let path = Path::new(&params.path);
let chunk_size = params.chunk_size.unwrap_or(65536); // 64KB default
let mut file = fs::File::open(path)
.map_err(|e| format!("read_file_stream: {}", e))?;
let mut total_bytes: u64 = 0;
let mut buf = vec![0u8; chunk_size];
loop {
let n = file.read(&mut buf).map_err(|e| format!("read_file_stream read: {}", e))?;
if n == 0 {
break;
}
total_bytes += n as u64;
let encoded = STANDARD.encode(&buf[..n]);
let chunk = IpcStreamChunk {
id: request_id.to_string(),
stream: true,
data: serde_json::json!(encoded),
};
if let Ok(json) = serde_json::to_string(&chunk) {
let stdout = io::stdout();
let mut out = stdout.lock();
let _ = writeln!(out, "{}", json);
let _ = out.flush();
}
}
Ok(total_bytes)
}
// ── Helpers ─────────────────────────────────────────────────────────
pub fn stat_path(path: &Path) -> Result<FileStats, String> {
let metadata = fs::symlink_metadata(path).map_err(|e| format!("stat: {}", e))?;
let file_type = metadata.file_type();
Ok(FileStats {
size: metadata.len(),
birthtime: system_time_to_iso(metadata.created().ok()),
mtime: system_time_to_iso(metadata.modified().ok()),
atime: system_time_to_iso(metadata.accessed().ok()),
is_file: file_type.is_file(),
is_directory: file_type.is_dir(),
is_symbolic_link: file_type.is_symlink(),
mode: metadata.permissions().mode(),
})
}
fn to_directory_entry(
entry: &fs::DirEntry,
params: &ListDirectoryParams,
) -> Result<DirectoryEntry, String> {
let file_type = entry.file_type().map_err(|e| format!("dir entry type: {}", e))?;
let path = entry.path();
let stats = if params.include_stats.unwrap_or(false) {
Self::stat_path(&path).ok()
} else {
None
};
Ok(DirectoryEntry {
name: entry.file_name().to_string_lossy().into_owned(),
path: path.to_string_lossy().into_owned(),
is_file: file_type.is_file(),
is_directory: file_type.is_dir(),
is_symbolic_link: file_type.is_symlink(),
stats,
})
}
fn matches_filter(name: &str, filter: &str) -> bool {
if let Some(regex_pattern) = filter.strip_prefix("regex:") {
// Raw regex pattern from TypeScript RegExp
if let Ok(regex) = regex_lite::Regex::new(regex_pattern) {
return regex.is_match(name);
}
return name.contains(regex_pattern);
}
// Simple glob matching: * matches any sequence
let pattern = filter.replace('.', "\\.").replace('*', ".*");
if let Ok(regex) = regex_lite::Regex::new(&format!("^{}$", pattern)) {
regex.is_match(name)
} else {
name.contains(filter)
}
}
}
fn system_time_to_iso(time: Option<SystemTime>) -> String {
match time {
Some(t) => {
let duration = t
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default();
let secs = duration.as_secs();
let millis = duration.subsec_millis();
// Simple ISO-ish format: unix timestamp as ISO string
// Full ISO formatting without chrono
format!("{}.{:03}Z", secs, millis)
}
None => "0.000Z".to_string(),
}
}

View File

@@ -0,0 +1,109 @@
use crate::FsOps;
use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use smartfs_protocol::{IpcEvent, WatchEvent};
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
/// Manages file watchers, emitting events as IPC events to stdout.
pub struct WatchManager {
watchers: Arc<Mutex<HashMap<String, RecommendedWatcher>>>,
}
impl WatchManager {
pub fn new() -> Self {
Self {
watchers: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn add_watch(
&self,
id: String,
path: &str,
recursive: bool,
) -> Result<(), String> {
let watch_id = id.clone();
let mode = if recursive {
RecursiveMode::Recursive
} else {
RecursiveMode::NonRecursive
};
let (tx, rx) = std::sync::mpsc::channel::<notify::Result<notify::Event>>();
let mut watcher = RecommendedWatcher::new(tx, Config::default())
.map_err(|e| format!("watch create: {}", e))?;
watcher
.watch(Path::new(path), mode)
.map_err(|e| format!("watch path: {}", e))?;
// Spawn a thread to read events and write IPC events to stdout
let watch_id_clone = watch_id.clone();
std::thread::spawn(move || {
for event in rx {
match event {
Ok(ev) => {
let event_type = match ev.kind {
EventKind::Create(_) => "add",
EventKind::Modify(_) => "change",
EventKind::Remove(_) => "delete",
_ => continue,
};
for ev_path in &ev.paths {
let stats = if event_type != "delete" {
FsOps::stat_path(ev_path).ok()
} else {
None
};
let watch_event = WatchEvent {
event_type: event_type.to_string(),
path: ev_path.to_string_lossy().into_owned(),
timestamp: {
let d = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default();
format!("{}.{:03}Z", d.as_secs(), d.subsec_millis())
},
stats,
};
let ipc_event = IpcEvent {
event: format!("watch:{}", watch_id_clone),
data: serde_json::to_value(&watch_event).unwrap_or_default(),
};
if let Ok(json) = serde_json::to_string(&ipc_event) {
// Write to stdout (IPC channel)
println!("{}", json);
}
}
}
Err(e) => {
eprintln!("watch error: {}", e);
}
}
}
});
// Store the watcher to keep it alive
self.watchers
.lock()
.map_err(|e| format!("lock: {}", e))?
.insert(id, watcher);
Ok(())
}
pub fn remove_all(&self) -> Result<(), String> {
self.watchers
.lock()
.map_err(|e| format!("lock: {}", e))?
.clear();
Ok(())
}
}