import * as fs from 'fs'; import * as path from 'path'; import * as smartrx from '@push.rocks/smartrx'; import type { IWatcher, IWatcherOptions, IWatchEvent, TWatchEventType } from './interfaces.js'; import { WriteStabilizer } from '../utils/write-stabilizer.js'; /** * Node.js/Bun file watcher using native fs.watch API */ export class NodeWatcher implements IWatcher { private watchers: Map = new Map(); private watchedFiles: Set = new Set(); private _isWatching = false; private writeStabilizer: WriteStabilizer; private recentEvents: Map = new Map(); private throttleMs = 50; public readonly events$ = new smartrx.rxjs.Subject(); constructor(private options: IWatcherOptions) { this.writeStabilizer = new WriteStabilizer( options.stabilityThreshold, options.pollInterval ); } get isWatching(): boolean { return this._isWatching; } async start(): Promise { if (this._isWatching) { return; } try { // Start watching each base path for (const basePath of this.options.basePaths) { await this.watchPath(basePath, 0); } this._isWatching = true; // Perform initial scan to emit 'add' events for existing files for (const basePath of this.options.basePaths) { await this.scanDirectory(basePath, 0); } // Emit ready event this.events$.next({ type: 'ready', path: '' }); } catch (error: any) { this.events$.next({ type: 'error', path: '', error }); throw error; } } async stop(): Promise { this._isWatching = false; this.writeStabilizer.cancelAll(); // Close all watchers for (const [watchPath, watcher] of this.watchers) { watcher.close(); } this.watchers.clear(); this.watchedFiles.clear(); this.recentEvents.clear(); } /** * Start watching a path (file or directory) */ private async watchPath(watchPath: string, depth: number): Promise { if (depth > this.options.depth) { return; } try { const stats = await this.statSafe(watchPath); if (!stats) { return; } if (stats.isDirectory()) { // Watch the directory with recursive option (Node.js 20+ supports this on all platforms) const watcher = fs.watch( watchPath, { recursive: true, persistent: true }, (eventType, filename) => { if (filename) { this.handleFsEvent(watchPath, filename, eventType); } } ); watcher.on('error', (error) => { this.events$.next({ type: 'error', path: watchPath, error }); }); this.watchers.set(watchPath, watcher); } } catch (error: any) { this.events$.next({ type: 'error', path: watchPath, error }); } } /** * Handle raw fs.watch events and normalize them */ private async handleFsEvent( basePath: string, filename: string, eventType: 'rename' | 'change' | string ): Promise { const fullPath = path.join(basePath, filename); // Throttle duplicate events if (!this.shouldEmit(fullPath, eventType)) { return; } try { const stats = await this.statSafe(fullPath); if (eventType === 'rename') { // 'rename' can mean add or unlink - check if file exists if (stats) { // File exists - it's either a new file or was renamed to this location if (stats.isDirectory()) { if (!this.watchedFiles.has(fullPath)) { this.watchedFiles.add(fullPath); this.events$.next({ type: 'addDir', path: fullPath, stats }); } } else { // Wait for write to stabilize before emitting try { const stableStats = await this.writeStabilizer.waitForWriteFinish(fullPath); const wasWatched = this.watchedFiles.has(fullPath); this.watchedFiles.add(fullPath); this.events$.next({ type: wasWatched ? 'change' : 'add', path: fullPath, stats: stableStats }); } catch { // File was deleted during stabilization if (this.watchedFiles.has(fullPath)) { this.watchedFiles.delete(fullPath); this.events$.next({ type: 'unlink', path: fullPath }); } } } } else { // File doesn't exist - it was deleted if (this.watchedFiles.has(fullPath)) { const wasDir = this.isKnownDirectory(fullPath); this.watchedFiles.delete(fullPath); this.events$.next({ type: wasDir ? 'unlinkDir' : 'unlink', path: fullPath }); } } } else if (eventType === 'change') { // File was modified if (stats && !stats.isDirectory()) { try { const stableStats = await this.writeStabilizer.waitForWriteFinish(fullPath); // Check if this is a new file (not yet in watchedFiles) const wasWatched = this.watchedFiles.has(fullPath); if (!wasWatched) { // This is actually an 'add' - file wasn't being watched before this.watchedFiles.add(fullPath); this.events$.next({ type: 'add', path: fullPath, stats: stableStats }); } else { this.events$.next({ type: 'change', path: fullPath, stats: stableStats }); } } catch { // File was deleted during write if (this.watchedFiles.has(fullPath)) { this.watchedFiles.delete(fullPath); this.events$.next({ type: 'unlink', path: fullPath }); } } } } } catch (error: any) { this.events$.next({ type: 'error', path: fullPath, error }); } } /** * Scan directory and emit 'add' events for existing files */ private async scanDirectory(dirPath: string, depth: number): Promise { if (depth > this.options.depth) { return; } try { const entries = await fs.promises.readdir(dirPath, { withFileTypes: true }); for (const entry of entries) { const fullPath = path.join(dirPath, entry.name); const stats = await this.statSafe(fullPath); if (!stats) { continue; } if (entry.isDirectory()) { this.watchedFiles.add(fullPath); this.events$.next({ type: 'addDir', path: fullPath, stats }); await this.scanDirectory(fullPath, depth + 1); } else if (entry.isFile()) { this.watchedFiles.add(fullPath); this.events$.next({ type: 'add', path: fullPath, stats }); } } } catch (error: any) { if (error.code !== 'ENOENT' && error.code !== 'EACCES') { this.events$.next({ type: 'error', path: dirPath, error }); } } } /** * Safely stat a path, returning null if it doesn't exist */ private async statSafe(filePath: string): Promise { try { if (this.options.followSymlinks) { return await fs.promises.stat(filePath); } else { return await fs.promises.lstat(filePath); } } catch { return null; } } /** * Check if a path was known to be a directory (for proper unlink event type) */ private isKnownDirectory(filePath: string): boolean { // Check if any watched files are children of this path for (const watched of this.watchedFiles) { if (watched.startsWith(filePath + path.sep)) { return true; } } return false; } /** * Throttle duplicate events */ private shouldEmit(filePath: string, eventType: string): boolean { const key = `${filePath}:${eventType}`; const now = Date.now(); const lastEmit = this.recentEvents.get(key); if (lastEmit && now - lastEmit < this.throttleMs) { return false; } this.recentEvents.set(key, now); // Clean up old entries periodically if (this.recentEvents.size > 1000) { const cutoff = now - this.throttleMs * 2; for (const [k, time] of this.recentEvents) { if (time < cutoff) { this.recentEvents.delete(k); } } } return true; } }