Compare commits
No commits in common. "master" and "v5.15.0" have entirely different histories.
15
changelog.md
15
changelog.md
@ -1,20 +1,5 @@
|
||||
# Changelog
|
||||
|
||||
## 2025-04-25 - 5.16.0 - feat(watcher)
|
||||
Enhance change stream watchers with buffering and EventEmitter support; update dependency versions
|
||||
|
||||
- Bumped smartmongo from ^2.0.11 to ^2.0.12 and smartrx from ^3.0.7 to ^3.0.10
|
||||
- Upgraded @tsclass/tsclass to ^9.0.0 and mongodb to ^6.16.0
|
||||
- Refactored the watch API to accept additional options (bufferTimeMs, fullDocument) for improved change stream handling
|
||||
- Modified SmartdataDbWatcher to extend EventEmitter and support event notifications
|
||||
|
||||
## 2025-04-24 - 5.15.1 - fix(cursor)
|
||||
Improve cursor usage documentation and refactor getCursor API to support native cursor modifiers
|
||||
|
||||
- Updated examples in readme.md to demonstrate manual iteration using cursor.next() and proper cursor closing.
|
||||
- Refactored the getCursor method in classes.doc.ts to accept session and modifier options, consolidating cursor handling.
|
||||
- Added new tests in test/test.cursor.ts to verify cursor operations, including limits, sorting, and skipping.
|
||||
|
||||
## 2025-04-24 - 5.15.0 - feat(svDb)
|
||||
Enhance svDb decorator to support custom serialization and deserialization options
|
||||
|
||||
|
14
package.json
14
package.json
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartdata",
|
||||
"version": "5.16.0",
|
||||
"version": "5.15.0",
|
||||
"private": false,
|
||||
"description": "An advanced library for NoSQL data organization and manipulation using TypeScript with support for MongoDB, data validation, collections, and custom data types.",
|
||||
"main": "dist_ts/index.js",
|
||||
@ -26,23 +26,23 @@
|
||||
"@push.rocks/lik": "^6.0.14",
|
||||
"@push.rocks/smartdelay": "^3.0.1",
|
||||
"@push.rocks/smartlog": "^3.0.2",
|
||||
"@push.rocks/smartmongo": "^2.0.12",
|
||||
"@push.rocks/smartmongo": "^2.0.11",
|
||||
"@push.rocks/smartpromise": "^4.0.2",
|
||||
"@push.rocks/smartrx": "^3.0.10",
|
||||
"@push.rocks/smartrx": "^3.0.7",
|
||||
"@push.rocks/smartstring": "^4.0.15",
|
||||
"@push.rocks/smarttime": "^4.0.6",
|
||||
"@push.rocks/smartunique": "^3.0.8",
|
||||
"@push.rocks/taskbuffer": "^3.1.7",
|
||||
"@tsclass/tsclass": "^9.0.0",
|
||||
"mongodb": "^6.16.0"
|
||||
"@tsclass/tsclass": "^8.2.0",
|
||||
"mongodb": "^6.15.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@git.zone/tsbuild": "^2.3.2",
|
||||
"@git.zone/tsrun": "^1.2.44",
|
||||
"@git.zone/tstest": "^1.0.77",
|
||||
"@push.rocks/qenv": "^6.0.5",
|
||||
"@push.rocks/tapbundle": "^5.6.3",
|
||||
"@types/node": "^22.15.2"
|
||||
"@push.rocks/tapbundle": "^5.6.2",
|
||||
"@types/node": "^22.14.0"
|
||||
},
|
||||
"files": [
|
||||
"ts/**/*",
|
||||
|
647
pnpm-lock.yaml
generated
647
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
39
readme.md
39
readme.md
@ -133,34 +133,31 @@ const user = await User.getInstance({ username: 'myUsername' });
|
||||
// Fetch multiple users that match criteria
|
||||
const users = await User.getInstances({ email: 'myEmail@example.com' });
|
||||
|
||||
// Obtain a cursor for large result sets
|
||||
// Using a cursor for large collections
|
||||
const cursor = await User.getCursor({ active: true });
|
||||
|
||||
// Stream each document efficiently
|
||||
await cursor.forEach(async (user) => {
|
||||
console.log(`Processing user: ${user.username}`);
|
||||
// Process documents one at a time (memory efficient)
|
||||
await cursor.forEach(async (user, index) => {
|
||||
// Process each user with its position
|
||||
console.log(`Processing user ${index}: ${user.username}`);
|
||||
});
|
||||
|
||||
// Manually iterate using next()
|
||||
let nextUser;
|
||||
while ((nextUser = await cursor.next())) {
|
||||
console.log(`Next user: ${nextUser.username}`);
|
||||
}
|
||||
// Chain cursor methods like in the MongoDB native driver
|
||||
const paginatedCursor = await User.getCursor({ active: true })
|
||||
.limit(10) // Limit results
|
||||
.skip(20) // Skip first 20 results
|
||||
.sort({ createdAt: -1 }); // Sort by creation date descending
|
||||
|
||||
// Convert to array when the result set is small
|
||||
const userArray = await cursor.toArray();
|
||||
// Convert cursor to array (when you know the result set is small)
|
||||
const userArray = await paginatedCursor.toArray();
|
||||
|
||||
// Close the cursor to free resources
|
||||
// Other cursor operations
|
||||
const nextUser = await cursor.next(); // Get the next document
|
||||
const hasMoreUsers = await cursor.hasNext(); // Check if more documents exist
|
||||
const count = await cursor.count(); // Get the count of documents in the cursor
|
||||
|
||||
// Always close cursors when done with them
|
||||
await cursor.close();
|
||||
|
||||
// For native cursor modifiers (sort, skip, limit), use getCursor with modifier option:
|
||||
const paginatedCursor = await User.getCursor(
|
||||
{ active: true },
|
||||
{ modifier: (c) => c.sort({ createdAt: -1 }).skip(20).limit(10) }
|
||||
);
|
||||
await paginatedCursor.forEach((user) => {
|
||||
console.log(`Paginated user: ${user.username}`);
|
||||
});
|
||||
```
|
||||
|
||||
#### Update
|
||||
|
@ -1,97 +0,0 @@
|
||||
import { tap, expect } from '@push.rocks/tapbundle';
|
||||
import * as smartmongo from '@push.rocks/smartmongo';
|
||||
import { smartunique } from '../ts/plugins.js';
|
||||
import * as smartdata from '../ts/index.js';
|
||||
|
||||
// Set up database connection
|
||||
let smartmongoInstance: smartmongo.SmartMongo;
|
||||
let testDb: smartdata.SmartdataDb;
|
||||
|
||||
// Define a simple document model for cursor tests
|
||||
@smartdata.Collection(() => testDb)
|
||||
class CursorTest extends smartdata.SmartDataDbDoc<CursorTest, CursorTest> {
|
||||
@smartdata.unI()
|
||||
public id: string = smartunique.shortId();
|
||||
|
||||
@smartdata.svDb()
|
||||
public name: string;
|
||||
|
||||
@smartdata.svDb()
|
||||
public order: number;
|
||||
|
||||
constructor(name: string, order: number) {
|
||||
super();
|
||||
this.name = name;
|
||||
this.order = order;
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize the in-memory MongoDB and SmartdataDB
|
||||
tap.test('cursor init: start Mongo and SmartdataDb', async () => {
|
||||
smartmongoInstance = await smartmongo.SmartMongo.createAndStart();
|
||||
testDb = new smartdata.SmartdataDb(
|
||||
await smartmongoInstance.getMongoDescriptor(),
|
||||
);
|
||||
await testDb.init();
|
||||
});
|
||||
|
||||
// Insert sample documents
|
||||
tap.test('cursor insert: save 5 test documents', async () => {
|
||||
for (let i = 1; i <= 5; i++) {
|
||||
const doc = new CursorTest(`item${i}`, i);
|
||||
await doc.save();
|
||||
}
|
||||
const count = await CursorTest.getCount({});
|
||||
expect(count).toEqual(5);
|
||||
});
|
||||
|
||||
// Test that toArray returns all documents
|
||||
tap.test('cursor toArray: retrieves all documents', async () => {
|
||||
const cursor = await CursorTest.getCursor({});
|
||||
const all = await cursor.toArray();
|
||||
expect(all.length).toEqual(5);
|
||||
});
|
||||
|
||||
// Test iteration via forEach
|
||||
tap.test('cursor forEach: iterates through all documents', async () => {
|
||||
const names: string[] = [];
|
||||
const cursor = await CursorTest.getCursor({});
|
||||
await cursor.forEach(async (item) => {
|
||||
names.push(item.name);
|
||||
});
|
||||
expect(names.length).toEqual(5);
|
||||
expect(names).toContain('item3');
|
||||
});
|
||||
|
||||
// Test native cursor modifiers: limit
|
||||
tap.test('cursor modifier limit: only two documents', async () => {
|
||||
const cursor = await CursorTest.getCursor({}, { modifier: (c) => c.limit(2) });
|
||||
const limited = await cursor.toArray();
|
||||
expect(limited.length).toEqual(2);
|
||||
});
|
||||
|
||||
// Test native cursor modifiers: sort and skip
|
||||
tap.test('cursor modifier sort & skip: returns correct order', async () => {
|
||||
const cursor = await CursorTest.getCursor({}, {
|
||||
modifier: (c) => c.sort({ order: -1 }).skip(1),
|
||||
});
|
||||
const results = await cursor.toArray();
|
||||
// Skipped the first (order 5), next should be 4,3,2,1
|
||||
expect(results.length).toEqual(4);
|
||||
expect(results[0].order).toEqual(4);
|
||||
});
|
||||
|
||||
// Cleanup: drop database, close connections, stop Mongo
|
||||
tap.test('cursor cleanup: drop DB and stop', async () => {
|
||||
await testDb.mongoDb.dropDatabase();
|
||||
await testDb.close();
|
||||
if (smartmongoInstance) {
|
||||
await smartmongoInstance.stopAndDumpToDir(
|
||||
`.nogit/dbdump/test.cursor.ts`,
|
||||
);
|
||||
}
|
||||
// Ensure process exits after cleanup
|
||||
setTimeout(() => process.exit(), 2000);
|
||||
});
|
||||
|
||||
export default tap.start();
|
@ -60,43 +60,6 @@ tap.test('should watch a collection', async (toolsArg) => {
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// ======= New tests for EventEmitter and buffering support =======
|
||||
tap.test('should emit change via EventEmitter', async (tools) => {
|
||||
const done = tools.defer();
|
||||
const watcher = await House.watch({});
|
||||
watcher.on('change', async (houseArg) => {
|
||||
// Expect a House instance
|
||||
expect(houseArg).toBeDefined();
|
||||
// Clean up
|
||||
await watcher.stop();
|
||||
done.resolve();
|
||||
});
|
||||
// Trigger an insert to generate a change event
|
||||
const h = new House();
|
||||
await h.save();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
tap.test('should buffer change events when bufferTimeMs is set', async (tools) => {
|
||||
const done = tools.defer();
|
||||
// bufferTimeMs collects events into arrays every 50ms
|
||||
const watcher = await House.watch({}, { bufferTimeMs: 50 });
|
||||
let received: House[];
|
||||
watcher.changeSubject.subscribe(async (batch: House[]) => {
|
||||
if (batch && batch.length > 0) {
|
||||
received = batch;
|
||||
await watcher.stop();
|
||||
done.resolve();
|
||||
}
|
||||
});
|
||||
// Rapidly insert multiple docs
|
||||
const docs = [new House(), new House(), new House()];
|
||||
for (const doc of docs) await doc.save();
|
||||
await done.promise;
|
||||
// All inserts should be in one buffered batch
|
||||
expect(received.length).toEqual(docs.length);
|
||||
});
|
||||
|
||||
// =======================================
|
||||
// close the database connection
|
||||
// =======================================
|
||||
|
@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartdata',
|
||||
version: '5.16.0',
|
||||
version: '5.15.0',
|
||||
description: 'An advanced library for NoSQL data organization and manipulation using TypeScript with support for MongoDB, data validation, collections, and custom data types.'
|
||||
}
|
||||
|
@ -256,40 +256,24 @@ export class SmartdataCollection<T> {
|
||||
}
|
||||
|
||||
/**
|
||||
* Watches the collection, returning a SmartdataDbWatcher with RxJS and EventEmitter support.
|
||||
* @param filterObject match filter for change stream
|
||||
* @param opts optional MongoDB ChangeStreamOptions & { bufferTimeMs } to buffer events
|
||||
* @param smartdataDbDocArg document class for instance creation
|
||||
* watches the collection while applying a filter
|
||||
*/
|
||||
public async watch(
|
||||
filterObject: any,
|
||||
opts: (plugins.mongodb.ChangeStreamOptions & { bufferTimeMs?: number }) = {},
|
||||
smartdataDbDocArg?: typeof SmartDataDbDoc,
|
||||
smartdataDbDocArg: typeof SmartDataDbDoc,
|
||||
): Promise<SmartdataDbWatcher> {
|
||||
await this.init();
|
||||
// Extract bufferTimeMs from options
|
||||
const { bufferTimeMs, fullDocument, ...otherOptions } = opts || {};
|
||||
// Determine fullDocument behavior: default to 'updateLookup'
|
||||
const changeStreamOptions: plugins.mongodb.ChangeStreamOptions = {
|
||||
...otherOptions,
|
||||
fullDocument:
|
||||
fullDocument === undefined
|
||||
? 'updateLookup'
|
||||
: fullDocument === true
|
||||
? 'updateLookup'
|
||||
: fullDocument,
|
||||
} as any;
|
||||
// Build pipeline with match if provided
|
||||
const pipeline = filterObject ? [{ $match: filterObject }] : [];
|
||||
const changeStream = this.mongoDbCollection.watch(
|
||||
pipeline,
|
||||
changeStreamOptions,
|
||||
);
|
||||
const smartdataWatcher = new SmartdataDbWatcher(
|
||||
changeStream,
|
||||
smartdataDbDocArg,
|
||||
{ bufferTimeMs },
|
||||
[
|
||||
{
|
||||
$match: filterObject,
|
||||
},
|
||||
],
|
||||
{
|
||||
fullDocument: 'updateLookup',
|
||||
},
|
||||
);
|
||||
const smartdataWatcher = new SmartdataDbWatcher(changeStream, smartdataDbDocArg);
|
||||
await smartdataWatcher.readyDeferred.promise;
|
||||
return smartdataWatcher;
|
||||
}
|
||||
|
@ -276,27 +276,38 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a cursor for streaming results, with optional session and native cursor modifiers.
|
||||
* @param filterArg Partial filter to apply
|
||||
* @param opts Optional session and modifier for the raw MongoDB cursor
|
||||
* get cursor
|
||||
* @returns
|
||||
*/
|
||||
/**
|
||||
* Get a cursor for streaming results, with optional session
|
||||
*/
|
||||
public static async getCursor<T>(
|
||||
this: plugins.tsclass.typeFest.Class<T>,
|
||||
filterArg: plugins.tsclass.typeFest.PartialDeep<T>,
|
||||
opts?: {
|
||||
session?: plugins.mongodb.ClientSession;
|
||||
modifier?: (cursorArg: plugins.mongodb.FindCursor<plugins.mongodb.WithId<plugins.mongodb.BSON.Document>>) => plugins.mongodb.FindCursor<plugins.mongodb.WithId<plugins.mongodb.BSON.Document>>;
|
||||
}
|
||||
opts?: { session?: plugins.mongodb.ClientSession }
|
||||
) {
|
||||
const collection: SmartdataCollection<T> = (this as any).collection;
|
||||
const cursor: SmartdataDbCursor<T> = await collection.getCursor(
|
||||
convertFilterForMongoDb(filterArg),
|
||||
this as any as typeof SmartDataDbDoc,
|
||||
{ session: opts?.session },
|
||||
);
|
||||
return cursor;
|
||||
}
|
||||
|
||||
public static async getCursorExtended<T>(
|
||||
this: plugins.tsclass.typeFest.Class<T>,
|
||||
filterArg: plugins.tsclass.typeFest.PartialDeep<T>,
|
||||
modifierFunction = (cursorArg: plugins.mongodb.FindCursor<plugins.mongodb.WithId<plugins.mongodb.BSON.Document>>) => cursorArg,
|
||||
): Promise<SmartdataDbCursor<T>> {
|
||||
const collection: SmartdataCollection<T> = (this as any).collection;
|
||||
const { session, modifier } = opts || {};
|
||||
await collection.init();
|
||||
let rawCursor: plugins.mongodb.FindCursor<any> =
|
||||
collection.mongoDbCollection.find(convertFilterForMongoDb(filterArg), { session });
|
||||
if (modifier) {
|
||||
rawCursor = modifier(rawCursor);
|
||||
}
|
||||
return new SmartdataDbCursor<T>(rawCursor, this as any as typeof SmartDataDbDoc);
|
||||
let cursor: plugins.mongodb.FindCursor<any> = collection.mongoDbCollection.find(
|
||||
convertFilterForMongoDb(filterArg),
|
||||
);
|
||||
cursor = modifierFunction(cursor);
|
||||
return new SmartdataDbCursor<T>(cursor, this as any as typeof SmartDataDbDoc);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -305,20 +316,13 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
|
||||
* @param filterArg
|
||||
* @param forEachFunction
|
||||
*/
|
||||
/**
|
||||
* Watch the collection for changes, with optional buffering and change stream options.
|
||||
* @param filterArg MongoDB filter to select which changes to observe
|
||||
* @param opts optional ChangeStreamOptions plus bufferTimeMs
|
||||
*/
|
||||
public static async watch<T>(
|
||||
this: plugins.tsclass.typeFest.Class<T>,
|
||||
filterArg: plugins.tsclass.typeFest.PartialDeep<T>,
|
||||
opts?: plugins.mongodb.ChangeStreamOptions & { bufferTimeMs?: number },
|
||||
): Promise<SmartdataDbWatcher<T>> {
|
||||
) {
|
||||
const collection: SmartdataCollection<T> = (this as any).collection;
|
||||
const watcher: SmartdataDbWatcher<T> = await collection.watch(
|
||||
convertFilterForMongoDb(filterArg),
|
||||
opts || {},
|
||||
this as any,
|
||||
);
|
||||
return watcher;
|
||||
|
@ -1,73 +1,37 @@
|
||||
import { SmartDataDbDoc } from './classes.doc.js';
|
||||
import * as plugins from './plugins.js';
|
||||
import { EventEmitter } from 'events';
|
||||
|
||||
/**
|
||||
* a wrapper for the native mongodb cursor. Exposes better
|
||||
*/
|
||||
/**
|
||||
* Wraps a MongoDB ChangeStream with RxJS and EventEmitter support.
|
||||
*/
|
||||
export class SmartdataDbWatcher<T = any> extends EventEmitter {
|
||||
export class SmartdataDbWatcher<T = any> {
|
||||
// STATIC
|
||||
public readyDeferred = plugins.smartpromise.defer();
|
||||
|
||||
// INSTANCE
|
||||
private changeStream: plugins.mongodb.ChangeStream<T>;
|
||||
private rawSubject: plugins.smartrx.rxjs.Subject<T>;
|
||||
/** Emits change documents (or arrays of documents if buffered) */
|
||||
public changeSubject: any;
|
||||
/**
|
||||
* @param changeStreamArg native MongoDB ChangeStream
|
||||
* @param smartdataDbDocArg document class for instance creation
|
||||
* @param opts.bufferTimeMs optional milliseconds to buffer events via RxJS
|
||||
*/
|
||||
|
||||
public changeSubject = new plugins.smartrx.rxjs.Subject<T>();
|
||||
constructor(
|
||||
changeStreamArg: plugins.mongodb.ChangeStream<T>,
|
||||
smartdataDbDocArg: typeof SmartDataDbDoc,
|
||||
opts?: { bufferTimeMs?: number },
|
||||
) {
|
||||
super();
|
||||
this.rawSubject = new plugins.smartrx.rxjs.Subject<T>();
|
||||
// Apply buffering if requested
|
||||
if (opts && opts.bufferTimeMs) {
|
||||
this.changeSubject = this.rawSubject.pipe(plugins.smartrx.rxjs.ops.bufferTime(opts.bufferTimeMs));
|
||||
} else {
|
||||
this.changeSubject = this.rawSubject;
|
||||
}
|
||||
this.changeStream = changeStreamArg;
|
||||
this.changeStream.on('change', async (item: any) => {
|
||||
let docInstance: T = null;
|
||||
if (item.fullDocument) {
|
||||
docInstance = smartdataDbDocArg.createInstanceFromMongoDbNativeDoc(
|
||||
item.fullDocument
|
||||
) as any as T;
|
||||
if (!item.fullDocument) {
|
||||
this.changeSubject.next(null);
|
||||
return;
|
||||
}
|
||||
// Notify subscribers
|
||||
this.rawSubject.next(docInstance);
|
||||
this.emit('change', docInstance);
|
||||
this.changeSubject.next(
|
||||
smartdataDbDocArg.createInstanceFromMongoDbNativeDoc(item.fullDocument) as any as T,
|
||||
);
|
||||
});
|
||||
// Signal readiness after one tick
|
||||
plugins.smartdelay.delayFor(0).then(() => {
|
||||
this.readyDeferred.resolve();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the change stream, complete the RxJS subject, and remove listeners.
|
||||
*/
|
||||
public async close(): Promise<void> {
|
||||
// Close MongoDB ChangeStream
|
||||
public async close() {
|
||||
await this.changeStream.close();
|
||||
// Complete the subject to teardown any buffering operators
|
||||
this.rawSubject.complete();
|
||||
// Remove all EventEmitter listeners
|
||||
this.removeAllListeners();
|
||||
}
|
||||
/**
|
||||
* Alias for close(), matching README usage
|
||||
*/
|
||||
public async stop(): Promise<void> {
|
||||
return this.close();
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user