Compare commits

..

16 Commits

Author SHA1 Message Date
a91fac450a 5.16.0
Some checks failed
Default (tags) / security (push) Successful in 38s
Default (tags) / test (push) Failing after 3m29s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2025-04-25 09:35:51 +00:00
5cb043009c feat(watcher): Enhance change stream watchers with buffering and EventEmitter support; update dependency versions 2025-04-25 09:35:51 +00:00
4a1f11b885 5.15.1
Some checks failed
Default (tags) / security (push) Successful in 38s
Default (tags) / test (push) Successful in 3m7s
Default (tags) / release (push) Failing after 50s
Default (tags) / metadata (push) Successful in 56s
2025-04-24 11:34:49 +00:00
43f9033ccc fix(cursor): Improve cursor usage documentation and refactor getCursor API to support native cursor modifiers 2025-04-24 11:34:49 +00:00
e7c0951786 5.15.0
Some checks failed
Default (tags) / security (push) Successful in 39s
Default (tags) / test (push) Successful in 3m10s
Default (tags) / release (push) Failing after 51s
Default (tags) / metadata (push) Successful in 57s
2025-04-24 11:08:19 +00:00
efc107907c feat(svDb): Enhance svDb decorator to support custom serialization and deserialization options 2025-04-24 11:08:19 +00:00
2b8b0e5bdd 5.14.1
Some checks failed
Default (tags) / security (push) Successful in 38s
Default (tags) / test (push) Successful in 3m3s
Default (tags) / release (push) Failing after 51s
Default (tags) / metadata (push) Successful in 58s
2025-04-23 17:28:49 +00:00
3ae2a7fcf5 fix(db operations): Update transaction API to consistently pass optional session parameters across database operations 2025-04-23 17:28:49 +00:00
0806d3749b 5.14.0
Some checks failed
Default (tags) / security (push) Successful in 37s
Default (tags) / test (push) Successful in 3m7s
Default (tags) / release (push) Failing after 50s
Default (tags) / metadata (push) Successful in 57s
2025-04-23 09:03:15 +00:00
f5d5e20a97 feat(doc): Implement support for beforeSave, afterSave, beforeDelete, and afterDelete lifecycle hooks in document save and delete operations to allow custom logic execution during these critical moments. 2025-04-23 09:03:15 +00:00
db2767010d 5.13.1
Some checks failed
Default (tags) / security (push) Successful in 33s
Default (tags) / test (push) Successful in 3m2s
Default (tags) / release (push) Failing after 50s
Default (tags) / metadata (push) Successful in 59s
2025-04-22 20:42:11 +00:00
e2dc094afd fix(search): Improve search query parsing for implicit AND queries by preserving quoted substrings and better handling free terms, quoted phrases, and field:value tokens. 2025-04-22 20:42:11 +00:00
39d2957b7d 5.13.0
Some checks failed
Default (tags) / security (push) Successful in 39s
Default (tags) / test (push) Successful in 3m5s
Default (tags) / release (push) Failing after 52s
Default (tags) / metadata (push) Successful in 57s
2025-04-22 20:34:23 +00:00
490524516e feat(search): Improve search query handling and update documentation 2025-04-22 20:34:23 +00:00
ccd4b9e1ec 5.12.2
Some checks failed
Default (tags) / security (push) Successful in 39s
Default (tags) / test (push) Successful in 3m7s
Default (tags) / release (push) Failing after 52s
Default (tags) / metadata (push) Successful in 1m2s
2025-04-22 20:09:21 +00:00
9c6d6d9f2c fix(search): Fix handling of quoted wildcard patterns in field-specific search queries and add tests for location-based wildcard phrase searches 2025-04-22 20:09:21 +00:00
13 changed files with 1160 additions and 282 deletions

View File

@ -1,5 +1,64 @@
# Changelog # Changelog
## 2025-04-25 - 5.16.0 - feat(watcher)
Enhance change stream watchers with buffering and EventEmitter support; update dependency versions
- Bumped smartmongo from ^2.0.11 to ^2.0.12 and smartrx from ^3.0.7 to ^3.0.10
- Upgraded @tsclass/tsclass to ^9.0.0 and mongodb to ^6.16.0
- Refactored the watch API to accept additional options (bufferTimeMs, fullDocument) for improved change stream handling
- Modified SmartdataDbWatcher to extend EventEmitter and support event notifications
## 2025-04-24 - 5.15.1 - fix(cursor)
Improve cursor usage documentation and refactor getCursor API to support native cursor modifiers
- Updated examples in readme.md to demonstrate manual iteration using cursor.next() and proper cursor closing.
- Refactored the getCursor method in classes.doc.ts to accept session and modifier options, consolidating cursor handling.
- Added new tests in test/test.cursor.ts to verify cursor operations, including limits, sorting, and skipping.
## 2025-04-24 - 5.15.0 - feat(svDb)
Enhance svDb decorator to support custom serialization and deserialization options
- Added an optional options parameter to the svDb decorator to accept serialize/deserialize functions
- Updated instance creation logic (updateFromDb) to apply custom deserialization if provided
- Updated createSavableObject to use custom serialization when available
## 2025-04-23 - 5.14.1 - fix(db operations)
Update transaction API to consistently pass optional session parameters across database operations
- Revised transaction support in readme to use startSession without await and showcased session usage in getInstance and save calls
- Updated methods in classes.collection.ts to accept an optional session parameter for findOne, getCursor, findAll, insert, update, delete, and getCount
- Enhanced SmartDataDbDoc save and delete methods to propagate session parameters
- Improved overall consistency of transactional APIs across the library
## 2025-04-23 - 5.14.0 - feat(doc)
Implement support for beforeSave, afterSave, beforeDelete, and afterDelete lifecycle hooks in document save and delete operations to allow custom logic execution during these critical moments.
- Calls beforeSave hook if defined before performing insert or update.
- Calls afterSave hook after a document is saved.
- Calls beforeDelete hook before deletion and afterDelete hook afterward.
- Ensures _updatedAt timestamp is refreshed during save operations.
## 2025-04-22 - 5.13.1 - fix(search)
Improve search query parsing for implicit AND queries by preserving quoted substrings and better handling free terms, quoted phrases, and field:value tokens.
- Replace previous implicit AND logic with tokenization that preserves quoted substrings
- Support both free term and field:value tokens with wildcards inside quotes
- Ensure errors are thrown for non-searchable fields in field-specific queries
## 2025-04-22 - 5.13.0 - feat(search)
Improve search query handling and update documentation
- Added 'codex.md' providing a high-level project overview and detailed search API documentation.
- Enhanced search parsing in SmartDataDbDoc to support combined free-term and quoted field phrase queries.
- Introduced a new fallback branch in the search method to handle free term with quoted field input.
- Updated tests in test/test.search.ts to cover new combined query scenarios and ensure robust behavior.
## 2025-04-22 - 5.12.2 - fix(search)
Fix handling of quoted wildcard patterns in field-specific search queries and add tests for location-based wildcard phrase searches
- Strip surrounding quotes from wildcard patterns in field queries to correctly transform them to regex
- Introduce new tests in test/test.search.ts to validate exact quoted and unquoted wildcard searches on a location field
## 2025-04-22 - 5.12.1 - fix(search) ## 2025-04-22 - 5.12.1 - fix(search)
Improve implicit AND logic for mixed free term and field queries in search and enhance wildcard field handling. Improve implicit AND logic for mixed free term and field queries in search and enhance wildcard field handling.

77
codex.md Normal file
View File

@ -0,0 +1,77 @@
# SmartData Project Overview
This document provides a high-level overview of the SmartData library (`@push.rocks/smartdata`), its architecture, core components, and key features—including recent enhancements to the search API.
## 1. Project Purpose
- A TypeScriptfirst wrapper around MongoDB that supplies:
- Stronglytyped document & collection classes
- Decoratorbased schema definition (no external schema files)
- Advanced search capabilities with Lucenestyle queries
- Builtin support for realtime data sync, distributed coordination, and keyvalue EasyStore
## 2. Core Concepts & Components
- **SmartDataDb**: Manages the MongoDB connection, pooling, and initialization of collections.
- **SmartDataDbDoc**: Base class for all document models; provides CRUD, upsert, and cursor APIs.
- **Decorators**:
- `@Collection`: Associates a class with a MongoDB collection
- `@svDb()`: Marks a field as persisted to the DB
- `@unI()`: Marks a field as a unique index
- `@index()`: Adds a regular index
- `@searchable()`: Marks a field for inclusion in text searches or regex queries
- **SmartdataCollection**: Wraps a MongoDB collection; autocreates indexes based on decorators.
- **Lucene Adapter**: Parses a Lucene query string into an AST and transforms it to a MongoDB filter object.
- **EasyStore**: A simple, schemaless keyvalue store built on top of MongoDB for sharing ephemeral data.
- **Distributed Coordinator**: Leader election and taskdistribution API for building resilient, multiinstance systems.
- **Watcher**: Listens to change streams for realtime updates and integrates with RxJS.
## 3. Search API
SmartData provides a unified `.search(query[, opts])` method on all models with `@searchable()` fields:
- **Supported Syntax**:
1. Exact field:value (e.g. `field:Value`)
2. Quoted phrases (e.g. `"exact phrase"` or `'exact phrase'`)
3. Wildcards: `*` (zero or more chars) and `?` (single char)
4. Boolean operators: `AND`, `OR`, `NOT`
5. Grouping: parenthesis `(A OR B) AND C`
6. Range queries: `[num TO num]`, `{num TO num}`
7. Multiterm unquoted: terms ANDd across all searchable fields
8. Empty query returns all documents
- **Fallback Mechanisms**:
1. Text index based `$text` search (if supported)
2. Fieldscoped and multifield regex queries
3. Inmemory filtering for complex or unsupported cases
### New Security & Extensibility Hooks
The `.search(query, opts?)` signature now accepts a `SearchOptions<T>` object:
```ts
interface SearchOptions<T> {
filter?: Record<string, any>; // Additional MongoDB filter ANDmerged
validate?: (doc: T) => boolean; // Postfetch hook to drop results
}
```
- **filter**: Enforces mandatory constraints (e.g. multitenant isolation) directly in the Mongo query.
- **validate**: An async function that runs after fetching; return `false` to exclude a document.
## 4. Testing Strategy
- Unit tests in `test/test.search.ts` cover basic search functionality and new options:
- Exact, wildcard, phrase, boolean and grouping cases
- Implicit AND and mixed freeterm + field searches
- Edge cases (nonsearchable fields, quoted wildcards, no matches)
- `filter` and `validate` tests ensure security hooks work as intended
- Advanced search scenarios are covered in `test/test.search.advanced.ts`.
## 5. Usage Example
```ts
// Basic search
const prods = await Product.search('wireless earbuds');
// Scoped search (only your organizations items)
const myItems = await Product.search('book', { filter: { ownerId } });
// Postsearch validation (only cheap items)
const cheapItems = await Product.search('', { validate: p => p.price < 50 });
```
---
Last updated: 2025-04-22

View File

@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartdata", "name": "@push.rocks/smartdata",
"version": "5.12.1", "version": "5.16.0",
"private": false, "private": false,
"description": "An advanced library for NoSQL data organization and manipulation using TypeScript with support for MongoDB, data validation, collections, and custom data types.", "description": "An advanced library for NoSQL data organization and manipulation using TypeScript with support for MongoDB, data validation, collections, and custom data types.",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",
@ -26,23 +26,23 @@
"@push.rocks/lik": "^6.0.14", "@push.rocks/lik": "^6.0.14",
"@push.rocks/smartdelay": "^3.0.1", "@push.rocks/smartdelay": "^3.0.1",
"@push.rocks/smartlog": "^3.0.2", "@push.rocks/smartlog": "^3.0.2",
"@push.rocks/smartmongo": "^2.0.11", "@push.rocks/smartmongo": "^2.0.12",
"@push.rocks/smartpromise": "^4.0.2", "@push.rocks/smartpromise": "^4.0.2",
"@push.rocks/smartrx": "^3.0.7", "@push.rocks/smartrx": "^3.0.10",
"@push.rocks/smartstring": "^4.0.15", "@push.rocks/smartstring": "^4.0.15",
"@push.rocks/smarttime": "^4.0.6", "@push.rocks/smarttime": "^4.0.6",
"@push.rocks/smartunique": "^3.0.8", "@push.rocks/smartunique": "^3.0.8",
"@push.rocks/taskbuffer": "^3.1.7", "@push.rocks/taskbuffer": "^3.1.7",
"@tsclass/tsclass": "^8.2.0", "@tsclass/tsclass": "^9.0.0",
"mongodb": "^6.15.0" "mongodb": "^6.16.0"
}, },
"devDependencies": { "devDependencies": {
"@git.zone/tsbuild": "^2.3.2", "@git.zone/tsbuild": "^2.3.2",
"@git.zone/tsrun": "^1.2.44", "@git.zone/tsrun": "^1.2.44",
"@git.zone/tstest": "^1.0.77", "@git.zone/tstest": "^1.0.77",
"@push.rocks/qenv": "^6.0.5", "@push.rocks/qenv": "^6.0.5",
"@push.rocks/tapbundle": "^5.6.2", "@push.rocks/tapbundle": "^5.6.3",
"@types/node": "^22.14.0" "@types/node": "^22.15.2"
}, },
"files": [ "files": [
"ts/**/*", "ts/**/*",

647
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@ -133,31 +133,34 @@ const user = await User.getInstance({ username: 'myUsername' });
// Fetch multiple users that match criteria // Fetch multiple users that match criteria
const users = await User.getInstances({ email: 'myEmail@example.com' }); const users = await User.getInstances({ email: 'myEmail@example.com' });
// Using a cursor for large collections // Obtain a cursor for large result sets
const cursor = await User.getCursor({ active: true }); const cursor = await User.getCursor({ active: true });
// Process documents one at a time (memory efficient) // Stream each document efficiently
await cursor.forEach(async (user, index) => { await cursor.forEach(async (user) => {
// Process each user with its position console.log(`Processing user: ${user.username}`);
console.log(`Processing user ${index}: ${user.username}`);
}); });
// Chain cursor methods like in the MongoDB native driver // Manually iterate using next()
const paginatedCursor = await User.getCursor({ active: true }) let nextUser;
.limit(10) // Limit results while ((nextUser = await cursor.next())) {
.skip(20) // Skip first 20 results console.log(`Next user: ${nextUser.username}`);
.sort({ createdAt: -1 }); // Sort by creation date descending }
// Convert cursor to array (when you know the result set is small) // Convert to array when the result set is small
const userArray = await paginatedCursor.toArray(); const userArray = await cursor.toArray();
// Other cursor operations // Close the cursor to free resources
const nextUser = await cursor.next(); // Get the next document
const hasMoreUsers = await cursor.hasNext(); // Check if more documents exist
const count = await cursor.count(); // Get the count of documents in the cursor
// Always close cursors when done with them
await cursor.close(); await cursor.close();
// For native cursor modifiers (sort, skip, limit), use getCursor with modifier option:
const paginatedCursor = await User.getCursor(
{ active: true },
{ modifier: (c) => c.sort({ createdAt: -1 }).skip(20).limit(10) }
);
await paginatedCursor.forEach((user) => {
console.log(`Paginated user: ${user.username}`);
});
``` ```
#### Update #### Update
@ -409,19 +412,23 @@ class Product extends SmartDataDbDoc<Product, Product> {
### Transaction Support ### Transaction Support
Use MongoDB transactions for atomic operations: Use MongoDB transactions for atomic operations. SmartData now exposes `startSession()` and accepts an optional session in all fetch and write APIs:
```typescript ```typescript
const session = await db.startSession(); // start a client session (no await)
const session = db.startSession();
try { try {
// wrap operations in a transaction
await session.withTransaction(async () => { await session.withTransaction(async () => {
const user = await User.getInstance({ id: 'user-id' }, { session }); // pass session as second arg to getInstance
const user = await User.getInstance({ id: 'user-id' }, session);
user.balance -= 100; user.balance -= 100;
// pass session in save opts
await user.save({ session }); await user.save({ session });
const recipient = await User.getInstance({ id: 'recipient-id' }, { session }); const recipient = await User.getInstance({ id: 'recipient-id' }, session);
recipient.balance += 100; recipient.balance += 100;
await user.save({ session }); await recipient.save({ session });
}); });
} finally { } finally {
await session.endSession(); await session.endSession();
@ -518,6 +525,11 @@ class Order extends SmartDataDbDoc<Order, Order> {
throw new Error('Order cannot be deleted'); throw new Error('Order cannot be deleted');
} }
} }
// Called after deleting the document
async afterDelete() {
// Cleanup or audit actions
await auditLogDeletion(this.id);
}
} }
``` ```

97
test/test.cursor.ts Normal file
View File

@ -0,0 +1,97 @@
import { tap, expect } from '@push.rocks/tapbundle';
import * as smartmongo from '@push.rocks/smartmongo';
import { smartunique } from '../ts/plugins.js';
import * as smartdata from '../ts/index.js';
// Set up database connection
let smartmongoInstance: smartmongo.SmartMongo;
let testDb: smartdata.SmartdataDb;
// Define a simple document model for cursor tests
@smartdata.Collection(() => testDb)
class CursorTest extends smartdata.SmartDataDbDoc<CursorTest, CursorTest> {
@smartdata.unI()
public id: string = smartunique.shortId();
@smartdata.svDb()
public name: string;
@smartdata.svDb()
public order: number;
constructor(name: string, order: number) {
super();
this.name = name;
this.order = order;
}
}
// Initialize the in-memory MongoDB and SmartdataDB
tap.test('cursor init: start Mongo and SmartdataDb', async () => {
smartmongoInstance = await smartmongo.SmartMongo.createAndStart();
testDb = new smartdata.SmartdataDb(
await smartmongoInstance.getMongoDescriptor(),
);
await testDb.init();
});
// Insert sample documents
tap.test('cursor insert: save 5 test documents', async () => {
for (let i = 1; i <= 5; i++) {
const doc = new CursorTest(`item${i}`, i);
await doc.save();
}
const count = await CursorTest.getCount({});
expect(count).toEqual(5);
});
// Test that toArray returns all documents
tap.test('cursor toArray: retrieves all documents', async () => {
const cursor = await CursorTest.getCursor({});
const all = await cursor.toArray();
expect(all.length).toEqual(5);
});
// Test iteration via forEach
tap.test('cursor forEach: iterates through all documents', async () => {
const names: string[] = [];
const cursor = await CursorTest.getCursor({});
await cursor.forEach(async (item) => {
names.push(item.name);
});
expect(names.length).toEqual(5);
expect(names).toContain('item3');
});
// Test native cursor modifiers: limit
tap.test('cursor modifier limit: only two documents', async () => {
const cursor = await CursorTest.getCursor({}, { modifier: (c) => c.limit(2) });
const limited = await cursor.toArray();
expect(limited.length).toEqual(2);
});
// Test native cursor modifiers: sort and skip
tap.test('cursor modifier sort & skip: returns correct order', async () => {
const cursor = await CursorTest.getCursor({}, {
modifier: (c) => c.sort({ order: -1 }).skip(1),
});
const results = await cursor.toArray();
// Skipped the first (order 5), next should be 4,3,2,1
expect(results.length).toEqual(4);
expect(results[0].order).toEqual(4);
});
// Cleanup: drop database, close connections, stop Mongo
tap.test('cursor cleanup: drop DB and stop', async () => {
await testDb.mongoDb.dropDatabase();
await testDb.close();
if (smartmongoInstance) {
await smartmongoInstance.stopAndDumpToDir(
`.nogit/dbdump/test.cursor.ts`,
);
}
// Ensure process exits after cleanup
setTimeout(() => process.exit(), 2000);
});
export default tap.start();

View File

@ -9,6 +9,8 @@ import { searchable } from '../ts/classes.doc.js';
// Set up database connection // Set up database connection
let smartmongoInstance: smartmongo.SmartMongo; let smartmongoInstance: smartmongo.SmartMongo;
let testDb: smartdata.SmartdataDb; let testDb: smartdata.SmartdataDb;
// Class for location-based wildcard/phrase tests
let LocationDoc: any;
// Define a test class with searchable fields using the standard SmartDataDbDoc // Define a test class with searchable fields using the standard SmartDataDbDoc
@smartdata.Collection(() => testDb) @smartdata.Collection(() => testDb)
@ -290,6 +292,76 @@ tap.test('should apply validate hook to post-filter results', async () => {
expensive.forEach((p) => expect(p.price).toBeGreaterThan(500)); expensive.forEach((p) => expect(p.price).toBeGreaterThan(500));
}); });
// Tests for quoted and wildcard field-specific phrases
tap.test('setup location test products', async () => {
@smartdata.Collection(() => testDb)
class LD extends smartdata.SmartDataDbDoc<LD, LD> {
@smartdata.unI() public id: string = smartunique.shortId();
@smartdata.svDb() @searchable() public location: string;
constructor(loc: string) { super(); this.location = loc; }
}
// Assign to outer variable for subsequent tests
LocationDoc = LD;
const locations = ['Berlin', 'Frankfurt am Main', 'Frankfurt am Oder', 'London'];
for (const loc of locations) {
await new LocationDoc(loc).save();
}
});
tap.test('should search exact quoted field phrase', async () => {
const results = await (LocationDoc as any).search('location:"Frankfurt am Main"');
expect(results.length).toEqual(1);
expect(results[0].location).toEqual('Frankfurt am Main');
});
tap.test('should search wildcard quoted field phrase', async () => {
const results = await (LocationDoc as any).search('location:"Frankfurt am *"');
const names = results.map((d: any) => d.location).sort();
expect(names).toEqual(['Frankfurt am Main', 'Frankfurt am Oder']);
});
tap.test('should search unquoted wildcard field', async () => {
const results = await (LocationDoc as any).search('location:Frankfurt*');
const names = results.map((d: any) => d.location).sort();
expect(names).toEqual(['Frankfurt am Main', 'Frankfurt am Oder']);
});
// Combined free-term + field phrase/wildcard tests
let CombinedDoc: any;
tap.test('setup combined docs for free-term and location tests', async () => {
@smartdata.Collection(() => testDb)
class CD extends smartdata.SmartDataDbDoc<CD, CD> {
@smartdata.unI() public id: string = smartunique.shortId();
@smartdata.svDb() @searchable() public name: string;
@smartdata.svDb() @searchable() public location: string;
constructor(name: string, location: string) { super(); this.name = name; this.location = location; }
}
CombinedDoc = CD;
const docs = [
new CombinedDoc('TypeScript', 'Berlin'),
new CombinedDoc('TypeScript', 'Frankfurt am Main'),
new CombinedDoc('TypeScript', 'Frankfurt am Oder'),
new CombinedDoc('JavaScript', 'Berlin'),
];
for (const d of docs) await d.save();
});
tap.test('should search free term and exact quoted field phrase', async () => {
const res = await CombinedDoc.search('TypeScript location:"Berlin"');
expect(res.length).toEqual(1);
expect(res[0].location).toEqual('Berlin');
});
tap.test('should not match free term with non-matching quoted field phrase', async () => {
const res = await CombinedDoc.search('TypeScript location:"Frankfurt d"');
expect(res.length).toEqual(0);
});
tap.test('should search free term with quoted wildcard field phrase', async () => {
const res = await CombinedDoc.search('TypeScript location:"Frankfurt am *"');
const locs = res.map((r: any) => r.location).sort();
expect(locs).toEqual(['Frankfurt am Main', 'Frankfurt am Oder']);
});
// Quoted exact field phrase without wildcard should return no matches if no exact match
tap.test('should not match location:"Frankfurt d"', async () => {
const results = await (LocationDoc as any).search('location:"Frankfurt d"');
expect(results.length).toEqual(0);
});
// Combined free-term and field wildcard tests // Combined free-term and field wildcard tests
tap.test('should combine free term and wildcard field search', async () => { tap.test('should combine free term and wildcard field search', async () => {
const results = await Product.search('book category:Book*'); const results = await Product.search('book category:Book*');

View File

@ -60,6 +60,43 @@ tap.test('should watch a collection', async (toolsArg) => {
await done.promise; await done.promise;
}); });
// ======= New tests for EventEmitter and buffering support =======
tap.test('should emit change via EventEmitter', async (tools) => {
const done = tools.defer();
const watcher = await House.watch({});
watcher.on('change', async (houseArg) => {
// Expect a House instance
expect(houseArg).toBeDefined();
// Clean up
await watcher.stop();
done.resolve();
});
// Trigger an insert to generate a change event
const h = new House();
await h.save();
await done.promise;
});
tap.test('should buffer change events when bufferTimeMs is set', async (tools) => {
const done = tools.defer();
// bufferTimeMs collects events into arrays every 50ms
const watcher = await House.watch({}, { bufferTimeMs: 50 });
let received: House[];
watcher.changeSubject.subscribe(async (batch: House[]) => {
if (batch && batch.length > 0) {
received = batch;
await watcher.stop();
done.resolve();
}
});
// Rapidly insert multiple docs
const docs = [new House(), new House(), new House()];
for (const doc of docs) await doc.save();
await done.promise;
// All inserts should be in one buffered batch
expect(received.length).toEqual(docs.length);
});
// ======================================= // =======================================
// close the database connection // close the database connection
// ======================================= // =======================================

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartdata', name: '@push.rocks/smartdata',
version: '5.12.1', version: '5.16.0',
description: 'An advanced library for NoSQL data organization and manipulation using TypeScript with support for MongoDB, data validation, collections, and custom data types.' description: 'An advanced library for NoSQL data organization and manipulation using TypeScript with support for MongoDB, data validation, collections, and custom data types.'
} }

View File

@ -222,53 +222,74 @@ export class SmartdataCollection<T> {
/** /**
* finds an object in the DbCollection * finds an object in the DbCollection
*/ */
public async findOne(filterObject: any): Promise<any> { public async findOne(
filterObject: any,
opts?: { session?: plugins.mongodb.ClientSession }
): Promise<any> {
await this.init(); await this.init();
const cursor = this.mongoDbCollection.find(filterObject); // Use MongoDB driver's findOne with optional session
const result = await cursor.next(); return this.mongoDbCollection.findOne(filterObject, { session: opts?.session });
cursor.close();
return result;
} }
public async getCursor( public async getCursor(
filterObjectArg: any, filterObjectArg: any,
dbDocArg: typeof SmartDataDbDoc, dbDocArg: typeof SmartDataDbDoc,
opts?: { session?: plugins.mongodb.ClientSession }
): Promise<SmartdataDbCursor<any>> { ): Promise<SmartdataDbCursor<any>> {
await this.init(); await this.init();
const cursor = this.mongoDbCollection.find(filterObjectArg); const cursor = this.mongoDbCollection.find(filterObjectArg, { session: opts?.session });
return new SmartdataDbCursor(cursor, dbDocArg); return new SmartdataDbCursor(cursor, dbDocArg);
} }
/** /**
* finds an object in the DbCollection * finds an object in the DbCollection
*/ */
public async findAll(filterObject: any): Promise<any[]> { public async findAll(
filterObject: any,
opts?: { session?: plugins.mongodb.ClientSession }
): Promise<any[]> {
await this.init(); await this.init();
const cursor = this.mongoDbCollection.find(filterObject); const cursor = this.mongoDbCollection.find(filterObject, { session: opts?.session });
const result = await cursor.toArray(); const result = await cursor.toArray();
cursor.close(); cursor.close();
return result; return result;
} }
/** /**
* watches the collection while applying a filter * Watches the collection, returning a SmartdataDbWatcher with RxJS and EventEmitter support.
* @param filterObject match filter for change stream
* @param opts optional MongoDB ChangeStreamOptions & { bufferTimeMs } to buffer events
* @param smartdataDbDocArg document class for instance creation
*/ */
public async watch( public async watch(
filterObject: any, filterObject: any,
smartdataDbDocArg: typeof SmartDataDbDoc, opts: (plugins.mongodb.ChangeStreamOptions & { bufferTimeMs?: number }) = {},
smartdataDbDocArg?: typeof SmartDataDbDoc,
): Promise<SmartdataDbWatcher> { ): Promise<SmartdataDbWatcher> {
await this.init(); await this.init();
// Extract bufferTimeMs from options
const { bufferTimeMs, fullDocument, ...otherOptions } = opts || {};
// Determine fullDocument behavior: default to 'updateLookup'
const changeStreamOptions: plugins.mongodb.ChangeStreamOptions = {
...otherOptions,
fullDocument:
fullDocument === undefined
? 'updateLookup'
: fullDocument === true
? 'updateLookup'
: fullDocument,
} as any;
// Build pipeline with match if provided
const pipeline = filterObject ? [{ $match: filterObject }] : [];
const changeStream = this.mongoDbCollection.watch( const changeStream = this.mongoDbCollection.watch(
[ pipeline,
{ changeStreamOptions,
$match: filterObject, );
}, const smartdataWatcher = new SmartdataDbWatcher(
], changeStream,
{ smartdataDbDocArg,
fullDocument: 'updateLookup', { bufferTimeMs },
},
); );
const smartdataWatcher = new SmartdataDbWatcher(changeStream, smartdataDbDocArg);
await smartdataWatcher.readyDeferred.promise; await smartdataWatcher.readyDeferred.promise;
return smartdataWatcher; return smartdataWatcher;
} }
@ -276,7 +297,10 @@ export class SmartdataCollection<T> {
/** /**
* create an object in the database * create an object in the database
*/ */
public async insert(dbDocArg: T & SmartDataDbDoc<T, unknown>): Promise<any> { public async insert(
dbDocArg: T & SmartDataDbDoc<T, unknown>,
opts?: { session?: plugins.mongodb.ClientSession }
): Promise<any> {
await this.init(); await this.init();
await this.checkDoc(dbDocArg); await this.checkDoc(dbDocArg);
this.markUniqueIndexes(dbDocArg.uniqueIndexes); this.markUniqueIndexes(dbDocArg.uniqueIndexes);
@ -287,14 +311,17 @@ export class SmartdataCollection<T> {
} }
const saveableObject = await dbDocArg.createSavableObject(); const saveableObject = await dbDocArg.createSavableObject();
const result = await this.mongoDbCollection.insertOne(saveableObject); const result = await this.mongoDbCollection.insertOne(saveableObject, { session: opts?.session });
return result; return result;
} }
/** /**
* inserts object into the DbCollection * inserts object into the DbCollection
*/ */
public async update(dbDocArg: T & SmartDataDbDoc<T, unknown>): Promise<any> { public async update(
dbDocArg: T & SmartDataDbDoc<T, unknown>,
opts?: { session?: plugins.mongodb.ClientSession }
): Promise<any> {
await this.init(); await this.init();
await this.checkDoc(dbDocArg); await this.checkDoc(dbDocArg);
const identifiableObject = await dbDocArg.createIdentifiableObject(); const identifiableObject = await dbDocArg.createIdentifiableObject();
@ -309,21 +336,27 @@ export class SmartdataCollection<T> {
const result = await this.mongoDbCollection.updateOne( const result = await this.mongoDbCollection.updateOne(
identifiableObject, identifiableObject,
{ $set: updateableObject }, { $set: updateableObject },
{ upsert: true }, { upsert: true, session: opts?.session },
); );
return result; return result;
} }
public async delete(dbDocArg: T & SmartDataDbDoc<T, unknown>): Promise<any> { public async delete(
dbDocArg: T & SmartDataDbDoc<T, unknown>,
opts?: { session?: plugins.mongodb.ClientSession }
): Promise<any> {
await this.init(); await this.init();
await this.checkDoc(dbDocArg); await this.checkDoc(dbDocArg);
const identifiableObject = await dbDocArg.createIdentifiableObject(); const identifiableObject = await dbDocArg.createIdentifiableObject();
await this.mongoDbCollection.deleteOne(identifiableObject); await this.mongoDbCollection.deleteOne(identifiableObject, { session: opts?.session });
} }
public async getCount(filterObject: any) { public async getCount(
filterObject: any,
opts?: { session?: plugins.mongodb.ClientSession }
) {
await this.init(); await this.init();
return this.mongoDbCollection.countDocuments(filterObject); return this.mongoDbCollection.countDocuments(filterObject, { session: opts?.session });
} }
/** /**

View File

@ -63,6 +63,12 @@ export class SmartdataDb {
this.status = 'disconnected'; this.status = 'disconnected';
logger.log('info', `disconnected from database ${this.smartdataOptions.mongoDbName}`); logger.log('info', `disconnected from database ${this.smartdataOptions.mongoDbName}`);
} }
/**
* Start a MongoDB client session for transactions
*/
public startSession(): plugins.mongodb.ClientSession {
return this.mongoDbClient.startSession();
}
// handle table to class distribution // handle table to class distribution

View File

@ -11,8 +11,18 @@ import { SmartdataLuceneAdapter } from './classes.lucene.adapter.js';
* - validate: post-fetch validator, return true to keep a doc * - validate: post-fetch validator, return true to keep a doc
*/ */
export interface SearchOptions<T> { export interface SearchOptions<T> {
/**
* Additional MongoDB filter to ANDmerge into the query
*/
filter?: Record<string, any>; filter?: Record<string, any>;
/**
* Postfetch validator; return true to keep each doc
*/
validate?: (doc: T) => Promise<boolean> | boolean; validate?: (doc: T) => Promise<boolean> | boolean;
/**
* Optional MongoDB session for transactional operations
*/
session?: plugins.mongodb.ClientSession;
} }
export type TDocCreation = 'db' | 'new' | 'mixed'; export type TDocCreation = 'db' | 'new' | 'mixed';
@ -29,16 +39,34 @@ export function globalSvDb() {
}; };
} }
/**
* Options for custom serialization/deserialization of a field.
*/
export interface SvDbOptions {
/** Function to serialize the field value before saving to DB */
serialize?: (value: any) => any;
/** Function to deserialize the field value after reading from DB */
deserialize?: (value: any) => any;
}
/** /**
* saveable - saveable decorator to be used on class properties * saveable - saveable decorator to be used on class properties
*/ */
export function svDb() { export function svDb(options?: SvDbOptions) {
return (target: SmartDataDbDoc<unknown, unknown>, key: string) => { return (target: SmartDataDbDoc<unknown, unknown>, key: string) => {
console.log(`called svDb() on >${target.constructor.name}.${key}<`); console.log(`called svDb() on >${target.constructor.name}.${key}<`);
if (!target.saveableProperties) { if (!target.saveableProperties) {
target.saveableProperties = []; target.saveableProperties = [];
} }
target.saveableProperties.push(key); target.saveableProperties.push(key);
// attach custom serializer/deserializer options to the class constructor
const ctor = target.constructor as any;
if (!ctor._svDbOptions) {
ctor._svDbOptions = {};
}
if (options) {
ctor._svDbOptions[key] = options;
}
}; };
} }
@ -179,7 +207,12 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
const newInstance = new this(); const newInstance = new this();
(newInstance as any).creationStatus = 'db'; (newInstance as any).creationStatus = 'db';
for (const key of Object.keys(mongoDbNativeDocArg)) { for (const key of Object.keys(mongoDbNativeDocArg)) {
newInstance[key] = mongoDbNativeDocArg[key]; const rawValue = mongoDbNativeDocArg[key];
const optionsMap = (this as any)._svDbOptions || {};
const opts = optionsMap[key];
newInstance[key] = opts && typeof opts.deserialize === 'function'
? opts.deserialize(rawValue)
: rawValue;
} }
return newInstance; return newInstance;
} }
@ -193,8 +226,13 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
public static async getInstances<T>( public static async getInstances<T>(
this: plugins.tsclass.typeFest.Class<T>, this: plugins.tsclass.typeFest.Class<T>,
filterArg: plugins.tsclass.typeFest.PartialDeep<T>, filterArg: plugins.tsclass.typeFest.PartialDeep<T>,
opts?: { session?: plugins.mongodb.ClientSession }
): Promise<T[]> { ): Promise<T[]> {
const foundDocs = await (this as any).collection.findAll(convertFilterForMongoDb(filterArg)); // Pass session through to findAll for transactional queries
const foundDocs = await (this as any).collection.findAll(
convertFilterForMongoDb(filterArg),
{ session: opts?.session },
);
const returnArray = []; const returnArray = [];
for (const foundDoc of foundDocs) { for (const foundDoc of foundDocs) {
const newInstance: T = (this as any).createInstanceFromMongoDbNativeDoc(foundDoc); const newInstance: T = (this as any).createInstanceFromMongoDbNativeDoc(foundDoc);
@ -212,8 +250,13 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
public static async getInstance<T>( public static async getInstance<T>(
this: plugins.tsclass.typeFest.Class<T>, this: plugins.tsclass.typeFest.Class<T>,
filterArg: plugins.tsclass.typeFest.PartialDeep<T>, filterArg: plugins.tsclass.typeFest.PartialDeep<T>,
opts?: { session?: plugins.mongodb.ClientSession }
): Promise<T> { ): Promise<T> {
const foundDoc = await (this as any).collection.findOne(convertFilterForMongoDb(filterArg)); // Retrieve one document, with optional session for transactions
const foundDoc = await (this as any).collection.findOne(
convertFilterForMongoDb(filterArg),
{ session: opts?.session },
);
if (foundDoc) { if (foundDoc) {
const newInstance: T = (this as any).createInstanceFromMongoDbNativeDoc(foundDoc); const newInstance: T = (this as any).createInstanceFromMongoDbNativeDoc(foundDoc);
return newInstance; return newInstance;
@ -233,33 +276,27 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
} }
/** /**
* get cursor * Get a cursor for streaming results, with optional session and native cursor modifiers.
* @returns * @param filterArg Partial filter to apply
* @param opts Optional session and modifier for the raw MongoDB cursor
*/ */
public static async getCursor<T>( public static async getCursor<T>(
this: plugins.tsclass.typeFest.Class<T>, this: plugins.tsclass.typeFest.Class<T>,
filterArg: plugins.tsclass.typeFest.PartialDeep<T>, filterArg: plugins.tsclass.typeFest.PartialDeep<T>,
) { opts?: {
const collection: SmartdataCollection<T> = (this as any).collection; session?: plugins.mongodb.ClientSession;
const cursor: SmartdataDbCursor<T> = await collection.getCursor( modifier?: (cursorArg: plugins.mongodb.FindCursor<plugins.mongodb.WithId<plugins.mongodb.BSON.Document>>) => plugins.mongodb.FindCursor<plugins.mongodb.WithId<plugins.mongodb.BSON.Document>>;
convertFilterForMongoDb(filterArg), }
this as any as typeof SmartDataDbDoc,
);
return cursor;
}
public static async getCursorExtended<T>(
this: plugins.tsclass.typeFest.Class<T>,
filterArg: plugins.tsclass.typeFest.PartialDeep<T>,
modifierFunction = (cursorArg: plugins.mongodb.FindCursor<plugins.mongodb.WithId<plugins.mongodb.BSON.Document>>) => cursorArg,
): Promise<SmartdataDbCursor<T>> { ): Promise<SmartdataDbCursor<T>> {
const collection: SmartdataCollection<T> = (this as any).collection; const collection: SmartdataCollection<T> = (this as any).collection;
const { session, modifier } = opts || {};
await collection.init(); await collection.init();
let cursor: plugins.mongodb.FindCursor<any> = collection.mongoDbCollection.find( let rawCursor: plugins.mongodb.FindCursor<any> =
convertFilterForMongoDb(filterArg), collection.mongoDbCollection.find(convertFilterForMongoDb(filterArg), { session });
); if (modifier) {
cursor = modifierFunction(cursor); rawCursor = modifier(rawCursor);
return new SmartdataDbCursor<T>(cursor, this as any as typeof SmartDataDbDoc); }
return new SmartdataDbCursor<T>(rawCursor, this as any as typeof SmartDataDbDoc);
} }
/** /**
@ -268,13 +305,20 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
* @param filterArg * @param filterArg
* @param forEachFunction * @param forEachFunction
*/ */
/**
* Watch the collection for changes, with optional buffering and change stream options.
* @param filterArg MongoDB filter to select which changes to observe
* @param opts optional ChangeStreamOptions plus bufferTimeMs
*/
public static async watch<T>( public static async watch<T>(
this: plugins.tsclass.typeFest.Class<T>, this: plugins.tsclass.typeFest.Class<T>,
filterArg: plugins.tsclass.typeFest.PartialDeep<T>, filterArg: plugins.tsclass.typeFest.PartialDeep<T>,
) { opts?: plugins.mongodb.ChangeStreamOptions & { bufferTimeMs?: number },
): Promise<SmartdataDbWatcher<T>> {
const collection: SmartdataCollection<T> = (this as any).collection; const collection: SmartdataCollection<T> = (this as any).collection;
const watcher: SmartdataDbWatcher<T> = await collection.watch( const watcher: SmartdataDbWatcher<T> = await collection.watch(
convertFilterForMongoDb(filterArg), convertFilterForMongoDb(filterArg),
opts || {},
this as any, this as any,
); );
return watcher; return watcher;
@ -339,7 +383,9 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
if (opts?.filter) { if (opts?.filter) {
mongoFilter = { $and: [mongoFilter, opts.filter] }; mongoFilter = { $and: [mongoFilter, opts.filter] };
} }
let docs: T[] = await (this as any).getInstances(mongoFilter); // Fetch with optional session for transactions
// Fetch within optional session
let docs: T[] = await (this as any).getInstances(mongoFilter, { session: opts?.session });
if (opts?.validate) { if (opts?.validate) {
const out: T[] = []; const out: T[] = [];
for (const d of docs) { for (const d of docs) {
@ -397,7 +443,12 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
const wildcardField = q.match(/^(\w+):(.+[*?].*)$/); const wildcardField = q.match(/^(\w+):(.+[*?].*)$/);
if (wildcardField) { if (wildcardField) {
const field = wildcardField[1]; const field = wildcardField[1];
const pattern = wildcardField[2]; // Support quoted wildcard patterns: strip surrounding quotes
let pattern = wildcardField[2];
if ((pattern.startsWith('"') && pattern.endsWith('"')) ||
(pattern.startsWith("'") && pattern.endsWith("'"))) {
pattern = pattern.slice(1, -1);
}
if (!searchableFields.includes(field)) { if (!searchableFields.includes(field)) {
throw new Error(`Field '${field}' is not searchable for class ${this.name}`); throw new Error(`Field '${field}' is not searchable for class ${this.name}`);
} }
@ -414,37 +465,52 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
const orConds = searchableFields.map((f) => ({ [f]: { $regex: pattern, $options: 'i' } })); const orConds = searchableFields.map((f) => ({ [f]: { $regex: pattern, $options: 'i' } }));
return await (this as any).execQuery({ $or: orConds }, opts); return await (this as any).execQuery({ $or: orConds }, opts);
} }
// implicit AND: combine free terms and field:value terms (with or without wildcards) // implicit AND for multiple tokens: free terms, quoted phrases, and field:values
const parts = q.split(/\s+/); {
const hasColon = parts.some((t) => t.includes(':')); // Split query into tokens, preserving quoted substrings
if ( const rawTokens = q.match(/(?:[^\s"']+|"[^"]*"|'[^']*')+/g) || [];
parts.length > 1 && hasColon && // Only apply when more than one token and no boolean operators or grouping
!q.includes(' AND ') && !q.includes(' OR ') && !q.includes(' NOT ') && if (
!q.includes('(') && !q.includes(')') && rawTokens.length > 1 &&
!q.includes('[') && !q.includes(']') !/(\bAND\b|\bOR\b|\bNOT\b|\(|\))/i.test(q) &&
) { !/\[|\]/.test(q)
const andConds = parts.map((term) => { ) {
const m = term.match(/^(\w+):(.+)$/); const andConds: any[] = [];
if (m) { for (let token of rawTokens) {
const field = m[1]; // field:value token
const value = m[2]; const fv = token.match(/^(\w+):(.+)$/);
if (!searchableFields.includes(field)) { if (fv) {
throw new Error(`Field '${field}' is not searchable for class ${this.name}`); const field = fv[1];
let value = fv[2];
if (!searchableFields.includes(field)) {
throw new Error(`Field '${field}' is not searchable for class ${this.name}`);
}
// Strip surrounding quotes if present
if ((value.startsWith('"') && value.endsWith('"')) || (value.startsWith("'") && value.endsWith("'"))) {
value = value.slice(1, -1);
}
// Wildcard search?
if (value.includes('*') || value.includes('?')) {
const escaped = value.replace(/([.+^${}()|[\\]\\])/g, '\\$1');
const pattern = escaped.replace(/\*/g, '.*').replace(/\?/g, '.');
andConds.push({ [field]: { $regex: pattern, $options: 'i' } });
} else {
andConds.push({ [field]: value });
}
} else if ((token.startsWith('"') && token.endsWith('"')) || (token.startsWith("'") && token.endsWith("'"))) {
// Quoted free phrase across all fields
const phrase = token.slice(1, -1);
const parts = phrase.split(/\s+/).map((t) => escapeForRegex(t));
const pattern = parts.join('\\s+');
andConds.push({ $or: searchableFields.map((f) => ({ [f]: { $regex: pattern, $options: 'i' } })) });
} else {
// Free term across all fields
const esc = escapeForRegex(token);
andConds.push({ $or: searchableFields.map((f) => ({ [f]: { $regex: esc, $options: 'i' } })) });
} }
if (value.includes('*') || value.includes('?')) {
// wildcard field search
const escaped = value.replace(/([.+^${}()|[\\]\\])/g, '\\$1');
const pattern = escaped.replace(/\*/g, '.*').replace(/\?/g, '.');
return { [field]: { $regex: pattern, $options: 'i' } };
}
// exact field:value
return { [field]: value };
} }
// free term -> regex across all searchable fields return await (this as any).execQuery({ $and: andConds }, opts);
const esc = escapeForRegex(term); }
return { $or: searchableFields.map((f) => ({ [f]: { $regex: esc, $options: 'i' } })) };
});
return await (this as any).execQuery({ $and: andConds }, opts);
} }
// detect advanced Lucene syntax: field:value, wildcards, boolean, grouping // detect advanced Lucene syntax: field:value, wildcards, boolean, grouping
const luceneSyntax = /(\w+:[^\s]+)|\*|\?|\bAND\b|\bOR\b|\bNOT\b|\(|\)/; const luceneSyntax = /(\w+:[^\s]+)|\*|\?|\bAND\b|\bOR\b|\bNOT\b|\(|\)/;
@ -526,35 +592,52 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
constructor() {} constructor() {}
/** /**
* saves this instance but not any connected items * saves this instance (optionally within a transaction)
* may lead to data inconsistencies, but is faster
*/ */
public async save() { public async save(opts?: { session?: plugins.mongodb.ClientSession }) {
// allow hook before saving
if (typeof (this as any).beforeSave === 'function') {
await (this as any).beforeSave();
}
// tslint:disable-next-line: no-this-assignment // tslint:disable-next-line: no-this-assignment
const self: any = this; const self: any = this;
let dbResult: any; let dbResult: any;
// update timestamp
this._updatedAt = new Date().toISOString(); this._updatedAt = new Date().toISOString();
// perform insert or update
switch (this.creationStatus) { switch (this.creationStatus) {
case 'db': case 'db':
dbResult = await this.collection.update(self); dbResult = await this.collection.update(self, { session: opts?.session });
break; break;
case 'new': case 'new':
dbResult = await this.collection.insert(self); dbResult = await this.collection.insert(self, { session: opts?.session });
this.creationStatus = 'db'; this.creationStatus = 'db';
break; break;
default: default:
console.error('neither new nor in db?'); console.error('neither new nor in db?');
} }
// allow hook after saving
if (typeof (this as any).afterSave === 'function') {
await (this as any).afterSave();
}
return dbResult; return dbResult;
} }
/** /**
* deletes a document from the database * deletes a document from the database (optionally within a transaction)
*/ */
public async delete() { public async delete(opts?: { session?: plugins.mongodb.ClientSession }) {
await this.collection.delete(this); // allow hook before deleting
if (typeof (this as any).beforeDelete === 'function') {
await (this as any).beforeDelete();
}
// perform deletion
const result = await this.collection.delete(this, { session: opts?.session });
// allow hook after delete
if (typeof (this as any).afterDelete === 'function') {
await (this as any).afterDelete();
}
return result;
} }
/** /**
@ -581,7 +664,12 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
public async updateFromDb() { public async updateFromDb() {
const mongoDbNativeDoc = await this.collection.findOne(await this.createIdentifiableObject()); const mongoDbNativeDoc = await this.collection.findOne(await this.createIdentifiableObject());
for (const key of Object.keys(mongoDbNativeDoc)) { for (const key of Object.keys(mongoDbNativeDoc)) {
this[key] = mongoDbNativeDoc[key]; const rawValue = mongoDbNativeDoc[key];
const optionsMap = (this.constructor as any)._svDbOptions || {};
const opts = optionsMap[key];
this[key] = opts && typeof opts.deserialize === 'function'
? opts.deserialize(rawValue)
: rawValue;
} }
} }
@ -591,8 +679,14 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
public async createSavableObject(): Promise<TImplements> { public async createSavableObject(): Promise<TImplements> {
const saveableObject: unknown = {}; // is not exposed to outside, so any is ok here const saveableObject: unknown = {}; // is not exposed to outside, so any is ok here
const saveableProperties = [...this.globalSaveableProperties, ...this.saveableProperties]; const saveableProperties = [...this.globalSaveableProperties, ...this.saveableProperties];
// apply custom serialization if configured
const optionsMap = (this.constructor as any)._svDbOptions || {};
for (const propertyNameString of saveableProperties) { for (const propertyNameString of saveableProperties) {
saveableObject[propertyNameString] = this[propertyNameString]; const rawValue = (this as any)[propertyNameString];
const opts = optionsMap[propertyNameString];
(saveableObject as any)[propertyNameString] = opts && typeof opts.serialize === 'function'
? opts.serialize(rawValue)
: rawValue;
} }
return saveableObject as TImplements; return saveableObject as TImplements;
} }

View File

@ -1,37 +1,73 @@
import { SmartDataDbDoc } from './classes.doc.js'; import { SmartDataDbDoc } from './classes.doc.js';
import * as plugins from './plugins.js'; import * as plugins from './plugins.js';
import { EventEmitter } from 'events';
/** /**
* a wrapper for the native mongodb cursor. Exposes better * a wrapper for the native mongodb cursor. Exposes better
*/ */
export class SmartdataDbWatcher<T = any> { /**
* Wraps a MongoDB ChangeStream with RxJS and EventEmitter support.
*/
export class SmartdataDbWatcher<T = any> extends EventEmitter {
// STATIC // STATIC
public readyDeferred = plugins.smartpromise.defer(); public readyDeferred = plugins.smartpromise.defer();
// INSTANCE // INSTANCE
private changeStream: plugins.mongodb.ChangeStream<T>; private changeStream: plugins.mongodb.ChangeStream<T>;
private rawSubject: plugins.smartrx.rxjs.Subject<T>;
public changeSubject = new plugins.smartrx.rxjs.Subject<T>(); /** Emits change documents (or arrays of documents if buffered) */
public changeSubject: any;
/**
* @param changeStreamArg native MongoDB ChangeStream
* @param smartdataDbDocArg document class for instance creation
* @param opts.bufferTimeMs optional milliseconds to buffer events via RxJS
*/
constructor( constructor(
changeStreamArg: plugins.mongodb.ChangeStream<T>, changeStreamArg: plugins.mongodb.ChangeStream<T>,
smartdataDbDocArg: typeof SmartDataDbDoc, smartdataDbDocArg: typeof SmartDataDbDoc,
opts?: { bufferTimeMs?: number },
) { ) {
super();
this.rawSubject = new plugins.smartrx.rxjs.Subject<T>();
// Apply buffering if requested
if (opts && opts.bufferTimeMs) {
this.changeSubject = this.rawSubject.pipe(plugins.smartrx.rxjs.ops.bufferTime(opts.bufferTimeMs));
} else {
this.changeSubject = this.rawSubject;
}
this.changeStream = changeStreamArg; this.changeStream = changeStreamArg;
this.changeStream.on('change', async (item: any) => { this.changeStream.on('change', async (item: any) => {
if (!item.fullDocument) { let docInstance: T = null;
this.changeSubject.next(null); if (item.fullDocument) {
return; docInstance = smartdataDbDocArg.createInstanceFromMongoDbNativeDoc(
item.fullDocument
) as any as T;
} }
this.changeSubject.next( // Notify subscribers
smartdataDbDocArg.createInstanceFromMongoDbNativeDoc(item.fullDocument) as any as T, this.rawSubject.next(docInstance);
); this.emit('change', docInstance);
}); });
// Signal readiness after one tick
plugins.smartdelay.delayFor(0).then(() => { plugins.smartdelay.delayFor(0).then(() => {
this.readyDeferred.resolve(); this.readyDeferred.resolve();
}); });
} }
public async close() { /**
* Close the change stream, complete the RxJS subject, and remove listeners.
*/
public async close(): Promise<void> {
// Close MongoDB ChangeStream
await this.changeStream.close(); await this.changeStream.close();
// Complete the subject to teardown any buffering operators
this.rawSubject.complete();
// Remove all EventEmitter listeners
this.removeAllListeners();
}
/**
* Alias for close(), matching README usage
*/
public async stop(): Promise<void> {
return this.close();
} }
} }