import { SmartDataDbDoc } from './classes.doc.js'; import * as plugins from './plugins.js'; import { EventEmitter } from 'events'; /** * a wrapper for the native mongodb cursor. Exposes better */ /** * Wraps a MongoDB ChangeStream with RxJS and EventEmitter support. */ export class SmartdataDbWatcher<T = any> extends EventEmitter { // STATIC public readyDeferred = plugins.smartpromise.defer(); // INSTANCE private changeStream: plugins.mongodb.ChangeStream<T>; private rawSubject: plugins.smartrx.rxjs.Subject<T>; /** Emits change documents (or arrays of documents if buffered) */ public changeSubject: any; /** * @param changeStreamArg native MongoDB ChangeStream * @param smartdataDbDocArg document class for instance creation * @param opts.bufferTimeMs optional milliseconds to buffer events via RxJS */ constructor( changeStreamArg: plugins.mongodb.ChangeStream<T>, smartdataDbDocArg: typeof SmartDataDbDoc, opts?: { bufferTimeMs?: number }, ) { super(); this.rawSubject = new plugins.smartrx.rxjs.Subject<T>(); // Apply buffering if requested if (opts && opts.bufferTimeMs) { this.changeSubject = this.rawSubject.pipe(plugins.smartrx.rxjs.ops.bufferTime(opts.bufferTimeMs)); } else { this.changeSubject = this.rawSubject; } this.changeStream = changeStreamArg; this.changeStream.on('change', async (item: any) => { let docInstance: T = null; if (item.fullDocument) { docInstance = smartdataDbDocArg.createInstanceFromMongoDbNativeDoc( item.fullDocument ) as any as T; } // Notify subscribers this.rawSubject.next(docInstance); this.emit('change', docInstance); }); // Signal readiness after one tick plugins.smartdelay.delayFor(0).then(() => { this.readyDeferred.resolve(); }); } /** * Close the change stream, complete the RxJS subject, and remove listeners. */ public async close(): Promise<void> { // Close MongoDB ChangeStream await this.changeStream.close(); // Complete the subject to teardown any buffering operators this.rawSubject.complete(); // Remove all EventEmitter listeners this.removeAllListeners(); } /** * Alias for close(), matching README usage */ public async stop(): Promise<void> { return this.close(); } }