Compare commits

..

4 Commits

Author SHA1 Message Date
a91fac450a 5.16.0
Some checks failed
Default (tags) / security (push) Successful in 38s
Default (tags) / test (push) Failing after 3m29s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2025-04-25 09:35:51 +00:00
5cb043009c feat(watcher): Enhance change stream watchers with buffering and EventEmitter support; update dependency versions 2025-04-25 09:35:51 +00:00
4a1f11b885 5.15.1
Some checks failed
Default (tags) / security (push) Successful in 38s
Default (tags) / test (push) Successful in 3m7s
Default (tags) / release (push) Failing after 50s
Default (tags) / metadata (push) Successful in 56s
2025-04-24 11:34:49 +00:00
43f9033ccc fix(cursor): Improve cursor usage documentation and refactor getCursor API to support native cursor modifiers 2025-04-24 11:34:49 +00:00
10 changed files with 774 additions and 219 deletions

View File

@ -1,5 +1,20 @@
# Changelog # Changelog
## 2025-04-25 - 5.16.0 - feat(watcher)
Enhance change stream watchers with buffering and EventEmitter support; update dependency versions
- Bumped smartmongo from ^2.0.11 to ^2.0.12 and smartrx from ^3.0.7 to ^3.0.10
- Upgraded @tsclass/tsclass to ^9.0.0 and mongodb to ^6.16.0
- Refactored the watch API to accept additional options (bufferTimeMs, fullDocument) for improved change stream handling
- Modified SmartdataDbWatcher to extend EventEmitter and support event notifications
## 2025-04-24 - 5.15.1 - fix(cursor)
Improve cursor usage documentation and refactor getCursor API to support native cursor modifiers
- Updated examples in readme.md to demonstrate manual iteration using cursor.next() and proper cursor closing.
- Refactored the getCursor method in classes.doc.ts to accept session and modifier options, consolidating cursor handling.
- Added new tests in test/test.cursor.ts to verify cursor operations, including limits, sorting, and skipping.
## 2025-04-24 - 5.15.0 - feat(svDb) ## 2025-04-24 - 5.15.0 - feat(svDb)
Enhance svDb decorator to support custom serialization and deserialization options Enhance svDb decorator to support custom serialization and deserialization options

View File

@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartdata", "name": "@push.rocks/smartdata",
"version": "5.15.0", "version": "5.16.0",
"private": false, "private": false,
"description": "An advanced library for NoSQL data organization and manipulation using TypeScript with support for MongoDB, data validation, collections, and custom data types.", "description": "An advanced library for NoSQL data organization and manipulation using TypeScript with support for MongoDB, data validation, collections, and custom data types.",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",
@ -26,23 +26,23 @@
"@push.rocks/lik": "^6.0.14", "@push.rocks/lik": "^6.0.14",
"@push.rocks/smartdelay": "^3.0.1", "@push.rocks/smartdelay": "^3.0.1",
"@push.rocks/smartlog": "^3.0.2", "@push.rocks/smartlog": "^3.0.2",
"@push.rocks/smartmongo": "^2.0.11", "@push.rocks/smartmongo": "^2.0.12",
"@push.rocks/smartpromise": "^4.0.2", "@push.rocks/smartpromise": "^4.0.2",
"@push.rocks/smartrx": "^3.0.7", "@push.rocks/smartrx": "^3.0.10",
"@push.rocks/smartstring": "^4.0.15", "@push.rocks/smartstring": "^4.0.15",
"@push.rocks/smarttime": "^4.0.6", "@push.rocks/smarttime": "^4.0.6",
"@push.rocks/smartunique": "^3.0.8", "@push.rocks/smartunique": "^3.0.8",
"@push.rocks/taskbuffer": "^3.1.7", "@push.rocks/taskbuffer": "^3.1.7",
"@tsclass/tsclass": "^8.2.0", "@tsclass/tsclass": "^9.0.0",
"mongodb": "^6.15.0" "mongodb": "^6.16.0"
}, },
"devDependencies": { "devDependencies": {
"@git.zone/tsbuild": "^2.3.2", "@git.zone/tsbuild": "^2.3.2",
"@git.zone/tsrun": "^1.2.44", "@git.zone/tsrun": "^1.2.44",
"@git.zone/tstest": "^1.0.77", "@git.zone/tstest": "^1.0.77",
"@push.rocks/qenv": "^6.0.5", "@push.rocks/qenv": "^6.0.5",
"@push.rocks/tapbundle": "^5.6.2", "@push.rocks/tapbundle": "^5.6.3",
"@types/node": "^22.14.0" "@types/node": "^22.15.2"
}, },
"files": [ "files": [
"ts/**/*", "ts/**/*",

647
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@ -133,31 +133,34 @@ const user = await User.getInstance({ username: 'myUsername' });
// Fetch multiple users that match criteria // Fetch multiple users that match criteria
const users = await User.getInstances({ email: 'myEmail@example.com' }); const users = await User.getInstances({ email: 'myEmail@example.com' });
// Using a cursor for large collections // Obtain a cursor for large result sets
const cursor = await User.getCursor({ active: true }); const cursor = await User.getCursor({ active: true });
// Process documents one at a time (memory efficient) // Stream each document efficiently
await cursor.forEach(async (user, index) => { await cursor.forEach(async (user) => {
// Process each user with its position console.log(`Processing user: ${user.username}`);
console.log(`Processing user ${index}: ${user.username}`);
}); });
// Chain cursor methods like in the MongoDB native driver // Manually iterate using next()
const paginatedCursor = await User.getCursor({ active: true }) let nextUser;
.limit(10) // Limit results while ((nextUser = await cursor.next())) {
.skip(20) // Skip first 20 results console.log(`Next user: ${nextUser.username}`);
.sort({ createdAt: -1 }); // Sort by creation date descending }
// Convert cursor to array (when you know the result set is small) // Convert to array when the result set is small
const userArray = await paginatedCursor.toArray(); const userArray = await cursor.toArray();
// Other cursor operations // Close the cursor to free resources
const nextUser = await cursor.next(); // Get the next document
const hasMoreUsers = await cursor.hasNext(); // Check if more documents exist
const count = await cursor.count(); // Get the count of documents in the cursor
// Always close cursors when done with them
await cursor.close(); await cursor.close();
// For native cursor modifiers (sort, skip, limit), use getCursor with modifier option:
const paginatedCursor = await User.getCursor(
{ active: true },
{ modifier: (c) => c.sort({ createdAt: -1 }).skip(20).limit(10) }
);
await paginatedCursor.forEach((user) => {
console.log(`Paginated user: ${user.username}`);
});
``` ```
#### Update #### Update

97
test/test.cursor.ts Normal file
View File

@ -0,0 +1,97 @@
import { tap, expect } from '@push.rocks/tapbundle';
import * as smartmongo from '@push.rocks/smartmongo';
import { smartunique } from '../ts/plugins.js';
import * as smartdata from '../ts/index.js';
// Set up database connection
let smartmongoInstance: smartmongo.SmartMongo;
let testDb: smartdata.SmartdataDb;
// Define a simple document model for cursor tests
@smartdata.Collection(() => testDb)
class CursorTest extends smartdata.SmartDataDbDoc<CursorTest, CursorTest> {
@smartdata.unI()
public id: string = smartunique.shortId();
@smartdata.svDb()
public name: string;
@smartdata.svDb()
public order: number;
constructor(name: string, order: number) {
super();
this.name = name;
this.order = order;
}
}
// Initialize the in-memory MongoDB and SmartdataDB
tap.test('cursor init: start Mongo and SmartdataDb', async () => {
smartmongoInstance = await smartmongo.SmartMongo.createAndStart();
testDb = new smartdata.SmartdataDb(
await smartmongoInstance.getMongoDescriptor(),
);
await testDb.init();
});
// Insert sample documents
tap.test('cursor insert: save 5 test documents', async () => {
for (let i = 1; i <= 5; i++) {
const doc = new CursorTest(`item${i}`, i);
await doc.save();
}
const count = await CursorTest.getCount({});
expect(count).toEqual(5);
});
// Test that toArray returns all documents
tap.test('cursor toArray: retrieves all documents', async () => {
const cursor = await CursorTest.getCursor({});
const all = await cursor.toArray();
expect(all.length).toEqual(5);
});
// Test iteration via forEach
tap.test('cursor forEach: iterates through all documents', async () => {
const names: string[] = [];
const cursor = await CursorTest.getCursor({});
await cursor.forEach(async (item) => {
names.push(item.name);
});
expect(names.length).toEqual(5);
expect(names).toContain('item3');
});
// Test native cursor modifiers: limit
tap.test('cursor modifier limit: only two documents', async () => {
const cursor = await CursorTest.getCursor({}, { modifier: (c) => c.limit(2) });
const limited = await cursor.toArray();
expect(limited.length).toEqual(2);
});
// Test native cursor modifiers: sort and skip
tap.test('cursor modifier sort & skip: returns correct order', async () => {
const cursor = await CursorTest.getCursor({}, {
modifier: (c) => c.sort({ order: -1 }).skip(1),
});
const results = await cursor.toArray();
// Skipped the first (order 5), next should be 4,3,2,1
expect(results.length).toEqual(4);
expect(results[0].order).toEqual(4);
});
// Cleanup: drop database, close connections, stop Mongo
tap.test('cursor cleanup: drop DB and stop', async () => {
await testDb.mongoDb.dropDatabase();
await testDb.close();
if (smartmongoInstance) {
await smartmongoInstance.stopAndDumpToDir(
`.nogit/dbdump/test.cursor.ts`,
);
}
// Ensure process exits after cleanup
setTimeout(() => process.exit(), 2000);
});
export default tap.start();

View File

@ -60,6 +60,43 @@ tap.test('should watch a collection', async (toolsArg) => {
await done.promise; await done.promise;
}); });
// ======= New tests for EventEmitter and buffering support =======
tap.test('should emit change via EventEmitter', async (tools) => {
const done = tools.defer();
const watcher = await House.watch({});
watcher.on('change', async (houseArg) => {
// Expect a House instance
expect(houseArg).toBeDefined();
// Clean up
await watcher.stop();
done.resolve();
});
// Trigger an insert to generate a change event
const h = new House();
await h.save();
await done.promise;
});
tap.test('should buffer change events when bufferTimeMs is set', async (tools) => {
const done = tools.defer();
// bufferTimeMs collects events into arrays every 50ms
const watcher = await House.watch({}, { bufferTimeMs: 50 });
let received: House[];
watcher.changeSubject.subscribe(async (batch: House[]) => {
if (batch && batch.length > 0) {
received = batch;
await watcher.stop();
done.resolve();
}
});
// Rapidly insert multiple docs
const docs = [new House(), new House(), new House()];
for (const doc of docs) await doc.save();
await done.promise;
// All inserts should be in one buffered batch
expect(received.length).toEqual(docs.length);
});
// ======================================= // =======================================
// close the database connection // close the database connection
// ======================================= // =======================================

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartdata', name: '@push.rocks/smartdata',
version: '5.15.0', version: '5.16.0',
description: 'An advanced library for NoSQL data organization and manipulation using TypeScript with support for MongoDB, data validation, collections, and custom data types.' description: 'An advanced library for NoSQL data organization and manipulation using TypeScript with support for MongoDB, data validation, collections, and custom data types.'
} }

View File

@ -256,24 +256,40 @@ export class SmartdataCollection<T> {
} }
/** /**
* watches the collection while applying a filter * Watches the collection, returning a SmartdataDbWatcher with RxJS and EventEmitter support.
* @param filterObject match filter for change stream
* @param opts optional MongoDB ChangeStreamOptions & { bufferTimeMs } to buffer events
* @param smartdataDbDocArg document class for instance creation
*/ */
public async watch( public async watch(
filterObject: any, filterObject: any,
smartdataDbDocArg: typeof SmartDataDbDoc, opts: (plugins.mongodb.ChangeStreamOptions & { bufferTimeMs?: number }) = {},
smartdataDbDocArg?: typeof SmartDataDbDoc,
): Promise<SmartdataDbWatcher> { ): Promise<SmartdataDbWatcher> {
await this.init(); await this.init();
// Extract bufferTimeMs from options
const { bufferTimeMs, fullDocument, ...otherOptions } = opts || {};
// Determine fullDocument behavior: default to 'updateLookup'
const changeStreamOptions: plugins.mongodb.ChangeStreamOptions = {
...otherOptions,
fullDocument:
fullDocument === undefined
? 'updateLookup'
: fullDocument === true
? 'updateLookup'
: fullDocument,
} as any;
// Build pipeline with match if provided
const pipeline = filterObject ? [{ $match: filterObject }] : [];
const changeStream = this.mongoDbCollection.watch( const changeStream = this.mongoDbCollection.watch(
[ pipeline,
{ changeStreamOptions,
$match: filterObject, );
}, const smartdataWatcher = new SmartdataDbWatcher(
], changeStream,
{ smartdataDbDocArg,
fullDocument: 'updateLookup', { bufferTimeMs },
},
); );
const smartdataWatcher = new SmartdataDbWatcher(changeStream, smartdataDbDocArg);
await smartdataWatcher.readyDeferred.promise; await smartdataWatcher.readyDeferred.promise;
return smartdataWatcher; return smartdataWatcher;
} }

View File

@ -276,38 +276,27 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
} }
/** /**
* get cursor * Get a cursor for streaming results, with optional session and native cursor modifiers.
* @returns * @param filterArg Partial filter to apply
*/ * @param opts Optional session and modifier for the raw MongoDB cursor
/**
* Get a cursor for streaming results, with optional session
*/ */
public static async getCursor<T>( public static async getCursor<T>(
this: plugins.tsclass.typeFest.Class<T>, this: plugins.tsclass.typeFest.Class<T>,
filterArg: plugins.tsclass.typeFest.PartialDeep<T>, filterArg: plugins.tsclass.typeFest.PartialDeep<T>,
opts?: { session?: plugins.mongodb.ClientSession } opts?: {
) { session?: plugins.mongodb.ClientSession;
const collection: SmartdataCollection<T> = (this as any).collection; modifier?: (cursorArg: plugins.mongodb.FindCursor<plugins.mongodb.WithId<plugins.mongodb.BSON.Document>>) => plugins.mongodb.FindCursor<plugins.mongodb.WithId<plugins.mongodb.BSON.Document>>;
const cursor: SmartdataDbCursor<T> = await collection.getCursor(
convertFilterForMongoDb(filterArg),
this as any as typeof SmartDataDbDoc,
{ session: opts?.session },
);
return cursor;
} }
public static async getCursorExtended<T>(
this: plugins.tsclass.typeFest.Class<T>,
filterArg: plugins.tsclass.typeFest.PartialDeep<T>,
modifierFunction = (cursorArg: plugins.mongodb.FindCursor<plugins.mongodb.WithId<plugins.mongodb.BSON.Document>>) => cursorArg,
): Promise<SmartdataDbCursor<T>> { ): Promise<SmartdataDbCursor<T>> {
const collection: SmartdataCollection<T> = (this as any).collection; const collection: SmartdataCollection<T> = (this as any).collection;
const { session, modifier } = opts || {};
await collection.init(); await collection.init();
let cursor: plugins.mongodb.FindCursor<any> = collection.mongoDbCollection.find( let rawCursor: plugins.mongodb.FindCursor<any> =
convertFilterForMongoDb(filterArg), collection.mongoDbCollection.find(convertFilterForMongoDb(filterArg), { session });
); if (modifier) {
cursor = modifierFunction(cursor); rawCursor = modifier(rawCursor);
return new SmartdataDbCursor<T>(cursor, this as any as typeof SmartDataDbDoc); }
return new SmartdataDbCursor<T>(rawCursor, this as any as typeof SmartDataDbDoc);
} }
/** /**
@ -316,13 +305,20 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
* @param filterArg * @param filterArg
* @param forEachFunction * @param forEachFunction
*/ */
/**
* Watch the collection for changes, with optional buffering and change stream options.
* @param filterArg MongoDB filter to select which changes to observe
* @param opts optional ChangeStreamOptions plus bufferTimeMs
*/
public static async watch<T>( public static async watch<T>(
this: plugins.tsclass.typeFest.Class<T>, this: plugins.tsclass.typeFest.Class<T>,
filterArg: plugins.tsclass.typeFest.PartialDeep<T>, filterArg: plugins.tsclass.typeFest.PartialDeep<T>,
) { opts?: plugins.mongodb.ChangeStreamOptions & { bufferTimeMs?: number },
): Promise<SmartdataDbWatcher<T>> {
const collection: SmartdataCollection<T> = (this as any).collection; const collection: SmartdataCollection<T> = (this as any).collection;
const watcher: SmartdataDbWatcher<T> = await collection.watch( const watcher: SmartdataDbWatcher<T> = await collection.watch(
convertFilterForMongoDb(filterArg), convertFilterForMongoDb(filterArg),
opts || {},
this as any, this as any,
); );
return watcher; return watcher;

View File

@ -1,37 +1,73 @@
import { SmartDataDbDoc } from './classes.doc.js'; import { SmartDataDbDoc } from './classes.doc.js';
import * as plugins from './plugins.js'; import * as plugins from './plugins.js';
import { EventEmitter } from 'events';
/** /**
* a wrapper for the native mongodb cursor. Exposes better * a wrapper for the native mongodb cursor. Exposes better
*/ */
export class SmartdataDbWatcher<T = any> { /**
* Wraps a MongoDB ChangeStream with RxJS and EventEmitter support.
*/
export class SmartdataDbWatcher<T = any> extends EventEmitter {
// STATIC // STATIC
public readyDeferred = plugins.smartpromise.defer(); public readyDeferred = plugins.smartpromise.defer();
// INSTANCE // INSTANCE
private changeStream: plugins.mongodb.ChangeStream<T>; private changeStream: plugins.mongodb.ChangeStream<T>;
private rawSubject: plugins.smartrx.rxjs.Subject<T>;
public changeSubject = new plugins.smartrx.rxjs.Subject<T>(); /** Emits change documents (or arrays of documents if buffered) */
public changeSubject: any;
/**
* @param changeStreamArg native MongoDB ChangeStream
* @param smartdataDbDocArg document class for instance creation
* @param opts.bufferTimeMs optional milliseconds to buffer events via RxJS
*/
constructor( constructor(
changeStreamArg: plugins.mongodb.ChangeStream<T>, changeStreamArg: plugins.mongodb.ChangeStream<T>,
smartdataDbDocArg: typeof SmartDataDbDoc, smartdataDbDocArg: typeof SmartDataDbDoc,
opts?: { bufferTimeMs?: number },
) { ) {
super();
this.rawSubject = new plugins.smartrx.rxjs.Subject<T>();
// Apply buffering if requested
if (opts && opts.bufferTimeMs) {
this.changeSubject = this.rawSubject.pipe(plugins.smartrx.rxjs.ops.bufferTime(opts.bufferTimeMs));
} else {
this.changeSubject = this.rawSubject;
}
this.changeStream = changeStreamArg; this.changeStream = changeStreamArg;
this.changeStream.on('change', async (item: any) => { this.changeStream.on('change', async (item: any) => {
if (!item.fullDocument) { let docInstance: T = null;
this.changeSubject.next(null); if (item.fullDocument) {
return; docInstance = smartdataDbDocArg.createInstanceFromMongoDbNativeDoc(
item.fullDocument
) as any as T;
} }
this.changeSubject.next( // Notify subscribers
smartdataDbDocArg.createInstanceFromMongoDbNativeDoc(item.fullDocument) as any as T, this.rawSubject.next(docInstance);
); this.emit('change', docInstance);
}); });
// Signal readiness after one tick
plugins.smartdelay.delayFor(0).then(() => { plugins.smartdelay.delayFor(0).then(() => {
this.readyDeferred.resolve(); this.readyDeferred.resolve();
}); });
} }
public async close() { /**
* Close the change stream, complete the RxJS subject, and remove listeners.
*/
public async close(): Promise<void> {
// Close MongoDB ChangeStream
await this.changeStream.close(); await this.changeStream.close();
// Complete the subject to teardown any buffering operators
this.rawSubject.complete();
// Remove all EventEmitter listeners
this.removeAllListeners();
}
/**
* Alias for close(), matching README usage
*/
public async stop(): Promise<void> {
return this.close();
} }
} }