import * as fs from 'fs'; import * as path from 'path'; import * as smartrx from '@push.rocks/smartrx'; import type { IWatcher, IWatcherOptions, IWatchEvent, TWatchEventType } from './interfaces.js'; /** * Node.js/Bun file watcher using native fs.watch API */ export class NodeWatcher implements IWatcher { private watchers: Map = new Map(); private watchedFiles: Set = new Set(); private _isWatching = false; // Debounce: pending emits per file path // Fix 2: Track event sequence instead of just last event type // This prevents losing intermediate events (add→change→delete should not lose add) private pendingEmits: Map; }> = new Map(); // Restart tracking private restartDelays: Map = new Map(); private restartAttempts: Map = new Map(); private healthCheckInterval: NodeJS.Timeout | null = null; // Inode tracking - detect when directories are replaced (atomic saves, etc.) // fs.watch watches the inode, not the path. If inode changes, we need to restart. private watchedInodes: Map = new Map(); // File inode tracking - detect when individual files are deleted and recreated // This is critical: editors delete+recreate files, fs.watch watches OLD inode! // See: https://github.com/paulmillr/chokidar/issues/972 private fileInodes: Map = new Map(); // Abort controllers for pending restart delays - prevents orphan watchers on stop() private restartAbortControllers: Map = new Map(); // Prevent concurrent restarts for the same path (health check + error can race) private restartingPaths: Set = new Set(); // Initial scan state - events are deferred until scan completes to avoid race conditions // Without this, events can arrive before watchedFiles is populated, causing inconsistent state private initialScanComplete: boolean = false; private deferredEvents: Array<{basePath: string; filename: string; eventType: string}> = []; // Configuration constants private static readonly MAX_RETRIES = 3; private static readonly INITIAL_RESTART_DELAY = 1000; private static readonly MAX_RESTART_DELAY = 30000; private static readonly HEALTH_CHECK_INTERVAL = 30000; public readonly events$ = new smartrx.rxjs.Subject(); constructor(private options: IWatcherOptions) {} /** * Safely emit an event, catching any subscriber errors */ private safeEmit(event: IWatchEvent): void { try { this.events$.next(event); } catch (error) { console.error('[smartwatch] Subscriber threw error (isolated):', error); // Don't let subscriber errors kill the watcher } } /** * Restart a watcher after an error with exponential backoff * Includes guards against: * - Dual restart race condition (health check + error handler calling simultaneously) * - Orphan watchers when stop() is called during restart delay */ private async restartWatcher(basePath: string, error: Error): Promise { // Guard: Prevent concurrent restarts for the same path if (this.restartingPaths.has(basePath)) { console.log(`[smartwatch] Restart already in progress for ${basePath}, skipping`); return; } this.restartingPaths.add(basePath); try { const attempts = (this.restartAttempts.get(basePath) || 0) + 1; this.restartAttempts.set(basePath, attempts); console.log(`[smartwatch] Watcher error for ${basePath}: ${error.message}`); console.log(`[smartwatch] Restart attempt ${attempts}/${NodeWatcher.MAX_RETRIES}`); if (attempts > NodeWatcher.MAX_RETRIES) { console.error(`[smartwatch] Max retries exceeded for ${basePath}, giving up`); this.safeEmit({ type: 'error', path: basePath, error: new Error(`Max restart retries (${NodeWatcher.MAX_RETRIES}) exceeded`) }); return; } // Close failed watcher const oldWatcher = this.watchers.get(basePath); if (oldWatcher) { try { oldWatcher.close(); } catch { // Ignore close errors } this.watchers.delete(basePath); } // Exponential backoff with AbortController (so stop() can cancel) const delay = this.restartDelays.get(basePath) || NodeWatcher.INITIAL_RESTART_DELAY; console.log(`[smartwatch] Waiting ${delay}ms before restart...`); const abortController = new AbortController(); this.restartAbortControllers.set(basePath, abortController); try { await new Promise((resolve, reject) => { const timeout = setTimeout(resolve, delay); abortController.signal.addEventListener('abort', () => { clearTimeout(timeout); reject(new Error('Restart aborted by stop()')); }); }); } catch (abortError) { console.log(`[smartwatch] Restart aborted for ${basePath}`); return; // stop() was called, don't continue } finally { this.restartAbortControllers.delete(basePath); } // Double-check we're still watching after the delay if (!this._isWatching) { console.log(`[smartwatch] Watcher stopped during restart delay, aborting`); return; } this.restartDelays.set(basePath, Math.min(delay * 2, NodeWatcher.MAX_RESTART_DELAY)); try { await this.watchPath(basePath, 0); console.log(`[smartwatch] Successfully restarted watcher for ${basePath}`); this.restartDelays.set(basePath, NodeWatcher.INITIAL_RESTART_DELAY); this.restartAttempts.set(basePath, 0); } catch (restartError) { console.error(`[smartwatch] Restart failed for ${basePath}:`, restartError); // Clear restartingPaths before recursive call this.restartingPaths.delete(basePath); this.restartWatcher(basePath, restartError as Error); // Recursive retry return; // Don't delete from restartingPaths again in finally } } finally { this.restartingPaths.delete(basePath); } } /** * Start periodic health checks to detect silent failures * Checks for: * 1. Path no longer exists * 2. Inode changed (directory was replaced - fs.watch watches inode, not path!) */ private startHealthCheck(): void { console.log('[smartwatch] Starting health check (every 30s)'); this.healthCheckInterval = setInterval(async () => { console.log(`[smartwatch] Health check: ${this.watchers.size} watchers active`); for (const [basePath] of this.watchers) { try { const stats = await fs.promises.stat(basePath); const currentInode = stats.ino; const previousInode = this.watchedInodes.get(basePath); if (!stats) { console.error(`[smartwatch] Health check failed: ${basePath} no longer exists`); this.safeEmit({ type: 'error', path: basePath, error: new Error('Watched path no longer exists') }); this.restartWatcher(basePath, new Error('Watched path disappeared')); } else if (previousInode !== undefined && BigInt(currentInode) !== previousInode) { // CRITICAL: Inode changed! fs.watch is now watching a stale inode. // This happens when the directory is replaced (atomic operations, git checkout, etc.) console.warn(`[smartwatch] Inode changed for ${basePath}: ${previousInode} -> ${currentInode}`); console.warn('[smartwatch] fs.watch watches inode, not path - restarting watcher'); this.restartWatcher(basePath, new Error('Inode changed - directory was replaced')); } } catch (error: any) { if (error.code === 'ENOENT') { console.error(`[smartwatch] Health check failed: ${basePath} no longer exists`); this.restartWatcher(basePath, new Error('Watched path disappeared')); } else if (error.code === 'ENOSPC') { // inotify watch limit exceeded - critical system issue console.error(`[smartwatch] ENOSPC: inotify watch limit exceeded!`); console.error('[smartwatch] Fix: echo fs.inotify.max_user_watches=524288 | sudo tee -a /etc/sysctl.conf && sudo sysctl -p'); this.safeEmit({ type: 'error', path: basePath, error }); } else { console.error(`[smartwatch] Health check error for ${basePath}:`, error); } } } }, NodeWatcher.HEALTH_CHECK_INTERVAL); } /** * Stop health check interval */ private stopHealthCheck(): void { if (this.healthCheckInterval) { clearInterval(this.healthCheckInterval); this.healthCheckInterval = null; console.log('[smartwatch] Stopped health check'); } } /** * Check if a file is a temporary file created by editors */ private isTemporaryFile(filePath: string): boolean { const basename = path.basename(filePath); // Editor temp files: *.tmp.*, *.swp, *.swx, *~, .#* if (basename.includes('.tmp.')) return true; if (basename.endsWith('.swp') || basename.endsWith('.swx')) return true; if (basename.endsWith('~')) return true; if (basename.startsWith('.#')) return true; return false; } /** * Extract the real file path from a temporary file path * Used to detect atomic writes where only the temp file event is emitted * * Patterns: * - Claude Code: file.ts.tmp.PID.TIMESTAMP -> file.ts * - Vim swap: .file.ts.swp -> file.ts (but we don't handle this case) */ private getTempFileTarget(tempFilePath: string): string | null { const basename = path.basename(tempFilePath); // Claude Code pattern: file.ts.tmp.PID.TIMESTAMP // Match: anything.tmp.digits.digits const claudeMatch = basename.match(/^(.+)\.tmp\.\d+\.\d+$/); if (claudeMatch) { const realBasename = claudeMatch[1]; return path.join(path.dirname(tempFilePath), realBasename); } // Generic .tmp. pattern: file.ts.tmp.something -> file.ts const tmpMatch = basename.match(/^(.+)\.tmp\.[^.]+$/); if (tmpMatch) { const realBasename = tmpMatch[1]; return path.join(path.dirname(tempFilePath), realBasename); } return null; } get isWatching(): boolean { return this._isWatching; } async start(): Promise { if (this._isWatching) { return; } console.log(`[smartwatch] Starting watcher for ${this.options.basePaths.length} base path(s)...`); try { // Reset initial scan state this.initialScanComplete = false; this.deferredEvents = []; // Start watching each base path // NOTE: Events may arrive immediately but will be deferred until scan completes for (const basePath of this.options.basePaths) { await this.watchPath(basePath, 0); } this._isWatching = true; // Start health check monitoring this.startHealthCheck(); // Perform initial scan to emit 'add' events for existing files // This populates watchedFiles and fileInodes BEFORE we process events for (const basePath of this.options.basePaths) { await this.scanDirectory(basePath, 0); } // Mark scan complete and process any events that arrived during scan this.initialScanComplete = true; if (this.deferredEvents.length > 0) { console.log(`[smartwatch] Processing ${this.deferredEvents.length} deferred events from initial scan window`); for (const event of this.deferredEvents) { this.handleFsEvent(event.basePath, event.filename, event.eventType); } this.deferredEvents = []; } // Emit ready event this.safeEmit({ type: 'ready', path: '' }); console.log(`[smartwatch] Watcher started with ${this.watchers.size} active watcher(s)`); } catch (error: any) { console.error('[smartwatch] Failed to start watcher:', error); this.safeEmit({ type: 'error', path: '', error }); throw error; } } async stop(): Promise { console.log('[smartwatch] Stopping watcher...'); // Fix 4: Cancel pending debounced emits FIRST (before flag changes) // This prevents handleFsEvent from creating new pendingEmits during shutdown for (const pending of this.pendingEmits.values()) { clearTimeout(pending.timeout); } this.pendingEmits.clear(); // NOW set the flag - handleFsEvent will return early after this this._isWatching = false; // Stop health check monitoring this.stopHealthCheck(); // Abort all pending restart delays (prevents orphan watchers) for (const [path, controller] of this.restartAbortControllers) { console.log(`[smartwatch] Aborting pending restart for: ${path}`); controller.abort(); } this.restartAbortControllers.clear(); // Close all watchers for (const [watchPath, watcher] of this.watchers) { console.log(`[smartwatch] Closing watcher for: ${watchPath}`); watcher.close(); } this.watchers.clear(); this.watchedFiles.clear(); // Clear all tracking state this.restartDelays.clear(); this.restartAttempts.clear(); this.watchedInodes.clear(); this.fileInodes.clear(); this.restartingPaths.clear(); // Fix 5: Reset initial scan state this.initialScanComplete = false; this.deferredEvents = []; console.log('[smartwatch] Watcher stopped'); } /** * Start watching a path (file or directory) */ private async watchPath(watchPath: string, depth: number): Promise { if (depth > this.options.depth) { return; } try { const stats = await this.statSafe(watchPath); if (!stats) { return; } if (stats.isDirectory()) { // Store inode for health check - fs.watch watches inode, not path! // If inode changes (directory replaced), watcher becomes stale this.watchedInodes.set(watchPath, BigInt(stats.ino)); // Watch the directory with recursive option (Node.js 20+ supports this on all platforms) const watcher = fs.watch( watchPath, { recursive: true, persistent: true }, (eventType, filename) => { if (filename) { this.handleFsEvent(watchPath, filename, eventType); } } ); watcher.on('error', (error: NodeJS.ErrnoException) => { console.error(`[smartwatch] FSWatcher error event on ${watchPath}:`, error); // Detect inotify watch limit exceeded - common cause of "stops working" if (error.code === 'ENOSPC') { console.error('[smartwatch] CRITICAL: inotify watch limit exceeded!'); console.error('[smartwatch] Fix with: echo fs.inotify.max_user_watches=524288 | sudo tee -a /etc/sysctl.conf && sudo sysctl -p'); } this.safeEmit({ type: 'error', path: watchPath, error }); if (this._isWatching) { this.restartWatcher(watchPath, error); } }); // Handle 'close' event - fs.watch can close without error watcher.on('close', () => { // Only log/restart if we didn't intentionally stop if (this._isWatching) { console.warn(`[smartwatch] FSWatcher closed unexpectedly for ${watchPath}`); this.restartWatcher(watchPath, new Error('Watcher closed unexpectedly')); } }); this.watchers.set(watchPath, watcher); console.log(`[smartwatch] Started watching: ${watchPath}`); } } catch (error: any) { console.error(`[smartwatch] Failed to watch path ${watchPath}:`, error); this.safeEmit({ type: 'error', path: watchPath, error }); } } /** * Handle raw fs.watch events - debounce and normalize them */ private handleFsEvent( basePath: string, filename: string, eventType: 'rename' | 'change' | string ): void { // Fix 3: Guard against post-stop events (events queued before watcher closed) if (!this._isWatching) { return; } // Fix 1: Defer events until initial scan completes // This prevents race conditions where events arrive before watchedFiles is populated if (!this.initialScanComplete) { this.deferredEvents.push({ basePath, filename, eventType }); return; } const fullPath = path.join(basePath, filename); // Handle temporary files from atomic writes (Claude Code, editors, etc.) // Pattern: editor writes to file.tmp.xxx then renames to file // Problem: fs.watch on Linux may ONLY emit event for the temp file, not the target! // Solution: When we see a temp file event, also check the corresponding real file if (this.isTemporaryFile(fullPath)) { console.log(`[smartwatch] Detected temp file event: ${filename}`); // Extract the real file path from the temp file path // Pattern: file.ts.tmp.PID.TIMESTAMP -> file.ts const realFilePath = this.getTempFileTarget(fullPath); if (realFilePath) { console.log(`[smartwatch] Checking corresponding real file: ${realFilePath}`); // Queue an event for the REAL file - this is the actual file that changed // Use a short delay to let the rename complete setTimeout(() => { if (this._isWatching) { this.handleFsEvent(basePath, path.relative(basePath, realFilePath), 'change'); } }, 50); } return; } // Fix 2: Track event sequence in debounce instead of collapsing to last event // This ensures we don't lose intermediate events (e.g., add→change→delete) const existing = this.pendingEmits.get(fullPath); if (existing) { // Cancel existing timeout but KEEP the event sequence clearTimeout(existing.timeout); // Add this event to the sequence existing.events.push(eventType as 'rename' | 'change'); // Reschedule the emit with the accumulated events existing.timeout = setTimeout(() => { const pending = this.pendingEmits.get(fullPath); if (pending) { this.pendingEmits.delete(fullPath); this.emitFileEvent(fullPath, pending.events); } }, this.options.debounceMs); } else { // First event for this file - create new sequence const timeout = setTimeout(() => { const pending = this.pendingEmits.get(fullPath); if (pending) { this.pendingEmits.delete(fullPath); this.emitFileEvent(fullPath, pending.events); } }, this.options.debounceMs); this.pendingEmits.set(fullPath, { timeout, events: [eventType as 'rename' | 'change'] }); } } /** * Emit the actual file event after debounce * * Fix 2: Now receives event sequence instead of single event type * This allows intelligent processing of rapid event sequences: * - add→change→delete: File was created and deleted rapidly * - rename→rename: File was deleted and recreated (or vice versa) * * Also handles file inode tracking to detect delete+recreate scenarios: * - fs.watch watches the inode, not the path * - When editors delete+recreate files, the inode changes * - Without inode tracking, events for the new file would be missed * - See: https://github.com/paulmillr/chokidar/issues/972 */ private async emitFileEvent( fullPath: string, eventSequence: Array<'rename' | 'change'> ): Promise { try { const stats = await this.statSafe(fullPath); const wasWatched = this.watchedFiles.has(fullPath); const previousInode = this.fileInodes.get(fullPath); // Analyze event sequence to understand what happened const hasRename = eventSequence.includes('rename'); const hasChange = eventSequence.includes('change'); const renameCount = eventSequence.filter(e => e === 'rename').length; // Log sequence for debugging complex scenarios if (eventSequence.length > 1) { console.log(`[smartwatch] Processing event sequence for ${fullPath}: [${eventSequence.join(', ')}]`); } if (stats) { // File EXISTS now const currentInode = BigInt(stats.ino); const inodeChanged = previousInode !== undefined && previousInode !== currentInode; if (stats.isDirectory()) { if (!wasWatched) { this.watchedFiles.add(fullPath); this.safeEmit({ type: 'addDir', path: fullPath, stats }); } // Directories don't track inodes at file level } else { // Update tracking this.fileInodes.set(fullPath, currentInode); this.watchedFiles.add(fullPath); if (!wasWatched) { // File wasn't tracked before - this is an add // Even if there were multiple events, the end result is a new file this.safeEmit({ type: 'add', path: fullPath, stats }); } else if (inodeChanged) { // File was recreated with different inode (delete+recreate) console.log(`[smartwatch] File inode changed (delete+recreate): ${fullPath}`); console.log(`[smartwatch] Previous inode: ${previousInode}, current: ${currentInode}`); // Multiple rename events with inode change = delete+recreate pattern // Emit unlink for the old file, then add for the new one if (renameCount >= 2) { this.safeEmit({ type: 'unlink', path: fullPath }); this.safeEmit({ type: 'add', path: fullPath, stats }); } else { // Single rename with inode change = atomic save (emit as change) this.safeEmit({ type: 'change', path: fullPath, stats }); } } else if (hasChange || hasRename) { // File exists, was tracked, inode same - content changed this.safeEmit({ type: 'change', path: fullPath, stats }); } } } else { // File does NOT exist now - it was deleted const wasDir = this.isKnownDirectory(fullPath); if (wasWatched) { // File was tracked and is now gone this.watchedFiles.delete(fullPath); this.fileInodes.delete(fullPath); // If there were multiple events, file may have been created then deleted if (renameCount >= 2 && !wasDir) { // add→delete sequence - emit both events console.log(`[smartwatch] File created and deleted rapidly: ${fullPath}`); this.safeEmit({ type: 'add', path: fullPath }); this.safeEmit({ type: 'unlink', path: fullPath }); } else { this.safeEmit({ type: wasDir ? 'unlinkDir' : 'unlink', path: fullPath }); } } else { // File wasn't tracked - but events occurred for it this.fileInodes.delete(fullPath); if (renameCount >= 2) { // Multiple rename events for untracked file that doesn't exist // Likely: created → deleted rapidly console.log(`[smartwatch] Untracked file created and deleted: ${fullPath}`); this.safeEmit({ type: 'add', path: fullPath }); this.safeEmit({ type: 'unlink', path: fullPath }); } else if (hasRename) { // Single event for file that doesn't exist and wasn't tracked console.log(`[smartwatch] Untracked file deleted: ${fullPath}`); this.safeEmit({ type: 'unlink', path: fullPath }); } // If only 'change' events for non-existent untracked file, ignore } } } catch (error: any) { this.safeEmit({ type: 'error', path: fullPath, error }); } } /** * Scan directory and emit 'add' events for existing files */ private async scanDirectory(dirPath: string, depth: number): Promise { if (depth > this.options.depth) { return; } try { const entries = await fs.promises.readdir(dirPath, { withFileTypes: true }); for (const entry of entries) { const fullPath = path.join(dirPath, entry.name); // Skip temp files during initial scan too if (this.isTemporaryFile(fullPath)) { continue; } const stats = await this.statSafe(fullPath); if (!stats) { continue; } if (entry.isDirectory()) { this.watchedFiles.add(fullPath); this.safeEmit({ type: 'addDir', path: fullPath, stats }); await this.scanDirectory(fullPath, depth + 1); } else if (entry.isFile()) { this.watchedFiles.add(fullPath); // Track file inode for delete+recreate detection this.fileInodes.set(fullPath, BigInt(stats.ino)); this.safeEmit({ type: 'add', path: fullPath, stats }); } } } catch (error: any) { if (error.code !== 'ENOENT' && error.code !== 'EACCES') { this.safeEmit({ type: 'error', path: dirPath, error }); } } } /** * Safely stat a path, returning null if it doesn't exist */ private async statSafe(filePath: string): Promise { try { return await (this.options.followSymlinks ? fs.promises.stat(filePath) : fs.promises.lstat(filePath)); } catch (error: any) { // Only silently return null for expected "file doesn't exist" errors if (error.code === 'ENOENT' || error.code === 'ENOTDIR') { return null; } // Log other errors (permission, I/O) but still return null console.warn(`[smartwatch] statSafe warning for ${filePath}: ${error.code} - ${error.message}`); return null; } } /** * Check if a path was known to be a directory (for proper unlink event type) */ private isKnownDirectory(filePath: string): boolean { // Check if any watched files are children of this path for (const watched of this.watchedFiles) { if (watched.startsWith(filePath + path.sep)) { return true; } } return false; } }