// classes.watcher.ts import * as plugins from './plugins.js'; import * as interfaces from './interfaces.js'; import type { Bucket } from './classes.bucket.js'; import { EventEmitter } from 'node:events'; /** * BucketWatcher monitors an S3 bucket for changes (add/modify/delete) * using a polling-based approach. Designed to follow the SmartdataDbWatcher pattern. * * @example * ```typescript * const watcher = bucket.createWatcher({ prefix: 'uploads/', pollIntervalMs: 3000 }); * * // RxJS Observable pattern * watcher.changeSubject.subscribe((change) => { * console.log('Change:', change); * }); * * // EventEmitter pattern * watcher.on('change', (change) => console.log(change)); * watcher.on('error', (err) => console.error(err)); * * await watcher.start(); * await watcher.readyDeferred.promise; // Wait for initial state * * // Later... * await watcher.stop(); * ``` */ export class BucketWatcher extends EventEmitter { /** Deferred that resolves when initial state is built and watcher is ready */ public readyDeferred = plugins.smartpromise.defer(); /** Observable for receiving change events (supports RxJS operators) */ public changeSubject: plugins.smartrx.rxjs.Observable; // Internal subjects and state private rawSubject: plugins.smartrx.rxjs.Subject; private previousState: Map; private pollIntervalId: ReturnType | null = null; private isPolling = false; private isStopped = false; // Configuration private readonly bucketRef: Bucket; private readonly prefix: string; private readonly pollIntervalMs: number; private readonly bufferTimeMs?: number; private readonly includeInitial: boolean; private readonly pageSize: number; constructor(bucketRef: Bucket, options: interfaces.IBucketWatcherOptions = {}) { super(); this.bucketRef = bucketRef; this.prefix = options.prefix ?? ''; this.pollIntervalMs = options.pollIntervalMs ?? 5000; this.bufferTimeMs = options.bufferTimeMs; this.includeInitial = options.includeInitial ?? false; this.pageSize = options.pageSize ?? 1000; // Initialize state tracking this.previousState = new Map(); // Initialize raw subject for emitting changes this.rawSubject = new plugins.smartrx.rxjs.Subject(); // Configure the public observable with optional buffering if (this.bufferTimeMs && this.bufferTimeMs > 0) { this.changeSubject = this.rawSubject.pipe( plugins.smartrx.rxjs.ops.bufferTime(this.bufferTimeMs), plugins.smartrx.rxjs.ops.filter((events: interfaces.IS3ChangeEvent[]) => events.length > 0) ); } else { this.changeSubject = this.rawSubject.asObservable(); } } /** * Start watching the bucket for changes */ public async start(): Promise { if (this.pollIntervalId !== null) { console.log('BucketWatcher is already running'); return; } this.isStopped = false; // Build initial state await this.buildInitialState(); // Emit initial state as 'add' events if configured if (this.includeInitial) { for (const state of this.previousState.values()) { this.emitChange({ type: 'add', key: state.key, size: state.size, etag: state.etag, lastModified: state.lastModified, bucket: this.bucketRef.name, }); } } // Mark as ready this.readyDeferred.resolve(); // Start polling loop this.pollIntervalId = setInterval(() => { this.poll().catch((err) => { this.emit('error', err); }); }, this.pollIntervalMs); } /** * Stop watching the bucket */ public async stop(): Promise { this.isStopped = true; if (this.pollIntervalId !== null) { clearInterval(this.pollIntervalId); this.pollIntervalId = null; } // Wait for any in-progress poll to complete while (this.isPolling) { await new Promise((resolve) => setTimeout(resolve, 50)); } this.rawSubject.complete(); } /** * Alias for stop() - for consistency with other APIs */ public async close(): Promise { return this.stop(); } /** * Build the initial state by listing all objects with metadata */ private async buildInitialState(): Promise { this.previousState.clear(); for await (const obj of this.listObjectsWithMetadata()) { if (obj.Key) { this.previousState.set(obj.Key, { key: obj.Key, etag: obj.ETag ?? '', size: obj.Size ?? 0, lastModified: obj.LastModified ?? new Date(0), }); } } } /** * Poll for changes by comparing current state against previous state */ private async poll(): Promise { // Guard against overlapping polls if (this.isPolling || this.isStopped) { return; } this.isPolling = true; try { // Build current state const currentState = new Map(); for await (const obj of this.listObjectsWithMetadata()) { if (this.isStopped) { break; } if (obj.Key) { currentState.set(obj.Key, { key: obj.Key, etag: obj.ETag ?? '', size: obj.Size ?? 0, lastModified: obj.LastModified ?? new Date(0), }); } } if (!this.isStopped) { this.detectChanges(currentState); this.previousState = currentState; } } catch (err) { this.emit('error', err); } finally { this.isPolling = false; } } /** * Detect changes between current and previous state */ private detectChanges(currentState: Map): void { // Detect added and modified objects for (const [key, current] of currentState) { const previous = this.previousState.get(key); if (!previous) { // New object - emit 'add' event this.emitChange({ type: 'add', key: current.key, size: current.size, etag: current.etag, lastModified: current.lastModified, bucket: this.bucketRef.name, }); } else if ( previous.etag !== current.etag || previous.size !== current.size || previous.lastModified.getTime() !== current.lastModified.getTime() ) { // Object modified - emit 'modify' event this.emitChange({ type: 'modify', key: current.key, size: current.size, etag: current.etag, lastModified: current.lastModified, bucket: this.bucketRef.name, }); } } // Detect deleted objects for (const [key, previous] of this.previousState) { if (!currentState.has(key)) { // Object deleted - emit 'delete' event this.emitChange({ type: 'delete', key: previous.key, bucket: this.bucketRef.name, }); } } } /** * Emit a change event via both RxJS Subject and EventEmitter */ private emitChange(event: interfaces.IS3ChangeEvent): void { this.rawSubject.next(event); this.emit('change', event); } /** * List objects with full metadata (ETag, Size, LastModified) * This is a private method that yields full _Object data, not just keys */ private async *listObjectsWithMetadata(): AsyncIterableIterator { let continuationToken: string | undefined; do { if (this.isStopped) { return; } const command = new plugins.s3.ListObjectsV2Command({ Bucket: this.bucketRef.name, Prefix: this.prefix, MaxKeys: this.pageSize, ContinuationToken: continuationToken, }); const response = await this.bucketRef.smartbucketRef.s3Client.send(command); for (const obj of response.Contents || []) { yield obj; } continuationToken = response.NextContinuationToken; } while (continuationToken); } }