From 097ea96e99ea8e7bc26688003c612c8733d86b24 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Mon, 8 Dec 2025 16:06:18 +0000 Subject: [PATCH] BREAKING CHANGE(watchers): Replace polling-based write stabilization with debounce-based event coalescing and simplify watcher options --- changelog.md | 11 +++ package.json | 2 +- ts/00_commitinfo_data.ts | 2 +- ts/smartwatch.classes.smartwatch.ts | 4 +- ts/utils/write-stabilizer.ts | 109 ---------------------- ts/watchers/index.ts | 4 +- ts/watchers/interfaces.ts | 8 +- ts/watchers/watcher.deno.ts | 136 ++++++++-------------------- ts/watchers/watcher.node.ts | 118 +++++++++++------------- 9 files changed, 106 insertions(+), 288 deletions(-) delete mode 100644 ts/utils/write-stabilizer.ts diff --git a/changelog.md b/changelog.md index 9e2e6c2..8be5226 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,16 @@ # Changelog +## 2025-12-08 - 6.0.0 - BREAKING CHANGE(watchers) +Replace polling-based write stabilization with debounce-based event coalescing and simplify watcher options + +- Remove polling-based WriteStabilizer (ts/utils/write-stabilizer.ts) and related waitForWriteFinish logic +- Introduce debounce-based coalescing (debounceMs) for Node and Deno watchers (ts/watchers/watcher.node.ts, ts/watchers/watcher.deno.ts) +- Change IWatcherOptions: remove stabilityThreshold/pollInterval/maxWaitTime and add debounceMs +- Default watcher options updated to use debounceMs = 100 +- Node/Deno watchers now skip editor temp files earlier, debounce duplicate events, and emit events directly (no size polling) +- Updated default watcher creation in Smartwatch to pass debounceMs +- Update package.json build script to run 'tsbuild tsfolders' + ## 2025-12-08 - 5.1.0 - feat(watchers) Improve write stabilization and ignore temporary editor files diff --git a/package.json b/package.json index f2def02..a709246 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ "scripts": { "test": "(npm run prepareTest && tstest test/)", "prepareTest": "(rm -f ./test/assets/hi.txt)", - "build": "tsbuild", + "build": "tsbuild tsfolders", "buildDocs": "tsdoc" }, "repository": { diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index baf6412..8773374 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartwatch', - version: '5.1.0', + version: '6.0.0', description: 'A cross-runtime file watcher with glob pattern support for Node.js, Deno, and Bun.' } diff --git a/ts/smartwatch.classes.smartwatch.ts b/ts/smartwatch.classes.smartwatch.ts index 0bfddd2..891833a 100644 --- a/ts/smartwatch.classes.smartwatch.ts +++ b/ts/smartwatch.classes.smartwatch.ts @@ -132,9 +132,7 @@ export class Smartwatch { basePaths: watchPaths, depth: 4, followSymlinks: false, - stabilityThreshold: 100, - pollInterval: 100, - maxWaitTime: 1000 + debounceMs: 100 }); // Subscribe to watcher events and dispatch to appropriate subjects diff --git a/ts/utils/write-stabilizer.ts b/ts/utils/write-stabilizer.ts deleted file mode 100644 index 2aef7cd..0000000 --- a/ts/utils/write-stabilizer.ts +++ /dev/null @@ -1,109 +0,0 @@ -import * as fs from 'fs'; - -interface IPendingWrite { - lastSize: number; - lastChange: number; - startTime: number; - timeoutId: ReturnType; - resolve: (stats: fs.Stats) => void; - reject: (error: Error) => void; -} - -/** - * Implements awaitWriteFinish functionality by polling file size until stable. - * This replaces chokidar's built-in write stabilization. - */ -export class WriteStabilizer { - private pendingWrites = new Map(); - - constructor( - private stabilityThreshold: number = 100, - private pollInterval: number = 100, - private maxWaitTime: number = 1000 - ) {} - - /** - * Wait for a file write to complete by polling until size is stable - */ - async waitForWriteFinish(filePath: string): Promise { - // Cancel any existing pending check for this file - this.cancel(filePath); - - return new Promise((resolve, reject) => { - const startTime = Date.now(); - - const poll = async () => { - try { - const stats = await fs.promises.stat(filePath); - const pending = this.pendingWrites.get(filePath); - - if (!pending) { - // Was cancelled - return; - } - - const now = Date.now(); - - // Check if we've exceeded max wait time - emit with current stats - if (now - pending.startTime >= this.maxWaitTime) { - this.pendingWrites.delete(filePath); - resolve(stats); - return; - } - - if (stats.size !== pending.lastSize) { - // Size changed - file is still being written, reset timer - pending.lastSize = stats.size; - pending.lastChange = now; - pending.timeoutId = setTimeout(poll, this.pollInterval); - } else if (now - pending.lastChange >= this.stabilityThreshold) { - // Size has been stable for the threshold duration - this.pendingWrites.delete(filePath); - resolve(stats); - } else { - // Size is the same but not yet past threshold - pending.timeoutId = setTimeout(poll, this.pollInterval); - } - } catch (error: any) { - this.pendingWrites.delete(filePath); - if (error.code === 'ENOENT') { - // File was deleted during polling - reject(new Error(`File was deleted: ${filePath}`)); - } else { - reject(error); - } - } - }; - - this.pendingWrites.set(filePath, { - lastSize: -1, - lastChange: startTime, - startTime: startTime, - timeoutId: setTimeout(poll, this.pollInterval), - resolve, - reject - }); - }); - } - - /** - * Cancel any pending write stabilization for a file - */ - cancel(filePath: string): void { - const pending = this.pendingWrites.get(filePath); - if (pending) { - clearTimeout(pending.timeoutId); - this.pendingWrites.delete(filePath); - } - } - - /** - * Cancel all pending write stabilizations - */ - cancelAll(): void { - for (const [filePath, pending] of this.pendingWrites) { - clearTimeout(pending.timeoutId); - } - this.pendingWrites.clear(); - } -} diff --git a/ts/watchers/index.ts b/ts/watchers/index.ts index 7e737cd..1679719 100644 --- a/ts/watchers/index.ts +++ b/ts/watchers/index.ts @@ -28,7 +28,5 @@ export const defaultWatcherOptions: IWatcherOptions = { basePaths: [], depth: 4, followSymlinks: false, - stabilityThreshold: 100, - pollInterval: 100, - maxWaitTime: 1000 + debounceMs: 100 }; diff --git a/ts/watchers/interfaces.ts b/ts/watchers/interfaces.ts index 6c29d9d..cbb47ae 100644 --- a/ts/watchers/interfaces.ts +++ b/ts/watchers/interfaces.ts @@ -26,12 +26,8 @@ export interface IWatcherOptions { depth: number; /** Whether to follow symbolic links */ followSymlinks: boolean; - /** Stability threshold for write detection (ms) */ - stabilityThreshold: number; - /** Poll interval for write detection (ms) */ - pollInterval: number; - /** Maximum time to wait for write stabilization (ms) */ - maxWaitTime: number; + /** Debounce time in ms - events for the same file within this window are coalesced */ + debounceMs: number; } /** diff --git a/ts/watchers/watcher.deno.ts b/ts/watchers/watcher.deno.ts index 363044d..27f6ad2 100644 --- a/ts/watchers/watcher.deno.ts +++ b/ts/watchers/watcher.deno.ts @@ -65,10 +65,9 @@ export class DenoWatcher implements IWatcher { private watcher: ReturnType | null = null; private watchedFiles: Set = new Set(); private _isWatching = false; - private abortController: AbortController | null = null; - private recentEvents: Map = new Map(); - private throttleMs = 50; - private pendingWrites: Map> = new Map(); + + // Debounce: pending emits per file path + private pendingEmits: Map> = new Map(); public readonly events$ = new smartrx.rxjs.Subject(); @@ -97,8 +96,6 @@ export class DenoWatcher implements IWatcher { } try { - this.abortController = new AbortController(); - // Start watching all base paths this.watcher = Deno.watchFs(this.options.basePaths, { recursive: true }); this._isWatching = true; @@ -122,11 +119,11 @@ export class DenoWatcher implements IWatcher { async stop(): Promise { this._isWatching = false; - // Cancel all pending write stabilizations - for (const timeout of this.pendingWrites.values()) { + // Cancel all pending debounced emits + for (const timeout of this.pendingEmits.values()) { clearTimeout(timeout); } - this.pendingWrites.clear(); + this.pendingEmits.clear(); // Close the watcher if (this.watcher) { @@ -135,7 +132,6 @@ export class DenoWatcher implements IWatcher { } this.watchedFiles.clear(); - this.recentEvents.clear(); } /** @@ -153,7 +149,7 @@ export class DenoWatcher implements IWatcher { } for (const filePath of event.paths) { - await this.handleDenoEvent(event.kind, filePath); + this.handleDenoEvent(event.kind, filePath); } } } catch (error: any) { @@ -164,12 +160,12 @@ export class DenoWatcher implements IWatcher { } /** - * Handle a Deno file system event + * Handle a Deno file system event - debounce and normalize */ - private async handleDenoEvent( + private handleDenoEvent( kind: 'create' | 'modify' | 'remove' | 'access' | 'any' | 'other', filePath: string - ): Promise { + ): void { // Ignore 'access' events (just reading the file) if (kind === 'access') { return; @@ -180,14 +176,30 @@ export class DenoWatcher implements IWatcher { return; } - // Throttle duplicate events - if (!this.shouldEmit(filePath, kind)) { - return; + // Debounce: cancel any pending emit for this file + const existing = this.pendingEmits.get(filePath); + if (existing) { + clearTimeout(existing); } + // Schedule debounced emit + const timeout = setTimeout(() => { + this.pendingEmits.delete(filePath); + this.emitFileEvent(filePath, kind); + }, this.options.debounceMs); + + this.pendingEmits.set(filePath, timeout); + } + + /** + * Emit the actual file event after debounce + */ + private async emitFileEvent( + filePath: string, + kind: 'create' | 'modify' | 'remove' | 'access' | 'any' | 'other' + ): Promise { try { if (kind === 'create') { - // Create events (atomic saves) don't need stabilization - file is already complete const stats = await this.statSafe(filePath); if (stats) { this.watchedFiles.add(filePath); @@ -195,16 +207,9 @@ export class DenoWatcher implements IWatcher { this.events$.next({ type: eventType, path: filePath, stats }); } } else if (kind === 'modify') { - // Modify events are in-place writes - use stabilization const stats = await this.statSafe(filePath); if (stats && !stats.isDirectory()) { - // Wait for write to stabilize - await this.waitForWriteFinish(filePath); - const finalStats = await this.statSafe(filePath); - - if (finalStats) { - this.events$.next({ type: 'change', path: filePath, stats: finalStats }); - } + this.events$.next({ type: 'change', path: filePath, stats }); } } else if (kind === 'remove') { const wasDirectory = this.isKnownDirectory(filePath); @@ -219,52 +224,6 @@ export class DenoWatcher implements IWatcher { } } - /** - * Wait for file write to complete (polling-based) - */ - private async waitForWriteFinish(filePath: string): Promise { - return new Promise((resolve) => { - let lastSize = -1; - let lastChange = Date.now(); - const startTime = Date.now(); - - const poll = async () => { - try { - const stats = await this.statSafe(filePath); - if (!stats) { - resolve(); - return; - } - - const now = Date.now(); - - // Check if we've exceeded max wait time - resolve immediately - if (now - startTime >= this.options.maxWaitTime) { - this.pendingWrites.delete(filePath); - resolve(); - return; - } - - if (stats.size !== lastSize) { - lastSize = stats.size; - lastChange = now; - this.pendingWrites.set(filePath, setTimeout(poll, this.options.pollInterval)); - } else if (now - lastChange >= this.options.stabilityThreshold) { - this.pendingWrites.delete(filePath); - resolve(); - } else { - this.pendingWrites.set(filePath, setTimeout(poll, this.options.pollInterval)); - } - } catch { - this.pendingWrites.delete(filePath); - resolve(); - } - }; - - this.pendingWrites.set(filePath, setTimeout(poll, this.options.pollInterval)); - }); - } - /** * Scan directory and emit 'add' events for existing files */ @@ -276,6 +235,12 @@ export class DenoWatcher implements IWatcher { try { for await (const entry of Deno.readDir(dirPath)) { const fullPath = `${dirPath}/${entry.name}`; + + // Skip temp files during initial scan too + if (this.isTemporaryFile(fullPath)) { + continue; + } + const stats = await this.statSafe(fullPath); if (!stats) { @@ -322,31 +287,4 @@ export class DenoWatcher implements IWatcher { } return false; } - - /** - * Throttle duplicate events - */ - private shouldEmit(filePath: string, eventType: string): boolean { - const key = `${filePath}:${eventType}`; - const now = Date.now(); - const lastEmit = this.recentEvents.get(key); - - if (lastEmit && now - lastEmit < this.throttleMs) { - return false; - } - - this.recentEvents.set(key, now); - - // Clean up old entries periodically - if (this.recentEvents.size > 1000) { - const cutoff = now - this.throttleMs * 2; - for (const [k, time] of this.recentEvents) { - if (time < cutoff) { - this.recentEvents.delete(k); - } - } - } - - return true; - } } diff --git a/ts/watchers/watcher.node.ts b/ts/watchers/watcher.node.ts index 8076e51..24347b6 100644 --- a/ts/watchers/watcher.node.ts +++ b/ts/watchers/watcher.node.ts @@ -2,7 +2,6 @@ import * as fs from 'fs'; import * as path from 'path'; import * as smartrx from '@push.rocks/smartrx'; import type { IWatcher, IWatcherOptions, IWatchEvent, TWatchEventType } from './interfaces.js'; -import { WriteStabilizer } from '../utils/write-stabilizer.js'; /** * Node.js/Bun file watcher using native fs.watch API @@ -11,19 +10,13 @@ export class NodeWatcher implements IWatcher { private watchers: Map = new Map(); private watchedFiles: Set = new Set(); private _isWatching = false; - private writeStabilizer: WriteStabilizer; - private recentEvents: Map = new Map(); - private throttleMs = 50; + + // Debounce: pending emits per file path + private pendingEmits: Map = new Map(); public readonly events$ = new smartrx.rxjs.Subject(); - constructor(private options: IWatcherOptions) { - this.writeStabilizer = new WriteStabilizer( - options.stabilityThreshold, - options.pollInterval, - options.maxWaitTime - ); - } + constructor(private options: IWatcherOptions) {} /** * Check if a file is a temporary file created by editors @@ -70,15 +63,19 @@ export class NodeWatcher implements IWatcher { async stop(): Promise { this._isWatching = false; - this.writeStabilizer.cancelAll(); + + // Cancel all pending debounced emits + for (const timeout of this.pendingEmits.values()) { + clearTimeout(timeout); + } + this.pendingEmits.clear(); // Close all watchers - for (const [watchPath, watcher] of this.watchers) { + for (const [, watcher] of this.watchers) { watcher.close(); } this.watchers.clear(); this.watchedFiles.clear(); - this.recentEvents.clear(); } /** @@ -119,13 +116,13 @@ export class NodeWatcher implements IWatcher { } /** - * Handle raw fs.watch events and normalize them + * Handle raw fs.watch events - debounce and normalize them */ - private async handleFsEvent( + private handleFsEvent( basePath: string, filename: string, eventType: 'rename' | 'change' | string - ): Promise { + ): void { const fullPath = path.join(basePath, filename); // Skip temporary files created by editors (atomic saves) @@ -133,11 +130,28 @@ export class NodeWatcher implements IWatcher { return; } - // Throttle duplicate events - if (!this.shouldEmit(fullPath, eventType)) { - return; + // Debounce: cancel any pending emit for this file + const existing = this.pendingEmits.get(fullPath); + if (existing) { + clearTimeout(existing); } + // Schedule debounced emit + const timeout = setTimeout(() => { + this.pendingEmits.delete(fullPath); + this.emitFileEvent(fullPath, eventType); + }, this.options.debounceMs); + + this.pendingEmits.set(fullPath, timeout); + } + + /** + * Emit the actual file event after debounce + */ + private async emitFileEvent( + fullPath: string, + eventType: 'rename' | 'change' | string + ): Promise { try { const stats = await this.statSafe(fullPath); @@ -151,7 +165,6 @@ export class NodeWatcher implements IWatcher { this.events$.next({ type: 'addDir', path: fullPath, stats }); } } else { - // Rename events (atomic saves) don't need stabilization - file is already complete const wasWatched = this.watchedFiles.has(fullPath); this.watchedFiles.add(fullPath); this.events$.next({ @@ -172,26 +185,20 @@ export class NodeWatcher implements IWatcher { } } } else if (eventType === 'change') { - // File was modified in-place - use stabilization for streaming writes + // File was modified if (stats && !stats.isDirectory()) { - try { - const stableStats = await this.writeStabilizer.waitForWriteFinish(fullPath); - // Check if this is a new file (not yet in watchedFiles) - const wasWatched = this.watchedFiles.has(fullPath); - if (!wasWatched) { - // This is actually an 'add' - file wasn't being watched before - this.watchedFiles.add(fullPath); - this.events$.next({ type: 'add', path: fullPath, stats: stableStats }); - } else { - this.events$.next({ type: 'change', path: fullPath, stats: stableStats }); - } - } catch { - // File was deleted during write - if (this.watchedFiles.has(fullPath)) { - this.watchedFiles.delete(fullPath); - this.events$.next({ type: 'unlink', path: fullPath }); - } + const wasWatched = this.watchedFiles.has(fullPath); + if (!wasWatched) { + // This is actually an 'add' - file wasn't being watched before + this.watchedFiles.add(fullPath); + this.events$.next({ type: 'add', path: fullPath, stats }); + } else { + this.events$.next({ type: 'change', path: fullPath, stats }); } + } else if (!stats && this.watchedFiles.has(fullPath)) { + // File was deleted + this.watchedFiles.delete(fullPath); + this.events$.next({ type: 'unlink', path: fullPath }); } } } catch (error: any) { @@ -212,6 +219,12 @@ export class NodeWatcher implements IWatcher { for (const entry of entries) { const fullPath = path.join(dirPath, entry.name); + + // Skip temp files during initial scan too + if (this.isTemporaryFile(fullPath)) { + continue; + } + const stats = await this.statSafe(fullPath); if (!stats) { @@ -261,31 +274,4 @@ export class NodeWatcher implements IWatcher { } return false; } - - /** - * Throttle duplicate events - */ - private shouldEmit(filePath: string, eventType: string): boolean { - const key = `${filePath}:${eventType}`; - const now = Date.now(); - const lastEmit = this.recentEvents.get(key); - - if (lastEmit && now - lastEmit < this.throttleMs) { - return false; - } - - this.recentEvents.set(key, now); - - // Clean up old entries periodically - if (this.recentEvents.size > 1000) { - const cutoff = now - this.throttleMs * 2; - for (const [k, time] of this.recentEvents) { - if (time < cutoff) { - this.recentEvents.delete(k); - } - } - } - - return true; - } }