From bd1764159e4a5196c8aa6938467cf38514a1a70d Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sun, 1 Feb 2026 16:02:03 +0000 Subject: [PATCH] BREAKING CHANGE(storage,engine,server): add session & transaction management, index/query planner, WAL and checksum support; integrate index-accelerated queries and update storage API (findByIds) to enable index optimizations --- changelog.md | 13 + ts/00_commitinfo_data.ts | 2 +- ts/tsmdb/engine/IndexEngine.ts | 431 +++++++++++++++++++--- ts/tsmdb/engine/QueryPlanner.ts | 393 ++++++++++++++++++++ ts/tsmdb/engine/SessionEngine.ts | 292 +++++++++++++++ ts/tsmdb/index.ts | 9 + ts/tsmdb/server/CommandRouter.ts | 109 ++++++ ts/tsmdb/server/TsmdbServer.ts | 3 + ts/tsmdb/server/handlers/AdminHandler.ts | 115 +++++- ts/tsmdb/server/handlers/DeleteHandler.ts | 19 +- ts/tsmdb/server/handlers/FindHandler.ts | 55 ++- ts/tsmdb/server/handlers/InsertHandler.ts | 6 + ts/tsmdb/server/handlers/UpdateHandler.ts | 37 +- ts/tsmdb/storage/FileStorageAdapter.ts | 93 ++++- ts/tsmdb/storage/IStorageAdapter.ts | 6 + ts/tsmdb/storage/MemoryStorageAdapter.ts | 12 + ts/tsmdb/storage/WAL.ts | 375 +++++++++++++++++++ ts/tsmdb/utils/checksum.ts | 88 +++++ ts/tsmdb/utils/index.ts | 1 + 19 files changed, 1973 insertions(+), 86 deletions(-) create mode 100644 ts/tsmdb/engine/QueryPlanner.ts create mode 100644 ts/tsmdb/engine/SessionEngine.ts create mode 100644 ts/tsmdb/storage/WAL.ts create mode 100644 ts/tsmdb/utils/checksum.ts create mode 100644 ts/tsmdb/utils/index.ts diff --git a/changelog.md b/changelog.md index 2b8b3d6..032b9d8 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,18 @@ # Changelog +## 2026-02-01 - 4.0.0 - BREAKING CHANGE(storage,engine,server) +add session & transaction management, index/query planner, WAL and checksum support; integrate index-accelerated queries and update storage API (findByIds) to enable index optimizations + +- Add SessionEngine with session lifecycle, auto-abort of transactions on expiry and session tracking in CommandRouter and AdminHandler. +- Introduce TransactionEngine integrations in CommandRouter and AdminHandler; handlers now support start/commit/abort transaction workflows. +- Add IndexEngine enhancements including a simple B-tree and hash map optimizations; integrate index usage into Find/Count/Insert/Update/Delete handlers for index-accelerated queries and index maintenance on mutations. +- Add QueryPlanner to choose IXSCAN vs COLLSCAN and provide explain plans. +- Add WAL (write-ahead log) for durability, with LSNs, checkpoints and recovery APIs. +- Add checksum utilities and FileStorageAdapter support for checksums (enableChecksums/strictChecksums), with verification on read and optional strict failure behavior. +- IStorageAdapter interface changed to include findByIds; MemoryStorageAdapter and FileStorageAdapter implement findByIds to support index lookups. +- Exported API additions: WAL, QueryPlanner, SessionEngine, checksum utilities; CommandRouter now caches IndexEngines and exposes transaction/session engines. +- Breaking change: the IStorageAdapter interface change requires third-party storage adapters to implement the new findByIds method. + ## 2026-02-01 - 3.0.0 - BREAKING CHANGE(tsmdb) rename CongoDB to TsmDB and relocate/rename wire-protocol server implementation and public exports diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index a0a638b..c65f4a9 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartmongo', - version: '3.0.0', + version: '4.0.0', description: 'A module for creating and managing a local MongoDB instance for testing purposes.' } diff --git a/ts/tsmdb/engine/IndexEngine.ts b/ts/tsmdb/engine/IndexEngine.ts index 4c2228a..5003b68 100644 --- a/ts/tsmdb/engine/IndexEngine.ts +++ b/ts/tsmdb/engine/IndexEngine.ts @@ -1,5 +1,89 @@ import * as plugins from '../tsmdb.plugins.js'; import type { IStorageAdapter } from '../storage/IStorageAdapter.js'; + +// Simple B-Tree implementation for range queries +// Since sorted-btree has ESM/CJS interop issues, we use a simple custom implementation +class SimpleBTree { + private entries: Map = new Map(); + private sortedKeys: K[] = []; + private comparator: (a: K, b: K) => number; + + constructor(_unused?: undefined, comparator?: (a: K, b: K) => number) { + this.comparator = comparator || ((a: K, b: K) => { + if (a < b) return -1; + if (a > b) return 1; + return 0; + }); + } + + private keyToString(key: K): string { + return JSON.stringify(key); + } + + set(key: K, value: V): boolean { + const keyStr = this.keyToString(key); + const existed = this.entries.has(keyStr); + this.entries.set(keyStr, { key, value }); + + if (!existed) { + // Insert in sorted order + const idx = this.sortedKeys.findIndex(k => this.comparator(k, key) > 0); + if (idx === -1) { + this.sortedKeys.push(key); + } else { + this.sortedKeys.splice(idx, 0, key); + } + } + return !existed; + } + + get(key: K): V | undefined { + const entry = this.entries.get(this.keyToString(key)); + return entry?.value; + } + + delete(key: K): boolean { + const keyStr = this.keyToString(key); + if (this.entries.has(keyStr)) { + this.entries.delete(keyStr); + const idx = this.sortedKeys.findIndex(k => this.comparator(k, key) === 0); + if (idx !== -1) { + this.sortedKeys.splice(idx, 1); + } + return true; + } + return false; + } + + forRange( + lowKey: K | undefined, + highKey: K | undefined, + lowInclusive: boolean, + highInclusive: boolean, + callback: (value: V, key: K) => void + ): void { + for (const key of this.sortedKeys) { + // Check low bound + if (lowKey !== undefined) { + const cmp = this.comparator(key, lowKey); + if (cmp < 0) continue; + if (cmp === 0 && !lowInclusive) continue; + } + + // Check high bound + if (highKey !== undefined) { + const cmp = this.comparator(key, highKey); + if (cmp > 0) break; + if (cmp === 0 && !highInclusive) break; + } + + const entry = this.entries.get(this.keyToString(key)); + if (entry) { + callback(entry.value, key); + } + } + } +} import type { Document, IStoredDocument, @@ -11,7 +95,61 @@ import { TsmdbDuplicateKeyError, TsmdbIndexError } from '../errors/TsmdbErrors.j import { QueryEngine } from './QueryEngine.js'; /** - * Index data structure for fast lookups + * Comparator for B-Tree that handles mixed types consistently + */ +function indexKeyComparator(a: any, b: any): number { + // Handle null/undefined + if (a === null || a === undefined) { + if (b === null || b === undefined) return 0; + return -1; + } + if (b === null || b === undefined) return 1; + + // Handle arrays (compound keys) + if (Array.isArray(a) && Array.isArray(b)) { + for (let i = 0; i < Math.max(a.length, b.length); i++) { + const cmp = indexKeyComparator(a[i], b[i]); + if (cmp !== 0) return cmp; + } + return 0; + } + + // Handle ObjectId + if (a instanceof plugins.bson.ObjectId && b instanceof plugins.bson.ObjectId) { + return a.toHexString().localeCompare(b.toHexString()); + } + + // Handle Date + if (a instanceof Date && b instanceof Date) { + return a.getTime() - b.getTime(); + } + + // Handle different types - use type ordering (null < number < string < object) + const typeOrder = (v: any): number => { + if (v === null || v === undefined) return 0; + if (typeof v === 'number') return 1; + if (typeof v === 'string') return 2; + if (typeof v === 'boolean') return 3; + if (v instanceof Date) return 4; + if (v instanceof plugins.bson.ObjectId) return 5; + return 6; + }; + + const typeA = typeOrder(a); + const typeB = typeOrder(b); + if (typeA !== typeB) return typeA - typeB; + + // Same type comparison + if (typeof a === 'number') return a - b; + if (typeof a === 'string') return a.localeCompare(b); + if (typeof a === 'boolean') return (a ? 1 : 0) - (b ? 1 : 0); + + // Fallback to string comparison + return String(a).localeCompare(String(b)); +} + +/** + * Index data structure using B-Tree for range queries */ interface IIndexData { name: string; @@ -19,8 +157,10 @@ interface IIndexData { unique: boolean; sparse: boolean; expireAfterSeconds?: number; - // Map from index key value to document _id(s) - entries: Map>; + // B-Tree for ordered index lookups (supports range queries) + btree: SimpleBTree>; + // Hash map for fast equality lookups + hashMap: Map>; } /** @@ -55,7 +195,8 @@ export class IndexEngine { unique: indexSpec.unique || false, sparse: indexSpec.sparse || false, expireAfterSeconds: indexSpec.expireAfterSeconds, - entries: new Map(), + btree: new SimpleBTree>(undefined, indexKeyComparator), + hashMap: new Map(), }; // Build index entries @@ -63,10 +204,20 @@ export class IndexEngine { const keyValue = this.extractKeyValue(doc, indexSpec.key); if (keyValue !== null || !indexData.sparse) { const keyStr = JSON.stringify(keyValue); - if (!indexData.entries.has(keyStr)) { - indexData.entries.set(keyStr, new Set()); + + // Add to hash map + if (!indexData.hashMap.has(keyStr)) { + indexData.hashMap.set(keyStr, new Set()); + } + indexData.hashMap.get(keyStr)!.add(doc._id.toHexString()); + + // Add to B-tree + const existing = indexData.btree.get(keyValue); + if (existing) { + existing.add(doc._id.toHexString()); + } else { + indexData.btree.set(keyValue, new Set([doc._id.toHexString()])); } - indexData.entries.get(keyStr)!.add(doc._id.toHexString()); } } @@ -100,7 +251,8 @@ export class IndexEngine { unique: options?.unique || false, sparse: options?.sparse || false, expireAfterSeconds: options?.expireAfterSeconds, - entries: new Map(), + btree: new SimpleBTree>(undefined, indexKeyComparator), + hashMap: new Map(), }; // Build index from existing documents @@ -115,7 +267,7 @@ export class IndexEngine { const keyStr = JSON.stringify(keyValue); - if (indexData.unique && indexData.entries.has(keyStr)) { + if (indexData.unique && indexData.hashMap.has(keyStr)) { throw new TsmdbDuplicateKeyError( `E11000 duplicate key error index: ${this.dbName}.${this.collName}.$${name}`, key as Record, @@ -123,10 +275,19 @@ export class IndexEngine { ); } - if (!indexData.entries.has(keyStr)) { - indexData.entries.set(keyStr, new Set()); + // Add to hash map + if (!indexData.hashMap.has(keyStr)) { + indexData.hashMap.set(keyStr, new Set()); + } + indexData.hashMap.get(keyStr)!.add(doc._id.toHexString()); + + // Add to B-tree + const existing = indexData.btree.get(keyValue); + if (existing) { + existing.add(doc._id.toHexString()); + } else { + indexData.btree.set(keyValue, new Set([doc._id.toHexString()])); } - indexData.entries.get(keyStr)!.add(doc._id.toHexString()); } // Store index @@ -213,7 +374,7 @@ export class IndexEngine { // Check unique constraint if (indexData.unique) { - const existing = indexData.entries.get(keyStr); + const existing = indexData.hashMap.get(keyStr); if (existing && existing.size > 0) { throw new TsmdbDuplicateKeyError( `E11000 duplicate key error collection: ${this.dbName}.${this.collName} index: ${name}`, @@ -223,10 +384,19 @@ export class IndexEngine { } } - if (!indexData.entries.has(keyStr)) { - indexData.entries.set(keyStr, new Set()); + // Add to hash map + if (!indexData.hashMap.has(keyStr)) { + indexData.hashMap.set(keyStr, new Set()); + } + indexData.hashMap.get(keyStr)!.add(doc._id.toHexString()); + + // Add to B-tree + const btreeSet = indexData.btree.get(keyValue); + if (btreeSet) { + btreeSet.add(doc._id.toHexString()); + } else { + indexData.btree.set(keyValue, new Set([doc._id.toHexString()])); } - indexData.entries.get(keyStr)!.add(doc._id.toHexString()); } } @@ -245,11 +415,21 @@ export class IndexEngine { // Remove old entry if key changed if (oldKeyStr !== newKeyStr) { if (oldKeyValue !== null || !indexData.sparse) { - const oldSet = indexData.entries.get(oldKeyStr); - if (oldSet) { - oldSet.delete(oldDoc._id.toHexString()); - if (oldSet.size === 0) { - indexData.entries.delete(oldKeyStr); + // Remove from hash map + const oldHashSet = indexData.hashMap.get(oldKeyStr); + if (oldHashSet) { + oldHashSet.delete(oldDoc._id.toHexString()); + if (oldHashSet.size === 0) { + indexData.hashMap.delete(oldKeyStr); + } + } + + // Remove from B-tree + const oldBtreeSet = indexData.btree.get(oldKeyValue); + if (oldBtreeSet) { + oldBtreeSet.delete(oldDoc._id.toHexString()); + if (oldBtreeSet.size === 0) { + indexData.btree.delete(oldKeyValue); } } } @@ -258,7 +438,7 @@ export class IndexEngine { if (newKeyValue !== null || !indexData.sparse) { // Check unique constraint if (indexData.unique) { - const existing = indexData.entries.get(newKeyStr); + const existing = indexData.hashMap.get(newKeyStr); if (existing && existing.size > 0) { throw new TsmdbDuplicateKeyError( `E11000 duplicate key error collection: ${this.dbName}.${this.collName} index: ${name}`, @@ -268,10 +448,19 @@ export class IndexEngine { } } - if (!indexData.entries.has(newKeyStr)) { - indexData.entries.set(newKeyStr, new Set()); + // Add to hash map + if (!indexData.hashMap.has(newKeyStr)) { + indexData.hashMap.set(newKeyStr, new Set()); + } + indexData.hashMap.get(newKeyStr)!.add(newDoc._id.toHexString()); + + // Add to B-tree + const newBtreeSet = indexData.btree.get(newKeyValue); + if (newBtreeSet) { + newBtreeSet.add(newDoc._id.toHexString()); + } else { + indexData.btree.set(newKeyValue, new Set([newDoc._id.toHexString()])); } - indexData.entries.get(newKeyStr)!.add(newDoc._id.toHexString()); } } } @@ -291,11 +480,22 @@ export class IndexEngine { } const keyStr = JSON.stringify(keyValue); - const set = indexData.entries.get(keyStr); - if (set) { - set.delete(doc._id.toHexString()); - if (set.size === 0) { - indexData.entries.delete(keyStr); + + // Remove from hash map + const hashSet = indexData.hashMap.get(keyStr); + if (hashSet) { + hashSet.delete(doc._id.toHexString()); + if (hashSet.size === 0) { + indexData.hashMap.delete(keyStr); + } + } + + // Remove from B-tree + const btreeSet = indexData.btree.get(keyValue); + if (btreeSet) { + btreeSet.delete(doc._id.toHexString()); + if (btreeSet.size === 0) { + indexData.btree.delete(keyValue); } } } @@ -309,8 +509,8 @@ export class IndexEngine { return null; } - // Get filter fields - const filterFields = new Set(this.getFilterFields(filter)); + // Get filter fields and operators + const filterInfo = this.analyzeFilter(filter); // Score each index let bestIndex: { name: string; data: IIndexData } | null = null; @@ -320,12 +520,21 @@ export class IndexEngine { const indexFields = Object.keys(indexData.key); let score = 0; - // Count how many index fields are in the filter + // Count how many index fields can be used for (const field of indexFields) { - if (filterFields.has(field)) { - score++; + const info = filterInfo.get(field); + if (!info) break; + + // Equality is best + if (info.equality) { + score += 2; + } else if (info.range) { + // Range queries can use B-tree + score += 1; + } else if (info.in) { + score += 1.5; } else { - break; // Index fields must be contiguous + break; } } @@ -344,7 +553,46 @@ export class IndexEngine { } /** - * Use index to find candidate document IDs + * Analyze filter to extract field operators + */ + private analyzeFilter(filter: Document): Map }> { + const result = new Map }>(); + + for (const [key, value] of Object.entries(filter)) { + if (key.startsWith('$')) continue; + + const info = { equality: false, range: false, in: false, ops: {} as Record }; + + if (typeof value !== 'object' || value === null || value instanceof plugins.bson.ObjectId || value instanceof Date) { + info.equality = true; + info.ops['$eq'] = value; + } else { + const ops = value as Record; + if (ops.$eq !== undefined) { + info.equality = true; + info.ops['$eq'] = ops.$eq; + } + if (ops.$in !== undefined) { + info.in = true; + info.ops['$in'] = ops.$in; + } + if (ops.$gt !== undefined || ops.$gte !== undefined || ops.$lt !== undefined || ops.$lte !== undefined) { + info.range = true; + if (ops.$gt !== undefined) info.ops['$gt'] = ops.$gt; + if (ops.$gte !== undefined) info.ops['$gte'] = ops.$gte; + if (ops.$lt !== undefined) info.ops['$lt'] = ops.$lt; + if (ops.$lte !== undefined) info.ops['$lte'] = ops.$lte; + } + } + + result.set(key, info); + } + + return result; + } + + /** + * Use index to find candidate document IDs (supports range queries with B-tree) */ async findCandidateIds(filter: Document): Promise | null> { await this.initialize(); @@ -352,25 +600,58 @@ export class IndexEngine { const index = this.selectIndex(filter); if (!index) return null; - // Try to use the index for equality matches + const filterInfo = this.analyzeFilter(filter); const indexFields = Object.keys(index.data.key); - const equalityValues: Record = {}; - for (const field of indexFields) { - const filterValue = this.getFilterValue(filter, field); - if (filterValue === undefined) break; + // For single-field indexes with range queries, use B-tree + if (indexFields.length === 1) { + const field = indexFields[0]; + const info = filterInfo.get(field); - // Only use equality matches for index lookup - if (typeof filterValue === 'object' && filterValue !== null) { - if (filterValue.$eq !== undefined) { - equalityValues[field] = filterValue.$eq; - } else if (filterValue.$in !== undefined) { + if (info) { + // Handle equality using hash map (faster) + if (info.equality) { + const keyStr = JSON.stringify(info.ops['$eq']); + return index.data.hashMap.get(keyStr) || new Set(); + } + + // Handle $in using hash map + if (info.in) { + const results = new Set(); + for (const val of info.ops['$in']) { + const keyStr = JSON.stringify(val); + const ids = index.data.hashMap.get(keyStr); + if (ids) { + for (const id of ids) { + results.add(id); + } + } + } + return results; + } + + // Handle range queries using B-tree + if (info.range) { + return this.findRangeCandidates(index.data, info.ops); + } + } + } else { + // For compound indexes, use hash map with partial key matching + const equalityValues: Record = {}; + + for (const field of indexFields) { + const info = filterInfo.get(field); + if (!info) break; + + if (info.equality) { + equalityValues[field] = info.ops['$eq']; + } else if (info.in) { // Handle $in with multiple lookups const results = new Set(); - for (const val of filterValue.$in) { + for (const val of info.ops['$in']) { equalityValues[field] = val; const keyStr = JSON.stringify(this.buildKeyValue(equalityValues, index.data.key)); - const ids = index.data.entries.get(keyStr); + const ids = index.data.hashMap.get(keyStr); if (ids) { for (const id of ids) { results.add(id); @@ -379,19 +660,57 @@ export class IndexEngine { } return results; } else { - break; // Non-equality operator, stop here + break; // Non-equality/in operator, stop here } - } else { - equalityValues[field] = filterValue; + } + + if (Object.keys(equalityValues).length > 0) { + const keyStr = JSON.stringify(this.buildKeyValue(equalityValues, index.data.key)); + return index.data.hashMap.get(keyStr) || new Set(); } } - if (Object.keys(equalityValues).length === 0) { - return null; + return null; + } + + /** + * Find candidates using B-tree range scan + */ + private findRangeCandidates(indexData: IIndexData, ops: Record): Set { + const results = new Set(); + + let lowKey: any = undefined; + let highKey: any = undefined; + let lowInclusive = true; + let highInclusive = true; + + if (ops['$gt'] !== undefined) { + lowKey = ops['$gt']; + lowInclusive = false; + } + if (ops['$gte'] !== undefined) { + lowKey = ops['$gte']; + lowInclusive = true; + } + if (ops['$lt'] !== undefined) { + highKey = ops['$lt']; + highInclusive = false; + } + if (ops['$lte'] !== undefined) { + highKey = ops['$lte']; + highInclusive = true; } - const keyStr = JSON.stringify(this.buildKeyValue(equalityValues, index.data.key)); - return index.data.entries.get(keyStr) || new Set(); + // Use B-tree range iteration + indexData.btree.forRange(lowKey, highKey, lowInclusive, highInclusive, (value, key) => { + if (value) { + for (const id of value) { + results.add(id); + } + } + }); + + return results; } // ============================================================================ diff --git a/ts/tsmdb/engine/QueryPlanner.ts b/ts/tsmdb/engine/QueryPlanner.ts new file mode 100644 index 0000000..300cfbd --- /dev/null +++ b/ts/tsmdb/engine/QueryPlanner.ts @@ -0,0 +1,393 @@ +import * as plugins from '../tsmdb.plugins.js'; +import type { Document, IStoredDocument } from '../types/interfaces.js'; +import { IndexEngine } from './IndexEngine.js'; + +/** + * Query execution plan types + */ +export type TQueryPlanType = 'IXSCAN' | 'COLLSCAN' | 'FETCH' | 'IXSCAN_RANGE'; + +/** + * Represents a query execution plan + */ +export interface IQueryPlan { + /** The type of scan used */ + type: TQueryPlanType; + /** Index name if using an index */ + indexName?: string; + /** Index key specification */ + indexKey?: Record; + /** Whether the query can be fully satisfied by the index */ + indexCovering: boolean; + /** Estimated selectivity (0-1, lower is more selective) */ + selectivity: number; + /** Whether range operators are used */ + usesRange: boolean; + /** Fields used from the index */ + indexFieldsUsed: string[]; + /** Filter conditions that must be applied post-index lookup */ + residualFilter?: Document; + /** Explanation for debugging */ + explanation: string; +} + +/** + * Filter operator analysis + */ +interface IFilterOperatorInfo { + field: string; + operators: string[]; + equality: boolean; + range: boolean; + in: boolean; + exists: boolean; + regex: boolean; + values: Record; +} + +/** + * QueryPlanner - Analyzes queries and selects optimal execution plans + */ +export class QueryPlanner { + private indexEngine: IndexEngine; + + constructor(indexEngine: IndexEngine) { + this.indexEngine = indexEngine; + } + + /** + * Generate an execution plan for a query filter + */ + async plan(filter: Document): Promise { + await this.indexEngine['initialize'](); + + // Empty filter = full collection scan + if (!filter || Object.keys(filter).length === 0) { + return { + type: 'COLLSCAN', + indexCovering: false, + selectivity: 1.0, + usesRange: false, + indexFieldsUsed: [], + explanation: 'No filter specified, full collection scan required', + }; + } + + // Analyze the filter + const operatorInfo = this.analyzeFilter(filter); + + // Get available indexes + const indexes = await this.indexEngine.listIndexes(); + + // Score each index + let bestPlan: IQueryPlan | null = null; + let bestScore = -1; + + for (const index of indexes) { + const plan = this.scoreIndex(index, operatorInfo, filter); + if (plan.selectivity < 1.0) { + const score = this.calculateScore(plan); + if (score > bestScore) { + bestScore = score; + bestPlan = plan; + } + } + } + + // If no suitable index found, fall back to collection scan + if (!bestPlan || bestScore <= 0) { + return { + type: 'COLLSCAN', + indexCovering: false, + selectivity: 1.0, + usesRange: false, + indexFieldsUsed: [], + explanation: 'No suitable index found for this query', + }; + } + + return bestPlan; + } + + /** + * Analyze filter to extract operator information per field + */ + private analyzeFilter(filter: Document, prefix = ''): Map { + const result = new Map(); + + for (const [key, value] of Object.entries(filter)) { + // Skip logical operators at the top level + if (key.startsWith('$')) { + if (key === '$and' && Array.isArray(value)) { + // Merge $and conditions + for (const subFilter of value) { + const subInfo = this.analyzeFilter(subFilter, prefix); + for (const [field, info] of subInfo) { + if (result.has(field)) { + // Merge operators + const existing = result.get(field)!; + existing.operators.push(...info.operators); + existing.equality = existing.equality || info.equality; + existing.range = existing.range || info.range; + existing.in = existing.in || info.in; + Object.assign(existing.values, info.values); + } else { + result.set(field, info); + } + } + } + } + continue; + } + + const fullKey = prefix ? `${prefix}.${key}` : key; + const info: IFilterOperatorInfo = { + field: fullKey, + operators: [], + equality: false, + range: false, + in: false, + exists: false, + regex: false, + values: {}, + }; + + if (typeof value !== 'object' || value === null || value instanceof plugins.bson.ObjectId || value instanceof Date) { + // Direct equality + info.equality = true; + info.operators.push('$eq'); + info.values['$eq'] = value; + } else if (Array.isArray(value)) { + // Array equality (rare, but possible) + info.equality = true; + info.operators.push('$eq'); + info.values['$eq'] = value; + } else { + // Operator object + for (const [op, opValue] of Object.entries(value)) { + if (op.startsWith('$')) { + info.operators.push(op); + info.values[op] = opValue; + + switch (op) { + case '$eq': + info.equality = true; + break; + case '$ne': + case '$not': + // These can use indexes but with low selectivity + break; + case '$in': + info.in = true; + break; + case '$nin': + // Can't efficiently use indexes + break; + case '$gt': + case '$gte': + case '$lt': + case '$lte': + info.range = true; + break; + case '$exists': + info.exists = true; + break; + case '$regex': + info.regex = true; + break; + } + } else { + // Nested object - recurse + const nestedInfo = this.analyzeFilter({ [op]: opValue }, fullKey); + for (const [nestedField, nestedFieldInfo] of nestedInfo) { + result.set(nestedField, nestedFieldInfo); + } + } + } + } + + if (info.operators.length > 0) { + result.set(fullKey, info); + } + } + + return result; + } + + /** + * Score an index for the given filter + */ + private scoreIndex( + index: { name: string; key: Record; unique?: boolean; sparse?: boolean }, + operatorInfo: Map, + filter: Document + ): IQueryPlan { + const indexFields = Object.keys(index.key); + const usedFields: string[] = []; + let usesRange = false; + let canUseIndex = true; + let selectivity = 1.0; + let residualFilter: Document | undefined; + + // Check each index field in order + for (const field of indexFields) { + const info = operatorInfo.get(field); + if (!info) { + // Index field not in filter - stop here + break; + } + + usedFields.push(field); + + // Calculate selectivity based on operator + if (info.equality) { + // Equality has high selectivity + selectivity *= 0.01; // Assume 1% match + } else if (info.in) { + // $in selectivity depends on array size + const inValues = info.values['$in']; + if (Array.isArray(inValues)) { + selectivity *= Math.min(0.5, inValues.length * 0.01); + } else { + selectivity *= 0.1; + } + } else if (info.range) { + // Range queries have moderate selectivity + selectivity *= 0.25; + usesRange = true; + // After range, can't use more index fields efficiently + break; + } else if (info.exists) { + // $exists can use sparse indexes + selectivity *= 0.5; + } else { + // Other operators may not be indexable + canUseIndex = false; + break; + } + } + + if (!canUseIndex || usedFields.length === 0) { + return { + type: 'COLLSCAN', + indexCovering: false, + selectivity: 1.0, + usesRange: false, + indexFieldsUsed: [], + explanation: `Index ${index.name} cannot be used for this query`, + }; + } + + // Build residual filter for conditions not covered by index + const coveredFields = new Set(usedFields); + const residualConditions: Record = {}; + for (const [field, info] of operatorInfo) { + if (!coveredFields.has(field)) { + // This field isn't covered by the index + if (info.equality) { + residualConditions[field] = info.values['$eq']; + } else { + residualConditions[field] = info.values; + } + } + } + + if (Object.keys(residualConditions).length > 0) { + residualFilter = residualConditions; + } + + // Unique indexes have better selectivity for equality + if (index.unique && usedFields.length === indexFields.length) { + selectivity = Math.min(selectivity, 0.001); // At most 1 document + } + + return { + type: usesRange ? 'IXSCAN_RANGE' : 'IXSCAN', + indexName: index.name, + indexKey: index.key, + indexCovering: Object.keys(residualConditions).length === 0, + selectivity, + usesRange, + indexFieldsUsed: usedFields, + residualFilter, + explanation: `Using index ${index.name} on fields [${usedFields.join(', ')}]`, + }; + } + + /** + * Calculate overall score for a plan (higher is better) + */ + private calculateScore(plan: IQueryPlan): number { + let score = 0; + + // Lower selectivity is better (fewer documents to fetch) + score += (1 - plan.selectivity) * 100; + + // Index covering queries are best + if (plan.indexCovering) { + score += 50; + } + + // More index fields used is better + score += plan.indexFieldsUsed.length * 10; + + // Equality scans are better than range scans + if (!plan.usesRange) { + score += 20; + } + + return score; + } + + /** + * Explain a query - returns detailed plan information + */ + async explain(filter: Document): Promise<{ + queryPlanner: { + plannerVersion: number; + namespace: string; + indexFilterSet: boolean; + winningPlan: IQueryPlan; + rejectedPlans: IQueryPlan[]; + }; + }> { + await this.indexEngine['initialize'](); + + // Analyze the filter + const operatorInfo = this.analyzeFilter(filter); + + // Get available indexes + const indexes = await this.indexEngine.listIndexes(); + + // Score all indexes + const plans: IQueryPlan[] = []; + + for (const index of indexes) { + const plan = this.scoreIndex(index, operatorInfo, filter); + plans.push(plan); + } + + // Add collection scan as fallback + plans.push({ + type: 'COLLSCAN', + indexCovering: false, + selectivity: 1.0, + usesRange: false, + indexFieldsUsed: [], + explanation: 'Full collection scan', + }); + + // Sort by score (best first) + plans.sort((a, b) => this.calculateScore(b) - this.calculateScore(a)); + + return { + queryPlanner: { + plannerVersion: 1, + namespace: `${this.indexEngine['dbName']}.${this.indexEngine['collName']}`, + indexFilterSet: false, + winningPlan: plans[0], + rejectedPlans: plans.slice(1), + }, + }; + } +} diff --git a/ts/tsmdb/engine/SessionEngine.ts b/ts/tsmdb/engine/SessionEngine.ts new file mode 100644 index 0000000..3216561 --- /dev/null +++ b/ts/tsmdb/engine/SessionEngine.ts @@ -0,0 +1,292 @@ +import * as plugins from '../tsmdb.plugins.js'; +import type { TransactionEngine } from './TransactionEngine.js'; + +/** + * Session state + */ +export interface ISession { + /** Session ID (UUID) */ + id: string; + /** Timestamp when the session was created */ + createdAt: number; + /** Timestamp of the last activity */ + lastActivityAt: number; + /** Current transaction ID if any */ + txnId?: string; + /** Transaction number for ordering */ + txnNumber?: number; + /** Whether the session is in a transaction */ + inTransaction: boolean; + /** Session metadata */ + metadata?: Record; +} + +/** + * Session engine options + */ +export interface ISessionEngineOptions { + /** Session timeout in milliseconds (default: 30 minutes) */ + sessionTimeoutMs?: number; + /** Interval to check for expired sessions in ms (default: 60 seconds) */ + cleanupIntervalMs?: number; +} + +/** + * Session engine for managing client sessions + * - Tracks session lifecycle (create, touch, end) + * - Links sessions to transactions + * - Auto-aborts transactions on session expiry + */ +export class SessionEngine { + private sessions: Map = new Map(); + private sessionTimeoutMs: number; + private cleanupInterval?: ReturnType; + private transactionEngine?: TransactionEngine; + + constructor(options?: ISessionEngineOptions) { + this.sessionTimeoutMs = options?.sessionTimeoutMs ?? 30 * 60 * 1000; // 30 minutes default + const cleanupIntervalMs = options?.cleanupIntervalMs ?? 60 * 1000; // 1 minute default + + // Start cleanup interval + this.cleanupInterval = setInterval(() => { + this.cleanupExpiredSessions(); + }, cleanupIntervalMs); + } + + /** + * Set the transaction engine to use for auto-abort + */ + setTransactionEngine(engine: TransactionEngine): void { + this.transactionEngine = engine; + } + + /** + * Start a new session + */ + startSession(sessionId?: string, metadata?: Record): ISession { + const id = sessionId ?? new plugins.bson.UUID().toHexString(); + const now = Date.now(); + + const session: ISession = { + id, + createdAt: now, + lastActivityAt: now, + inTransaction: false, + metadata, + }; + + this.sessions.set(id, session); + return session; + } + + /** + * Get a session by ID + */ + getSession(sessionId: string): ISession | undefined { + const session = this.sessions.get(sessionId); + if (session && this.isSessionExpired(session)) { + // Session expired, clean it up + this.endSession(sessionId); + return undefined; + } + return session; + } + + /** + * Touch a session to update last activity time + */ + touchSession(sessionId: string): boolean { + const session = this.sessions.get(sessionId); + if (!session) return false; + + if (this.isSessionExpired(session)) { + this.endSession(sessionId); + return false; + } + + session.lastActivityAt = Date.now(); + return true; + } + + /** + * End a session explicitly + * This will also abort any active transaction + */ + async endSession(sessionId: string): Promise { + const session = this.sessions.get(sessionId); + if (!session) return false; + + // If session has an active transaction, abort it + if (session.inTransaction && session.txnId && this.transactionEngine) { + try { + await this.transactionEngine.abortTransaction(session.txnId); + } catch (e) { + // Ignore abort errors during cleanup + } + } + + this.sessions.delete(sessionId); + return true; + } + + /** + * Start a transaction in a session + */ + startTransaction(sessionId: string, txnId: string, txnNumber?: number): boolean { + const session = this.sessions.get(sessionId); + if (!session) return false; + + if (this.isSessionExpired(session)) { + this.endSession(sessionId); + return false; + } + + session.txnId = txnId; + session.txnNumber = txnNumber; + session.inTransaction = true; + session.lastActivityAt = Date.now(); + + return true; + } + + /** + * End a transaction in a session (commit or abort) + */ + endTransaction(sessionId: string): boolean { + const session = this.sessions.get(sessionId); + if (!session) return false; + + session.txnId = undefined; + session.txnNumber = undefined; + session.inTransaction = false; + session.lastActivityAt = Date.now(); + + return true; + } + + /** + * Get transaction ID for a session + */ + getTransactionId(sessionId: string): string | undefined { + const session = this.sessions.get(sessionId); + return session?.txnId; + } + + /** + * Check if session is in a transaction + */ + isInTransaction(sessionId: string): boolean { + const session = this.sessions.get(sessionId); + return session?.inTransaction ?? false; + } + + /** + * Check if a session is expired + */ + isSessionExpired(session: ISession): boolean { + return Date.now() - session.lastActivityAt > this.sessionTimeoutMs; + } + + /** + * Cleanup expired sessions + * This is called periodically by the cleanup interval + */ + private async cleanupExpiredSessions(): Promise { + const expiredSessions: string[] = []; + + for (const [id, session] of this.sessions) { + if (this.isSessionExpired(session)) { + expiredSessions.push(id); + } + } + + // End all expired sessions (this will also abort their transactions) + for (const sessionId of expiredSessions) { + await this.endSession(sessionId); + } + } + + /** + * Get all active sessions + */ + listSessions(): ISession[] { + const activeSessions: ISession[] = []; + for (const session of this.sessions.values()) { + if (!this.isSessionExpired(session)) { + activeSessions.push(session); + } + } + return activeSessions; + } + + /** + * Get session count + */ + getSessionCount(): number { + return this.sessions.size; + } + + /** + * Get sessions with active transactions + */ + getSessionsWithTransactions(): ISession[] { + return this.listSessions().filter(s => s.inTransaction); + } + + /** + * Refresh session timeout + */ + refreshSession(sessionId: string): boolean { + return this.touchSession(sessionId); + } + + /** + * Close the session engine and cleanup + */ + close(): void { + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + this.cleanupInterval = undefined; + } + + // Clear all sessions + this.sessions.clear(); + } + + /** + * Get or create a session for a given session ID + * Useful for handling MongoDB driver session requests + */ + getOrCreateSession(sessionId: string): ISession { + let session = this.getSession(sessionId); + if (!session) { + session = this.startSession(sessionId); + } else { + this.touchSession(sessionId); + } + return session; + } + + /** + * Extract session ID from MongoDB lsid (logical session ID) + */ + static extractSessionId(lsid: any): string | undefined { + if (!lsid) return undefined; + + // MongoDB session ID format: { id: UUID } + if (lsid.id) { + if (lsid.id instanceof plugins.bson.UUID) { + return lsid.id.toHexString(); + } + if (typeof lsid.id === 'string') { + return lsid.id; + } + if (lsid.id.$binary?.base64) { + // Binary UUID format + return Buffer.from(lsid.id.$binary.base64, 'base64').toString('hex'); + } + } + + return undefined; + } +} diff --git a/ts/tsmdb/index.ts b/ts/tsmdb/index.ts index 78bf1db..4b37944 100644 --- a/ts/tsmdb/index.ts +++ b/ts/tsmdb/index.ts @@ -19,6 +19,8 @@ export type { IStorageAdapter } from './storage/IStorageAdapter.js'; export { MemoryStorageAdapter } from './storage/MemoryStorageAdapter.js'; export { FileStorageAdapter } from './storage/FileStorageAdapter.js'; export { OpLog } from './storage/OpLog.js'; +export { WAL } from './storage/WAL.js'; +export type { IWalEntry, TWalOperation } from './storage/WAL.js'; // Export engines export { QueryEngine } from './engine/QueryEngine.js'; @@ -26,6 +28,10 @@ export { UpdateEngine } from './engine/UpdateEngine.js'; export { AggregationEngine } from './engine/AggregationEngine.js'; export { IndexEngine } from './engine/IndexEngine.js'; export { TransactionEngine } from './engine/TransactionEngine.js'; +export { QueryPlanner } from './engine/QueryPlanner.js'; +export type { IQueryPlan, TQueryPlanType } from './engine/QueryPlanner.js'; +export { SessionEngine } from './engine/SessionEngine.js'; +export type { ISession, ISessionEngineOptions } from './engine/SessionEngine.js'; // Export server (the main entry point for using TsmDB) export { TsmdbServer } from './server/TsmdbServer.js'; @@ -35,3 +41,6 @@ export type { ITsmdbServerOptions } from './server/TsmdbServer.js'; export { WireProtocol } from './server/WireProtocol.js'; export { CommandRouter } from './server/CommandRouter.js'; export type { ICommandHandler, IHandlerContext, ICursorState } from './server/CommandRouter.js'; + +// Export utilities +export * from './utils/checksum.js'; diff --git a/ts/tsmdb/server/CommandRouter.ts b/ts/tsmdb/server/CommandRouter.ts index 222d3c6..852386c 100644 --- a/ts/tsmdb/server/CommandRouter.ts +++ b/ts/tsmdb/server/CommandRouter.ts @@ -2,6 +2,9 @@ import * as plugins from '../tsmdb.plugins.js'; import type { IStorageAdapter } from '../storage/IStorageAdapter.js'; import type { IParsedCommand } from './WireProtocol.js'; import type { TsmdbServer } from './TsmdbServer.js'; +import { IndexEngine } from '../engine/IndexEngine.js'; +import { TransactionEngine } from '../engine/TransactionEngine.js'; +import { SessionEngine } from '../engine/SessionEngine.js'; // Import handlers import { HelloHandler } from './handlers/HelloHandler.js'; @@ -22,6 +25,16 @@ export interface IHandlerContext { database: string; command: plugins.bson.Document; documentSequences?: Map; + /** Get or create an IndexEngine for a collection */ + getIndexEngine: (collName: string) => IndexEngine; + /** Transaction engine instance */ + transactionEngine: TransactionEngine; + /** Current transaction ID (if in a transaction) */ + txnId?: string; + /** Session ID (from lsid) */ + sessionId?: string; + /** Session engine instance */ + sessionEngine: SessionEngine; } /** @@ -43,12 +56,54 @@ export class CommandRouter { private cursors: Map = new Map(); private cursorIdCounter: bigint = BigInt(1); + // Index engine cache: db.collection -> IndexEngine + private indexEngines: Map = new Map(); + + // Transaction engine (shared across all handlers) + private transactionEngine: TransactionEngine; + + // Session engine (shared across all handlers) + private sessionEngine: SessionEngine; + constructor(storage: IStorageAdapter, server: TsmdbServer) { this.storage = storage; this.server = server; + this.transactionEngine = new TransactionEngine(storage); + this.sessionEngine = new SessionEngine(); + // Link session engine to transaction engine for auto-abort on session expiry + this.sessionEngine.setTransactionEngine(this.transactionEngine); this.registerHandlers(); } + /** + * Get or create an IndexEngine for a database.collection + */ + getIndexEngine(dbName: string, collName: string): IndexEngine { + const key = `${dbName}.${collName}`; + let engine = this.indexEngines.get(key); + if (!engine) { + engine = new IndexEngine(dbName, collName, this.storage); + this.indexEngines.set(key, engine); + } + return engine; + } + + /** + * Clear index engine cache for a collection (used when collection is dropped) + */ + clearIndexEngineCache(dbName: string, collName?: string): void { + if (collName) { + this.indexEngines.delete(`${dbName}.${collName}`); + } else { + // Clear all engines for the database + for (const key of this.indexEngines.keys()) { + if (key.startsWith(`${dbName}.`)) { + this.indexEngines.delete(key); + } + } + } + } + /** * Register all command handlers */ @@ -120,6 +175,29 @@ export class CommandRouter { async route(parsedCommand: IParsedCommand): Promise { const { commandName, command, database, documentSequences } = parsedCommand; + // Extract session ID from lsid using SessionEngine helper + let sessionId = SessionEngine.extractSessionId(command.lsid); + let txnId: string | undefined; + + // If we have a session ID, register/touch the session + if (sessionId) { + this.sessionEngine.getOrCreateSession(sessionId); + } + + // Check if this starts a new transaction + if (command.startTransaction && sessionId) { + txnId = this.transactionEngine.startTransaction(sessionId); + this.sessionEngine.startTransaction(sessionId, txnId, command.txnNumber); + } else if (sessionId && this.sessionEngine.isInTransaction(sessionId)) { + // Continue existing transaction + txnId = this.sessionEngine.getTransactionId(sessionId); + // Verify transaction is still active + if (txnId && !this.transactionEngine.isActive(txnId)) { + this.sessionEngine.endTransaction(sessionId); + txnId = undefined; + } + } + // Create handler context const context: IHandlerContext = { storage: this.storage, @@ -127,6 +205,11 @@ export class CommandRouter { database, command, documentSequences, + getIndexEngine: (collName: string) => this.getIndexEngine(database, collName), + transactionEngine: this.transactionEngine, + sessionEngine: this.sessionEngine, + txnId, + sessionId, }; // Find handler @@ -164,6 +247,32 @@ export class CommandRouter { }; } } + + /** + * Close the command router and cleanup resources + */ + close(): void { + // Close session engine (stops cleanup interval, clears sessions) + this.sessionEngine.close(); + // Clear cursors + this.cursors.clear(); + // Clear index engine cache + this.indexEngines.clear(); + } + + /** + * Get session engine (for administrative purposes) + */ + getSessionEngine(): SessionEngine { + return this.sessionEngine; + } + + /** + * Get transaction engine (for administrative purposes) + */ + getTransactionEngine(): TransactionEngine { + return this.transactionEngine; + } } /** diff --git a/ts/tsmdb/server/TsmdbServer.ts b/ts/tsmdb/server/TsmdbServer.ts index f8fda01..d1ca9e0 100644 --- a/ts/tsmdb/server/TsmdbServer.ts +++ b/ts/tsmdb/server/TsmdbServer.ts @@ -154,6 +154,9 @@ export class TsmdbServer { } this.connections.clear(); + // Close command router (cleans up session engine, cursors, etc.) + this.commandRouter.close(); + // Close storage await this.storage.close(); diff --git a/ts/tsmdb/server/handlers/AdminHandler.ts b/ts/tsmdb/server/handlers/AdminHandler.ts index b7f1bc0..629ce6f 100644 --- a/ts/tsmdb/server/handlers/AdminHandler.ts +++ b/ts/tsmdb/server/handlers/AdminHandler.ts @@ -1,5 +1,6 @@ import * as plugins from '../../tsmdb.plugins.js'; import type { ICommandHandler, IHandlerContext } from '../CommandRouter.js'; +import { SessionEngine } from '../../engine/SessionEngine.js'; /** * AdminHandler - Handles administrative commands @@ -237,10 +238,12 @@ export class AdminHandler implements ICommandHandler { * Handle serverStatus command */ private async handleServerStatus(context: IHandlerContext): Promise { - const { server } = context; + const { server, sessionEngine } = context; const uptime = server.getUptime(); const connections = server.getConnectionCount(); + const sessions = sessionEngine.listSessions(); + const sessionsWithTxn = sessionEngine.getSessionsWithTransactions(); return { ok: 1, @@ -263,6 +266,26 @@ export class AdminHandler implements ICommandHandler { totalCreated: connections, active: connections, }, + logicalSessionRecordCache: { + activeSessionsCount: sessions.length, + sessionsCollectionJobCount: 0, + lastSessionsCollectionJobDurationMillis: 0, + lastSessionsCollectionJobTimestamp: new Date(), + transactionReaperJobCount: 0, + lastTransactionReaperJobDurationMillis: 0, + lastTransactionReaperJobTimestamp: new Date(), + }, + transactions: { + retriedCommandsCount: 0, + retriedStatementsCount: 0, + transactionsCollectionWriteCount: 0, + currentActive: sessionsWithTxn.length, + currentInactive: 0, + currentOpen: sessionsWithTxn.length, + totalStarted: sessionsWithTxn.length, + totalCommitted: 0, + totalAborted: 0, + }, network: { bytesIn: 0, bytesOut: 0, @@ -409,6 +432,17 @@ export class AdminHandler implements ICommandHandler { * Handle endSessions command */ private async handleEndSessions(context: IHandlerContext): Promise { + const { command, sessionEngine } = context; + + // End each session in the array + const sessions = command.endSessions || []; + for (const sessionSpec of sessions) { + const sessionId = SessionEngine.extractSessionId(sessionSpec); + if (sessionId) { + await sessionEngine.endSession(sessionId); + } + } + return { ok: 1 }; } @@ -416,16 +450,87 @@ export class AdminHandler implements ICommandHandler { * Handle abortTransaction command */ private async handleAbortTransaction(context: IHandlerContext): Promise { - // Transactions are not fully supported, but acknowledge the command - return { ok: 1 }; + const { transactionEngine, sessionEngine, txnId, sessionId } = context; + + if (!txnId) { + return { + ok: 0, + errmsg: 'No transaction started', + code: 251, + codeName: 'NoSuchTransaction', + }; + } + + try { + await transactionEngine.abortTransaction(txnId); + transactionEngine.endTransaction(txnId); + // Update session state + if (sessionId) { + sessionEngine.endTransaction(sessionId); + } + return { ok: 1 }; + } catch (error: any) { + return { + ok: 0, + errmsg: error.message || 'Abort transaction failed', + code: error.code || 1, + codeName: error.codeName || 'UnknownError', + }; + } } /** * Handle commitTransaction command */ private async handleCommitTransaction(context: IHandlerContext): Promise { - // Transactions are not fully supported, but acknowledge the command - return { ok: 1 }; + const { transactionEngine, sessionEngine, txnId, sessionId } = context; + + if (!txnId) { + return { + ok: 0, + errmsg: 'No transaction started', + code: 251, + codeName: 'NoSuchTransaction', + }; + } + + try { + await transactionEngine.commitTransaction(txnId); + transactionEngine.endTransaction(txnId); + // Update session state + if (sessionId) { + sessionEngine.endTransaction(sessionId); + } + return { ok: 1 }; + } catch (error: any) { + // If commit fails, transaction should be aborted + try { + await transactionEngine.abortTransaction(txnId); + transactionEngine.endTransaction(txnId); + if (sessionId) { + sessionEngine.endTransaction(sessionId); + } + } catch { + // Ignore abort errors + } + + if (error.code === 112) { + // Write conflict + return { + ok: 0, + errmsg: error.message || 'Write conflict during commit', + code: 112, + codeName: 'WriteConflict', + }; + } + + return { + ok: 0, + errmsg: error.message || 'Commit transaction failed', + code: error.code || 1, + codeName: error.codeName || 'UnknownError', + }; + } } /** diff --git a/ts/tsmdb/server/handlers/DeleteHandler.ts b/ts/tsmdb/server/handlers/DeleteHandler.ts index 4f21ea6..f2a7f86 100644 --- a/ts/tsmdb/server/handlers/DeleteHandler.ts +++ b/ts/tsmdb/server/handlers/DeleteHandler.ts @@ -1,5 +1,6 @@ import * as plugins from '../../tsmdb.plugins.js'; import type { ICommandHandler, IHandlerContext } from '../CommandRouter.js'; +import type { IStoredDocument } from '../../types/interfaces.js'; import { QueryEngine } from '../../engine/QueryEngine.js'; /** @@ -47,6 +48,8 @@ export class DeleteHandler implements ICommandHandler { return { ok: 1, n: 0 }; } + const indexEngine = context.getIndexEngine(collection); + for (let i = 0; i < deletes.length; i++) { const deleteSpec = deletes[i]; const filter = deleteSpec.q || deleteSpec.filter || {}; @@ -56,8 +59,15 @@ export class DeleteHandler implements ICommandHandler { const deleteAll = limit === 0; try { - // Get all documents - const documents = await storage.findAll(database, collection); + // Try to use index-accelerated query + const candidateIds = await indexEngine.findCandidateIds(filter); + + let documents: IStoredDocument[]; + if (candidateIds !== null) { + documents = await storage.findByIds(database, collection, candidateIds); + } else { + documents = await storage.findAll(database, collection); + } // Apply filter const matchingDocs = QueryEngine.filter(documents, filter); @@ -69,6 +79,11 @@ export class DeleteHandler implements ICommandHandler { // Determine which documents to delete const docsToDelete = deleteAll ? matchingDocs : matchingDocs.slice(0, 1); + // Update indexes for deleted documents + for (const doc of docsToDelete) { + await indexEngine.onDelete(doc as any); + } + // Delete the documents const idsToDelete = docsToDelete.map(doc => doc._id); const deleted = await storage.deleteByIds(database, collection, idsToDelete); diff --git a/ts/tsmdb/server/handlers/FindHandler.ts b/ts/tsmdb/server/handlers/FindHandler.ts index a6655e2..6a3404f 100644 --- a/ts/tsmdb/server/handlers/FindHandler.ts +++ b/ts/tsmdb/server/handlers/FindHandler.ts @@ -1,5 +1,6 @@ import * as plugins from '../../tsmdb.plugins.js'; import type { ICommandHandler, IHandlerContext, ICursorState } from '../CommandRouter.js'; +import type { IStoredDocument } from '../../types/interfaces.js'; import { QueryEngine } from '../../engine/QueryEngine.js'; /** @@ -45,7 +46,7 @@ export class FindHandler implements ICommandHandler { * Handle find command */ private async handleFind(context: IHandlerContext): Promise { - const { storage, database, command } = context; + const { storage, database, command, getIndexEngine } = context; const collection = command.find; const filter = command.filter || {}; @@ -70,11 +71,22 @@ export class FindHandler implements ICommandHandler { }; } - // Get all documents - let documents = await storage.findAll(database, collection); + // Try to use index-accelerated query + const indexEngine = getIndexEngine(collection); + const candidateIds = await indexEngine.findCandidateIds(filter); - // Apply filter - documents = QueryEngine.filter(documents, filter); + let documents: IStoredDocument[]; + if (candidateIds !== null) { + // Index hit - fetch only candidate documents + documents = await storage.findByIds(database, collection, candidateIds); + // Still apply filter for any conditions the index couldn't fully satisfy + documents = QueryEngine.filter(documents, filter); + } else { + // No suitable index - full collection scan + documents = await storage.findAll(database, collection); + // Apply filter + documents = QueryEngine.filter(documents, filter); + } // Apply sort if (sort) { @@ -233,7 +245,7 @@ export class FindHandler implements ICommandHandler { * Handle count command */ private async handleCount(context: IHandlerContext): Promise { - const { storage, database, command } = context; + const { storage, database, command, getIndexEngine } = context; const collection = command.count; const query = command.query || {}; @@ -246,11 +258,20 @@ export class FindHandler implements ICommandHandler { return { ok: 1, n: 0 }; } - // Get all documents - let documents = await storage.findAll(database, collection); + // Try to use index-accelerated query + const indexEngine = getIndexEngine(collection); + const candidateIds = await indexEngine.findCandidateIds(query); - // Apply filter - documents = QueryEngine.filter(documents, query); + let documents: IStoredDocument[]; + if (candidateIds !== null) { + // Index hit - fetch only candidate documents + documents = await storage.findByIds(database, collection, candidateIds); + documents = QueryEngine.filter(documents, query); + } else { + // No suitable index - full collection scan + documents = await storage.findAll(database, collection); + documents = QueryEngine.filter(documents, query); + } // Apply skip if (skip > 0) { @@ -269,7 +290,7 @@ export class FindHandler implements ICommandHandler { * Handle distinct command */ private async handleDistinct(context: IHandlerContext): Promise { - const { storage, database, command } = context; + const { storage, database, command, getIndexEngine } = context; const collection = command.distinct; const key = command.key; @@ -290,8 +311,16 @@ export class FindHandler implements ICommandHandler { return { ok: 1, values: [] }; } - // Get all documents - const documents = await storage.findAll(database, collection); + // Try to use index-accelerated query + const indexEngine = getIndexEngine(collection); + const candidateIds = await indexEngine.findCandidateIds(query); + + let documents: IStoredDocument[]; + if (candidateIds !== null) { + documents = await storage.findByIds(database, collection, candidateIds); + } else { + documents = await storage.findAll(database, collection); + } // Get distinct values const values = QueryEngine.distinct(documents, key, query); diff --git a/ts/tsmdb/server/handlers/InsertHandler.ts b/ts/tsmdb/server/handlers/InsertHandler.ts index f2275c1..b736439 100644 --- a/ts/tsmdb/server/handlers/InsertHandler.ts +++ b/ts/tsmdb/server/handlers/InsertHandler.ts @@ -1,5 +1,6 @@ import * as plugins from '../../tsmdb.plugins.js'; import type { ICommandHandler, IHandlerContext } from '../CommandRouter.js'; +import type { IStoredDocument } from '../../types/interfaces.js'; /** * InsertHandler - Handles insert commands @@ -42,6 +43,8 @@ export class InsertHandler implements ICommandHandler { // Ensure collection exists await storage.createCollection(database, collection); + const indexEngine = context.getIndexEngine(collection); + // Insert documents for (let i = 0; i < documents.length; i++) { const doc = documents[i]; @@ -52,6 +55,9 @@ export class InsertHandler implements ICommandHandler { doc._id = new plugins.bson.ObjectId(); } + // Check index constraints before insert (doc now has _id) + await indexEngine.onInsert(doc as IStoredDocument); + await storage.insertOne(database, collection, doc); insertedCount++; } catch (error: any) { diff --git a/ts/tsmdb/server/handlers/UpdateHandler.ts b/ts/tsmdb/server/handlers/UpdateHandler.ts index 846b447..4771157 100644 --- a/ts/tsmdb/server/handlers/UpdateHandler.ts +++ b/ts/tsmdb/server/handlers/UpdateHandler.ts @@ -1,5 +1,6 @@ import * as plugins from '../../tsmdb.plugins.js'; import type { ICommandHandler, IHandlerContext } from '../CommandRouter.js'; +import type { IStoredDocument } from '../../types/interfaces.js'; import { QueryEngine } from '../../engine/QueryEngine.js'; import { UpdateEngine } from '../../engine/UpdateEngine.js'; @@ -69,6 +70,8 @@ export class UpdateHandler implements ICommandHandler { // Ensure collection exists await storage.createCollection(database, collection); + const indexEngine = context.getIndexEngine(collection); + for (let i = 0; i < updates.length; i++) { const updateSpec = updates[i]; const filter = updateSpec.q || updateSpec.filter || {}; @@ -78,8 +81,15 @@ export class UpdateHandler implements ICommandHandler { const arrayFilters = updateSpec.arrayFilters; try { - // Get all documents - let documents = await storage.findAll(database, collection); + // Try to use index-accelerated query + const candidateIds = await indexEngine.findCandidateIds(filter); + + let documents: IStoredDocument[]; + if (candidateIds !== null) { + documents = await storage.findByIds(database, collection, candidateIds); + } else { + documents = await storage.findAll(database, collection); + } // Apply filter let matchingDocs = QueryEngine.filter(documents, filter); @@ -99,6 +109,8 @@ export class UpdateHandler implements ICommandHandler { Object.assign(updatedDoc, update.$setOnInsert); } + // Update index for the new document + await indexEngine.onInsert(updatedDoc); await storage.insertOne(database, collection, updatedDoc); totalUpserted++; upserted.push({ index: i, _id: updatedDoc._id }); @@ -113,6 +125,8 @@ export class UpdateHandler implements ICommandHandler { // Check if document actually changed const changed = JSON.stringify(doc) !== JSON.stringify(updatedDoc); if (changed) { + // Update index + await indexEngine.onUpdate(doc as any, updatedDoc); await storage.updateById(database, collection, doc._id, updatedDoc); totalModified++; } @@ -186,8 +200,17 @@ export class UpdateHandler implements ICommandHandler { // Ensure collection exists await storage.createCollection(database, collection); - // Get matching documents - let documents = await storage.findAll(database, collection); + // Try to use index-accelerated query + const indexEngine = context.getIndexEngine(collection); + const candidateIds = await indexEngine.findCandidateIds(query); + + let documents: IStoredDocument[]; + if (candidateIds !== null) { + documents = await storage.findByIds(database, collection, candidateIds); + } else { + documents = await storage.findAll(database, collection); + } + let matchingDocs = QueryEngine.filter(documents, query); // Apply sort if specified @@ -203,6 +226,8 @@ export class UpdateHandler implements ICommandHandler { return { ok: 1, value: null }; } + // Update index for delete + await indexEngine.onDelete(doc as any); await storage.deleteById(database, collection, doc._id); let result = doc; @@ -231,6 +256,8 @@ export class UpdateHandler implements ICommandHandler { // Update existing originalDoc = { ...doc }; resultDoc = UpdateEngine.applyUpdate(doc, update, arrayFilters); + // Update index + await indexEngine.onUpdate(doc as any, resultDoc as any); await storage.updateById(database, collection, doc._id, resultDoc as any); } else { // Upsert @@ -243,6 +270,8 @@ export class UpdateHandler implements ICommandHandler { Object.assign(resultDoc, update.$setOnInsert); } + // Update index for insert + await indexEngine.onInsert(resultDoc as any); await storage.insertOne(database, collection, resultDoc); } diff --git a/ts/tsmdb/storage/FileStorageAdapter.ts b/ts/tsmdb/storage/FileStorageAdapter.ts index 61aa258..bc94d64 100644 --- a/ts/tsmdb/storage/FileStorageAdapter.ts +++ b/ts/tsmdb/storage/FileStorageAdapter.ts @@ -1,6 +1,17 @@ import * as plugins from '../tsmdb.plugins.js'; import type { IStorageAdapter } from './IStorageAdapter.js'; import type { IStoredDocument, IOpLogEntry, Document } from '../types/interfaces.js'; +import { calculateDocumentChecksum, verifyChecksum } from '../utils/checksum.js'; + +/** + * File storage adapter options + */ +export interface IFileStorageAdapterOptions { + /** Enable checksum verification for data integrity */ + enableChecksums?: boolean; + /** Throw error on checksum mismatch (default: false, just log warning) */ + strictChecksums?: boolean; +} /** * File-based storage adapter for TsmDB @@ -11,9 +22,13 @@ export class FileStorageAdapter implements IStorageAdapter { private opLogCounter = 0; private initialized = false; private fs = new plugins.smartfs.SmartFs(new plugins.smartfs.SmartFsProviderNode()); + private enableChecksums: boolean; + private strictChecksums: boolean; - constructor(basePath: string) { + constructor(basePath: string, options?: IFileStorageAdapterOptions) { this.basePath = basePath; + this.enableChecksums = options?.enableChecksums ?? false; + this.strictChecksums = options?.strictChecksums ?? false; } // ============================================================================ @@ -68,6 +83,45 @@ export class FileStorageAdapter implements IStorageAdapter { return doc; } + /** + * Verify document checksum and handle errors + */ + private verifyDocumentChecksum(doc: any): boolean { + if (!this.enableChecksums || !doc._checksum) { + return true; + } + + const isValid = verifyChecksum(doc); + if (!isValid) { + const errorMsg = `Checksum mismatch for document ${doc._id}`; + if (this.strictChecksums) { + throw new Error(errorMsg); + } else { + console.warn(`WARNING: ${errorMsg}`); + } + } + return isValid; + } + + /** + * Add checksum to document before storing + */ + private prepareDocumentForStorage(doc: any): any { + if (!this.enableChecksums) { + return doc; + } + const checksum = calculateDocumentChecksum(doc); + return { ...doc, _checksum: checksum }; + } + + /** + * Remove internal checksum field before returning to user + */ + private cleanDocumentForReturn(doc: any): IStoredDocument { + const { _checksum, ...cleanDoc } = doc; + return this.restoreObjectIds(cleanDoc); + } + // ============================================================================ // Initialization // ============================================================================ @@ -233,7 +287,9 @@ export class FileStorageAdapter implements IStorageAdapter { throw new Error(`Duplicate key error: _id ${idStr}`); } - docs.push(storedDoc); + // Add checksum if enabled + const docToStore = this.prepareDocumentForStorage(storedDoc); + docs.push(docToStore); await this.writeJsonFile(collPath, docs); return storedDoc; } @@ -258,7 +314,9 @@ export class FileStorageAdapter implements IStorageAdapter { } existingIds.add(idStr); - docs.push(storedDoc); + // Add checksum if enabled + const docToStore = this.prepareDocumentForStorage(storedDoc); + docs.push(docToStore); results.push(storedDoc); } @@ -270,10 +328,33 @@ export class FileStorageAdapter implements IStorageAdapter { await this.createCollection(dbName, collName); const collPath = this.getCollectionPath(dbName, collName); const docs = await this.readJsonFile(collPath, []); - return docs.map(doc => this.restoreObjectIds(doc)); + return docs.map(doc => { + // Verify checksum if enabled + this.verifyDocumentChecksum(doc); + // Clean and return document without internal checksum field + return this.cleanDocumentForReturn(doc); + }); + } + + async findByIds(dbName: string, collName: string, ids: Set): Promise { + await this.createCollection(dbName, collName); + const collPath = this.getCollectionPath(dbName, collName); + const docs = await this.readJsonFile(collPath, []); + const results: IStoredDocument[] = []; + for (const doc of docs) { + // Verify checksum if enabled + this.verifyDocumentChecksum(doc); + // Clean and restore document + const cleaned = this.cleanDocumentForReturn(doc); + if (ids.has(cleaned._id.toHexString())) { + results.push(cleaned); + } + } + return results; } async findById(dbName: string, collName: string, id: plugins.bson.ObjectId): Promise { + // Use findAll which already handles checksum verification const docs = await this.findAll(dbName, collName); const idStr = id.toHexString(); return docs.find(d => d._id.toHexString() === idStr) || null; @@ -291,7 +372,9 @@ export class FileStorageAdapter implements IStorageAdapter { if (idx === -1) return false; - docs[idx] = doc; + // Add checksum if enabled + const docToStore = this.prepareDocumentForStorage(doc); + docs[idx] = docToStore; await this.writeJsonFile(collPath, docs); return true; } diff --git a/ts/tsmdb/storage/IStorageAdapter.ts b/ts/tsmdb/storage/IStorageAdapter.ts index 280cdcc..71ada24 100644 --- a/ts/tsmdb/storage/IStorageAdapter.ts +++ b/ts/tsmdb/storage/IStorageAdapter.ts @@ -90,6 +90,12 @@ export interface IStorageAdapter { */ findAll(dbName: string, collName: string): Promise; + /** + * Find documents by a set of _id strings (hex format) + * Used for index-accelerated queries + */ + findByIds(dbName: string, collName: string, ids: Set): Promise; + /** * Find a document by _id */ diff --git a/ts/tsmdb/storage/MemoryStorageAdapter.ts b/ts/tsmdb/storage/MemoryStorageAdapter.ts index fe67e4e..9199940 100644 --- a/ts/tsmdb/storage/MemoryStorageAdapter.ts +++ b/ts/tsmdb/storage/MemoryStorageAdapter.ts @@ -196,6 +196,18 @@ export class MemoryStorageAdapter implements IStorageAdapter { return Array.from(collection.values()); } + async findByIds(dbName: string, collName: string, ids: Set): Promise { + const collection = this.ensureCollection(dbName, collName); + const results: IStoredDocument[] = []; + for (const id of ids) { + const doc = collection.get(id); + if (doc) { + results.push(doc); + } + } + return results; + } + async findById(dbName: string, collName: string, id: plugins.bson.ObjectId): Promise { const collection = this.ensureCollection(dbName, collName); return collection.get(id.toHexString()) || null; diff --git a/ts/tsmdb/storage/WAL.ts b/ts/tsmdb/storage/WAL.ts new file mode 100644 index 0000000..36f0542 --- /dev/null +++ b/ts/tsmdb/storage/WAL.ts @@ -0,0 +1,375 @@ +import * as plugins from '../tsmdb.plugins.js'; +import type { Document, IStoredDocument } from '../types/interfaces.js'; + +/** + * WAL entry operation types + */ +export type TWalOperation = 'insert' | 'update' | 'delete' | 'checkpoint' | 'begin' | 'commit' | 'abort'; + +/** + * WAL entry structure + */ +export interface IWalEntry { + /** Log Sequence Number - monotonically increasing */ + lsn: number; + /** Timestamp of the operation */ + timestamp: number; + /** Operation type */ + operation: TWalOperation; + /** Database name */ + dbName: string; + /** Collection name */ + collName: string; + /** Document ID (hex string) */ + documentId: string; + /** Document data (BSON serialized, base64 encoded) */ + data?: string; + /** Previous document data for updates (for rollback) */ + previousData?: string; + /** Transaction ID if part of a transaction */ + txnId?: string; + /** CRC32 checksum of the entry (excluding this field) */ + checksum: number; +} + +/** + * Checkpoint record + */ +interface ICheckpointRecord { + lsn: number; + timestamp: number; + lastCommittedLsn: number; +} + +/** + * Write-Ahead Log (WAL) for durability and crash recovery + * + * The WAL ensures durability by writing operations to a log file before + * they are applied to the main storage. On crash recovery, uncommitted + * operations can be replayed to restore the database to a consistent state. + */ +export class WAL { + private walPath: string; + private currentLsn: number = 0; + private lastCheckpointLsn: number = 0; + private entries: IWalEntry[] = []; + private isInitialized: boolean = false; + private fs = new plugins.smartfs.SmartFs(new plugins.smartfs.SmartFsProviderNode()); + + // In-memory uncommitted entries per transaction + private uncommittedTxns: Map = new Map(); + + // Checkpoint interval (number of entries between checkpoints) + private checkpointInterval: number = 1000; + + constructor(walPath: string, options?: { checkpointInterval?: number }) { + this.walPath = walPath; + if (options?.checkpointInterval) { + this.checkpointInterval = options.checkpointInterval; + } + } + + /** + * Initialize the WAL, loading existing entries and recovering if needed + */ + async initialize(): Promise<{ recoveredEntries: IWalEntry[] }> { + if (this.isInitialized) { + return { recoveredEntries: [] }; + } + + // Ensure WAL directory exists + const walDir = this.walPath.substring(0, this.walPath.lastIndexOf('/')); + if (walDir) { + await this.fs.directory(walDir).recursive().create(); + } + + // Try to load existing WAL + const exists = await this.fs.file(this.walPath).exists(); + if (exists) { + const content = await this.fs.file(this.walPath).encoding('utf8').read(); + const lines = (content as string).split('\n').filter(line => line.trim()); + + for (const line of lines) { + try { + const entry = JSON.parse(line) as IWalEntry; + // Verify checksum + if (this.verifyChecksum(entry)) { + this.entries.push(entry); + if (entry.lsn > this.currentLsn) { + this.currentLsn = entry.lsn; + } + if (entry.operation === 'checkpoint') { + this.lastCheckpointLsn = entry.lsn; + } + } + } catch { + // Skip corrupted entries + console.warn('Skipping corrupted WAL entry'); + } + } + } + + this.isInitialized = true; + + // Return entries after last checkpoint that need recovery + const recoveredEntries = this.entries.filter( + e => e.lsn > this.lastCheckpointLsn && + (e.operation === 'insert' || e.operation === 'update' || e.operation === 'delete') + ); + + return { recoveredEntries }; + } + + /** + * Log an insert operation + */ + async logInsert(dbName: string, collName: string, doc: IStoredDocument, txnId?: string): Promise { + return this.appendEntry({ + operation: 'insert', + dbName, + collName, + documentId: doc._id.toHexString(), + data: this.serializeDocument(doc), + txnId, + }); + } + + /** + * Log an update operation + */ + async logUpdate( + dbName: string, + collName: string, + oldDoc: IStoredDocument, + newDoc: IStoredDocument, + txnId?: string + ): Promise { + return this.appendEntry({ + operation: 'update', + dbName, + collName, + documentId: oldDoc._id.toHexString(), + data: this.serializeDocument(newDoc), + previousData: this.serializeDocument(oldDoc), + txnId, + }); + } + + /** + * Log a delete operation + */ + async logDelete(dbName: string, collName: string, doc: IStoredDocument, txnId?: string): Promise { + return this.appendEntry({ + operation: 'delete', + dbName, + collName, + documentId: doc._id.toHexString(), + previousData: this.serializeDocument(doc), + txnId, + }); + } + + /** + * Log transaction begin + */ + async logBeginTransaction(txnId: string): Promise { + this.uncommittedTxns.set(txnId, []); + return this.appendEntry({ + operation: 'begin', + dbName: '', + collName: '', + documentId: '', + txnId, + }); + } + + /** + * Log transaction commit + */ + async logCommitTransaction(txnId: string): Promise { + this.uncommittedTxns.delete(txnId); + return this.appendEntry({ + operation: 'commit', + dbName: '', + collName: '', + documentId: '', + txnId, + }); + } + + /** + * Log transaction abort + */ + async logAbortTransaction(txnId: string): Promise { + this.uncommittedTxns.delete(txnId); + return this.appendEntry({ + operation: 'abort', + dbName: '', + collName: '', + documentId: '', + txnId, + }); + } + + /** + * Get entries to roll back for an aborted transaction + */ + getTransactionEntries(txnId: string): IWalEntry[] { + return this.entries.filter(e => e.txnId === txnId); + } + + /** + * Create a checkpoint - marks a consistent point in the log + */ + async checkpoint(): Promise { + const lsn = await this.appendEntry({ + operation: 'checkpoint', + dbName: '', + collName: '', + documentId: '', + }); + this.lastCheckpointLsn = lsn; + + // Truncate old entries (keep only entries after checkpoint) + await this.truncate(); + + return lsn; + } + + /** + * Truncate the WAL file, removing entries before the last checkpoint + */ + private async truncate(): Promise { + // Keep entries after last checkpoint + const newEntries = this.entries.filter(e => e.lsn >= this.lastCheckpointLsn); + this.entries = newEntries; + + // Rewrite the WAL file + const lines = this.entries.map(e => JSON.stringify(e)).join('\n'); + await this.fs.file(this.walPath).encoding('utf8').write(lines); + } + + /** + * Get current LSN + */ + getCurrentLsn(): number { + return this.currentLsn; + } + + /** + * Get entries after a specific LSN (for recovery) + */ + getEntriesAfter(lsn: number): IWalEntry[] { + return this.entries.filter(e => e.lsn > lsn); + } + + /** + * Close the WAL + */ + async close(): Promise { + if (this.isInitialized) { + // Final checkpoint before close + await this.checkpoint(); + } + this.isInitialized = false; + } + + // ============================================================================ + // Private Methods + // ============================================================================ + + private async appendEntry( + partial: Omit + ): Promise { + await this.initialize(); + + this.currentLsn++; + const entry: IWalEntry = { + ...partial, + lsn: this.currentLsn, + timestamp: Date.now(), + checksum: 0, // Will be calculated + }; + + // Calculate checksum + entry.checksum = this.calculateChecksum(entry); + + // Track in transaction if applicable + if (partial.txnId && this.uncommittedTxns.has(partial.txnId)) { + this.uncommittedTxns.get(partial.txnId)!.push(entry); + } + + // Add to in-memory log + this.entries.push(entry); + + // Append to file (append mode for durability) + await this.fs.file(this.walPath).encoding('utf8').append(JSON.stringify(entry) + '\n'); + + // Check if we need a checkpoint + if (this.entries.length - this.lastCheckpointLsn >= this.checkpointInterval) { + await this.checkpoint(); + } + + return entry.lsn; + } + + private serializeDocument(doc: Document): string { + // Serialize document to BSON and encode as base64 + const bsonData = plugins.bson.serialize(doc); + return Buffer.from(bsonData).toString('base64'); + } + + private deserializeDocument(data: string): Document { + // Decode base64 and deserialize from BSON + const buffer = Buffer.from(data, 'base64'); + return plugins.bson.deserialize(buffer); + } + + private calculateChecksum(entry: IWalEntry): number { + // Simple CRC32-like checksum + const str = JSON.stringify({ + lsn: entry.lsn, + timestamp: entry.timestamp, + operation: entry.operation, + dbName: entry.dbName, + collName: entry.collName, + documentId: entry.documentId, + data: entry.data, + previousData: entry.previousData, + txnId: entry.txnId, + }); + + let crc = 0xFFFFFFFF; + for (let i = 0; i < str.length; i++) { + crc ^= str.charCodeAt(i); + for (let j = 0; j < 8; j++) { + crc = (crc >>> 1) ^ (crc & 1 ? 0xEDB88320 : 0); + } + } + return (~crc) >>> 0; + } + + private verifyChecksum(entry: IWalEntry): boolean { + const savedChecksum = entry.checksum; + entry.checksum = 0; + const calculatedChecksum = this.calculateChecksum(entry); + entry.checksum = savedChecksum; + return calculatedChecksum === savedChecksum; + } + + /** + * Recover document from WAL entry + */ + recoverDocument(entry: IWalEntry): IStoredDocument | null { + if (!entry.data) return null; + return this.deserializeDocument(entry.data) as IStoredDocument; + } + + /** + * Recover previous document state from WAL entry (for rollback) + */ + recoverPreviousDocument(entry: IWalEntry): IStoredDocument | null { + if (!entry.previousData) return null; + return this.deserializeDocument(entry.previousData) as IStoredDocument; + } +} diff --git a/ts/tsmdb/utils/checksum.ts b/ts/tsmdb/utils/checksum.ts new file mode 100644 index 0000000..c315b53 --- /dev/null +++ b/ts/tsmdb/utils/checksum.ts @@ -0,0 +1,88 @@ +/** + * CRC32 checksum utilities for data integrity + */ + +// CRC32 lookup table +const CRC32_TABLE: number[] = []; + +// Initialize the CRC32 table +function initCRC32Table(): void { + if (CRC32_TABLE.length > 0) return; + + for (let i = 0; i < 256; i++) { + let crc = i; + for (let j = 0; j < 8; j++) { + crc = (crc & 1) ? (0xEDB88320 ^ (crc >>> 1)) : (crc >>> 1); + } + CRC32_TABLE[i] = crc >>> 0; + } +} + +/** + * Calculate CRC32 checksum for a string + */ +export function calculateCRC32(data: string): number { + initCRC32Table(); + + let crc = 0xFFFFFFFF; + for (let i = 0; i < data.length; i++) { + const byte = data.charCodeAt(i) & 0xFF; + crc = CRC32_TABLE[(crc ^ byte) & 0xFF] ^ (crc >>> 8); + } + return (~crc) >>> 0; +} + +/** + * Calculate CRC32 checksum for a Buffer + */ +export function calculateCRC32Buffer(data: Buffer): number { + initCRC32Table(); + + let crc = 0xFFFFFFFF; + for (let i = 0; i < data.length; i++) { + crc = CRC32_TABLE[(crc ^ data[i]) & 0xFF] ^ (crc >>> 8); + } + return (~crc) >>> 0; +} + +/** + * Calculate checksum for a document (serialized as JSON) + */ +export function calculateDocumentChecksum(doc: Record): number { + // Exclude _checksum field from calculation + const { _checksum, ...docWithoutChecksum } = doc; + const json = JSON.stringify(docWithoutChecksum); + return calculateCRC32(json); +} + +/** + * Add checksum to a document + */ +export function addChecksum>(doc: T): T & { _checksum: number } { + const checksum = calculateDocumentChecksum(doc); + return { ...doc, _checksum: checksum }; +} + +/** + * Verify checksum of a document + * Returns true if checksum is valid or if document has no checksum + */ +export function verifyChecksum(doc: Record): boolean { + if (!('_checksum' in doc)) { + // No checksum to verify + return true; + } + + const storedChecksum = doc._checksum; + const calculatedChecksum = calculateDocumentChecksum(doc); + + return storedChecksum === calculatedChecksum; +} + +/** + * Remove checksum from a document + */ +export function removeChecksum>(doc: T): Omit { + const { _checksum, ...docWithoutChecksum } = doc; + return docWithoutChecksum as Omit; +} diff --git a/ts/tsmdb/utils/index.ts b/ts/tsmdb/utils/index.ts new file mode 100644 index 0000000..97007c6 --- /dev/null +++ b/ts/tsmdb/utils/index.ts @@ -0,0 +1 @@ +export * from './checksum.js';