import * as fs from 'fs'; import * as path from 'path'; import * as smartrx from '@push.rocks/smartrx'; import type { IWatcher, IWatcherOptions, IWatchEvent, TWatchEventType } from './interfaces.js'; /** * Node.js/Bun file watcher using native fs.watch API */ export class NodeWatcher implements IWatcher { private watchers: Map = new Map(); private watchedFiles: Set = new Set(); private _isWatching = false; // Debounce: pending emits per file path private pendingEmits: Map = new Map(); // Restart tracking private restartDelays: Map = new Map(); private restartAttempts: Map = new Map(); private healthCheckInterval: NodeJS.Timeout | null = null; // Inode tracking - detect when directories are replaced (atomic saves, etc.) // fs.watch watches the inode, not the path. If inode changes, we need to restart. private watchedInodes: Map = new Map(); // Configuration constants private static readonly MAX_RETRIES = 3; private static readonly INITIAL_RESTART_DELAY = 1000; private static readonly MAX_RESTART_DELAY = 30000; private static readonly HEALTH_CHECK_INTERVAL = 30000; public readonly events$ = new smartrx.rxjs.Subject(); constructor(private options: IWatcherOptions) {} /** * Safely emit an event, catching any subscriber errors */ private safeEmit(event: IWatchEvent): void { try { this.events$.next(event); } catch (error) { console.error('[smartwatch] Subscriber threw error (isolated):', error); // Don't let subscriber errors kill the watcher } } /** * Restart a watcher after an error with exponential backoff */ private async restartWatcher(basePath: string, error: Error): Promise { const attempts = (this.restartAttempts.get(basePath) || 0) + 1; this.restartAttempts.set(basePath, attempts); console.log(`[smartwatch] Watcher error for ${basePath}: ${error.message}`); console.log(`[smartwatch] Restart attempt ${attempts}/${NodeWatcher.MAX_RETRIES}`); if (attempts > NodeWatcher.MAX_RETRIES) { console.error(`[smartwatch] Max retries exceeded for ${basePath}, giving up`); this.safeEmit({ type: 'error', path: basePath, error: new Error(`Max restart retries (${NodeWatcher.MAX_RETRIES}) exceeded`) }); return; } // Close failed watcher const oldWatcher = this.watchers.get(basePath); if (oldWatcher) { try { oldWatcher.close(); } catch { // Ignore close errors } this.watchers.delete(basePath); } // Exponential backoff const delay = this.restartDelays.get(basePath) || NodeWatcher.INITIAL_RESTART_DELAY; console.log(`[smartwatch] Waiting ${delay}ms before restart...`); await new Promise((resolve) => setTimeout(resolve, delay)); this.restartDelays.set(basePath, Math.min(delay * 2, NodeWatcher.MAX_RESTART_DELAY)); try { await this.watchPath(basePath, 0); console.log(`[smartwatch] Successfully restarted watcher for ${basePath}`); this.restartDelays.set(basePath, NodeWatcher.INITIAL_RESTART_DELAY); this.restartAttempts.set(basePath, 0); } catch (restartError) { console.error(`[smartwatch] Restart failed for ${basePath}:`, restartError); this.restartWatcher(basePath, restartError as Error); // Recursive retry } } /** * Start periodic health checks to detect silent failures * Checks for: * 1. Path no longer exists * 2. Inode changed (directory was replaced - fs.watch watches inode, not path!) */ private startHealthCheck(): void { console.log('[smartwatch] Starting health check (every 30s)'); this.healthCheckInterval = setInterval(async () => { console.log(`[smartwatch] Health check: ${this.watchers.size} watchers active`); for (const [basePath] of this.watchers) { try { const stats = await fs.promises.stat(basePath); const currentInode = stats.ino; const previousInode = this.watchedInodes.get(basePath); if (!stats) { console.error(`[smartwatch] Health check failed: ${basePath} no longer exists`); this.safeEmit({ type: 'error', path: basePath, error: new Error('Watched path no longer exists') }); this.restartWatcher(basePath, new Error('Watched path disappeared')); } else if (previousInode !== undefined && BigInt(currentInode) !== previousInode) { // CRITICAL: Inode changed! fs.watch is now watching a stale inode. // This happens when the directory is replaced (atomic operations, git checkout, etc.) console.warn(`[smartwatch] Inode changed for ${basePath}: ${previousInode} -> ${currentInode}`); console.warn('[smartwatch] fs.watch watches inode, not path - restarting watcher'); this.restartWatcher(basePath, new Error('Inode changed - directory was replaced')); } } catch (error: any) { if (error.code === 'ENOENT') { console.error(`[smartwatch] Health check failed: ${basePath} no longer exists`); this.restartWatcher(basePath, new Error('Watched path disappeared')); } else if (error.code === 'ENOSPC') { // inotify watch limit exceeded - critical system issue console.error(`[smartwatch] ENOSPC: inotify watch limit exceeded!`); console.error('[smartwatch] Fix: echo fs.inotify.max_user_watches=524288 | sudo tee -a /etc/sysctl.conf && sudo sysctl -p'); this.safeEmit({ type: 'error', path: basePath, error }); } else { console.error(`[smartwatch] Health check error for ${basePath}:`, error); } } } }, NodeWatcher.HEALTH_CHECK_INTERVAL); } /** * Stop health check interval */ private stopHealthCheck(): void { if (this.healthCheckInterval) { clearInterval(this.healthCheckInterval); this.healthCheckInterval = null; console.log('[smartwatch] Stopped health check'); } } /** * Check if a file is a temporary file created by editors */ private isTemporaryFile(filePath: string): boolean { const basename = path.basename(filePath); // Editor temp files: *.tmp.*, *.swp, *.swx, *~, .#* if (basename.includes('.tmp.')) return true; if (basename.endsWith('.swp') || basename.endsWith('.swx')) return true; if (basename.endsWith('~')) return true; if (basename.startsWith('.#')) return true; return false; } get isWatching(): boolean { return this._isWatching; } async start(): Promise { if (this._isWatching) { return; } console.log(`[smartwatch] Starting watcher for ${this.options.basePaths.length} base path(s)...`); try { // Start watching each base path for (const basePath of this.options.basePaths) { await this.watchPath(basePath, 0); } this._isWatching = true; // Start health check monitoring this.startHealthCheck(); // Perform initial scan to emit 'add' events for existing files for (const basePath of this.options.basePaths) { await this.scanDirectory(basePath, 0); } // Emit ready event this.safeEmit({ type: 'ready', path: '' }); console.log(`[smartwatch] Watcher started with ${this.watchers.size} active watcher(s)`); } catch (error: any) { console.error('[smartwatch] Failed to start watcher:', error); this.safeEmit({ type: 'error', path: '', error }); throw error; } } async stop(): Promise { console.log('[smartwatch] Stopping watcher...'); this._isWatching = false; // Stop health check monitoring this.stopHealthCheck(); // Cancel all pending debounced emits for (const timeout of this.pendingEmits.values()) { clearTimeout(timeout); } this.pendingEmits.clear(); // Close all watchers for (const [watchPath, watcher] of this.watchers) { console.log(`[smartwatch] Closing watcher for: ${watchPath}`); watcher.close(); } this.watchers.clear(); this.watchedFiles.clear(); // Clear restart tracking state this.restartDelays.clear(); this.restartAttempts.clear(); this.watchedInodes.clear(); console.log('[smartwatch] Watcher stopped'); } /** * Start watching a path (file or directory) */ private async watchPath(watchPath: string, depth: number): Promise { if (depth > this.options.depth) { return; } try { const stats = await this.statSafe(watchPath); if (!stats) { return; } if (stats.isDirectory()) { // Store inode for health check - fs.watch watches inode, not path! // If inode changes (directory replaced), watcher becomes stale this.watchedInodes.set(watchPath, BigInt(stats.ino)); // Watch the directory with recursive option (Node.js 20+ supports this on all platforms) const watcher = fs.watch( watchPath, { recursive: true, persistent: true }, (eventType, filename) => { if (filename) { this.handleFsEvent(watchPath, filename, eventType); } } ); watcher.on('error', (error: NodeJS.ErrnoException) => { console.error(`[smartwatch] FSWatcher error event on ${watchPath}:`, error); // Detect inotify watch limit exceeded - common cause of "stops working" if (error.code === 'ENOSPC') { console.error('[smartwatch] CRITICAL: inotify watch limit exceeded!'); console.error('[smartwatch] Fix with: echo fs.inotify.max_user_watches=524288 | sudo tee -a /etc/sysctl.conf && sudo sysctl -p'); } this.safeEmit({ type: 'error', path: watchPath, error }); if (this._isWatching) { this.restartWatcher(watchPath, error); } }); // Handle 'close' event - fs.watch can close without error watcher.on('close', () => { console.warn(`[smartwatch] FSWatcher closed unexpectedly for ${watchPath}`); if (this._isWatching) { this.restartWatcher(watchPath, new Error('Watcher closed unexpectedly')); } }); this.watchers.set(watchPath, watcher); console.log(`[smartwatch] Started watching: ${watchPath}`); } } catch (error: any) { console.error(`[smartwatch] Failed to watch path ${watchPath}:`, error); this.safeEmit({ type: 'error', path: watchPath, error }); } } /** * Handle raw fs.watch events - debounce and normalize them */ private handleFsEvent( basePath: string, filename: string, eventType: 'rename' | 'change' | string ): void { const fullPath = path.join(basePath, filename); // Skip temporary files - but ONLY pure temp files, not the target of atomic writes // Atomic writes: editor writes to file.tmp.xxx then renames to file // We need to detect the final file, so only skip files that ARE temp files // and haven't been renamed to the real file yet if (this.isTemporaryFile(fullPath)) { // For temp files, we still want to track if they get renamed TO a real file // The 'rename' event fires for both source and target, so we'll catch the real file console.log(`[smartwatch] Skipping temp file event: ${filename}`); return; } // Debounce: cancel any pending emit for this file const existing = this.pendingEmits.get(fullPath); if (existing) { clearTimeout(existing); } // Schedule debounced emit const timeout = setTimeout(() => { this.pendingEmits.delete(fullPath); this.emitFileEvent(fullPath, eventType); }, this.options.debounceMs); this.pendingEmits.set(fullPath, timeout); } /** * Emit the actual file event after debounce */ private async emitFileEvent( fullPath: string, eventType: 'rename' | 'change' | string ): Promise { try { const stats = await this.statSafe(fullPath); if (eventType === 'rename') { // 'rename' can mean add or unlink - check if file exists if (stats) { // File exists - it's either a new file or was renamed to this location if (stats.isDirectory()) { if (!this.watchedFiles.has(fullPath)) { this.watchedFiles.add(fullPath); this.safeEmit({ type: 'addDir', path: fullPath, stats }); } } else { const wasWatched = this.watchedFiles.has(fullPath); this.watchedFiles.add(fullPath); this.safeEmit({ type: wasWatched ? 'change' : 'add', path: fullPath, stats }); } } else { // File doesn't exist - it was deleted if (this.watchedFiles.has(fullPath)) { const wasDir = this.isKnownDirectory(fullPath); this.watchedFiles.delete(fullPath); this.safeEmit({ type: wasDir ? 'unlinkDir' : 'unlink', path: fullPath }); } } } else if (eventType === 'change') { // File was modified if (stats && !stats.isDirectory()) { const wasWatched = this.watchedFiles.has(fullPath); if (!wasWatched) { // This is actually an 'add' - file wasn't being watched before this.watchedFiles.add(fullPath); this.safeEmit({ type: 'add', path: fullPath, stats }); } else { this.safeEmit({ type: 'change', path: fullPath, stats }); } } else if (!stats && this.watchedFiles.has(fullPath)) { // File was deleted this.watchedFiles.delete(fullPath); this.safeEmit({ type: 'unlink', path: fullPath }); } } } catch (error: any) { this.safeEmit({ type: 'error', path: fullPath, error }); } } /** * Scan directory and emit 'add' events for existing files */ private async scanDirectory(dirPath: string, depth: number): Promise { if (depth > this.options.depth) { return; } try { const entries = await fs.promises.readdir(dirPath, { withFileTypes: true }); for (const entry of entries) { const fullPath = path.join(dirPath, entry.name); // Skip temp files during initial scan too if (this.isTemporaryFile(fullPath)) { continue; } const stats = await this.statSafe(fullPath); if (!stats) { continue; } if (entry.isDirectory()) { this.watchedFiles.add(fullPath); this.safeEmit({ type: 'addDir', path: fullPath, stats }); await this.scanDirectory(fullPath, depth + 1); } else if (entry.isFile()) { this.watchedFiles.add(fullPath); this.safeEmit({ type: 'add', path: fullPath, stats }); } } } catch (error: any) { if (error.code !== 'ENOENT' && error.code !== 'EACCES') { this.safeEmit({ type: 'error', path: dirPath, error }); } } } /** * Safely stat a path, returning null if it doesn't exist */ private async statSafe(filePath: string): Promise { try { return await (this.options.followSymlinks ? fs.promises.stat(filePath) : fs.promises.lstat(filePath)); } catch (error: any) { // Only silently return null for expected "file doesn't exist" errors if (error.code === 'ENOENT' || error.code === 'ENOTDIR') { return null; } // Log other errors (permission, I/O) but still return null console.warn(`[smartwatch] statSafe warning for ${filePath}: ${error.code} - ${error.message}`); return null; } } /** * Check if a path was known to be a directory (for proper unlink event type) */ private isKnownDirectory(filePath: string): boolean { // Check if any watched files are children of this path for (const watched of this.watchedFiles) { if (watched.startsWith(filePath + path.sep)) { return true; } } return false; } }