import * as fs from 'fs'; import * as path from 'path'; import * as smartrx from '@push.rocks/smartrx'; import type { IWatcher, IWatcherOptions, IWatchEvent, TWatchEventType } from './interfaces.js'; // ============================================================================= // Constants // ============================================================================= /** Event type constants - inspired by chokidar's pattern */ const EV = { ADD: 'add', CHANGE: 'change', UNLINK: 'unlink', ADD_DIR: 'addDir', UNLINK_DIR: 'unlinkDir', READY: 'ready', ERROR: 'error', } as const; /** Configuration constants */ const CONFIG = { MAX_RETRIES: 3, INITIAL_RESTART_DELAY: 1000, MAX_RESTART_DELAY: 30000, HEALTH_CHECK_INTERVAL: 30000, ATOMIC_DELAY: 100, TEMP_FILE_DELAY: 50, } as const; // ============================================================================= // DirEntry Class - Elegant directory content tracking (inspired by chokidar) // ============================================================================= /** * Tracks contents of a watched directory with proper disposal */ class DirEntry { private _path: string; private _items: Set; private _inodes: Map; constructor(dirPath: string) { this._path = dirPath; this._items = new Set(); this._inodes = new Map(); } get path(): string { return this._path; } add(item: string, inode?: bigint): void { if (item === '.' || item === '..') return; this._items.add(item); if (inode !== undefined) { this._inodes.set(item, inode); } } remove(item: string): void { this._items.delete(item); this._inodes.delete(item); } has(item: string): boolean { return this._items.has(item); } getInode(item: string): bigint | undefined { return this._inodes.get(item); } setInode(item: string, inode: bigint): void { this._inodes.set(item, inode); } getChildren(): string[] { return [...this._items]; } get size(): number { return this._items.size; } dispose(): void { this._items.clear(); this._inodes.clear(); this._path = ''; // Freeze to catch accidental use after disposal Object.freeze(this); } } // ============================================================================= // NodeWatcher Class // ============================================================================= /** * Node.js/Bun file watcher using native fs.watch API * * Architecture inspired by chokidar with additional robustness features: * - Event deferral during initial scan * - Event sequence tracking for debounce * - Atomic write handling (unlink→add becomes change) * - Inode tracking for delete+recreate detection * - Health check monitoring * - Auto-restart with exponential backoff */ export class NodeWatcher implements IWatcher { // Core state private watchers: Map = new Map(); private watched: Map = new Map(); private _isWatching = false; // Event stream public readonly events$ = new smartrx.rxjs.Subject(); // Atomic write handling - pending unlinks that may become changes private pendingUnlinks: Map = new Map(); // Debounce with event sequence tracking private pendingEmits: Map; }> = new Map(); // Restart management private restartDelays: Map = new Map(); private restartAttempts: Map = new Map(); private restartAbortControllers: Map = new Map(); private restartingPaths: Set = new Set(); // Health monitoring private healthCheckInterval: NodeJS.Timeout | null = null; private watchedInodes: Map = new Map(); // Initial scan state private initialScanComplete = false; private deferredEvents: Array<{ basePath: string; filename: string; eventType: string }> = []; // Closer registry - inspired by chokidar for clean resource management private closers: Map void>> = new Map(); constructor(private options: IWatcherOptions) {} get isWatching(): boolean { return this._isWatching; } // =========================================================================== // Public API // =========================================================================== async start(): Promise { if (this._isWatching) return; console.log(`[smartwatch] Starting watcher for ${this.options.basePaths.length} base path(s)...`); try { // Reset state this.initialScanComplete = false; this.deferredEvents = []; // Start watching each base path (events will be deferred) for (const basePath of this.options.basePaths) { await this.watchPath(basePath); } this._isWatching = true; this.startHealthCheck(); // Initial scan populates watched entries for (const basePath of this.options.basePaths) { await this.scanDirectory(basePath, 0); } // Process deferred events this.initialScanComplete = true; if (this.deferredEvents.length > 0) { console.log(`[smartwatch] Processing ${this.deferredEvents.length} deferred events`); for (const event of this.deferredEvents) { this.handleFsEvent(event.basePath, event.filename, event.eventType); } this.deferredEvents = []; } this.safeEmit({ type: EV.READY, path: '' }); console.log(`[smartwatch] Watcher started with ${this.watchers.size} active watcher(s)`); } catch (error: any) { console.error('[smartwatch] Failed to start watcher:', error); this.safeEmit({ type: EV.ERROR, path: '', error }); throw error; } } async stop(): Promise { console.log('[smartwatch] Stopping watcher...'); // Cancel pending emits first (before flag changes) for (const pending of this.pendingEmits.values()) { clearTimeout(pending.timeout); } this.pendingEmits.clear(); // Cancel pending unlinks for (const pending of this.pendingUnlinks.values()) { clearTimeout(pending.timeout); } this.pendingUnlinks.clear(); // Now set flag this._isWatching = false; this.stopHealthCheck(); // Abort pending restarts for (const [watchPath, controller] of this.restartAbortControllers) { console.log(`[smartwatch] Aborting pending restart for: ${watchPath}`); controller.abort(); } this.restartAbortControllers.clear(); // Close all watchers and run closers for (const [watchPath, watcher] of this.watchers) { console.log(`[smartwatch] Closing watcher for: ${watchPath}`); watcher.close(); this.runClosers(watchPath); } // Clear all state this.watchers.clear(); this.watched.forEach(entry => entry.dispose()); this.watched.clear(); this.restartDelays.clear(); this.restartAttempts.clear(); this.watchedInodes.clear(); this.restartingPaths.clear(); this.closers.clear(); // Reset scan state this.initialScanComplete = false; this.deferredEvents = []; console.log('[smartwatch] Watcher stopped'); } // =========================================================================== // Event Emission // =========================================================================== /** Safely emit an event, isolating subscriber errors */ private safeEmit(event: IWatchEvent): void { try { this.events$.next(event); } catch (error) { console.error('[smartwatch] Subscriber threw error (isolated):', error); } } // =========================================================================== // Closer Registry - Clean resource management // =========================================================================== private addCloser(watchPath: string, closer: () => void): void { let list = this.closers.get(watchPath); if (!list) { list = []; this.closers.set(watchPath, list); } list.push(closer); } private runClosers(watchPath: string): void { const list = this.closers.get(watchPath); if (list) { list.forEach(closer => closer()); this.closers.delete(watchPath); } } // =========================================================================== // Directory Entry Management // =========================================================================== private getWatchedDir(dirPath: string): DirEntry { const resolved = path.resolve(dirPath); let entry = this.watched.get(resolved); if (!entry) { entry = new DirEntry(resolved); this.watched.set(resolved, entry); } return entry; } private isTracked(filePath: string): boolean { const dir = path.dirname(filePath); const base = path.basename(filePath); const entry = this.watched.get(path.resolve(dir)); return entry?.has(base) ?? false; } private trackFile(filePath: string, inode?: bigint): void { const dir = path.dirname(filePath); const base = path.basename(filePath); this.getWatchedDir(dir).add(base, inode); } private untrackFile(filePath: string): void { const dir = path.dirname(filePath); const base = path.basename(filePath); const entry = this.watched.get(path.resolve(dir)); entry?.remove(base); } private getFileInode(filePath: string): bigint | undefined { const dir = path.dirname(filePath); const base = path.basename(filePath); const entry = this.watched.get(path.resolve(dir)); return entry?.getInode(base); } // =========================================================================== // Temp File Handling // =========================================================================== private isTemporaryFile(filePath: string): boolean { const basename = path.basename(filePath); return ( basename.includes('.tmp.') || basename.endsWith('.swp') || basename.endsWith('.swx') || basename.endsWith('~') || basename.startsWith('.#') ); } /** * Extract real file path from temp file (Claude Code atomic writes) * Pattern: file.ts.tmp.PID.TIMESTAMP -> file.ts */ private getTempFileTarget(tempPath: string): string | null { const basename = path.basename(tempPath); // Claude Code: file.ts.tmp.PID.TIMESTAMP const claudeMatch = basename.match(/^(.+)\.tmp\.\d+\.\d+$/); if (claudeMatch) { return path.join(path.dirname(tempPath), claudeMatch[1]); } // Generic: file.ts.tmp.something const genericMatch = basename.match(/^(.+)\.tmp\.[^.]+$/); if (genericMatch) { return path.join(path.dirname(tempPath), genericMatch[1]); } return null; } // =========================================================================== // Watch Path Setup // =========================================================================== private async watchPath(watchPath: string): Promise { // Normalize path to absolute - critical for consistent lookups watchPath = path.resolve(watchPath); try { const stats = await this.statSafe(watchPath); if (!stats?.isDirectory()) return; // Store inode for health check (fs.watch watches inode, not path!) this.watchedInodes.set(watchPath, BigInt(stats.ino)); const watcher = fs.watch( watchPath, { recursive: true, persistent: true }, (eventType, filename) => { if (filename) { this.handleFsEvent(watchPath, filename, eventType); } } ); watcher.on('error', (error: NodeJS.ErrnoException) => { console.error(`[smartwatch] FSWatcher error on ${watchPath}:`, error); if (error.code === 'ENOSPC') { console.error('[smartwatch] CRITICAL: inotify watch limit exceeded!'); console.error('[smartwatch] Fix: echo fs.inotify.max_user_watches=524288 | sudo tee -a /etc/sysctl.conf && sudo sysctl -p'); } this.safeEmit({ type: EV.ERROR, path: watchPath, error }); if (this._isWatching) { this.restartWatcher(watchPath, error); } }); watcher.on('close', () => { if (this._isWatching) { console.warn(`[smartwatch] FSWatcher closed unexpectedly for ${watchPath}`); this.restartWatcher(watchPath, new Error('Watcher closed unexpectedly')); } }); this.watchers.set(watchPath, watcher); // Register closer this.addCloser(watchPath, () => { try { watcher.close(); } catch {} }); console.log(`[smartwatch] Started watching: ${watchPath}`); } catch (error: any) { console.error(`[smartwatch] Failed to watch path ${watchPath}:`, error); this.safeEmit({ type: EV.ERROR, path: watchPath, error }); } } // =========================================================================== // Event Handling // =========================================================================== private handleFsEvent( basePath: string, filename: string, eventType: 'rename' | 'change' | string ): void { // Guard against post-stop events if (!this._isWatching) return; // Defer events until initial scan completes if (!this.initialScanComplete) { this.deferredEvents.push({ basePath, filename, eventType }); return; } // Normalize to absolute path - critical for consistent lookups const fullPath = path.resolve(path.join(basePath, filename)); // Handle temp files from atomic writes if (this.isTemporaryFile(fullPath)) { console.log(`[smartwatch] Detected temp file event: ${filename}`); const realPath = this.getTempFileTarget(fullPath); if (realPath) { console.log(`[smartwatch] Checking corresponding real file: ${realPath}`); setTimeout(() => { if (this._isWatching) { this.handleFsEvent(basePath, path.relative(basePath, realPath), 'change'); } }, CONFIG.TEMP_FILE_DELAY); } return; } // Track event sequence for intelligent debouncing const existing = this.pendingEmits.get(fullPath); if (existing) { clearTimeout(existing.timeout); existing.events.push(eventType as 'rename' | 'change'); existing.timeout = setTimeout(() => { const pending = this.pendingEmits.get(fullPath); if (pending) { this.pendingEmits.delete(fullPath); this.emitFileEvent(fullPath, pending.events); } }, this.options.debounceMs); } else { const timeout = setTimeout(() => { const pending = this.pendingEmits.get(fullPath); if (pending) { this.pendingEmits.delete(fullPath); this.emitFileEvent(fullPath, pending.events); } }, this.options.debounceMs); this.pendingEmits.set(fullPath, { timeout, events: [eventType as 'rename' | 'change'], }); } } /** * Emit file event after debounce with atomic write handling * * Atomic write pattern (inspired by chokidar): * - unlink event queued with delay * - if add arrives for same path, transform to change */ private async emitFileEvent( fullPath: string, eventSequence: Array<'rename' | 'change'> ): Promise { try { const stats = await this.statSafe(fullPath); const wasTracked = this.isTracked(fullPath); const previousInode = this.getFileInode(fullPath); // Analyze event sequence const hasRename = eventSequence.includes('rename'); const renameCount = eventSequence.filter(e => e === 'rename').length; if (eventSequence.length > 1) { console.log(`[smartwatch] Processing event sequence for ${fullPath}: [${eventSequence.join(', ')}]`); } if (stats) { // File EXISTS const currentInode = BigInt(stats.ino); const inodeChanged = previousInode !== undefined && previousInode !== currentInode; if (stats.isDirectory()) { if (!wasTracked) { this.trackFile(fullPath); this.safeEmit({ type: EV.ADD_DIR, path: fullPath, stats }); } } else { // Update tracking this.trackFile(fullPath, currentInode); // Check for pending unlink → transform to change (atomic write pattern) const pendingUnlink = this.pendingUnlinks.get(fullPath); if (pendingUnlink) { clearTimeout(pendingUnlink.timeout); this.pendingUnlinks.delete(fullPath); console.log(`[smartwatch] Atomic write detected (unlink→add→change): ${fullPath}`); this.safeEmit({ type: EV.CHANGE, path: fullPath, stats }); return; } if (!wasTracked) { this.safeEmit({ type: EV.ADD, path: fullPath, stats }); } else if (inodeChanged) { console.log(`[smartwatch] File inode changed (delete+recreate): ${fullPath}`); console.log(`[smartwatch] Previous inode: ${previousInode}, current: ${currentInode}`); if (renameCount >= 2) { // Multiple renames with inode change = delete+recreate this.safeEmit({ type: EV.UNLINK, path: fullPath }); this.safeEmit({ type: EV.ADD, path: fullPath, stats }); } else { // Single rename with inode change = atomic save this.safeEmit({ type: EV.CHANGE, path: fullPath, stats }); } } else { // Debounce already handles rapid events - no extra throttle needed this.safeEmit({ type: EV.CHANGE, path: fullPath, stats }); } } } else { // File does NOT exist - handle unlink const wasDir = this.isKnownDirectory(fullPath); if (wasTracked) { this.untrackFile(fullPath); if (renameCount >= 2 && !wasDir) { // Rapid create+delete console.log(`[smartwatch] File created and deleted rapidly: ${fullPath}`); this.safeEmit({ type: EV.ADD, path: fullPath }); this.safeEmit({ type: EV.UNLINK, path: fullPath }); } else { // Queue unlink with delay for atomic write detection this.queueUnlink(fullPath, wasDir); } } else { if (renameCount >= 2) { console.log(`[smartwatch] Untracked file created and deleted: ${fullPath}`); this.safeEmit({ type: EV.ADD, path: fullPath }); this.safeEmit({ type: EV.UNLINK, path: fullPath }); } else if (hasRename) { console.log(`[smartwatch] Untracked file deleted: ${fullPath}`); this.queueUnlink(fullPath, false); } } } } catch (error: any) { this.safeEmit({ type: EV.ERROR, path: fullPath, error }); } } /** * Queue an unlink event with delay for atomic write detection * If add event arrives within delay, unlink is cancelled and change is emitted */ private queueUnlink(fullPath: string, isDir: boolean): void { const event: IWatchEvent = { type: isDir ? EV.UNLINK_DIR : EV.UNLINK, path: fullPath, }; const timeout = setTimeout(() => { const pending = this.pendingUnlinks.get(fullPath); if (pending) { this.pendingUnlinks.delete(fullPath); this.safeEmit(pending.event); } }, CONFIG.ATOMIC_DELAY); this.pendingUnlinks.set(fullPath, { timeout, event }); } // =========================================================================== // Directory Scanning // =========================================================================== private async scanDirectory(dirPath: string, depth: number): Promise { // Normalize path to absolute - critical for consistent lookups dirPath = path.resolve(dirPath); if (depth > this.options.depth) return; try { const entries = await fs.promises.readdir(dirPath, { withFileTypes: true }); for (const entry of entries) { const fullPath = path.join(dirPath, entry.name); if (this.isTemporaryFile(fullPath)) continue; const stats = await this.statSafe(fullPath); if (!stats) continue; if (entry.isDirectory()) { this.trackFile(fullPath); this.safeEmit({ type: EV.ADD_DIR, path: fullPath, stats }); await this.scanDirectory(fullPath, depth + 1); } else if (entry.isFile()) { this.trackFile(fullPath, BigInt(stats.ino)); this.safeEmit({ type: EV.ADD, path: fullPath, stats }); } } } catch (error: any) { if (error.code !== 'ENOENT' && error.code !== 'EACCES') { this.safeEmit({ type: EV.ERROR, path: dirPath, error }); } } } // =========================================================================== // Health Check & Auto-Restart // =========================================================================== private startHealthCheck(): void { console.log('[smartwatch] Starting health check (every 30s)'); this.healthCheckInterval = setInterval(async () => { console.log(`[smartwatch] Health check: ${this.watchers.size} watchers active`); for (const [basePath] of this.watchers) { try { const stats = await fs.promises.stat(basePath); const currentInode = BigInt(stats.ino); const previousInode = this.watchedInodes.get(basePath); if (previousInode !== undefined && currentInode !== previousInode) { console.warn(`[smartwatch] Inode changed for ${basePath}: ${previousInode} -> ${currentInode}`); console.warn('[smartwatch] fs.watch watches inode, not path - restarting watcher'); this.restartWatcher(basePath, new Error('Inode changed - directory was replaced')); } } catch (error: any) { if (error.code === 'ENOENT') { console.error(`[smartwatch] Health check failed: ${basePath} no longer exists`); this.restartWatcher(basePath, new Error('Watched path disappeared')); } else if (error.code === 'ENOSPC') { console.error('[smartwatch] ENOSPC: inotify watch limit exceeded!'); console.error('[smartwatch] Fix: echo fs.inotify.max_user_watches=524288 | sudo tee -a /etc/sysctl.conf && sudo sysctl -p'); this.safeEmit({ type: EV.ERROR, path: basePath, error }); // Trigger restart - watcher may be broken after ENOSPC this.restartWatcher(basePath, error); } else { console.error(`[smartwatch] Health check error for ${basePath}:`, error); } } } }, CONFIG.HEALTH_CHECK_INTERVAL); } private stopHealthCheck(): void { if (this.healthCheckInterval) { clearInterval(this.healthCheckInterval); this.healthCheckInterval = null; console.log('[smartwatch] Stopped health check'); } } private async restartWatcher(basePath: string, error: Error): Promise { // Guard against concurrent restarts if (this.restartingPaths.has(basePath)) { console.log(`[smartwatch] Restart already in progress for ${basePath}, skipping`); return; } this.restartingPaths.add(basePath); try { const attempts = (this.restartAttempts.get(basePath) || 0) + 1; this.restartAttempts.set(basePath, attempts); console.log(`[smartwatch] Watcher error for ${basePath}: ${error.message}`); console.log(`[smartwatch] Restart attempt ${attempts}/${CONFIG.MAX_RETRIES}`); if (attempts > CONFIG.MAX_RETRIES) { console.error(`[smartwatch] Max retries exceeded for ${basePath}, giving up`); this.safeEmit({ type: EV.ERROR, path: basePath, error: new Error(`Max restart retries (${CONFIG.MAX_RETRIES}) exceeded`), }); return; } // Close old watcher const oldWatcher = this.watchers.get(basePath); if (oldWatcher) { try { oldWatcher.close(); } catch {} this.watchers.delete(basePath); } // Clear pending unlinks for this base path (prevent stale events) for (const [unlinkedPath, pending] of this.pendingUnlinks) { if (unlinkedPath.startsWith(basePath)) { clearTimeout(pending.timeout); this.pendingUnlinks.delete(unlinkedPath); } } // Clear stale DirEntry data (will be repopulated by rescan) for (const [dirPath, entry] of this.watched) { if (dirPath === basePath || dirPath.startsWith(basePath + path.sep)) { entry.dispose(); this.watched.delete(dirPath); } } // Exponential backoff with abort support const delay = this.restartDelays.get(basePath) || CONFIG.INITIAL_RESTART_DELAY; console.log(`[smartwatch] Waiting ${delay}ms before restart...`); const abortController = new AbortController(); this.restartAbortControllers.set(basePath, abortController); try { await new Promise((resolve, reject) => { const timeout = setTimeout(resolve, delay); abortController.signal.addEventListener('abort', () => { clearTimeout(timeout); reject(new Error('Restart aborted by stop()')); }); }); } catch { console.log(`[smartwatch] Restart aborted for ${basePath}`); return; } finally { this.restartAbortControllers.delete(basePath); } if (!this._isWatching) { console.log('[smartwatch] Watcher stopped during restart delay, aborting'); return; } this.restartDelays.set(basePath, Math.min(delay * 2, CONFIG.MAX_RESTART_DELAY)); try { await this.watchPath(basePath); // Rescan to catch files created during restart window await this.scanDirectory(basePath, 0); console.log(`[smartwatch] Successfully restarted watcher for ${basePath}`); this.restartDelays.set(basePath, CONFIG.INITIAL_RESTART_DELAY); this.restartAttempts.set(basePath, 0); } catch (restartError) { console.error(`[smartwatch] Restart failed for ${basePath}:`, restartError); this.restartingPaths.delete(basePath); this.restartWatcher(basePath, restartError as Error); return; } } finally { this.restartingPaths.delete(basePath); } } // =========================================================================== // Utilities // =========================================================================== private async statSafe(filePath: string): Promise { try { return await (this.options.followSymlinks ? fs.promises.stat(filePath) : fs.promises.lstat(filePath)); } catch (error: any) { if (error.code === 'ENOENT' || error.code === 'ENOTDIR') { return null; } console.warn(`[smartwatch] statSafe warning for ${filePath}: ${error.code} - ${error.message}`); return null; } } private isKnownDirectory(filePath: string): boolean { const resolved = path.resolve(filePath); return this.watched.has(resolved); } }