feat(watcher): Enhance change stream watchers with buffering and EventEmitter support; update dependency versions

This commit is contained in:
Philipp Kunz 2025-04-25 09:35:51 +00:00
parent 4a1f11b885
commit 5cb043009c
8 changed files with 634 additions and 175 deletions

View File

@ -1,5 +1,13 @@
# Changelog
## 2025-04-25 - 5.16.0 - feat(watcher)
Enhance change stream watchers with buffering and EventEmitter support; update dependency versions
- Bumped smartmongo from ^2.0.11 to ^2.0.12 and smartrx from ^3.0.7 to ^3.0.10
- Upgraded @tsclass/tsclass to ^9.0.0 and mongodb to ^6.16.0
- Refactored the watch API to accept additional options (bufferTimeMs, fullDocument) for improved change stream handling
- Modified SmartdataDbWatcher to extend EventEmitter and support event notifications
## 2025-04-24 - 5.15.1 - fix(cursor)
Improve cursor usage documentation and refactor getCursor API to support native cursor modifiers

View File

@ -26,23 +26,23 @@
"@push.rocks/lik": "^6.0.14",
"@push.rocks/smartdelay": "^3.0.1",
"@push.rocks/smartlog": "^3.0.2",
"@push.rocks/smartmongo": "^2.0.11",
"@push.rocks/smartmongo": "^2.0.12",
"@push.rocks/smartpromise": "^4.0.2",
"@push.rocks/smartrx": "^3.0.7",
"@push.rocks/smartrx": "^3.0.10",
"@push.rocks/smartstring": "^4.0.15",
"@push.rocks/smarttime": "^4.0.6",
"@push.rocks/smartunique": "^3.0.8",
"@push.rocks/taskbuffer": "^3.1.7",
"@tsclass/tsclass": "^8.2.0",
"mongodb": "^6.15.0"
"@tsclass/tsclass": "^9.0.0",
"mongodb": "^6.16.0"
},
"devDependencies": {
"@git.zone/tsbuild": "^2.3.2",
"@git.zone/tsrun": "^1.2.44",
"@git.zone/tstest": "^1.0.77",
"@push.rocks/qenv": "^6.0.5",
"@push.rocks/tapbundle": "^5.6.2",
"@types/node": "^22.14.0"
"@push.rocks/tapbundle": "^5.6.3",
"@types/node": "^22.15.2"
},
"files": [
"ts/**/*",

647
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@ -60,6 +60,43 @@ tap.test('should watch a collection', async (toolsArg) => {
await done.promise;
});
// ======= New tests for EventEmitter and buffering support =======
tap.test('should emit change via EventEmitter', async (tools) => {
const done = tools.defer();
const watcher = await House.watch({});
watcher.on('change', async (houseArg) => {
// Expect a House instance
expect(houseArg).toBeDefined();
// Clean up
await watcher.stop();
done.resolve();
});
// Trigger an insert to generate a change event
const h = new House();
await h.save();
await done.promise;
});
tap.test('should buffer change events when bufferTimeMs is set', async (tools) => {
const done = tools.defer();
// bufferTimeMs collects events into arrays every 50ms
const watcher = await House.watch({}, { bufferTimeMs: 50 });
let received: House[];
watcher.changeSubject.subscribe(async (batch: House[]) => {
if (batch && batch.length > 0) {
received = batch;
await watcher.stop();
done.resolve();
}
});
// Rapidly insert multiple docs
const docs = [new House(), new House(), new House()];
for (const doc of docs) await doc.save();
await done.promise;
// All inserts should be in one buffered batch
expect(received.length).toEqual(docs.length);
});
// =======================================
// close the database connection
// =======================================

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartdata',
version: '5.15.1',
version: '5.16.0',
description: 'An advanced library for NoSQL data organization and manipulation using TypeScript with support for MongoDB, data validation, collections, and custom data types.'
}

View File

@ -256,24 +256,40 @@ export class SmartdataCollection<T> {
}
/**
* watches the collection while applying a filter
* Watches the collection, returning a SmartdataDbWatcher with RxJS and EventEmitter support.
* @param filterObject match filter for change stream
* @param opts optional MongoDB ChangeStreamOptions & { bufferTimeMs } to buffer events
* @param smartdataDbDocArg document class for instance creation
*/
public async watch(
filterObject: any,
smartdataDbDocArg: typeof SmartDataDbDoc,
opts: (plugins.mongodb.ChangeStreamOptions & { bufferTimeMs?: number }) = {},
smartdataDbDocArg?: typeof SmartDataDbDoc,
): Promise<SmartdataDbWatcher> {
await this.init();
// Extract bufferTimeMs from options
const { bufferTimeMs, fullDocument, ...otherOptions } = opts || {};
// Determine fullDocument behavior: default to 'updateLookup'
const changeStreamOptions: plugins.mongodb.ChangeStreamOptions = {
...otherOptions,
fullDocument:
fullDocument === undefined
? 'updateLookup'
: fullDocument === true
? 'updateLookup'
: fullDocument,
} as any;
// Build pipeline with match if provided
const pipeline = filterObject ? [{ $match: filterObject }] : [];
const changeStream = this.mongoDbCollection.watch(
[
{
$match: filterObject,
},
],
{
fullDocument: 'updateLookup',
},
pipeline,
changeStreamOptions,
);
const smartdataWatcher = new SmartdataDbWatcher(
changeStream,
smartdataDbDocArg,
{ bufferTimeMs },
);
const smartdataWatcher = new SmartdataDbWatcher(changeStream, smartdataDbDocArg);
await smartdataWatcher.readyDeferred.promise;
return smartdataWatcher;
}

View File

@ -305,13 +305,20 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
* @param filterArg
* @param forEachFunction
*/
/**
* Watch the collection for changes, with optional buffering and change stream options.
* @param filterArg MongoDB filter to select which changes to observe
* @param opts optional ChangeStreamOptions plus bufferTimeMs
*/
public static async watch<T>(
this: plugins.tsclass.typeFest.Class<T>,
filterArg: plugins.tsclass.typeFest.PartialDeep<T>,
) {
opts?: plugins.mongodb.ChangeStreamOptions & { bufferTimeMs?: number },
): Promise<SmartdataDbWatcher<T>> {
const collection: SmartdataCollection<T> = (this as any).collection;
const watcher: SmartdataDbWatcher<T> = await collection.watch(
convertFilterForMongoDb(filterArg),
opts || {},
this as any,
);
return watcher;

View File

@ -1,37 +1,73 @@
import { SmartDataDbDoc } from './classes.doc.js';
import * as plugins from './plugins.js';
import { EventEmitter } from 'events';
/**
* a wrapper for the native mongodb cursor. Exposes better
*/
export class SmartdataDbWatcher<T = any> {
/**
* Wraps a MongoDB ChangeStream with RxJS and EventEmitter support.
*/
export class SmartdataDbWatcher<T = any> extends EventEmitter {
// STATIC
public readyDeferred = plugins.smartpromise.defer();
// INSTANCE
private changeStream: plugins.mongodb.ChangeStream<T>;
public changeSubject = new plugins.smartrx.rxjs.Subject<T>();
private rawSubject: plugins.smartrx.rxjs.Subject<T>;
/** Emits change documents (or arrays of documents if buffered) */
public changeSubject: any;
/**
* @param changeStreamArg native MongoDB ChangeStream
* @param smartdataDbDocArg document class for instance creation
* @param opts.bufferTimeMs optional milliseconds to buffer events via RxJS
*/
constructor(
changeStreamArg: plugins.mongodb.ChangeStream<T>,
smartdataDbDocArg: typeof SmartDataDbDoc,
opts?: { bufferTimeMs?: number },
) {
super();
this.rawSubject = new plugins.smartrx.rxjs.Subject<T>();
// Apply buffering if requested
if (opts && opts.bufferTimeMs) {
this.changeSubject = this.rawSubject.pipe(plugins.smartrx.rxjs.ops.bufferTime(opts.bufferTimeMs));
} else {
this.changeSubject = this.rawSubject;
}
this.changeStream = changeStreamArg;
this.changeStream.on('change', async (item: any) => {
if (!item.fullDocument) {
this.changeSubject.next(null);
return;
let docInstance: T = null;
if (item.fullDocument) {
docInstance = smartdataDbDocArg.createInstanceFromMongoDbNativeDoc(
item.fullDocument
) as any as T;
}
this.changeSubject.next(
smartdataDbDocArg.createInstanceFromMongoDbNativeDoc(item.fullDocument) as any as T,
);
// Notify subscribers
this.rawSubject.next(docInstance);
this.emit('change', docInstance);
});
// Signal readiness after one tick
plugins.smartdelay.delayFor(0).then(() => {
this.readyDeferred.resolve();
});
}
public async close() {
/**
* Close the change stream, complete the RxJS subject, and remove listeners.
*/
public async close(): Promise<void> {
// Close MongoDB ChangeStream
await this.changeStream.close();
// Complete the subject to teardown any buffering operators
this.rawSubject.complete();
// Remove all EventEmitter listeners
this.removeAllListeners();
}
/**
* Alias for close(), matching README usage
*/
public async stop(): Promise<void> {
return this.close();
}
}