feat(watchers): add Rust-powered watcher backend with runtime fallback and cross-platform test coverage

This commit is contained in:
2026-03-23 14:15:31 +00:00
parent ca9a66e03e
commit 7def7020c6
26 changed files with 10383 additions and 2870 deletions

289
rust/src/main.rs Normal file
View File

@@ -0,0 +1,289 @@
use notify_debouncer_full::{new_debouncer, DebounceEventResult, DebouncedEvent, RecommendedCache};
use notify::{RecommendedWatcher, RecursiveMode, EventKind};
use serde::{Deserialize, Serialize};
use std::io::{self, BufRead, Write};
use std::path::Path;
use std::sync::mpsc;
use std::time::Duration;
// --- IPC message types ---
#[derive(Deserialize)]
struct Request {
id: String,
method: String,
#[serde(default)]
params: serde_json::Value,
}
#[derive(Serialize)]
struct Response {
id: String,
success: bool,
#[serde(skip_serializing_if = "Option::is_none")]
result: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
}
#[derive(Serialize)]
struct Event {
event: String,
data: serde_json::Value,
}
// --- Watch command params ---
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct WatchParams {
paths: Vec<String>,
#[serde(default = "default_depth")]
depth: u32,
#[serde(default)]
follow_symlinks: bool,
#[serde(default = "default_debounce")]
debounce_ms: u64,
}
fn default_depth() -> u32 { 4 }
fn default_debounce() -> u64 { 100 }
// --- Output helpers (thread-safe via stdout lock) ---
fn send_line(line: &str) {
let stdout = io::stdout();
let mut handle = stdout.lock();
let _ = handle.write_all(line.as_bytes());
let _ = handle.write_all(b"\n");
let _ = handle.flush();
}
fn send_response(resp: &Response) {
if let Ok(json) = serde_json::to_string(resp) {
send_line(&json);
}
}
fn send_event(name: &str, data: serde_json::Value) {
let evt = Event { event: name.to_string(), data };
if let Ok(json) = serde_json::to_string(&evt) {
send_line(&json);
}
}
fn ok_response(id: String, result: serde_json::Value) -> Response {
Response { id, success: true, result: Some(result), error: None }
}
fn err_response(id: String, msg: String) -> Response {
Response { id, success: false, result: None, error: Some(msg) }
}
// --- Map notify EventKind to our event type strings ---
fn event_kind_to_type(kind: &EventKind) -> Option<&'static str> {
match kind {
EventKind::Create(_) => Some("create"),
EventKind::Modify(_) => Some("change"),
EventKind::Remove(_) => Some("remove"),
_ => None,
}
}
/// Determine if a path is a directory
fn classify_path(path: &Path) -> &'static str {
if path.is_dir() { "dir" } else { "file" }
}
/// Walk a directory and emit add/addDir events for initial scan
fn scan_directory(dir: &Path, depth: u32, max_depth: u32) {
if depth > max_depth {
return;
}
let entries = match std::fs::read_dir(dir) {
Ok(e) => e,
Err(_) => return,
};
for entry in entries.flatten() {
let path = entry.path();
let path_str = path.to_string_lossy().to_string();
if path.is_dir() {
send_event("fsEvent", serde_json::json!({
"type": "addDir",
"path": path_str,
}));
scan_directory(&path, depth + 1, max_depth);
} else if path.is_file() {
send_event("fsEvent", serde_json::json!({
"type": "add",
"path": path_str,
}));
}
}
}
// --- Messages between threads ---
enum MainMessage {
StdinLine(String),
StdinClosed,
FsEvents(Vec<DebouncedEvent>),
FsError(Vec<notify::Error>),
}
// --- Main ---
fn main() {
let args: Vec<String> = std::env::args().collect();
if !args.contains(&"--management".to_string()) {
eprintln!("smartwatch-rust: use --management flag for IPC mode");
std::process::exit(1);
}
// Signal ready
send_event("ready", serde_json::json!({}));
// Single channel for all messages to the main thread
let (main_tx, main_rx) = mpsc::channel::<MainMessage>();
// Spawn stdin reader thread
let stdin_tx = main_tx.clone();
std::thread::spawn(move || {
let stdin = io::stdin();
for line in stdin.lock().lines() {
match line {
Ok(l) => {
let trimmed = l.trim().to_string();
if !trimmed.is_empty() {
if stdin_tx.send(MainMessage::StdinLine(trimmed)).is_err() {
break;
}
}
}
Err(_) => break,
}
}
let _ = stdin_tx.send(MainMessage::StdinClosed);
});
// State: active debouncer
let mut active_debouncer: Option<notify_debouncer_full::Debouncer<
RecommendedWatcher,
RecommendedCache,
>> = None;
// Main event loop — receives both stdin lines and FS events
for msg in &main_rx {
match msg {
MainMessage::StdinClosed => break,
MainMessage::FsEvents(events) => {
for event in events {
let Some(event_type) = event_kind_to_type(&event.kind) else {
continue;
};
for path in &event.paths {
let path_str = path.to_string_lossy().to_string();
let path_kind = classify_path(path);
let fs_type = match (event_type, path_kind) {
("create", "dir") => "addDir",
("create", _) => "add",
("change", _) => "change",
("remove", "dir") => "unlinkDir",
("remove", _) => "unlink",
_ => continue,
};
send_event("fsEvent", serde_json::json!({
"type": fs_type,
"path": path_str,
}));
}
}
}
MainMessage::FsError(errors) => {
for err in errors {
send_event("error", serde_json::json!({
"message": format!("{}", err),
}));
}
}
MainMessage::StdinLine(line) => {
let request: Request = match serde_json::from_str(&line) {
Ok(r) => r,
Err(e) => {
send_response(&err_response(
"unknown".to_string(),
format!("Failed to parse request: {}", e),
));
continue;
}
};
match request.method.as_str() {
"watch" => {
let params: WatchParams = match serde_json::from_value(request.params) {
Ok(p) => p,
Err(e) => {
send_response(&err_response(request.id, format!("Invalid params: {}", e)));
continue;
}
};
// Stop any existing watcher
active_debouncer.take();
let debounce_duration = Duration::from_millis(params.debounce_ms);
let fs_tx = main_tx.clone();
let debouncer = new_debouncer(
debounce_duration,
None,
move |result: DebounceEventResult| {
match result {
Ok(events) => { let _ = fs_tx.send(MainMessage::FsEvents(events)); }
Err(errors) => { let _ = fs_tx.send(MainMessage::FsError(errors)); }
}
},
);
match debouncer {
Ok(mut debouncer) => {
for path_str in &params.paths {
let path = Path::new(path_str);
if let Err(e) = debouncer.watch(path, RecursiveMode::Recursive) {
eprintln!("Watch error for {}: {}", path_str, e);
}
}
// Initial scan
for path_str in &params.paths {
scan_directory(Path::new(path_str), 0, params.depth);
}
send_event("watchReady", serde_json::json!({}));
active_debouncer = Some(debouncer);
send_response(&ok_response(request.id, serde_json::json!({ "watching": true })));
}
Err(e) => {
send_response(&err_response(request.id, format!("Failed to create watcher: {}", e)));
}
}
}
"stop" => {
active_debouncer.take();
send_response(&ok_response(request.id, serde_json::json!({ "stopped": true })));
}
other => {
send_response(&err_response(request.id, format!("Unknown method: {}", other)));
}
}
}
}
}
// Clean up
active_debouncer.take();
}