Compare commits

..

No commits in common. "master" and "v5.13.0" have entirely different histories.

11 changed files with 305 additions and 990 deletions

View File

@ -1,50 +1,5 @@
# Changelog # Changelog
## 2025-04-25 - 5.16.0 - feat(watcher)
Enhance change stream watchers with buffering and EventEmitter support; update dependency versions
- Bumped smartmongo from ^2.0.11 to ^2.0.12 and smartrx from ^3.0.7 to ^3.0.10
- Upgraded @tsclass/tsclass to ^9.0.0 and mongodb to ^6.16.0
- Refactored the watch API to accept additional options (bufferTimeMs, fullDocument) for improved change stream handling
- Modified SmartdataDbWatcher to extend EventEmitter and support event notifications
## 2025-04-24 - 5.15.1 - fix(cursor)
Improve cursor usage documentation and refactor getCursor API to support native cursor modifiers
- Updated examples in readme.md to demonstrate manual iteration using cursor.next() and proper cursor closing.
- Refactored the getCursor method in classes.doc.ts to accept session and modifier options, consolidating cursor handling.
- Added new tests in test/test.cursor.ts to verify cursor operations, including limits, sorting, and skipping.
## 2025-04-24 - 5.15.0 - feat(svDb)
Enhance svDb decorator to support custom serialization and deserialization options
- Added an optional options parameter to the svDb decorator to accept serialize/deserialize functions
- Updated instance creation logic (updateFromDb) to apply custom deserialization if provided
- Updated createSavableObject to use custom serialization when available
## 2025-04-23 - 5.14.1 - fix(db operations)
Update transaction API to consistently pass optional session parameters across database operations
- Revised transaction support in readme to use startSession without await and showcased session usage in getInstance and save calls
- Updated methods in classes.collection.ts to accept an optional session parameter for findOne, getCursor, findAll, insert, update, delete, and getCount
- Enhanced SmartDataDbDoc save and delete methods to propagate session parameters
- Improved overall consistency of transactional APIs across the library
## 2025-04-23 - 5.14.0 - feat(doc)
Implement support for beforeSave, afterSave, beforeDelete, and afterDelete lifecycle hooks in document save and delete operations to allow custom logic execution during these critical moments.
- Calls beforeSave hook if defined before performing insert or update.
- Calls afterSave hook after a document is saved.
- Calls beforeDelete hook before deletion and afterDelete hook afterward.
- Ensures _updatedAt timestamp is refreshed during save operations.
## 2025-04-22 - 5.13.1 - fix(search)
Improve search query parsing for implicit AND queries by preserving quoted substrings and better handling free terms, quoted phrases, and field:value tokens.
- Replace previous implicit AND logic with tokenization that preserves quoted substrings
- Support both free term and field:value tokens with wildcards inside quotes
- Ensure errors are thrown for non-searchable fields in field-specific queries
## 2025-04-22 - 5.13.0 - feat(search) ## 2025-04-22 - 5.13.0 - feat(search)
Improve search query handling and update documentation Improve search query handling and update documentation

View File

@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartdata", "name": "@push.rocks/smartdata",
"version": "5.16.0", "version": "5.13.0",
"private": false, "private": false,
"description": "An advanced library for NoSQL data organization and manipulation using TypeScript with support for MongoDB, data validation, collections, and custom data types.", "description": "An advanced library for NoSQL data organization and manipulation using TypeScript with support for MongoDB, data validation, collections, and custom data types.",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",
@ -26,23 +26,23 @@
"@push.rocks/lik": "^6.0.14", "@push.rocks/lik": "^6.0.14",
"@push.rocks/smartdelay": "^3.0.1", "@push.rocks/smartdelay": "^3.0.1",
"@push.rocks/smartlog": "^3.0.2", "@push.rocks/smartlog": "^3.0.2",
"@push.rocks/smartmongo": "^2.0.12", "@push.rocks/smartmongo": "^2.0.11",
"@push.rocks/smartpromise": "^4.0.2", "@push.rocks/smartpromise": "^4.0.2",
"@push.rocks/smartrx": "^3.0.10", "@push.rocks/smartrx": "^3.0.7",
"@push.rocks/smartstring": "^4.0.15", "@push.rocks/smartstring": "^4.0.15",
"@push.rocks/smarttime": "^4.0.6", "@push.rocks/smarttime": "^4.0.6",
"@push.rocks/smartunique": "^3.0.8", "@push.rocks/smartunique": "^3.0.8",
"@push.rocks/taskbuffer": "^3.1.7", "@push.rocks/taskbuffer": "^3.1.7",
"@tsclass/tsclass": "^9.0.0", "@tsclass/tsclass": "^8.2.0",
"mongodb": "^6.16.0" "mongodb": "^6.15.0"
}, },
"devDependencies": { "devDependencies": {
"@git.zone/tsbuild": "^2.3.2", "@git.zone/tsbuild": "^2.3.2",
"@git.zone/tsrun": "^1.2.44", "@git.zone/tsrun": "^1.2.44",
"@git.zone/tstest": "^1.0.77", "@git.zone/tstest": "^1.0.77",
"@push.rocks/qenv": "^6.0.5", "@push.rocks/qenv": "^6.0.5",
"@push.rocks/tapbundle": "^5.6.3", "@push.rocks/tapbundle": "^5.6.2",
"@types/node": "^22.15.2" "@types/node": "^22.14.0"
}, },
"files": [ "files": [
"ts/**/*", "ts/**/*",

647
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@ -133,34 +133,31 @@ const user = await User.getInstance({ username: 'myUsername' });
// Fetch multiple users that match criteria // Fetch multiple users that match criteria
const users = await User.getInstances({ email: 'myEmail@example.com' }); const users = await User.getInstances({ email: 'myEmail@example.com' });
// Obtain a cursor for large result sets // Using a cursor for large collections
const cursor = await User.getCursor({ active: true }); const cursor = await User.getCursor({ active: true });
// Stream each document efficiently // Process documents one at a time (memory efficient)
await cursor.forEach(async (user) => { await cursor.forEach(async (user, index) => {
console.log(`Processing user: ${user.username}`); // Process each user with its position
console.log(`Processing user ${index}: ${user.username}`);
}); });
// Manually iterate using next() // Chain cursor methods like in the MongoDB native driver
let nextUser; const paginatedCursor = await User.getCursor({ active: true })
while ((nextUser = await cursor.next())) { .limit(10) // Limit results
console.log(`Next user: ${nextUser.username}`); .skip(20) // Skip first 20 results
} .sort({ createdAt: -1 }); // Sort by creation date descending
// Convert to array when the result set is small // Convert cursor to array (when you know the result set is small)
const userArray = await cursor.toArray(); const userArray = await paginatedCursor.toArray();
// Close the cursor to free resources // Other cursor operations
const nextUser = await cursor.next(); // Get the next document
const hasMoreUsers = await cursor.hasNext(); // Check if more documents exist
const count = await cursor.count(); // Get the count of documents in the cursor
// Always close cursors when done with them
await cursor.close(); await cursor.close();
// For native cursor modifiers (sort, skip, limit), use getCursor with modifier option:
const paginatedCursor = await User.getCursor(
{ active: true },
{ modifier: (c) => c.sort({ createdAt: -1 }).skip(20).limit(10) }
);
await paginatedCursor.forEach((user) => {
console.log(`Paginated user: ${user.username}`);
});
``` ```
#### Update #### Update
@ -412,23 +409,19 @@ class Product extends SmartDataDbDoc<Product, Product> {
### Transaction Support ### Transaction Support
Use MongoDB transactions for atomic operations. SmartData now exposes `startSession()` and accepts an optional session in all fetch and write APIs: Use MongoDB transactions for atomic operations:
```typescript ```typescript
// start a client session (no await) const session = await db.startSession();
const session = db.startSession();
try { try {
// wrap operations in a transaction
await session.withTransaction(async () => { await session.withTransaction(async () => {
// pass session as second arg to getInstance const user = await User.getInstance({ id: 'user-id' }, { session });
const user = await User.getInstance({ id: 'user-id' }, session);
user.balance -= 100; user.balance -= 100;
// pass session in save opts
await user.save({ session }); await user.save({ session });
const recipient = await User.getInstance({ id: 'recipient-id' }, session); const recipient = await User.getInstance({ id: 'recipient-id' }, { session });
recipient.balance += 100; recipient.balance += 100;
await recipient.save({ session }); await user.save({ session });
}); });
} finally { } finally {
await session.endSession(); await session.endSession();
@ -525,11 +518,6 @@ class Order extends SmartDataDbDoc<Order, Order> {
throw new Error('Order cannot be deleted'); throw new Error('Order cannot be deleted');
} }
} }
// Called after deleting the document
async afterDelete() {
// Cleanup or audit actions
await auditLogDeletion(this.id);
}
} }
``` ```

View File

@ -1,97 +0,0 @@
import { tap, expect } from '@push.rocks/tapbundle';
import * as smartmongo from '@push.rocks/smartmongo';
import { smartunique } from '../ts/plugins.js';
import * as smartdata from '../ts/index.js';
// Set up database connection
let smartmongoInstance: smartmongo.SmartMongo;
let testDb: smartdata.SmartdataDb;
// Define a simple document model for cursor tests
@smartdata.Collection(() => testDb)
class CursorTest extends smartdata.SmartDataDbDoc<CursorTest, CursorTest> {
@smartdata.unI()
public id: string = smartunique.shortId();
@smartdata.svDb()
public name: string;
@smartdata.svDb()
public order: number;
constructor(name: string, order: number) {
super();
this.name = name;
this.order = order;
}
}
// Initialize the in-memory MongoDB and SmartdataDB
tap.test('cursor init: start Mongo and SmartdataDb', async () => {
smartmongoInstance = await smartmongo.SmartMongo.createAndStart();
testDb = new smartdata.SmartdataDb(
await smartmongoInstance.getMongoDescriptor(),
);
await testDb.init();
});
// Insert sample documents
tap.test('cursor insert: save 5 test documents', async () => {
for (let i = 1; i <= 5; i++) {
const doc = new CursorTest(`item${i}`, i);
await doc.save();
}
const count = await CursorTest.getCount({});
expect(count).toEqual(5);
});
// Test that toArray returns all documents
tap.test('cursor toArray: retrieves all documents', async () => {
const cursor = await CursorTest.getCursor({});
const all = await cursor.toArray();
expect(all.length).toEqual(5);
});
// Test iteration via forEach
tap.test('cursor forEach: iterates through all documents', async () => {
const names: string[] = [];
const cursor = await CursorTest.getCursor({});
await cursor.forEach(async (item) => {
names.push(item.name);
});
expect(names.length).toEqual(5);
expect(names).toContain('item3');
});
// Test native cursor modifiers: limit
tap.test('cursor modifier limit: only two documents', async () => {
const cursor = await CursorTest.getCursor({}, { modifier: (c) => c.limit(2) });
const limited = await cursor.toArray();
expect(limited.length).toEqual(2);
});
// Test native cursor modifiers: sort and skip
tap.test('cursor modifier sort & skip: returns correct order', async () => {
const cursor = await CursorTest.getCursor({}, {
modifier: (c) => c.sort({ order: -1 }).skip(1),
});
const results = await cursor.toArray();
// Skipped the first (order 5), next should be 4,3,2,1
expect(results.length).toEqual(4);
expect(results[0].order).toEqual(4);
});
// Cleanup: drop database, close connections, stop Mongo
tap.test('cursor cleanup: drop DB and stop', async () => {
await testDb.mongoDb.dropDatabase();
await testDb.close();
if (smartmongoInstance) {
await smartmongoInstance.stopAndDumpToDir(
`.nogit/dbdump/test.cursor.ts`,
);
}
// Ensure process exits after cleanup
setTimeout(() => process.exit(), 2000);
});
export default tap.start();

View File

@ -60,43 +60,6 @@ tap.test('should watch a collection', async (toolsArg) => {
await done.promise; await done.promise;
}); });
// ======= New tests for EventEmitter and buffering support =======
tap.test('should emit change via EventEmitter', async (tools) => {
const done = tools.defer();
const watcher = await House.watch({});
watcher.on('change', async (houseArg) => {
// Expect a House instance
expect(houseArg).toBeDefined();
// Clean up
await watcher.stop();
done.resolve();
});
// Trigger an insert to generate a change event
const h = new House();
await h.save();
await done.promise;
});
tap.test('should buffer change events when bufferTimeMs is set', async (tools) => {
const done = tools.defer();
// bufferTimeMs collects events into arrays every 50ms
const watcher = await House.watch({}, { bufferTimeMs: 50 });
let received: House[];
watcher.changeSubject.subscribe(async (batch: House[]) => {
if (batch && batch.length > 0) {
received = batch;
await watcher.stop();
done.resolve();
}
});
// Rapidly insert multiple docs
const docs = [new House(), new House(), new House()];
for (const doc of docs) await doc.save();
await done.promise;
// All inserts should be in one buffered batch
expect(received.length).toEqual(docs.length);
});
// ======================================= // =======================================
// close the database connection // close the database connection
// ======================================= // =======================================

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartdata', name: '@push.rocks/smartdata',
version: '5.16.0', version: '5.13.0',
description: 'An advanced library for NoSQL data organization and manipulation using TypeScript with support for MongoDB, data validation, collections, and custom data types.' description: 'An advanced library for NoSQL data organization and manipulation using TypeScript with support for MongoDB, data validation, collections, and custom data types.'
} }

View File

@ -222,74 +222,53 @@ export class SmartdataCollection<T> {
/** /**
* finds an object in the DbCollection * finds an object in the DbCollection
*/ */
public async findOne( public async findOne(filterObject: any): Promise<any> {
filterObject: any,
opts?: { session?: plugins.mongodb.ClientSession }
): Promise<any> {
await this.init(); await this.init();
// Use MongoDB driver's findOne with optional session const cursor = this.mongoDbCollection.find(filterObject);
return this.mongoDbCollection.findOne(filterObject, { session: opts?.session }); const result = await cursor.next();
cursor.close();
return result;
} }
public async getCursor( public async getCursor(
filterObjectArg: any, filterObjectArg: any,
dbDocArg: typeof SmartDataDbDoc, dbDocArg: typeof SmartDataDbDoc,
opts?: { session?: plugins.mongodb.ClientSession }
): Promise<SmartdataDbCursor<any>> { ): Promise<SmartdataDbCursor<any>> {
await this.init(); await this.init();
const cursor = this.mongoDbCollection.find(filterObjectArg, { session: opts?.session }); const cursor = this.mongoDbCollection.find(filterObjectArg);
return new SmartdataDbCursor(cursor, dbDocArg); return new SmartdataDbCursor(cursor, dbDocArg);
} }
/** /**
* finds an object in the DbCollection * finds an object in the DbCollection
*/ */
public async findAll( public async findAll(filterObject: any): Promise<any[]> {
filterObject: any,
opts?: { session?: plugins.mongodb.ClientSession }
): Promise<any[]> {
await this.init(); await this.init();
const cursor = this.mongoDbCollection.find(filterObject, { session: opts?.session }); const cursor = this.mongoDbCollection.find(filterObject);
const result = await cursor.toArray(); const result = await cursor.toArray();
cursor.close(); cursor.close();
return result; return result;
} }
/** /**
* Watches the collection, returning a SmartdataDbWatcher with RxJS and EventEmitter support. * watches the collection while applying a filter
* @param filterObject match filter for change stream
* @param opts optional MongoDB ChangeStreamOptions & { bufferTimeMs } to buffer events
* @param smartdataDbDocArg document class for instance creation
*/ */
public async watch( public async watch(
filterObject: any, filterObject: any,
opts: (plugins.mongodb.ChangeStreamOptions & { bufferTimeMs?: number }) = {}, smartdataDbDocArg: typeof SmartDataDbDoc,
smartdataDbDocArg?: typeof SmartDataDbDoc,
): Promise<SmartdataDbWatcher> { ): Promise<SmartdataDbWatcher> {
await this.init(); await this.init();
// Extract bufferTimeMs from options
const { bufferTimeMs, fullDocument, ...otherOptions } = opts || {};
// Determine fullDocument behavior: default to 'updateLookup'
const changeStreamOptions: plugins.mongodb.ChangeStreamOptions = {
...otherOptions,
fullDocument:
fullDocument === undefined
? 'updateLookup'
: fullDocument === true
? 'updateLookup'
: fullDocument,
} as any;
// Build pipeline with match if provided
const pipeline = filterObject ? [{ $match: filterObject }] : [];
const changeStream = this.mongoDbCollection.watch( const changeStream = this.mongoDbCollection.watch(
pipeline, [
changeStreamOptions, {
); $match: filterObject,
const smartdataWatcher = new SmartdataDbWatcher( },
changeStream, ],
smartdataDbDocArg, {
{ bufferTimeMs }, fullDocument: 'updateLookup',
},
); );
const smartdataWatcher = new SmartdataDbWatcher(changeStream, smartdataDbDocArg);
await smartdataWatcher.readyDeferred.promise; await smartdataWatcher.readyDeferred.promise;
return smartdataWatcher; return smartdataWatcher;
} }
@ -297,10 +276,7 @@ export class SmartdataCollection<T> {
/** /**
* create an object in the database * create an object in the database
*/ */
public async insert( public async insert(dbDocArg: T & SmartDataDbDoc<T, unknown>): Promise<any> {
dbDocArg: T & SmartDataDbDoc<T, unknown>,
opts?: { session?: plugins.mongodb.ClientSession }
): Promise<any> {
await this.init(); await this.init();
await this.checkDoc(dbDocArg); await this.checkDoc(dbDocArg);
this.markUniqueIndexes(dbDocArg.uniqueIndexes); this.markUniqueIndexes(dbDocArg.uniqueIndexes);
@ -311,17 +287,14 @@ export class SmartdataCollection<T> {
} }
const saveableObject = await dbDocArg.createSavableObject(); const saveableObject = await dbDocArg.createSavableObject();
const result = await this.mongoDbCollection.insertOne(saveableObject, { session: opts?.session }); const result = await this.mongoDbCollection.insertOne(saveableObject);
return result; return result;
} }
/** /**
* inserts object into the DbCollection * inserts object into the DbCollection
*/ */
public async update( public async update(dbDocArg: T & SmartDataDbDoc<T, unknown>): Promise<any> {
dbDocArg: T & SmartDataDbDoc<T, unknown>,
opts?: { session?: plugins.mongodb.ClientSession }
): Promise<any> {
await this.init(); await this.init();
await this.checkDoc(dbDocArg); await this.checkDoc(dbDocArg);
const identifiableObject = await dbDocArg.createIdentifiableObject(); const identifiableObject = await dbDocArg.createIdentifiableObject();
@ -336,27 +309,21 @@ export class SmartdataCollection<T> {
const result = await this.mongoDbCollection.updateOne( const result = await this.mongoDbCollection.updateOne(
identifiableObject, identifiableObject,
{ $set: updateableObject }, { $set: updateableObject },
{ upsert: true, session: opts?.session }, { upsert: true },
); );
return result; return result;
} }
public async delete( public async delete(dbDocArg: T & SmartDataDbDoc<T, unknown>): Promise<any> {
dbDocArg: T & SmartDataDbDoc<T, unknown>,
opts?: { session?: plugins.mongodb.ClientSession }
): Promise<any> {
await this.init(); await this.init();
await this.checkDoc(dbDocArg); await this.checkDoc(dbDocArg);
const identifiableObject = await dbDocArg.createIdentifiableObject(); const identifiableObject = await dbDocArg.createIdentifiableObject();
await this.mongoDbCollection.deleteOne(identifiableObject, { session: opts?.session }); await this.mongoDbCollection.deleteOne(identifiableObject);
} }
public async getCount( public async getCount(filterObject: any) {
filterObject: any,
opts?: { session?: plugins.mongodb.ClientSession }
) {
await this.init(); await this.init();
return this.mongoDbCollection.countDocuments(filterObject, { session: opts?.session }); return this.mongoDbCollection.countDocuments(filterObject);
} }
/** /**

View File

@ -63,12 +63,6 @@ export class SmartdataDb {
this.status = 'disconnected'; this.status = 'disconnected';
logger.log('info', `disconnected from database ${this.smartdataOptions.mongoDbName}`); logger.log('info', `disconnected from database ${this.smartdataOptions.mongoDbName}`);
} }
/**
* Start a MongoDB client session for transactions
*/
public startSession(): plugins.mongodb.ClientSession {
return this.mongoDbClient.startSession();
}
// handle table to class distribution // handle table to class distribution

View File

@ -11,18 +11,8 @@ import { SmartdataLuceneAdapter } from './classes.lucene.adapter.js';
* - validate: post-fetch validator, return true to keep a doc * - validate: post-fetch validator, return true to keep a doc
*/ */
export interface SearchOptions<T> { export interface SearchOptions<T> {
/**
* Additional MongoDB filter to ANDmerge into the query
*/
filter?: Record<string, any>; filter?: Record<string, any>;
/**
* Postfetch validator; return true to keep each doc
*/
validate?: (doc: T) => Promise<boolean> | boolean; validate?: (doc: T) => Promise<boolean> | boolean;
/**
* Optional MongoDB session for transactional operations
*/
session?: plugins.mongodb.ClientSession;
} }
export type TDocCreation = 'db' | 'new' | 'mixed'; export type TDocCreation = 'db' | 'new' | 'mixed';
@ -39,34 +29,16 @@ export function globalSvDb() {
}; };
} }
/**
* Options for custom serialization/deserialization of a field.
*/
export interface SvDbOptions {
/** Function to serialize the field value before saving to DB */
serialize?: (value: any) => any;
/** Function to deserialize the field value after reading from DB */
deserialize?: (value: any) => any;
}
/** /**
* saveable - saveable decorator to be used on class properties * saveable - saveable decorator to be used on class properties
*/ */
export function svDb(options?: SvDbOptions) { export function svDb() {
return (target: SmartDataDbDoc<unknown, unknown>, key: string) => { return (target: SmartDataDbDoc<unknown, unknown>, key: string) => {
console.log(`called svDb() on >${target.constructor.name}.${key}<`); console.log(`called svDb() on >${target.constructor.name}.${key}<`);
if (!target.saveableProperties) { if (!target.saveableProperties) {
target.saveableProperties = []; target.saveableProperties = [];
} }
target.saveableProperties.push(key); target.saveableProperties.push(key);
// attach custom serializer/deserializer options to the class constructor
const ctor = target.constructor as any;
if (!ctor._svDbOptions) {
ctor._svDbOptions = {};
}
if (options) {
ctor._svDbOptions[key] = options;
}
}; };
} }
@ -207,12 +179,7 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
const newInstance = new this(); const newInstance = new this();
(newInstance as any).creationStatus = 'db'; (newInstance as any).creationStatus = 'db';
for (const key of Object.keys(mongoDbNativeDocArg)) { for (const key of Object.keys(mongoDbNativeDocArg)) {
const rawValue = mongoDbNativeDocArg[key]; newInstance[key] = mongoDbNativeDocArg[key];
const optionsMap = (this as any)._svDbOptions || {};
const opts = optionsMap[key];
newInstance[key] = opts && typeof opts.deserialize === 'function'
? opts.deserialize(rawValue)
: rawValue;
} }
return newInstance; return newInstance;
} }
@ -226,13 +193,8 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
public static async getInstances<T>( public static async getInstances<T>(
this: plugins.tsclass.typeFest.Class<T>, this: plugins.tsclass.typeFest.Class<T>,
filterArg: plugins.tsclass.typeFest.PartialDeep<T>, filterArg: plugins.tsclass.typeFest.PartialDeep<T>,
opts?: { session?: plugins.mongodb.ClientSession }
): Promise<T[]> { ): Promise<T[]> {
// Pass session through to findAll for transactional queries const foundDocs = await (this as any).collection.findAll(convertFilterForMongoDb(filterArg));
const foundDocs = await (this as any).collection.findAll(
convertFilterForMongoDb(filterArg),
{ session: opts?.session },
);
const returnArray = []; const returnArray = [];
for (const foundDoc of foundDocs) { for (const foundDoc of foundDocs) {
const newInstance: T = (this as any).createInstanceFromMongoDbNativeDoc(foundDoc); const newInstance: T = (this as any).createInstanceFromMongoDbNativeDoc(foundDoc);
@ -250,13 +212,8 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
public static async getInstance<T>( public static async getInstance<T>(
this: plugins.tsclass.typeFest.Class<T>, this: plugins.tsclass.typeFest.Class<T>,
filterArg: plugins.tsclass.typeFest.PartialDeep<T>, filterArg: plugins.tsclass.typeFest.PartialDeep<T>,
opts?: { session?: plugins.mongodb.ClientSession }
): Promise<T> { ): Promise<T> {
// Retrieve one document, with optional session for transactions const foundDoc = await (this as any).collection.findOne(convertFilterForMongoDb(filterArg));
const foundDoc = await (this as any).collection.findOne(
convertFilterForMongoDb(filterArg),
{ session: opts?.session },
);
if (foundDoc) { if (foundDoc) {
const newInstance: T = (this as any).createInstanceFromMongoDbNativeDoc(foundDoc); const newInstance: T = (this as any).createInstanceFromMongoDbNativeDoc(foundDoc);
return newInstance; return newInstance;
@ -276,27 +233,33 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
} }
/** /**
* Get a cursor for streaming results, with optional session and native cursor modifiers. * get cursor
* @param filterArg Partial filter to apply * @returns
* @param opts Optional session and modifier for the raw MongoDB cursor
*/ */
public static async getCursor<T>( public static async getCursor<T>(
this: plugins.tsclass.typeFest.Class<T>, this: plugins.tsclass.typeFest.Class<T>,
filterArg: plugins.tsclass.typeFest.PartialDeep<T>, filterArg: plugins.tsclass.typeFest.PartialDeep<T>,
opts?: { ) {
session?: plugins.mongodb.ClientSession; const collection: SmartdataCollection<T> = (this as any).collection;
modifier?: (cursorArg: plugins.mongodb.FindCursor<plugins.mongodb.WithId<plugins.mongodb.BSON.Document>>) => plugins.mongodb.FindCursor<plugins.mongodb.WithId<plugins.mongodb.BSON.Document>>; const cursor: SmartdataDbCursor<T> = await collection.getCursor(
} convertFilterForMongoDb(filterArg),
this as any as typeof SmartDataDbDoc,
);
return cursor;
}
public static async getCursorExtended<T>(
this: plugins.tsclass.typeFest.Class<T>,
filterArg: plugins.tsclass.typeFest.PartialDeep<T>,
modifierFunction = (cursorArg: plugins.mongodb.FindCursor<plugins.mongodb.WithId<plugins.mongodb.BSON.Document>>) => cursorArg,
): Promise<SmartdataDbCursor<T>> { ): Promise<SmartdataDbCursor<T>> {
const collection: SmartdataCollection<T> = (this as any).collection; const collection: SmartdataCollection<T> = (this as any).collection;
const { session, modifier } = opts || {};
await collection.init(); await collection.init();
let rawCursor: plugins.mongodb.FindCursor<any> = let cursor: plugins.mongodb.FindCursor<any> = collection.mongoDbCollection.find(
collection.mongoDbCollection.find(convertFilterForMongoDb(filterArg), { session }); convertFilterForMongoDb(filterArg),
if (modifier) { );
rawCursor = modifier(rawCursor); cursor = modifierFunction(cursor);
} return new SmartdataDbCursor<T>(cursor, this as any as typeof SmartDataDbDoc);
return new SmartdataDbCursor<T>(rawCursor, this as any as typeof SmartDataDbDoc);
} }
/** /**
@ -305,20 +268,13 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
* @param filterArg * @param filterArg
* @param forEachFunction * @param forEachFunction
*/ */
/**
* Watch the collection for changes, with optional buffering and change stream options.
* @param filterArg MongoDB filter to select which changes to observe
* @param opts optional ChangeStreamOptions plus bufferTimeMs
*/
public static async watch<T>( public static async watch<T>(
this: plugins.tsclass.typeFest.Class<T>, this: plugins.tsclass.typeFest.Class<T>,
filterArg: plugins.tsclass.typeFest.PartialDeep<T>, filterArg: plugins.tsclass.typeFest.PartialDeep<T>,
opts?: plugins.mongodb.ChangeStreamOptions & { bufferTimeMs?: number }, ) {
): Promise<SmartdataDbWatcher<T>> {
const collection: SmartdataCollection<T> = (this as any).collection; const collection: SmartdataCollection<T> = (this as any).collection;
const watcher: SmartdataDbWatcher<T> = await collection.watch( const watcher: SmartdataDbWatcher<T> = await collection.watch(
convertFilterForMongoDb(filterArg), convertFilterForMongoDb(filterArg),
opts || {},
this as any, this as any,
); );
return watcher; return watcher;
@ -383,9 +339,7 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
if (opts?.filter) { if (opts?.filter) {
mongoFilter = { $and: [mongoFilter, opts.filter] }; mongoFilter = { $and: [mongoFilter, opts.filter] };
} }
// Fetch with optional session for transactions let docs: T[] = await (this as any).getInstances(mongoFilter);
// Fetch within optional session
let docs: T[] = await (this as any).getInstances(mongoFilter, { session: opts?.session });
if (opts?.validate) { if (opts?.validate) {
const out: T[] = []; const out: T[] = [];
for (const d of docs) { for (const d of docs) {
@ -465,52 +419,62 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
const orConds = searchableFields.map((f) => ({ [f]: { $regex: pattern, $options: 'i' } })); const orConds = searchableFields.map((f) => ({ [f]: { $regex: pattern, $options: 'i' } }));
return await (this as any).execQuery({ $or: orConds }, opts); return await (this as any).execQuery({ $or: orConds }, opts);
} }
// implicit AND for multiple tokens: free terms, quoted phrases, and field:values // implicit AND: combine free terms and field:value terms (with or without wildcards)
{ const parts = q.split(/\s+/);
// Split query into tokens, preserving quoted substrings const hasColon = parts.some((t) => t.includes(':'));
const rawTokens = q.match(/(?:[^\s"']+|"[^"]*"|'[^']*')+/g) || []; if (
// Only apply when more than one token and no boolean operators or grouping parts.length > 1 && hasColon &&
if ( !q.includes(' AND ') && !q.includes(' OR ') && !q.includes(' NOT ') &&
rawTokens.length > 1 && !q.includes('(') && !q.includes(')') &&
!/(\bAND\b|\bOR\b|\bNOT\b|\(|\))/i.test(q) && !q.includes('[') && !q.includes(']') &&
!/\[|\]/.test(q) !q.includes('"') && !q.includes("'")
) { ) {
const andConds: any[] = []; const andConds = parts.map((term) => {
for (let token of rawTokens) { const m = term.match(/^(\w+):(.+)$/);
// field:value token if (m) {
const fv = token.match(/^(\w+):(.+)$/); const field = m[1];
if (fv) { const value = m[2];
const field = fv[1]; if (!searchableFields.includes(field)) {
let value = fv[2]; throw new Error(`Field '${field}' is not searchable for class ${this.name}`);
if (!searchableFields.includes(field)) {
throw new Error(`Field '${field}' is not searchable for class ${this.name}`);
}
// Strip surrounding quotes if present
if ((value.startsWith('"') && value.endsWith('"')) || (value.startsWith("'") && value.endsWith("'"))) {
value = value.slice(1, -1);
}
// Wildcard search?
if (value.includes('*') || value.includes('?')) {
const escaped = value.replace(/([.+^${}()|[\\]\\])/g, '\\$1');
const pattern = escaped.replace(/\*/g, '.*').replace(/\?/g, '.');
andConds.push({ [field]: { $regex: pattern, $options: 'i' } });
} else {
andConds.push({ [field]: value });
}
} else if ((token.startsWith('"') && token.endsWith('"')) || (token.startsWith("'") && token.endsWith("'"))) {
// Quoted free phrase across all fields
const phrase = token.slice(1, -1);
const parts = phrase.split(/\s+/).map((t) => escapeForRegex(t));
const pattern = parts.join('\\s+');
andConds.push({ $or: searchableFields.map((f) => ({ [f]: { $regex: pattern, $options: 'i' } })) });
} else {
// Free term across all fields
const esc = escapeForRegex(token);
andConds.push({ $or: searchableFields.map((f) => ({ [f]: { $regex: esc, $options: 'i' } })) });
} }
if (value.includes('*') || value.includes('?')) {
// wildcard field search
const escaped = value.replace(/([.+^${}()|[\\]\\])/g, '\\$1');
const pattern = escaped.replace(/\*/g, '.*').replace(/\?/g, '.');
return { [field]: { $regex: pattern, $options: 'i' } };
}
// exact field:value
return { [field]: value };
} }
return await (this as any).execQuery({ $and: andConds }, opts); // free term -> regex across all searchable fields
const esc = escapeForRegex(term);
return { $or: searchableFields.map((f) => ({ [f]: { $regex: esc, $options: 'i' } })) };
});
return await (this as any).execQuery({ $and: andConds }, opts);
}
// free term and quoted field phrase (exact or wildcard), e.g. 'term field:"phrase"' or 'term field:"ph*se"'
const freeWithQuotedField = q.match(/^(\S+)\s+(\w+):"(.+)"$/);
if (freeWithQuotedField) {
const freeTerm = freeWithQuotedField[1];
const field = freeWithQuotedField[2];
let phrase = freeWithQuotedField[3];
if (!searchableFields.includes(field)) {
throw new Error(`Field '${field}' is not searchable for class ${this.name}`);
} }
// free term condition across all searchable fields
const freeEsc = escapeForRegex(freeTerm);
const freeCond = { $or: searchableFields.map((f) => ({ [f]: { $regex: freeEsc, $options: 'i' } })) };
// field condition: exact match or wildcard pattern
let fieldCond;
if (phrase.includes('*') || phrase.includes('?')) {
const escaped = phrase.replace(/([.+^${}()|[\\]\\])/g, '\\$1');
const pattern = escaped.replace(/\*/g, '.*').replace(/\?/g, '.');
fieldCond = { [field]: { $regex: pattern, $options: 'i' } };
} else {
fieldCond = { [field]: phrase };
}
return await (this as any).execQuery({ $and: [freeCond, fieldCond] }, opts);
} }
// detect advanced Lucene syntax: field:value, wildcards, boolean, grouping // detect advanced Lucene syntax: field:value, wildcards, boolean, grouping
const luceneSyntax = /(\w+:[^\s]+)|\*|\?|\bAND\b|\bOR\b|\bNOT\b|\(|\)/; const luceneSyntax = /(\w+:[^\s]+)|\*|\?|\bAND\b|\bOR\b|\bNOT\b|\(|\)/;
@ -592,52 +556,35 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
constructor() {} constructor() {}
/** /**
* saves this instance (optionally within a transaction) * saves this instance but not any connected items
* may lead to data inconsistencies, but is faster
*/ */
public async save(opts?: { session?: plugins.mongodb.ClientSession }) { public async save() {
// allow hook before saving
if (typeof (this as any).beforeSave === 'function') {
await (this as any).beforeSave();
}
// tslint:disable-next-line: no-this-assignment // tslint:disable-next-line: no-this-assignment
const self: any = this; const self: any = this;
let dbResult: any; let dbResult: any;
// update timestamp
this._updatedAt = new Date().toISOString(); this._updatedAt = new Date().toISOString();
// perform insert or update
switch (this.creationStatus) { switch (this.creationStatus) {
case 'db': case 'db':
dbResult = await this.collection.update(self, { session: opts?.session }); dbResult = await this.collection.update(self);
break; break;
case 'new': case 'new':
dbResult = await this.collection.insert(self, { session: opts?.session }); dbResult = await this.collection.insert(self);
this.creationStatus = 'db'; this.creationStatus = 'db';
break; break;
default: default:
console.error('neither new nor in db?'); console.error('neither new nor in db?');
} }
// allow hook after saving
if (typeof (this as any).afterSave === 'function') {
await (this as any).afterSave();
}
return dbResult; return dbResult;
} }
/** /**
* deletes a document from the database (optionally within a transaction) * deletes a document from the database
*/ */
public async delete(opts?: { session?: plugins.mongodb.ClientSession }) { public async delete() {
// allow hook before deleting await this.collection.delete(this);
if (typeof (this as any).beforeDelete === 'function') {
await (this as any).beforeDelete();
}
// perform deletion
const result = await this.collection.delete(this, { session: opts?.session });
// allow hook after delete
if (typeof (this as any).afterDelete === 'function') {
await (this as any).afterDelete();
}
return result;
} }
/** /**
@ -664,12 +611,7 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
public async updateFromDb() { public async updateFromDb() {
const mongoDbNativeDoc = await this.collection.findOne(await this.createIdentifiableObject()); const mongoDbNativeDoc = await this.collection.findOne(await this.createIdentifiableObject());
for (const key of Object.keys(mongoDbNativeDoc)) { for (const key of Object.keys(mongoDbNativeDoc)) {
const rawValue = mongoDbNativeDoc[key]; this[key] = mongoDbNativeDoc[key];
const optionsMap = (this.constructor as any)._svDbOptions || {};
const opts = optionsMap[key];
this[key] = opts && typeof opts.deserialize === 'function'
? opts.deserialize(rawValue)
: rawValue;
} }
} }
@ -679,14 +621,8 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
public async createSavableObject(): Promise<TImplements> { public async createSavableObject(): Promise<TImplements> {
const saveableObject: unknown = {}; // is not exposed to outside, so any is ok here const saveableObject: unknown = {}; // is not exposed to outside, so any is ok here
const saveableProperties = [...this.globalSaveableProperties, ...this.saveableProperties]; const saveableProperties = [...this.globalSaveableProperties, ...this.saveableProperties];
// apply custom serialization if configured
const optionsMap = (this.constructor as any)._svDbOptions || {};
for (const propertyNameString of saveableProperties) { for (const propertyNameString of saveableProperties) {
const rawValue = (this as any)[propertyNameString]; saveableObject[propertyNameString] = this[propertyNameString];
const opts = optionsMap[propertyNameString];
(saveableObject as any)[propertyNameString] = opts && typeof opts.serialize === 'function'
? opts.serialize(rawValue)
: rawValue;
} }
return saveableObject as TImplements; return saveableObject as TImplements;
} }

View File

@ -1,73 +1,37 @@
import { SmartDataDbDoc } from './classes.doc.js'; import { SmartDataDbDoc } from './classes.doc.js';
import * as plugins from './plugins.js'; import * as plugins from './plugins.js';
import { EventEmitter } from 'events';
/** /**
* a wrapper for the native mongodb cursor. Exposes better * a wrapper for the native mongodb cursor. Exposes better
*/ */
/** export class SmartdataDbWatcher<T = any> {
* Wraps a MongoDB ChangeStream with RxJS and EventEmitter support.
*/
export class SmartdataDbWatcher<T = any> extends EventEmitter {
// STATIC // STATIC
public readyDeferred = plugins.smartpromise.defer(); public readyDeferred = plugins.smartpromise.defer();
// INSTANCE // INSTANCE
private changeStream: plugins.mongodb.ChangeStream<T>; private changeStream: plugins.mongodb.ChangeStream<T>;
private rawSubject: plugins.smartrx.rxjs.Subject<T>;
/** Emits change documents (or arrays of documents if buffered) */ public changeSubject = new plugins.smartrx.rxjs.Subject<T>();
public changeSubject: any;
/**
* @param changeStreamArg native MongoDB ChangeStream
* @param smartdataDbDocArg document class for instance creation
* @param opts.bufferTimeMs optional milliseconds to buffer events via RxJS
*/
constructor( constructor(
changeStreamArg: plugins.mongodb.ChangeStream<T>, changeStreamArg: plugins.mongodb.ChangeStream<T>,
smartdataDbDocArg: typeof SmartDataDbDoc, smartdataDbDocArg: typeof SmartDataDbDoc,
opts?: { bufferTimeMs?: number },
) { ) {
super();
this.rawSubject = new plugins.smartrx.rxjs.Subject<T>();
// Apply buffering if requested
if (opts && opts.bufferTimeMs) {
this.changeSubject = this.rawSubject.pipe(plugins.smartrx.rxjs.ops.bufferTime(opts.bufferTimeMs));
} else {
this.changeSubject = this.rawSubject;
}
this.changeStream = changeStreamArg; this.changeStream = changeStreamArg;
this.changeStream.on('change', async (item: any) => { this.changeStream.on('change', async (item: any) => {
let docInstance: T = null; if (!item.fullDocument) {
if (item.fullDocument) { this.changeSubject.next(null);
docInstance = smartdataDbDocArg.createInstanceFromMongoDbNativeDoc( return;
item.fullDocument
) as any as T;
} }
// Notify subscribers this.changeSubject.next(
this.rawSubject.next(docInstance); smartdataDbDocArg.createInstanceFromMongoDbNativeDoc(item.fullDocument) as any as T,
this.emit('change', docInstance); );
}); });
// Signal readiness after one tick
plugins.smartdelay.delayFor(0).then(() => { plugins.smartdelay.delayFor(0).then(() => {
this.readyDeferred.resolve(); this.readyDeferred.resolve();
}); });
} }
/** public async close() {
* Close the change stream, complete the RxJS subject, and remove listeners.
*/
public async close(): Promise<void> {
// Close MongoDB ChangeStream
await this.changeStream.close(); await this.changeStream.close();
// Complete the subject to teardown any buffering operators
this.rawSubject.complete();
// Remove all EventEmitter listeners
this.removeAllListeners();
}
/**
* Alias for close(), matching README usage
*/
public async stop(): Promise<void> {
return this.close();
} }
} }