Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e3dc19aa7c | |||
| 316af45b5e | |||
| 6932059965 | |||
| bd1764159e |
22
changelog.md
22
changelog.md
@@ -1,5 +1,27 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-02-01 - 4.1.0 - feat(readme)
|
||||
expand README with storage integrity, WAL, query planner, session & transaction docs; update test script to enable verbose logging and increase timeout
|
||||
|
||||
- Updated npm test script to run tstest with --verbose, --logfile and --timeout 60 to improve test output and avoid timeouts.
|
||||
- Extensive README additions: file storage adapter examples with checksum options, write-ahead logging (WAL) usage and recovery, query planner examples, index and query execution details, session and transaction examples and features.
|
||||
- Wire protocol / features table updated to include Transactions and Sessions and added admin commands (dbStats, collStats).
|
||||
- Architecture diagram and component list updated to include QueryPlanner, SessionEngine, TransactionEngine and WAL; storage layer annotated with checksums and WAL.
|
||||
- Minor example import tweak: MongoClient import now includes Db type in test examples.
|
||||
|
||||
## 2026-02-01 - 4.0.0 - BREAKING CHANGE(storage,engine,server)
|
||||
add session & transaction management, index/query planner, WAL and checksum support; integrate index-accelerated queries and update storage API (findByIds) to enable index optimizations
|
||||
|
||||
- Add SessionEngine with session lifecycle, auto-abort of transactions on expiry and session tracking in CommandRouter and AdminHandler.
|
||||
- Introduce TransactionEngine integrations in CommandRouter and AdminHandler; handlers now support start/commit/abort transaction workflows.
|
||||
- Add IndexEngine enhancements including a simple B-tree and hash map optimizations; integrate index usage into Find/Count/Insert/Update/Delete handlers for index-accelerated queries and index maintenance on mutations.
|
||||
- Add QueryPlanner to choose IXSCAN vs COLLSCAN and provide explain plans.
|
||||
- Add WAL (write-ahead log) for durability, with LSNs, checkpoints and recovery APIs.
|
||||
- Add checksum utilities and FileStorageAdapter support for checksums (enableChecksums/strictChecksums), with verification on read and optional strict failure behavior.
|
||||
- IStorageAdapter interface changed to include findByIds; MemoryStorageAdapter and FileStorageAdapter implement findByIds to support index lookups.
|
||||
- Exported API additions: WAL, QueryPlanner, SessionEngine, checksum utilities; CommandRouter now caches IndexEngines and exposes transaction/session engines.
|
||||
- Breaking change: the IStorageAdapter interface change requires third-party storage adapters to implement the new findByIds method.
|
||||
|
||||
## 2026-02-01 - 3.0.0 - BREAKING CHANGE(tsmdb)
|
||||
rename CongoDB to TsmDB and relocate/rename wire-protocol server implementation and public exports
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartmongo",
|
||||
"version": "3.0.0",
|
||||
"version": "4.1.0",
|
||||
"private": false,
|
||||
"description": "A module for creating and managing a local MongoDB instance for testing purposes.",
|
||||
"main": "dist_ts/index.js",
|
||||
@@ -9,7 +9,7 @@
|
||||
"author": "Lossless GmbH",
|
||||
"license": "MIT",
|
||||
"scripts": {
|
||||
"test": "(tstest test/)",
|
||||
"test": "(tstest test/. --verbose --logfile --timeout 60)",
|
||||
"build": "(tsbuild --web)",
|
||||
"buildDocs": "tsdoc"
|
||||
},
|
||||
|
||||
157
readme.md
157
readme.md
@@ -134,8 +134,8 @@ await server.start();
|
||||
console.log(server.getConnectionUri()); // mongodb://127.0.0.1:27017
|
||||
|
||||
// Server properties
|
||||
console.log(server.running); // true
|
||||
console.log(server.getUptime()); // seconds
|
||||
console.log(server.running); // true
|
||||
console.log(server.getUptime()); // seconds
|
||||
console.log(server.getConnectionCount()); // active connections
|
||||
|
||||
await server.stop();
|
||||
@@ -279,7 +279,7 @@ console.log(result.deletedCount); // 1
|
||||
|
||||
### Storage Adapters
|
||||
|
||||
TsmDB supports pluggable storage:
|
||||
TsmDB supports pluggable storage with data integrity features:
|
||||
|
||||
```typescript
|
||||
// In-memory (default) - fast, data lost on stop
|
||||
@@ -292,13 +292,118 @@ const server = new tsmdb.TsmdbServer({
|
||||
persistIntervalMs: 30000 // Save every 30 seconds
|
||||
});
|
||||
|
||||
// File-based - persistent storage
|
||||
const server = new tsmdb.TsmdbServer({
|
||||
storage: 'file',
|
||||
storagePath: './data/tsmdb'
|
||||
// File-based - persistent storage with optional checksums
|
||||
import { FileStorageAdapter } from '@push.rocks/smartmongo/tsmdb';
|
||||
|
||||
const adapter = new FileStorageAdapter('./data/tsmdb', {
|
||||
enableChecksums: true, // CRC32 checksums for data integrity
|
||||
strictChecksums: false // Log warnings vs throw on mismatch
|
||||
});
|
||||
```
|
||||
|
||||
## ⚡ Performance & Reliability Features
|
||||
|
||||
TsmDB includes enterprise-grade features for robustness:
|
||||
|
||||
### 🔍 Index-Accelerated Queries
|
||||
|
||||
Indexes are automatically used to accelerate queries. Instead of scanning all documents, TsmDB uses:
|
||||
|
||||
- **Hash indexes** for equality queries (`$eq`, `$in`)
|
||||
- **B-tree indexes** for range queries (`$gt`, `$gte`, `$lt`, `$lte`)
|
||||
|
||||
```typescript
|
||||
// Create an index
|
||||
await collection.createIndex({ email: 1 });
|
||||
await collection.createIndex({ age: 1 });
|
||||
|
||||
// These queries will use the index (fast!)
|
||||
await collection.findOne({ email: 'alice@example.com' }); // Uses hash lookup
|
||||
await collection.find({ age: { $gte: 18, $lt: 65 } }); // Uses B-tree range scan
|
||||
```
|
||||
|
||||
### 📊 Query Planner
|
||||
|
||||
TsmDB includes a query planner that analyzes queries and selects optimal execution strategies:
|
||||
|
||||
```typescript
|
||||
import { tsmdb } from '@push.rocks/smartmongo';
|
||||
|
||||
// For debugging, you can access the query planner
|
||||
const planner = new tsmdb.QueryPlanner(indexEngine);
|
||||
const plan = planner.createPlan(filter);
|
||||
|
||||
console.log(plan);
|
||||
// {
|
||||
// type: 'IXSCAN', // or 'IXSCAN_RANGE', 'COLLSCAN'
|
||||
// indexName: 'email_1',
|
||||
// estimatedCost: 1,
|
||||
// selectivity: 0.001
|
||||
// }
|
||||
```
|
||||
|
||||
### 📝 Write-Ahead Logging (WAL)
|
||||
|
||||
For durability, TsmDB supports write-ahead logging:
|
||||
|
||||
```typescript
|
||||
import { tsmdb } from '@push.rocks/smartmongo';
|
||||
|
||||
const wal = new tsmdb.WAL('./data/wal.log');
|
||||
await wal.initialize();
|
||||
|
||||
// WAL entries include:
|
||||
// - LSN (Log Sequence Number)
|
||||
// - Timestamp
|
||||
// - Operation type (insert, update, delete, checkpoint)
|
||||
// - Document data (BSON serialized)
|
||||
// - CRC32 checksum
|
||||
|
||||
// Recovery support
|
||||
const entries = await wal.getEntriesAfter(lastCheckpointLsn);
|
||||
```
|
||||
|
||||
### 🔐 Session Management
|
||||
|
||||
TsmDB tracks client sessions with automatic timeout and transaction linking:
|
||||
|
||||
```typescript
|
||||
// Sessions are automatically managed when using the MongoDB driver
|
||||
const session = client.startSession();
|
||||
|
||||
try {
|
||||
session.startTransaction();
|
||||
await collection.insertOne({ name: 'Alice' }, { session });
|
||||
await collection.updateOne({ name: 'Bob' }, { $inc: { balance: 100 } }, { session });
|
||||
await session.commitTransaction();
|
||||
} catch (error) {
|
||||
await session.abortTransaction();
|
||||
} finally {
|
||||
session.endSession();
|
||||
}
|
||||
|
||||
// Session features:
|
||||
// - Automatic session timeout (30 minutes default)
|
||||
// - Transaction auto-abort on session expiry
|
||||
// - Session activity tracking
|
||||
```
|
||||
|
||||
### ✅ Data Integrity Checksums
|
||||
|
||||
File-based storage supports CRC32 checksums to detect corruption:
|
||||
|
||||
```typescript
|
||||
import { FileStorageAdapter } from '@push.rocks/smartmongo/tsmdb';
|
||||
|
||||
const adapter = new FileStorageAdapter('./data', {
|
||||
enableChecksums: true,
|
||||
strictChecksums: true // Throw error on corruption (vs warning)
|
||||
});
|
||||
|
||||
// Documents are checksummed on write, verified on read
|
||||
// Checksums are automatically stripped before returning to client
|
||||
```
|
||||
|
||||
### 📋 Supported Wire Protocol Commands
|
||||
|
||||
| Category | Commands |
|
||||
@@ -307,7 +412,9 @@ const server = new tsmdb.TsmdbServer({
|
||||
| **CRUD** | `find`, `insert`, `update`, `delete`, `findAndModify`, `getMore`, `killCursors` |
|
||||
| **Aggregation** | `aggregate`, `count`, `distinct` |
|
||||
| **Indexes** | `createIndexes`, `dropIndexes`, `listIndexes` |
|
||||
| **Admin** | `ping`, `listDatabases`, `listCollections`, `drop`, `dropDatabase`, `create`, `serverStatus`, `buildInfo` |
|
||||
| **Transactions** | `startTransaction`, `commitTransaction`, `abortTransaction` |
|
||||
| **Sessions** | `startSession`, `endSessions` |
|
||||
| **Admin** | `ping`, `listDatabases`, `listCollections`, `drop`, `dropDatabase`, `create`, `serverStatus`, `buildInfo`, `dbStats`, `collStats` |
|
||||
|
||||
TsmDB supports MongoDB wire protocol versions 0-21, compatible with MongoDB 3.6 through 7.0 drivers.
|
||||
|
||||
@@ -317,7 +424,7 @@ TsmDB supports MongoDB wire protocol versions 0-21, compatible with MongoDB 3.6
|
||||
|
||||
```typescript
|
||||
import { tsmdb } from '@push.rocks/smartmongo';
|
||||
import { MongoClient } from 'mongodb';
|
||||
import { MongoClient, Db } from 'mongodb';
|
||||
|
||||
let server: tsmdb.TsmdbServer;
|
||||
let client: MongoClient;
|
||||
@@ -421,21 +528,37 @@ export default tap.start();
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ Engines │
|
||||
│ ┌───────────┐ ┌────────────┐ ┌────────────┐ ┌───────────┐ │
|
||||
│ │ Query │ │ Update │ │Aggregation │ │ Index │ │
|
||||
│ │ Engine │ │ Engine │ │ Engine │ │ Engine │ │
|
||||
│ └───────────┘ └────────────┘ └────────────┘ └───────────┘ │
|
||||
│ ┌─────────┐ ┌────────┐ ┌───────────┐ ┌───────┐ ┌───────┐ │
|
||||
│ │ Query │ │ Update │ │Aggregation│ │ Index │ │Session│ │
|
||||
│ │ Planner │ │ Engine │ │ Engine │ │Engine │ │Engine │ │
|
||||
│ └─────────┘ └────────┘ └───────────┘ └───────┘ └───────┘ │
|
||||
│ ┌──────────────────────┐ │
|
||||
│ │ Transaction Engine │ │
|
||||
│ └──────────────────────┘ │
|
||||
└─────────────────────────┬───────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ Storage Adapters │
|
||||
│ ┌──────────────────┐ ┌──────────────────┐ │
|
||||
│ │ MemoryStorage │ │ FileStorage │ │
|
||||
│ └──────────────────┘ └──────────────────┘ │
|
||||
│ Storage Layer │
|
||||
│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────┐ │
|
||||
│ │ MemoryStorage │ │ FileStorage │ │ WAL │ │
|
||||
│ │ │ │ (+ Checksums) │ │ │ │
|
||||
│ └──────────────────┘ └──────────────────┘ └──────────┘ │
|
||||
└─────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### Key Components
|
||||
|
||||
| Component | Description |
|
||||
|-----------|-------------|
|
||||
| **WireProtocol** | Parses MongoDB OP_MSG binary protocol |
|
||||
| **CommandRouter** | Routes commands to appropriate handlers |
|
||||
| **QueryPlanner** | Analyzes queries and selects execution strategy |
|
||||
| **IndexEngine** | Manages B-tree and hash indexes |
|
||||
| **SessionEngine** | Tracks client sessions and timeouts |
|
||||
| **TransactionEngine** | Handles ACID transaction semantics |
|
||||
| **WAL** | Write-ahead logging for durability |
|
||||
|
||||
## License and Legal Information
|
||||
|
||||
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartmongo',
|
||||
version: '3.0.0',
|
||||
version: '4.1.0',
|
||||
description: 'A module for creating and managing a local MongoDB instance for testing purposes.'
|
||||
}
|
||||
|
||||
@@ -1,5 +1,89 @@
|
||||
import * as plugins from '../tsmdb.plugins.js';
|
||||
import type { IStorageAdapter } from '../storage/IStorageAdapter.js';
|
||||
|
||||
// Simple B-Tree implementation for range queries
|
||||
// Since sorted-btree has ESM/CJS interop issues, we use a simple custom implementation
|
||||
class SimpleBTree<K, V> {
|
||||
private entries: Map<string, { key: K; value: V }> = new Map();
|
||||
private sortedKeys: K[] = [];
|
||||
private comparator: (a: K, b: K) => number;
|
||||
|
||||
constructor(_unused?: undefined, comparator?: (a: K, b: K) => number) {
|
||||
this.comparator = comparator || ((a: K, b: K) => {
|
||||
if (a < b) return -1;
|
||||
if (a > b) return 1;
|
||||
return 0;
|
||||
});
|
||||
}
|
||||
|
||||
private keyToString(key: K): string {
|
||||
return JSON.stringify(key);
|
||||
}
|
||||
|
||||
set(key: K, value: V): boolean {
|
||||
const keyStr = this.keyToString(key);
|
||||
const existed = this.entries.has(keyStr);
|
||||
this.entries.set(keyStr, { key, value });
|
||||
|
||||
if (!existed) {
|
||||
// Insert in sorted order
|
||||
const idx = this.sortedKeys.findIndex(k => this.comparator(k, key) > 0);
|
||||
if (idx === -1) {
|
||||
this.sortedKeys.push(key);
|
||||
} else {
|
||||
this.sortedKeys.splice(idx, 0, key);
|
||||
}
|
||||
}
|
||||
return !existed;
|
||||
}
|
||||
|
||||
get(key: K): V | undefined {
|
||||
const entry = this.entries.get(this.keyToString(key));
|
||||
return entry?.value;
|
||||
}
|
||||
|
||||
delete(key: K): boolean {
|
||||
const keyStr = this.keyToString(key);
|
||||
if (this.entries.has(keyStr)) {
|
||||
this.entries.delete(keyStr);
|
||||
const idx = this.sortedKeys.findIndex(k => this.comparator(k, key) === 0);
|
||||
if (idx !== -1) {
|
||||
this.sortedKeys.splice(idx, 1);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
forRange(
|
||||
lowKey: K | undefined,
|
||||
highKey: K | undefined,
|
||||
lowInclusive: boolean,
|
||||
highInclusive: boolean,
|
||||
callback: (value: V, key: K) => void
|
||||
): void {
|
||||
for (const key of this.sortedKeys) {
|
||||
// Check low bound
|
||||
if (lowKey !== undefined) {
|
||||
const cmp = this.comparator(key, lowKey);
|
||||
if (cmp < 0) continue;
|
||||
if (cmp === 0 && !lowInclusive) continue;
|
||||
}
|
||||
|
||||
// Check high bound
|
||||
if (highKey !== undefined) {
|
||||
const cmp = this.comparator(key, highKey);
|
||||
if (cmp > 0) break;
|
||||
if (cmp === 0 && !highInclusive) break;
|
||||
}
|
||||
|
||||
const entry = this.entries.get(this.keyToString(key));
|
||||
if (entry) {
|
||||
callback(entry.value, key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
import type {
|
||||
Document,
|
||||
IStoredDocument,
|
||||
@@ -11,7 +95,61 @@ import { TsmdbDuplicateKeyError, TsmdbIndexError } from '../errors/TsmdbErrors.j
|
||||
import { QueryEngine } from './QueryEngine.js';
|
||||
|
||||
/**
|
||||
* Index data structure for fast lookups
|
||||
* Comparator for B-Tree that handles mixed types consistently
|
||||
*/
|
||||
function indexKeyComparator(a: any, b: any): number {
|
||||
// Handle null/undefined
|
||||
if (a === null || a === undefined) {
|
||||
if (b === null || b === undefined) return 0;
|
||||
return -1;
|
||||
}
|
||||
if (b === null || b === undefined) return 1;
|
||||
|
||||
// Handle arrays (compound keys)
|
||||
if (Array.isArray(a) && Array.isArray(b)) {
|
||||
for (let i = 0; i < Math.max(a.length, b.length); i++) {
|
||||
const cmp = indexKeyComparator(a[i], b[i]);
|
||||
if (cmp !== 0) return cmp;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Handle ObjectId
|
||||
if (a instanceof plugins.bson.ObjectId && b instanceof plugins.bson.ObjectId) {
|
||||
return a.toHexString().localeCompare(b.toHexString());
|
||||
}
|
||||
|
||||
// Handle Date
|
||||
if (a instanceof Date && b instanceof Date) {
|
||||
return a.getTime() - b.getTime();
|
||||
}
|
||||
|
||||
// Handle different types - use type ordering (null < number < string < object)
|
||||
const typeOrder = (v: any): number => {
|
||||
if (v === null || v === undefined) return 0;
|
||||
if (typeof v === 'number') return 1;
|
||||
if (typeof v === 'string') return 2;
|
||||
if (typeof v === 'boolean') return 3;
|
||||
if (v instanceof Date) return 4;
|
||||
if (v instanceof plugins.bson.ObjectId) return 5;
|
||||
return 6;
|
||||
};
|
||||
|
||||
const typeA = typeOrder(a);
|
||||
const typeB = typeOrder(b);
|
||||
if (typeA !== typeB) return typeA - typeB;
|
||||
|
||||
// Same type comparison
|
||||
if (typeof a === 'number') return a - b;
|
||||
if (typeof a === 'string') return a.localeCompare(b);
|
||||
if (typeof a === 'boolean') return (a ? 1 : 0) - (b ? 1 : 0);
|
||||
|
||||
// Fallback to string comparison
|
||||
return String(a).localeCompare(String(b));
|
||||
}
|
||||
|
||||
/**
|
||||
* Index data structure using B-Tree for range queries
|
||||
*/
|
||||
interface IIndexData {
|
||||
name: string;
|
||||
@@ -19,8 +157,10 @@ interface IIndexData {
|
||||
unique: boolean;
|
||||
sparse: boolean;
|
||||
expireAfterSeconds?: number;
|
||||
// Map from index key value to document _id(s)
|
||||
entries: Map<string, Set<string>>;
|
||||
// B-Tree for ordered index lookups (supports range queries)
|
||||
btree: SimpleBTree<any, Set<string>>;
|
||||
// Hash map for fast equality lookups
|
||||
hashMap: Map<string, Set<string>>;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -55,7 +195,8 @@ export class IndexEngine {
|
||||
unique: indexSpec.unique || false,
|
||||
sparse: indexSpec.sparse || false,
|
||||
expireAfterSeconds: indexSpec.expireAfterSeconds,
|
||||
entries: new Map(),
|
||||
btree: new SimpleBTree<any, Set<string>>(undefined, indexKeyComparator),
|
||||
hashMap: new Map(),
|
||||
};
|
||||
|
||||
// Build index entries
|
||||
@@ -63,10 +204,20 @@ export class IndexEngine {
|
||||
const keyValue = this.extractKeyValue(doc, indexSpec.key);
|
||||
if (keyValue !== null || !indexData.sparse) {
|
||||
const keyStr = JSON.stringify(keyValue);
|
||||
if (!indexData.entries.has(keyStr)) {
|
||||
indexData.entries.set(keyStr, new Set());
|
||||
|
||||
// Add to hash map
|
||||
if (!indexData.hashMap.has(keyStr)) {
|
||||
indexData.hashMap.set(keyStr, new Set());
|
||||
}
|
||||
indexData.hashMap.get(keyStr)!.add(doc._id.toHexString());
|
||||
|
||||
// Add to B-tree
|
||||
const existing = indexData.btree.get(keyValue);
|
||||
if (existing) {
|
||||
existing.add(doc._id.toHexString());
|
||||
} else {
|
||||
indexData.btree.set(keyValue, new Set([doc._id.toHexString()]));
|
||||
}
|
||||
indexData.entries.get(keyStr)!.add(doc._id.toHexString());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -100,7 +251,8 @@ export class IndexEngine {
|
||||
unique: options?.unique || false,
|
||||
sparse: options?.sparse || false,
|
||||
expireAfterSeconds: options?.expireAfterSeconds,
|
||||
entries: new Map(),
|
||||
btree: new SimpleBTree<any, Set<string>>(undefined, indexKeyComparator),
|
||||
hashMap: new Map(),
|
||||
};
|
||||
|
||||
// Build index from existing documents
|
||||
@@ -115,7 +267,7 @@ export class IndexEngine {
|
||||
|
||||
const keyStr = JSON.stringify(keyValue);
|
||||
|
||||
if (indexData.unique && indexData.entries.has(keyStr)) {
|
||||
if (indexData.unique && indexData.hashMap.has(keyStr)) {
|
||||
throw new TsmdbDuplicateKeyError(
|
||||
`E11000 duplicate key error index: ${this.dbName}.${this.collName}.$${name}`,
|
||||
key as Record<string, 1>,
|
||||
@@ -123,10 +275,19 @@ export class IndexEngine {
|
||||
);
|
||||
}
|
||||
|
||||
if (!indexData.entries.has(keyStr)) {
|
||||
indexData.entries.set(keyStr, new Set());
|
||||
// Add to hash map
|
||||
if (!indexData.hashMap.has(keyStr)) {
|
||||
indexData.hashMap.set(keyStr, new Set());
|
||||
}
|
||||
indexData.hashMap.get(keyStr)!.add(doc._id.toHexString());
|
||||
|
||||
// Add to B-tree
|
||||
const existing = indexData.btree.get(keyValue);
|
||||
if (existing) {
|
||||
existing.add(doc._id.toHexString());
|
||||
} else {
|
||||
indexData.btree.set(keyValue, new Set([doc._id.toHexString()]));
|
||||
}
|
||||
indexData.entries.get(keyStr)!.add(doc._id.toHexString());
|
||||
}
|
||||
|
||||
// Store index
|
||||
@@ -213,7 +374,7 @@ export class IndexEngine {
|
||||
|
||||
// Check unique constraint
|
||||
if (indexData.unique) {
|
||||
const existing = indexData.entries.get(keyStr);
|
||||
const existing = indexData.hashMap.get(keyStr);
|
||||
if (existing && existing.size > 0) {
|
||||
throw new TsmdbDuplicateKeyError(
|
||||
`E11000 duplicate key error collection: ${this.dbName}.${this.collName} index: ${name}`,
|
||||
@@ -223,10 +384,19 @@ export class IndexEngine {
|
||||
}
|
||||
}
|
||||
|
||||
if (!indexData.entries.has(keyStr)) {
|
||||
indexData.entries.set(keyStr, new Set());
|
||||
// Add to hash map
|
||||
if (!indexData.hashMap.has(keyStr)) {
|
||||
indexData.hashMap.set(keyStr, new Set());
|
||||
}
|
||||
indexData.hashMap.get(keyStr)!.add(doc._id.toHexString());
|
||||
|
||||
// Add to B-tree
|
||||
const btreeSet = indexData.btree.get(keyValue);
|
||||
if (btreeSet) {
|
||||
btreeSet.add(doc._id.toHexString());
|
||||
} else {
|
||||
indexData.btree.set(keyValue, new Set([doc._id.toHexString()]));
|
||||
}
|
||||
indexData.entries.get(keyStr)!.add(doc._id.toHexString());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -245,11 +415,21 @@ export class IndexEngine {
|
||||
// Remove old entry if key changed
|
||||
if (oldKeyStr !== newKeyStr) {
|
||||
if (oldKeyValue !== null || !indexData.sparse) {
|
||||
const oldSet = indexData.entries.get(oldKeyStr);
|
||||
if (oldSet) {
|
||||
oldSet.delete(oldDoc._id.toHexString());
|
||||
if (oldSet.size === 0) {
|
||||
indexData.entries.delete(oldKeyStr);
|
||||
// Remove from hash map
|
||||
const oldHashSet = indexData.hashMap.get(oldKeyStr);
|
||||
if (oldHashSet) {
|
||||
oldHashSet.delete(oldDoc._id.toHexString());
|
||||
if (oldHashSet.size === 0) {
|
||||
indexData.hashMap.delete(oldKeyStr);
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from B-tree
|
||||
const oldBtreeSet = indexData.btree.get(oldKeyValue);
|
||||
if (oldBtreeSet) {
|
||||
oldBtreeSet.delete(oldDoc._id.toHexString());
|
||||
if (oldBtreeSet.size === 0) {
|
||||
indexData.btree.delete(oldKeyValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -258,7 +438,7 @@ export class IndexEngine {
|
||||
if (newKeyValue !== null || !indexData.sparse) {
|
||||
// Check unique constraint
|
||||
if (indexData.unique) {
|
||||
const existing = indexData.entries.get(newKeyStr);
|
||||
const existing = indexData.hashMap.get(newKeyStr);
|
||||
if (existing && existing.size > 0) {
|
||||
throw new TsmdbDuplicateKeyError(
|
||||
`E11000 duplicate key error collection: ${this.dbName}.${this.collName} index: ${name}`,
|
||||
@@ -268,10 +448,19 @@ export class IndexEngine {
|
||||
}
|
||||
}
|
||||
|
||||
if (!indexData.entries.has(newKeyStr)) {
|
||||
indexData.entries.set(newKeyStr, new Set());
|
||||
// Add to hash map
|
||||
if (!indexData.hashMap.has(newKeyStr)) {
|
||||
indexData.hashMap.set(newKeyStr, new Set());
|
||||
}
|
||||
indexData.hashMap.get(newKeyStr)!.add(newDoc._id.toHexString());
|
||||
|
||||
// Add to B-tree
|
||||
const newBtreeSet = indexData.btree.get(newKeyValue);
|
||||
if (newBtreeSet) {
|
||||
newBtreeSet.add(newDoc._id.toHexString());
|
||||
} else {
|
||||
indexData.btree.set(newKeyValue, new Set([newDoc._id.toHexString()]));
|
||||
}
|
||||
indexData.entries.get(newKeyStr)!.add(newDoc._id.toHexString());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -291,11 +480,22 @@ export class IndexEngine {
|
||||
}
|
||||
|
||||
const keyStr = JSON.stringify(keyValue);
|
||||
const set = indexData.entries.get(keyStr);
|
||||
if (set) {
|
||||
set.delete(doc._id.toHexString());
|
||||
if (set.size === 0) {
|
||||
indexData.entries.delete(keyStr);
|
||||
|
||||
// Remove from hash map
|
||||
const hashSet = indexData.hashMap.get(keyStr);
|
||||
if (hashSet) {
|
||||
hashSet.delete(doc._id.toHexString());
|
||||
if (hashSet.size === 0) {
|
||||
indexData.hashMap.delete(keyStr);
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from B-tree
|
||||
const btreeSet = indexData.btree.get(keyValue);
|
||||
if (btreeSet) {
|
||||
btreeSet.delete(doc._id.toHexString());
|
||||
if (btreeSet.size === 0) {
|
||||
indexData.btree.delete(keyValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -309,8 +509,8 @@ export class IndexEngine {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Get filter fields
|
||||
const filterFields = new Set(this.getFilterFields(filter));
|
||||
// Get filter fields and operators
|
||||
const filterInfo = this.analyzeFilter(filter);
|
||||
|
||||
// Score each index
|
||||
let bestIndex: { name: string; data: IIndexData } | null = null;
|
||||
@@ -320,12 +520,21 @@ export class IndexEngine {
|
||||
const indexFields = Object.keys(indexData.key);
|
||||
let score = 0;
|
||||
|
||||
// Count how many index fields are in the filter
|
||||
// Count how many index fields can be used
|
||||
for (const field of indexFields) {
|
||||
if (filterFields.has(field)) {
|
||||
score++;
|
||||
const info = filterInfo.get(field);
|
||||
if (!info) break;
|
||||
|
||||
// Equality is best
|
||||
if (info.equality) {
|
||||
score += 2;
|
||||
} else if (info.range) {
|
||||
// Range queries can use B-tree
|
||||
score += 1;
|
||||
} else if (info.in) {
|
||||
score += 1.5;
|
||||
} else {
|
||||
break; // Index fields must be contiguous
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -344,7 +553,46 @@ export class IndexEngine {
|
||||
}
|
||||
|
||||
/**
|
||||
* Use index to find candidate document IDs
|
||||
* Analyze filter to extract field operators
|
||||
*/
|
||||
private analyzeFilter(filter: Document): Map<string, { equality: boolean; range: boolean; in: boolean; ops: Record<string, any> }> {
|
||||
const result = new Map<string, { equality: boolean; range: boolean; in: boolean; ops: Record<string, any> }>();
|
||||
|
||||
for (const [key, value] of Object.entries(filter)) {
|
||||
if (key.startsWith('$')) continue;
|
||||
|
||||
const info = { equality: false, range: false, in: false, ops: {} as Record<string, any> };
|
||||
|
||||
if (typeof value !== 'object' || value === null || value instanceof plugins.bson.ObjectId || value instanceof Date) {
|
||||
info.equality = true;
|
||||
info.ops['$eq'] = value;
|
||||
} else {
|
||||
const ops = value as Record<string, any>;
|
||||
if (ops.$eq !== undefined) {
|
||||
info.equality = true;
|
||||
info.ops['$eq'] = ops.$eq;
|
||||
}
|
||||
if (ops.$in !== undefined) {
|
||||
info.in = true;
|
||||
info.ops['$in'] = ops.$in;
|
||||
}
|
||||
if (ops.$gt !== undefined || ops.$gte !== undefined || ops.$lt !== undefined || ops.$lte !== undefined) {
|
||||
info.range = true;
|
||||
if (ops.$gt !== undefined) info.ops['$gt'] = ops.$gt;
|
||||
if (ops.$gte !== undefined) info.ops['$gte'] = ops.$gte;
|
||||
if (ops.$lt !== undefined) info.ops['$lt'] = ops.$lt;
|
||||
if (ops.$lte !== undefined) info.ops['$lte'] = ops.$lte;
|
||||
}
|
||||
}
|
||||
|
||||
result.set(key, info);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Use index to find candidate document IDs (supports range queries with B-tree)
|
||||
*/
|
||||
async findCandidateIds(filter: Document): Promise<Set<string> | null> {
|
||||
await this.initialize();
|
||||
@@ -352,25 +600,58 @@ export class IndexEngine {
|
||||
const index = this.selectIndex(filter);
|
||||
if (!index) return null;
|
||||
|
||||
// Try to use the index for equality matches
|
||||
const filterInfo = this.analyzeFilter(filter);
|
||||
const indexFields = Object.keys(index.data.key);
|
||||
const equalityValues: Record<string, any> = {};
|
||||
|
||||
for (const field of indexFields) {
|
||||
const filterValue = this.getFilterValue(filter, field);
|
||||
if (filterValue === undefined) break;
|
||||
// For single-field indexes with range queries, use B-tree
|
||||
if (indexFields.length === 1) {
|
||||
const field = indexFields[0];
|
||||
const info = filterInfo.get(field);
|
||||
|
||||
// Only use equality matches for index lookup
|
||||
if (typeof filterValue === 'object' && filterValue !== null) {
|
||||
if (filterValue.$eq !== undefined) {
|
||||
equalityValues[field] = filterValue.$eq;
|
||||
} else if (filterValue.$in !== undefined) {
|
||||
if (info) {
|
||||
// Handle equality using hash map (faster)
|
||||
if (info.equality) {
|
||||
const keyStr = JSON.stringify(info.ops['$eq']);
|
||||
return index.data.hashMap.get(keyStr) || new Set();
|
||||
}
|
||||
|
||||
// Handle $in using hash map
|
||||
if (info.in) {
|
||||
const results = new Set<string>();
|
||||
for (const val of info.ops['$in']) {
|
||||
const keyStr = JSON.stringify(val);
|
||||
const ids = index.data.hashMap.get(keyStr);
|
||||
if (ids) {
|
||||
for (const id of ids) {
|
||||
results.add(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
// Handle range queries using B-tree
|
||||
if (info.range) {
|
||||
return this.findRangeCandidates(index.data, info.ops);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// For compound indexes, use hash map with partial key matching
|
||||
const equalityValues: Record<string, any> = {};
|
||||
|
||||
for (const field of indexFields) {
|
||||
const info = filterInfo.get(field);
|
||||
if (!info) break;
|
||||
|
||||
if (info.equality) {
|
||||
equalityValues[field] = info.ops['$eq'];
|
||||
} else if (info.in) {
|
||||
// Handle $in with multiple lookups
|
||||
const results = new Set<string>();
|
||||
for (const val of filterValue.$in) {
|
||||
for (const val of info.ops['$in']) {
|
||||
equalityValues[field] = val;
|
||||
const keyStr = JSON.stringify(this.buildKeyValue(equalityValues, index.data.key));
|
||||
const ids = index.data.entries.get(keyStr);
|
||||
const ids = index.data.hashMap.get(keyStr);
|
||||
if (ids) {
|
||||
for (const id of ids) {
|
||||
results.add(id);
|
||||
@@ -379,19 +660,57 @@ export class IndexEngine {
|
||||
}
|
||||
return results;
|
||||
} else {
|
||||
break; // Non-equality operator, stop here
|
||||
break; // Non-equality/in operator, stop here
|
||||
}
|
||||
} else {
|
||||
equalityValues[field] = filterValue;
|
||||
}
|
||||
|
||||
if (Object.keys(equalityValues).length > 0) {
|
||||
const keyStr = JSON.stringify(this.buildKeyValue(equalityValues, index.data.key));
|
||||
return index.data.hashMap.get(keyStr) || new Set();
|
||||
}
|
||||
}
|
||||
|
||||
if (Object.keys(equalityValues).length === 0) {
|
||||
return null;
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find candidates using B-tree range scan
|
||||
*/
|
||||
private findRangeCandidates(indexData: IIndexData, ops: Record<string, any>): Set<string> {
|
||||
const results = new Set<string>();
|
||||
|
||||
let lowKey: any = undefined;
|
||||
let highKey: any = undefined;
|
||||
let lowInclusive = true;
|
||||
let highInclusive = true;
|
||||
|
||||
if (ops['$gt'] !== undefined) {
|
||||
lowKey = ops['$gt'];
|
||||
lowInclusive = false;
|
||||
}
|
||||
if (ops['$gte'] !== undefined) {
|
||||
lowKey = ops['$gte'];
|
||||
lowInclusive = true;
|
||||
}
|
||||
if (ops['$lt'] !== undefined) {
|
||||
highKey = ops['$lt'];
|
||||
highInclusive = false;
|
||||
}
|
||||
if (ops['$lte'] !== undefined) {
|
||||
highKey = ops['$lte'];
|
||||
highInclusive = true;
|
||||
}
|
||||
|
||||
const keyStr = JSON.stringify(this.buildKeyValue(equalityValues, index.data.key));
|
||||
return index.data.entries.get(keyStr) || new Set();
|
||||
// Use B-tree range iteration
|
||||
indexData.btree.forRange(lowKey, highKey, lowInclusive, highInclusive, (value, key) => {
|
||||
if (value) {
|
||||
for (const id of value) {
|
||||
results.add(id);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
|
||||
393
ts/tsmdb/engine/QueryPlanner.ts
Normal file
393
ts/tsmdb/engine/QueryPlanner.ts
Normal file
@@ -0,0 +1,393 @@
|
||||
import * as plugins from '../tsmdb.plugins.js';
|
||||
import type { Document, IStoredDocument } from '../types/interfaces.js';
|
||||
import { IndexEngine } from './IndexEngine.js';
|
||||
|
||||
/**
|
||||
* Query execution plan types
|
||||
*/
|
||||
export type TQueryPlanType = 'IXSCAN' | 'COLLSCAN' | 'FETCH' | 'IXSCAN_RANGE';
|
||||
|
||||
/**
|
||||
* Represents a query execution plan
|
||||
*/
|
||||
export interface IQueryPlan {
|
||||
/** The type of scan used */
|
||||
type: TQueryPlanType;
|
||||
/** Index name if using an index */
|
||||
indexName?: string;
|
||||
/** Index key specification */
|
||||
indexKey?: Record<string, 1 | -1 | string>;
|
||||
/** Whether the query can be fully satisfied by the index */
|
||||
indexCovering: boolean;
|
||||
/** Estimated selectivity (0-1, lower is more selective) */
|
||||
selectivity: number;
|
||||
/** Whether range operators are used */
|
||||
usesRange: boolean;
|
||||
/** Fields used from the index */
|
||||
indexFieldsUsed: string[];
|
||||
/** Filter conditions that must be applied post-index lookup */
|
||||
residualFilter?: Document;
|
||||
/** Explanation for debugging */
|
||||
explanation: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Filter operator analysis
|
||||
*/
|
||||
interface IFilterOperatorInfo {
|
||||
field: string;
|
||||
operators: string[];
|
||||
equality: boolean;
|
||||
range: boolean;
|
||||
in: boolean;
|
||||
exists: boolean;
|
||||
regex: boolean;
|
||||
values: Record<string, any>;
|
||||
}
|
||||
|
||||
/**
|
||||
* QueryPlanner - Analyzes queries and selects optimal execution plans
|
||||
*/
|
||||
export class QueryPlanner {
|
||||
private indexEngine: IndexEngine;
|
||||
|
||||
constructor(indexEngine: IndexEngine) {
|
||||
this.indexEngine = indexEngine;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate an execution plan for a query filter
|
||||
*/
|
||||
async plan(filter: Document): Promise<IQueryPlan> {
|
||||
await this.indexEngine['initialize']();
|
||||
|
||||
// Empty filter = full collection scan
|
||||
if (!filter || Object.keys(filter).length === 0) {
|
||||
return {
|
||||
type: 'COLLSCAN',
|
||||
indexCovering: false,
|
||||
selectivity: 1.0,
|
||||
usesRange: false,
|
||||
indexFieldsUsed: [],
|
||||
explanation: 'No filter specified, full collection scan required',
|
||||
};
|
||||
}
|
||||
|
||||
// Analyze the filter
|
||||
const operatorInfo = this.analyzeFilter(filter);
|
||||
|
||||
// Get available indexes
|
||||
const indexes = await this.indexEngine.listIndexes();
|
||||
|
||||
// Score each index
|
||||
let bestPlan: IQueryPlan | null = null;
|
||||
let bestScore = -1;
|
||||
|
||||
for (const index of indexes) {
|
||||
const plan = this.scoreIndex(index, operatorInfo, filter);
|
||||
if (plan.selectivity < 1.0) {
|
||||
const score = this.calculateScore(plan);
|
||||
if (score > bestScore) {
|
||||
bestScore = score;
|
||||
bestPlan = plan;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If no suitable index found, fall back to collection scan
|
||||
if (!bestPlan || bestScore <= 0) {
|
||||
return {
|
||||
type: 'COLLSCAN',
|
||||
indexCovering: false,
|
||||
selectivity: 1.0,
|
||||
usesRange: false,
|
||||
indexFieldsUsed: [],
|
||||
explanation: 'No suitable index found for this query',
|
||||
};
|
||||
}
|
||||
|
||||
return bestPlan;
|
||||
}
|
||||
|
||||
/**
|
||||
* Analyze filter to extract operator information per field
|
||||
*/
|
||||
private analyzeFilter(filter: Document, prefix = ''): Map<string, IFilterOperatorInfo> {
|
||||
const result = new Map<string, IFilterOperatorInfo>();
|
||||
|
||||
for (const [key, value] of Object.entries(filter)) {
|
||||
// Skip logical operators at the top level
|
||||
if (key.startsWith('$')) {
|
||||
if (key === '$and' && Array.isArray(value)) {
|
||||
// Merge $and conditions
|
||||
for (const subFilter of value) {
|
||||
const subInfo = this.analyzeFilter(subFilter, prefix);
|
||||
for (const [field, info] of subInfo) {
|
||||
if (result.has(field)) {
|
||||
// Merge operators
|
||||
const existing = result.get(field)!;
|
||||
existing.operators.push(...info.operators);
|
||||
existing.equality = existing.equality || info.equality;
|
||||
existing.range = existing.range || info.range;
|
||||
existing.in = existing.in || info.in;
|
||||
Object.assign(existing.values, info.values);
|
||||
} else {
|
||||
result.set(field, info);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
const fullKey = prefix ? `${prefix}.${key}` : key;
|
||||
const info: IFilterOperatorInfo = {
|
||||
field: fullKey,
|
||||
operators: [],
|
||||
equality: false,
|
||||
range: false,
|
||||
in: false,
|
||||
exists: false,
|
||||
regex: false,
|
||||
values: {},
|
||||
};
|
||||
|
||||
if (typeof value !== 'object' || value === null || value instanceof plugins.bson.ObjectId || value instanceof Date) {
|
||||
// Direct equality
|
||||
info.equality = true;
|
||||
info.operators.push('$eq');
|
||||
info.values['$eq'] = value;
|
||||
} else if (Array.isArray(value)) {
|
||||
// Array equality (rare, but possible)
|
||||
info.equality = true;
|
||||
info.operators.push('$eq');
|
||||
info.values['$eq'] = value;
|
||||
} else {
|
||||
// Operator object
|
||||
for (const [op, opValue] of Object.entries(value)) {
|
||||
if (op.startsWith('$')) {
|
||||
info.operators.push(op);
|
||||
info.values[op] = opValue;
|
||||
|
||||
switch (op) {
|
||||
case '$eq':
|
||||
info.equality = true;
|
||||
break;
|
||||
case '$ne':
|
||||
case '$not':
|
||||
// These can use indexes but with low selectivity
|
||||
break;
|
||||
case '$in':
|
||||
info.in = true;
|
||||
break;
|
||||
case '$nin':
|
||||
// Can't efficiently use indexes
|
||||
break;
|
||||
case '$gt':
|
||||
case '$gte':
|
||||
case '$lt':
|
||||
case '$lte':
|
||||
info.range = true;
|
||||
break;
|
||||
case '$exists':
|
||||
info.exists = true;
|
||||
break;
|
||||
case '$regex':
|
||||
info.regex = true;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// Nested object - recurse
|
||||
const nestedInfo = this.analyzeFilter({ [op]: opValue }, fullKey);
|
||||
for (const [nestedField, nestedFieldInfo] of nestedInfo) {
|
||||
result.set(nestedField, nestedFieldInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (info.operators.length > 0) {
|
||||
result.set(fullKey, info);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Score an index for the given filter
|
||||
*/
|
||||
private scoreIndex(
|
||||
index: { name: string; key: Record<string, any>; unique?: boolean; sparse?: boolean },
|
||||
operatorInfo: Map<string, IFilterOperatorInfo>,
|
||||
filter: Document
|
||||
): IQueryPlan {
|
||||
const indexFields = Object.keys(index.key);
|
||||
const usedFields: string[] = [];
|
||||
let usesRange = false;
|
||||
let canUseIndex = true;
|
||||
let selectivity = 1.0;
|
||||
let residualFilter: Document | undefined;
|
||||
|
||||
// Check each index field in order
|
||||
for (const field of indexFields) {
|
||||
const info = operatorInfo.get(field);
|
||||
if (!info) {
|
||||
// Index field not in filter - stop here
|
||||
break;
|
||||
}
|
||||
|
||||
usedFields.push(field);
|
||||
|
||||
// Calculate selectivity based on operator
|
||||
if (info.equality) {
|
||||
// Equality has high selectivity
|
||||
selectivity *= 0.01; // Assume 1% match
|
||||
} else if (info.in) {
|
||||
// $in selectivity depends on array size
|
||||
const inValues = info.values['$in'];
|
||||
if (Array.isArray(inValues)) {
|
||||
selectivity *= Math.min(0.5, inValues.length * 0.01);
|
||||
} else {
|
||||
selectivity *= 0.1;
|
||||
}
|
||||
} else if (info.range) {
|
||||
// Range queries have moderate selectivity
|
||||
selectivity *= 0.25;
|
||||
usesRange = true;
|
||||
// After range, can't use more index fields efficiently
|
||||
break;
|
||||
} else if (info.exists) {
|
||||
// $exists can use sparse indexes
|
||||
selectivity *= 0.5;
|
||||
} else {
|
||||
// Other operators may not be indexable
|
||||
canUseIndex = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!canUseIndex || usedFields.length === 0) {
|
||||
return {
|
||||
type: 'COLLSCAN',
|
||||
indexCovering: false,
|
||||
selectivity: 1.0,
|
||||
usesRange: false,
|
||||
indexFieldsUsed: [],
|
||||
explanation: `Index ${index.name} cannot be used for this query`,
|
||||
};
|
||||
}
|
||||
|
||||
// Build residual filter for conditions not covered by index
|
||||
const coveredFields = new Set(usedFields);
|
||||
const residualConditions: Record<string, any> = {};
|
||||
for (const [field, info] of operatorInfo) {
|
||||
if (!coveredFields.has(field)) {
|
||||
// This field isn't covered by the index
|
||||
if (info.equality) {
|
||||
residualConditions[field] = info.values['$eq'];
|
||||
} else {
|
||||
residualConditions[field] = info.values;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (Object.keys(residualConditions).length > 0) {
|
||||
residualFilter = residualConditions;
|
||||
}
|
||||
|
||||
// Unique indexes have better selectivity for equality
|
||||
if (index.unique && usedFields.length === indexFields.length) {
|
||||
selectivity = Math.min(selectivity, 0.001); // At most 1 document
|
||||
}
|
||||
|
||||
return {
|
||||
type: usesRange ? 'IXSCAN_RANGE' : 'IXSCAN',
|
||||
indexName: index.name,
|
||||
indexKey: index.key,
|
||||
indexCovering: Object.keys(residualConditions).length === 0,
|
||||
selectivity,
|
||||
usesRange,
|
||||
indexFieldsUsed: usedFields,
|
||||
residualFilter,
|
||||
explanation: `Using index ${index.name} on fields [${usedFields.join(', ')}]`,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate overall score for a plan (higher is better)
|
||||
*/
|
||||
private calculateScore(plan: IQueryPlan): number {
|
||||
let score = 0;
|
||||
|
||||
// Lower selectivity is better (fewer documents to fetch)
|
||||
score += (1 - plan.selectivity) * 100;
|
||||
|
||||
// Index covering queries are best
|
||||
if (plan.indexCovering) {
|
||||
score += 50;
|
||||
}
|
||||
|
||||
// More index fields used is better
|
||||
score += plan.indexFieldsUsed.length * 10;
|
||||
|
||||
// Equality scans are better than range scans
|
||||
if (!plan.usesRange) {
|
||||
score += 20;
|
||||
}
|
||||
|
||||
return score;
|
||||
}
|
||||
|
||||
/**
|
||||
* Explain a query - returns detailed plan information
|
||||
*/
|
||||
async explain(filter: Document): Promise<{
|
||||
queryPlanner: {
|
||||
plannerVersion: number;
|
||||
namespace: string;
|
||||
indexFilterSet: boolean;
|
||||
winningPlan: IQueryPlan;
|
||||
rejectedPlans: IQueryPlan[];
|
||||
};
|
||||
}> {
|
||||
await this.indexEngine['initialize']();
|
||||
|
||||
// Analyze the filter
|
||||
const operatorInfo = this.analyzeFilter(filter);
|
||||
|
||||
// Get available indexes
|
||||
const indexes = await this.indexEngine.listIndexes();
|
||||
|
||||
// Score all indexes
|
||||
const plans: IQueryPlan[] = [];
|
||||
|
||||
for (const index of indexes) {
|
||||
const plan = this.scoreIndex(index, operatorInfo, filter);
|
||||
plans.push(plan);
|
||||
}
|
||||
|
||||
// Add collection scan as fallback
|
||||
plans.push({
|
||||
type: 'COLLSCAN',
|
||||
indexCovering: false,
|
||||
selectivity: 1.0,
|
||||
usesRange: false,
|
||||
indexFieldsUsed: [],
|
||||
explanation: 'Full collection scan',
|
||||
});
|
||||
|
||||
// Sort by score (best first)
|
||||
plans.sort((a, b) => this.calculateScore(b) - this.calculateScore(a));
|
||||
|
||||
return {
|
||||
queryPlanner: {
|
||||
plannerVersion: 1,
|
||||
namespace: `${this.indexEngine['dbName']}.${this.indexEngine['collName']}`,
|
||||
indexFilterSet: false,
|
||||
winningPlan: plans[0],
|
||||
rejectedPlans: plans.slice(1),
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
292
ts/tsmdb/engine/SessionEngine.ts
Normal file
292
ts/tsmdb/engine/SessionEngine.ts
Normal file
@@ -0,0 +1,292 @@
|
||||
import * as plugins from '../tsmdb.plugins.js';
|
||||
import type { TransactionEngine } from './TransactionEngine.js';
|
||||
|
||||
/**
|
||||
* Session state
|
||||
*/
|
||||
export interface ISession {
|
||||
/** Session ID (UUID) */
|
||||
id: string;
|
||||
/** Timestamp when the session was created */
|
||||
createdAt: number;
|
||||
/** Timestamp of the last activity */
|
||||
lastActivityAt: number;
|
||||
/** Current transaction ID if any */
|
||||
txnId?: string;
|
||||
/** Transaction number for ordering */
|
||||
txnNumber?: number;
|
||||
/** Whether the session is in a transaction */
|
||||
inTransaction: boolean;
|
||||
/** Session metadata */
|
||||
metadata?: Record<string, any>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Session engine options
|
||||
*/
|
||||
export interface ISessionEngineOptions {
|
||||
/** Session timeout in milliseconds (default: 30 minutes) */
|
||||
sessionTimeoutMs?: number;
|
||||
/** Interval to check for expired sessions in ms (default: 60 seconds) */
|
||||
cleanupIntervalMs?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Session engine for managing client sessions
|
||||
* - Tracks session lifecycle (create, touch, end)
|
||||
* - Links sessions to transactions
|
||||
* - Auto-aborts transactions on session expiry
|
||||
*/
|
||||
export class SessionEngine {
|
||||
private sessions: Map<string, ISession> = new Map();
|
||||
private sessionTimeoutMs: number;
|
||||
private cleanupInterval?: ReturnType<typeof setInterval>;
|
||||
private transactionEngine?: TransactionEngine;
|
||||
|
||||
constructor(options?: ISessionEngineOptions) {
|
||||
this.sessionTimeoutMs = options?.sessionTimeoutMs ?? 30 * 60 * 1000; // 30 minutes default
|
||||
const cleanupIntervalMs = options?.cleanupIntervalMs ?? 60 * 1000; // 1 minute default
|
||||
|
||||
// Start cleanup interval
|
||||
this.cleanupInterval = setInterval(() => {
|
||||
this.cleanupExpiredSessions();
|
||||
}, cleanupIntervalMs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the transaction engine to use for auto-abort
|
||||
*/
|
||||
setTransactionEngine(engine: TransactionEngine): void {
|
||||
this.transactionEngine = engine;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a new session
|
||||
*/
|
||||
startSession(sessionId?: string, metadata?: Record<string, any>): ISession {
|
||||
const id = sessionId ?? new plugins.bson.UUID().toHexString();
|
||||
const now = Date.now();
|
||||
|
||||
const session: ISession = {
|
||||
id,
|
||||
createdAt: now,
|
||||
lastActivityAt: now,
|
||||
inTransaction: false,
|
||||
metadata,
|
||||
};
|
||||
|
||||
this.sessions.set(id, session);
|
||||
return session;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a session by ID
|
||||
*/
|
||||
getSession(sessionId: string): ISession | undefined {
|
||||
const session = this.sessions.get(sessionId);
|
||||
if (session && this.isSessionExpired(session)) {
|
||||
// Session expired, clean it up
|
||||
this.endSession(sessionId);
|
||||
return undefined;
|
||||
}
|
||||
return session;
|
||||
}
|
||||
|
||||
/**
|
||||
* Touch a session to update last activity time
|
||||
*/
|
||||
touchSession(sessionId: string): boolean {
|
||||
const session = this.sessions.get(sessionId);
|
||||
if (!session) return false;
|
||||
|
||||
if (this.isSessionExpired(session)) {
|
||||
this.endSession(sessionId);
|
||||
return false;
|
||||
}
|
||||
|
||||
session.lastActivityAt = Date.now();
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* End a session explicitly
|
||||
* This will also abort any active transaction
|
||||
*/
|
||||
async endSession(sessionId: string): Promise<boolean> {
|
||||
const session = this.sessions.get(sessionId);
|
||||
if (!session) return false;
|
||||
|
||||
// If session has an active transaction, abort it
|
||||
if (session.inTransaction && session.txnId && this.transactionEngine) {
|
||||
try {
|
||||
await this.transactionEngine.abortTransaction(session.txnId);
|
||||
} catch (e) {
|
||||
// Ignore abort errors during cleanup
|
||||
}
|
||||
}
|
||||
|
||||
this.sessions.delete(sessionId);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a transaction in a session
|
||||
*/
|
||||
startTransaction(sessionId: string, txnId: string, txnNumber?: number): boolean {
|
||||
const session = this.sessions.get(sessionId);
|
||||
if (!session) return false;
|
||||
|
||||
if (this.isSessionExpired(session)) {
|
||||
this.endSession(sessionId);
|
||||
return false;
|
||||
}
|
||||
|
||||
session.txnId = txnId;
|
||||
session.txnNumber = txnNumber;
|
||||
session.inTransaction = true;
|
||||
session.lastActivityAt = Date.now();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* End a transaction in a session (commit or abort)
|
||||
*/
|
||||
endTransaction(sessionId: string): boolean {
|
||||
const session = this.sessions.get(sessionId);
|
||||
if (!session) return false;
|
||||
|
||||
session.txnId = undefined;
|
||||
session.txnNumber = undefined;
|
||||
session.inTransaction = false;
|
||||
session.lastActivityAt = Date.now();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get transaction ID for a session
|
||||
*/
|
||||
getTransactionId(sessionId: string): string | undefined {
|
||||
const session = this.sessions.get(sessionId);
|
||||
return session?.txnId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if session is in a transaction
|
||||
*/
|
||||
isInTransaction(sessionId: string): boolean {
|
||||
const session = this.sessions.get(sessionId);
|
||||
return session?.inTransaction ?? false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a session is expired
|
||||
*/
|
||||
isSessionExpired(session: ISession): boolean {
|
||||
return Date.now() - session.lastActivityAt > this.sessionTimeoutMs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup expired sessions
|
||||
* This is called periodically by the cleanup interval
|
||||
*/
|
||||
private async cleanupExpiredSessions(): Promise<void> {
|
||||
const expiredSessions: string[] = [];
|
||||
|
||||
for (const [id, session] of this.sessions) {
|
||||
if (this.isSessionExpired(session)) {
|
||||
expiredSessions.push(id);
|
||||
}
|
||||
}
|
||||
|
||||
// End all expired sessions (this will also abort their transactions)
|
||||
for (const sessionId of expiredSessions) {
|
||||
await this.endSession(sessionId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all active sessions
|
||||
*/
|
||||
listSessions(): ISession[] {
|
||||
const activeSessions: ISession[] = [];
|
||||
for (const session of this.sessions.values()) {
|
||||
if (!this.isSessionExpired(session)) {
|
||||
activeSessions.push(session);
|
||||
}
|
||||
}
|
||||
return activeSessions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get session count
|
||||
*/
|
||||
getSessionCount(): number {
|
||||
return this.sessions.size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get sessions with active transactions
|
||||
*/
|
||||
getSessionsWithTransactions(): ISession[] {
|
||||
return this.listSessions().filter(s => s.inTransaction);
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh session timeout
|
||||
*/
|
||||
refreshSession(sessionId: string): boolean {
|
||||
return this.touchSession(sessionId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the session engine and cleanup
|
||||
*/
|
||||
close(): void {
|
||||
if (this.cleanupInterval) {
|
||||
clearInterval(this.cleanupInterval);
|
||||
this.cleanupInterval = undefined;
|
||||
}
|
||||
|
||||
// Clear all sessions
|
||||
this.sessions.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or create a session for a given session ID
|
||||
* Useful for handling MongoDB driver session requests
|
||||
*/
|
||||
getOrCreateSession(sessionId: string): ISession {
|
||||
let session = this.getSession(sessionId);
|
||||
if (!session) {
|
||||
session = this.startSession(sessionId);
|
||||
} else {
|
||||
this.touchSession(sessionId);
|
||||
}
|
||||
return session;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract session ID from MongoDB lsid (logical session ID)
|
||||
*/
|
||||
static extractSessionId(lsid: any): string | undefined {
|
||||
if (!lsid) return undefined;
|
||||
|
||||
// MongoDB session ID format: { id: UUID }
|
||||
if (lsid.id) {
|
||||
if (lsid.id instanceof plugins.bson.UUID) {
|
||||
return lsid.id.toHexString();
|
||||
}
|
||||
if (typeof lsid.id === 'string') {
|
||||
return lsid.id;
|
||||
}
|
||||
if (lsid.id.$binary?.base64) {
|
||||
// Binary UUID format
|
||||
return Buffer.from(lsid.id.$binary.base64, 'base64').toString('hex');
|
||||
}
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
@@ -19,6 +19,8 @@ export type { IStorageAdapter } from './storage/IStorageAdapter.js';
|
||||
export { MemoryStorageAdapter } from './storage/MemoryStorageAdapter.js';
|
||||
export { FileStorageAdapter } from './storage/FileStorageAdapter.js';
|
||||
export { OpLog } from './storage/OpLog.js';
|
||||
export { WAL } from './storage/WAL.js';
|
||||
export type { IWalEntry, TWalOperation } from './storage/WAL.js';
|
||||
|
||||
// Export engines
|
||||
export { QueryEngine } from './engine/QueryEngine.js';
|
||||
@@ -26,6 +28,10 @@ export { UpdateEngine } from './engine/UpdateEngine.js';
|
||||
export { AggregationEngine } from './engine/AggregationEngine.js';
|
||||
export { IndexEngine } from './engine/IndexEngine.js';
|
||||
export { TransactionEngine } from './engine/TransactionEngine.js';
|
||||
export { QueryPlanner } from './engine/QueryPlanner.js';
|
||||
export type { IQueryPlan, TQueryPlanType } from './engine/QueryPlanner.js';
|
||||
export { SessionEngine } from './engine/SessionEngine.js';
|
||||
export type { ISession, ISessionEngineOptions } from './engine/SessionEngine.js';
|
||||
|
||||
// Export server (the main entry point for using TsmDB)
|
||||
export { TsmdbServer } from './server/TsmdbServer.js';
|
||||
@@ -35,3 +41,6 @@ export type { ITsmdbServerOptions } from './server/TsmdbServer.js';
|
||||
export { WireProtocol } from './server/WireProtocol.js';
|
||||
export { CommandRouter } from './server/CommandRouter.js';
|
||||
export type { ICommandHandler, IHandlerContext, ICursorState } from './server/CommandRouter.js';
|
||||
|
||||
// Export utilities
|
||||
export * from './utils/checksum.js';
|
||||
|
||||
@@ -2,6 +2,9 @@ import * as plugins from '../tsmdb.plugins.js';
|
||||
import type { IStorageAdapter } from '../storage/IStorageAdapter.js';
|
||||
import type { IParsedCommand } from './WireProtocol.js';
|
||||
import type { TsmdbServer } from './TsmdbServer.js';
|
||||
import { IndexEngine } from '../engine/IndexEngine.js';
|
||||
import { TransactionEngine } from '../engine/TransactionEngine.js';
|
||||
import { SessionEngine } from '../engine/SessionEngine.js';
|
||||
|
||||
// Import handlers
|
||||
import { HelloHandler } from './handlers/HelloHandler.js';
|
||||
@@ -22,6 +25,16 @@ export interface IHandlerContext {
|
||||
database: string;
|
||||
command: plugins.bson.Document;
|
||||
documentSequences?: Map<string, plugins.bson.Document[]>;
|
||||
/** Get or create an IndexEngine for a collection */
|
||||
getIndexEngine: (collName: string) => IndexEngine;
|
||||
/** Transaction engine instance */
|
||||
transactionEngine: TransactionEngine;
|
||||
/** Current transaction ID (if in a transaction) */
|
||||
txnId?: string;
|
||||
/** Session ID (from lsid) */
|
||||
sessionId?: string;
|
||||
/** Session engine instance */
|
||||
sessionEngine: SessionEngine;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -43,12 +56,54 @@ export class CommandRouter {
|
||||
private cursors: Map<bigint, ICursorState> = new Map();
|
||||
private cursorIdCounter: bigint = BigInt(1);
|
||||
|
||||
// Index engine cache: db.collection -> IndexEngine
|
||||
private indexEngines: Map<string, IndexEngine> = new Map();
|
||||
|
||||
// Transaction engine (shared across all handlers)
|
||||
private transactionEngine: TransactionEngine;
|
||||
|
||||
// Session engine (shared across all handlers)
|
||||
private sessionEngine: SessionEngine;
|
||||
|
||||
constructor(storage: IStorageAdapter, server: TsmdbServer) {
|
||||
this.storage = storage;
|
||||
this.server = server;
|
||||
this.transactionEngine = new TransactionEngine(storage);
|
||||
this.sessionEngine = new SessionEngine();
|
||||
// Link session engine to transaction engine for auto-abort on session expiry
|
||||
this.sessionEngine.setTransactionEngine(this.transactionEngine);
|
||||
this.registerHandlers();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or create an IndexEngine for a database.collection
|
||||
*/
|
||||
getIndexEngine(dbName: string, collName: string): IndexEngine {
|
||||
const key = `${dbName}.${collName}`;
|
||||
let engine = this.indexEngines.get(key);
|
||||
if (!engine) {
|
||||
engine = new IndexEngine(dbName, collName, this.storage);
|
||||
this.indexEngines.set(key, engine);
|
||||
}
|
||||
return engine;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear index engine cache for a collection (used when collection is dropped)
|
||||
*/
|
||||
clearIndexEngineCache(dbName: string, collName?: string): void {
|
||||
if (collName) {
|
||||
this.indexEngines.delete(`${dbName}.${collName}`);
|
||||
} else {
|
||||
// Clear all engines for the database
|
||||
for (const key of this.indexEngines.keys()) {
|
||||
if (key.startsWith(`${dbName}.`)) {
|
||||
this.indexEngines.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register all command handlers
|
||||
*/
|
||||
@@ -120,6 +175,29 @@ export class CommandRouter {
|
||||
async route(parsedCommand: IParsedCommand): Promise<plugins.bson.Document> {
|
||||
const { commandName, command, database, documentSequences } = parsedCommand;
|
||||
|
||||
// Extract session ID from lsid using SessionEngine helper
|
||||
let sessionId = SessionEngine.extractSessionId(command.lsid);
|
||||
let txnId: string | undefined;
|
||||
|
||||
// If we have a session ID, register/touch the session
|
||||
if (sessionId) {
|
||||
this.sessionEngine.getOrCreateSession(sessionId);
|
||||
}
|
||||
|
||||
// Check if this starts a new transaction
|
||||
if (command.startTransaction && sessionId) {
|
||||
txnId = this.transactionEngine.startTransaction(sessionId);
|
||||
this.sessionEngine.startTransaction(sessionId, txnId, command.txnNumber);
|
||||
} else if (sessionId && this.sessionEngine.isInTransaction(sessionId)) {
|
||||
// Continue existing transaction
|
||||
txnId = this.sessionEngine.getTransactionId(sessionId);
|
||||
// Verify transaction is still active
|
||||
if (txnId && !this.transactionEngine.isActive(txnId)) {
|
||||
this.sessionEngine.endTransaction(sessionId);
|
||||
txnId = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
// Create handler context
|
||||
const context: IHandlerContext = {
|
||||
storage: this.storage,
|
||||
@@ -127,6 +205,11 @@ export class CommandRouter {
|
||||
database,
|
||||
command,
|
||||
documentSequences,
|
||||
getIndexEngine: (collName: string) => this.getIndexEngine(database, collName),
|
||||
transactionEngine: this.transactionEngine,
|
||||
sessionEngine: this.sessionEngine,
|
||||
txnId,
|
||||
sessionId,
|
||||
};
|
||||
|
||||
// Find handler
|
||||
@@ -164,6 +247,32 @@ export class CommandRouter {
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the command router and cleanup resources
|
||||
*/
|
||||
close(): void {
|
||||
// Close session engine (stops cleanup interval, clears sessions)
|
||||
this.sessionEngine.close();
|
||||
// Clear cursors
|
||||
this.cursors.clear();
|
||||
// Clear index engine cache
|
||||
this.indexEngines.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get session engine (for administrative purposes)
|
||||
*/
|
||||
getSessionEngine(): SessionEngine {
|
||||
return this.sessionEngine;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get transaction engine (for administrative purposes)
|
||||
*/
|
||||
getTransactionEngine(): TransactionEngine {
|
||||
return this.transactionEngine;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -154,6 +154,9 @@ export class TsmdbServer {
|
||||
}
|
||||
this.connections.clear();
|
||||
|
||||
// Close command router (cleans up session engine, cursors, etc.)
|
||||
this.commandRouter.close();
|
||||
|
||||
// Close storage
|
||||
await this.storage.close();
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import * as plugins from '../../tsmdb.plugins.js';
|
||||
import type { ICommandHandler, IHandlerContext } from '../CommandRouter.js';
|
||||
import { SessionEngine } from '../../engine/SessionEngine.js';
|
||||
|
||||
/**
|
||||
* AdminHandler - Handles administrative commands
|
||||
@@ -237,10 +238,12 @@ export class AdminHandler implements ICommandHandler {
|
||||
* Handle serverStatus command
|
||||
*/
|
||||
private async handleServerStatus(context: IHandlerContext): Promise<plugins.bson.Document> {
|
||||
const { server } = context;
|
||||
const { server, sessionEngine } = context;
|
||||
|
||||
const uptime = server.getUptime();
|
||||
const connections = server.getConnectionCount();
|
||||
const sessions = sessionEngine.listSessions();
|
||||
const sessionsWithTxn = sessionEngine.getSessionsWithTransactions();
|
||||
|
||||
return {
|
||||
ok: 1,
|
||||
@@ -263,6 +266,26 @@ export class AdminHandler implements ICommandHandler {
|
||||
totalCreated: connections,
|
||||
active: connections,
|
||||
},
|
||||
logicalSessionRecordCache: {
|
||||
activeSessionsCount: sessions.length,
|
||||
sessionsCollectionJobCount: 0,
|
||||
lastSessionsCollectionJobDurationMillis: 0,
|
||||
lastSessionsCollectionJobTimestamp: new Date(),
|
||||
transactionReaperJobCount: 0,
|
||||
lastTransactionReaperJobDurationMillis: 0,
|
||||
lastTransactionReaperJobTimestamp: new Date(),
|
||||
},
|
||||
transactions: {
|
||||
retriedCommandsCount: 0,
|
||||
retriedStatementsCount: 0,
|
||||
transactionsCollectionWriteCount: 0,
|
||||
currentActive: sessionsWithTxn.length,
|
||||
currentInactive: 0,
|
||||
currentOpen: sessionsWithTxn.length,
|
||||
totalStarted: sessionsWithTxn.length,
|
||||
totalCommitted: 0,
|
||||
totalAborted: 0,
|
||||
},
|
||||
network: {
|
||||
bytesIn: 0,
|
||||
bytesOut: 0,
|
||||
@@ -409,6 +432,17 @@ export class AdminHandler implements ICommandHandler {
|
||||
* Handle endSessions command
|
||||
*/
|
||||
private async handleEndSessions(context: IHandlerContext): Promise<plugins.bson.Document> {
|
||||
const { command, sessionEngine } = context;
|
||||
|
||||
// End each session in the array
|
||||
const sessions = command.endSessions || [];
|
||||
for (const sessionSpec of sessions) {
|
||||
const sessionId = SessionEngine.extractSessionId(sessionSpec);
|
||||
if (sessionId) {
|
||||
await sessionEngine.endSession(sessionId);
|
||||
}
|
||||
}
|
||||
|
||||
return { ok: 1 };
|
||||
}
|
||||
|
||||
@@ -416,16 +450,87 @@ export class AdminHandler implements ICommandHandler {
|
||||
* Handle abortTransaction command
|
||||
*/
|
||||
private async handleAbortTransaction(context: IHandlerContext): Promise<plugins.bson.Document> {
|
||||
// Transactions are not fully supported, but acknowledge the command
|
||||
return { ok: 1 };
|
||||
const { transactionEngine, sessionEngine, txnId, sessionId } = context;
|
||||
|
||||
if (!txnId) {
|
||||
return {
|
||||
ok: 0,
|
||||
errmsg: 'No transaction started',
|
||||
code: 251,
|
||||
codeName: 'NoSuchTransaction',
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
await transactionEngine.abortTransaction(txnId);
|
||||
transactionEngine.endTransaction(txnId);
|
||||
// Update session state
|
||||
if (sessionId) {
|
||||
sessionEngine.endTransaction(sessionId);
|
||||
}
|
||||
return { ok: 1 };
|
||||
} catch (error: any) {
|
||||
return {
|
||||
ok: 0,
|
||||
errmsg: error.message || 'Abort transaction failed',
|
||||
code: error.code || 1,
|
||||
codeName: error.codeName || 'UnknownError',
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle commitTransaction command
|
||||
*/
|
||||
private async handleCommitTransaction(context: IHandlerContext): Promise<plugins.bson.Document> {
|
||||
// Transactions are not fully supported, but acknowledge the command
|
||||
return { ok: 1 };
|
||||
const { transactionEngine, sessionEngine, txnId, sessionId } = context;
|
||||
|
||||
if (!txnId) {
|
||||
return {
|
||||
ok: 0,
|
||||
errmsg: 'No transaction started',
|
||||
code: 251,
|
||||
codeName: 'NoSuchTransaction',
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
await transactionEngine.commitTransaction(txnId);
|
||||
transactionEngine.endTransaction(txnId);
|
||||
// Update session state
|
||||
if (sessionId) {
|
||||
sessionEngine.endTransaction(sessionId);
|
||||
}
|
||||
return { ok: 1 };
|
||||
} catch (error: any) {
|
||||
// If commit fails, transaction should be aborted
|
||||
try {
|
||||
await transactionEngine.abortTransaction(txnId);
|
||||
transactionEngine.endTransaction(txnId);
|
||||
if (sessionId) {
|
||||
sessionEngine.endTransaction(sessionId);
|
||||
}
|
||||
} catch {
|
||||
// Ignore abort errors
|
||||
}
|
||||
|
||||
if (error.code === 112) {
|
||||
// Write conflict
|
||||
return {
|
||||
ok: 0,
|
||||
errmsg: error.message || 'Write conflict during commit',
|
||||
code: 112,
|
||||
codeName: 'WriteConflict',
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
ok: 0,
|
||||
errmsg: error.message || 'Commit transaction failed',
|
||||
code: error.code || 1,
|
||||
codeName: error.codeName || 'UnknownError',
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import * as plugins from '../../tsmdb.plugins.js';
|
||||
import type { ICommandHandler, IHandlerContext } from '../CommandRouter.js';
|
||||
import type { IStoredDocument } from '../../types/interfaces.js';
|
||||
import { QueryEngine } from '../../engine/QueryEngine.js';
|
||||
|
||||
/**
|
||||
@@ -47,6 +48,8 @@ export class DeleteHandler implements ICommandHandler {
|
||||
return { ok: 1, n: 0 };
|
||||
}
|
||||
|
||||
const indexEngine = context.getIndexEngine(collection);
|
||||
|
||||
for (let i = 0; i < deletes.length; i++) {
|
||||
const deleteSpec = deletes[i];
|
||||
const filter = deleteSpec.q || deleteSpec.filter || {};
|
||||
@@ -56,8 +59,15 @@ export class DeleteHandler implements ICommandHandler {
|
||||
const deleteAll = limit === 0;
|
||||
|
||||
try {
|
||||
// Get all documents
|
||||
const documents = await storage.findAll(database, collection);
|
||||
// Try to use index-accelerated query
|
||||
const candidateIds = await indexEngine.findCandidateIds(filter);
|
||||
|
||||
let documents: IStoredDocument[];
|
||||
if (candidateIds !== null) {
|
||||
documents = await storage.findByIds(database, collection, candidateIds);
|
||||
} else {
|
||||
documents = await storage.findAll(database, collection);
|
||||
}
|
||||
|
||||
// Apply filter
|
||||
const matchingDocs = QueryEngine.filter(documents, filter);
|
||||
@@ -69,6 +79,11 @@ export class DeleteHandler implements ICommandHandler {
|
||||
// Determine which documents to delete
|
||||
const docsToDelete = deleteAll ? matchingDocs : matchingDocs.slice(0, 1);
|
||||
|
||||
// Update indexes for deleted documents
|
||||
for (const doc of docsToDelete) {
|
||||
await indexEngine.onDelete(doc as any);
|
||||
}
|
||||
|
||||
// Delete the documents
|
||||
const idsToDelete = docsToDelete.map(doc => doc._id);
|
||||
const deleted = await storage.deleteByIds(database, collection, idsToDelete);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import * as plugins from '../../tsmdb.plugins.js';
|
||||
import type { ICommandHandler, IHandlerContext, ICursorState } from '../CommandRouter.js';
|
||||
import type { IStoredDocument } from '../../types/interfaces.js';
|
||||
import { QueryEngine } from '../../engine/QueryEngine.js';
|
||||
|
||||
/**
|
||||
@@ -45,7 +46,7 @@ export class FindHandler implements ICommandHandler {
|
||||
* Handle find command
|
||||
*/
|
||||
private async handleFind(context: IHandlerContext): Promise<plugins.bson.Document> {
|
||||
const { storage, database, command } = context;
|
||||
const { storage, database, command, getIndexEngine } = context;
|
||||
|
||||
const collection = command.find;
|
||||
const filter = command.filter || {};
|
||||
@@ -70,11 +71,22 @@ export class FindHandler implements ICommandHandler {
|
||||
};
|
||||
}
|
||||
|
||||
// Get all documents
|
||||
let documents = await storage.findAll(database, collection);
|
||||
// Try to use index-accelerated query
|
||||
const indexEngine = getIndexEngine(collection);
|
||||
const candidateIds = await indexEngine.findCandidateIds(filter);
|
||||
|
||||
// Apply filter
|
||||
documents = QueryEngine.filter(documents, filter);
|
||||
let documents: IStoredDocument[];
|
||||
if (candidateIds !== null) {
|
||||
// Index hit - fetch only candidate documents
|
||||
documents = await storage.findByIds(database, collection, candidateIds);
|
||||
// Still apply filter for any conditions the index couldn't fully satisfy
|
||||
documents = QueryEngine.filter(documents, filter);
|
||||
} else {
|
||||
// No suitable index - full collection scan
|
||||
documents = await storage.findAll(database, collection);
|
||||
// Apply filter
|
||||
documents = QueryEngine.filter(documents, filter);
|
||||
}
|
||||
|
||||
// Apply sort
|
||||
if (sort) {
|
||||
@@ -233,7 +245,7 @@ export class FindHandler implements ICommandHandler {
|
||||
* Handle count command
|
||||
*/
|
||||
private async handleCount(context: IHandlerContext): Promise<plugins.bson.Document> {
|
||||
const { storage, database, command } = context;
|
||||
const { storage, database, command, getIndexEngine } = context;
|
||||
|
||||
const collection = command.count;
|
||||
const query = command.query || {};
|
||||
@@ -246,11 +258,20 @@ export class FindHandler implements ICommandHandler {
|
||||
return { ok: 1, n: 0 };
|
||||
}
|
||||
|
||||
// Get all documents
|
||||
let documents = await storage.findAll(database, collection);
|
||||
// Try to use index-accelerated query
|
||||
const indexEngine = getIndexEngine(collection);
|
||||
const candidateIds = await indexEngine.findCandidateIds(query);
|
||||
|
||||
// Apply filter
|
||||
documents = QueryEngine.filter(documents, query);
|
||||
let documents: IStoredDocument[];
|
||||
if (candidateIds !== null) {
|
||||
// Index hit - fetch only candidate documents
|
||||
documents = await storage.findByIds(database, collection, candidateIds);
|
||||
documents = QueryEngine.filter(documents, query);
|
||||
} else {
|
||||
// No suitable index - full collection scan
|
||||
documents = await storage.findAll(database, collection);
|
||||
documents = QueryEngine.filter(documents, query);
|
||||
}
|
||||
|
||||
// Apply skip
|
||||
if (skip > 0) {
|
||||
@@ -269,7 +290,7 @@ export class FindHandler implements ICommandHandler {
|
||||
* Handle distinct command
|
||||
*/
|
||||
private async handleDistinct(context: IHandlerContext): Promise<plugins.bson.Document> {
|
||||
const { storage, database, command } = context;
|
||||
const { storage, database, command, getIndexEngine } = context;
|
||||
|
||||
const collection = command.distinct;
|
||||
const key = command.key;
|
||||
@@ -290,8 +311,16 @@ export class FindHandler implements ICommandHandler {
|
||||
return { ok: 1, values: [] };
|
||||
}
|
||||
|
||||
// Get all documents
|
||||
const documents = await storage.findAll(database, collection);
|
||||
// Try to use index-accelerated query
|
||||
const indexEngine = getIndexEngine(collection);
|
||||
const candidateIds = await indexEngine.findCandidateIds(query);
|
||||
|
||||
let documents: IStoredDocument[];
|
||||
if (candidateIds !== null) {
|
||||
documents = await storage.findByIds(database, collection, candidateIds);
|
||||
} else {
|
||||
documents = await storage.findAll(database, collection);
|
||||
}
|
||||
|
||||
// Get distinct values
|
||||
const values = QueryEngine.distinct(documents, key, query);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import * as plugins from '../../tsmdb.plugins.js';
|
||||
import type { ICommandHandler, IHandlerContext } from '../CommandRouter.js';
|
||||
import type { IStoredDocument } from '../../types/interfaces.js';
|
||||
|
||||
/**
|
||||
* InsertHandler - Handles insert commands
|
||||
@@ -42,6 +43,8 @@ export class InsertHandler implements ICommandHandler {
|
||||
// Ensure collection exists
|
||||
await storage.createCollection(database, collection);
|
||||
|
||||
const indexEngine = context.getIndexEngine(collection);
|
||||
|
||||
// Insert documents
|
||||
for (let i = 0; i < documents.length; i++) {
|
||||
const doc = documents[i];
|
||||
@@ -52,6 +55,9 @@ export class InsertHandler implements ICommandHandler {
|
||||
doc._id = new plugins.bson.ObjectId();
|
||||
}
|
||||
|
||||
// Check index constraints before insert (doc now has _id)
|
||||
await indexEngine.onInsert(doc as IStoredDocument);
|
||||
|
||||
await storage.insertOne(database, collection, doc);
|
||||
insertedCount++;
|
||||
} catch (error: any) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import * as plugins from '../../tsmdb.plugins.js';
|
||||
import type { ICommandHandler, IHandlerContext } from '../CommandRouter.js';
|
||||
import type { IStoredDocument } from '../../types/interfaces.js';
|
||||
import { QueryEngine } from '../../engine/QueryEngine.js';
|
||||
import { UpdateEngine } from '../../engine/UpdateEngine.js';
|
||||
|
||||
@@ -69,6 +70,8 @@ export class UpdateHandler implements ICommandHandler {
|
||||
// Ensure collection exists
|
||||
await storage.createCollection(database, collection);
|
||||
|
||||
const indexEngine = context.getIndexEngine(collection);
|
||||
|
||||
for (let i = 0; i < updates.length; i++) {
|
||||
const updateSpec = updates[i];
|
||||
const filter = updateSpec.q || updateSpec.filter || {};
|
||||
@@ -78,8 +81,15 @@ export class UpdateHandler implements ICommandHandler {
|
||||
const arrayFilters = updateSpec.arrayFilters;
|
||||
|
||||
try {
|
||||
// Get all documents
|
||||
let documents = await storage.findAll(database, collection);
|
||||
// Try to use index-accelerated query
|
||||
const candidateIds = await indexEngine.findCandidateIds(filter);
|
||||
|
||||
let documents: IStoredDocument[];
|
||||
if (candidateIds !== null) {
|
||||
documents = await storage.findByIds(database, collection, candidateIds);
|
||||
} else {
|
||||
documents = await storage.findAll(database, collection);
|
||||
}
|
||||
|
||||
// Apply filter
|
||||
let matchingDocs = QueryEngine.filter(documents, filter);
|
||||
@@ -99,6 +109,8 @@ export class UpdateHandler implements ICommandHandler {
|
||||
Object.assign(updatedDoc, update.$setOnInsert);
|
||||
}
|
||||
|
||||
// Update index for the new document
|
||||
await indexEngine.onInsert(updatedDoc);
|
||||
await storage.insertOne(database, collection, updatedDoc);
|
||||
totalUpserted++;
|
||||
upserted.push({ index: i, _id: updatedDoc._id });
|
||||
@@ -113,6 +125,8 @@ export class UpdateHandler implements ICommandHandler {
|
||||
// Check if document actually changed
|
||||
const changed = JSON.stringify(doc) !== JSON.stringify(updatedDoc);
|
||||
if (changed) {
|
||||
// Update index
|
||||
await indexEngine.onUpdate(doc as any, updatedDoc);
|
||||
await storage.updateById(database, collection, doc._id, updatedDoc);
|
||||
totalModified++;
|
||||
}
|
||||
@@ -186,8 +200,17 @@ export class UpdateHandler implements ICommandHandler {
|
||||
// Ensure collection exists
|
||||
await storage.createCollection(database, collection);
|
||||
|
||||
// Get matching documents
|
||||
let documents = await storage.findAll(database, collection);
|
||||
// Try to use index-accelerated query
|
||||
const indexEngine = context.getIndexEngine(collection);
|
||||
const candidateIds = await indexEngine.findCandidateIds(query);
|
||||
|
||||
let documents: IStoredDocument[];
|
||||
if (candidateIds !== null) {
|
||||
documents = await storage.findByIds(database, collection, candidateIds);
|
||||
} else {
|
||||
documents = await storage.findAll(database, collection);
|
||||
}
|
||||
|
||||
let matchingDocs = QueryEngine.filter(documents, query);
|
||||
|
||||
// Apply sort if specified
|
||||
@@ -203,6 +226,8 @@ export class UpdateHandler implements ICommandHandler {
|
||||
return { ok: 1, value: null };
|
||||
}
|
||||
|
||||
// Update index for delete
|
||||
await indexEngine.onDelete(doc as any);
|
||||
await storage.deleteById(database, collection, doc._id);
|
||||
|
||||
let result = doc;
|
||||
@@ -231,6 +256,8 @@ export class UpdateHandler implements ICommandHandler {
|
||||
// Update existing
|
||||
originalDoc = { ...doc };
|
||||
resultDoc = UpdateEngine.applyUpdate(doc, update, arrayFilters);
|
||||
// Update index
|
||||
await indexEngine.onUpdate(doc as any, resultDoc as any);
|
||||
await storage.updateById(database, collection, doc._id, resultDoc as any);
|
||||
} else {
|
||||
// Upsert
|
||||
@@ -243,6 +270,8 @@ export class UpdateHandler implements ICommandHandler {
|
||||
Object.assign(resultDoc, update.$setOnInsert);
|
||||
}
|
||||
|
||||
// Update index for insert
|
||||
await indexEngine.onInsert(resultDoc as any);
|
||||
await storage.insertOne(database, collection, resultDoc);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,17 @@
|
||||
import * as plugins from '../tsmdb.plugins.js';
|
||||
import type { IStorageAdapter } from './IStorageAdapter.js';
|
||||
import type { IStoredDocument, IOpLogEntry, Document } from '../types/interfaces.js';
|
||||
import { calculateDocumentChecksum, verifyChecksum } from '../utils/checksum.js';
|
||||
|
||||
/**
|
||||
* File storage adapter options
|
||||
*/
|
||||
export interface IFileStorageAdapterOptions {
|
||||
/** Enable checksum verification for data integrity */
|
||||
enableChecksums?: boolean;
|
||||
/** Throw error on checksum mismatch (default: false, just log warning) */
|
||||
strictChecksums?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* File-based storage adapter for TsmDB
|
||||
@@ -11,9 +22,13 @@ export class FileStorageAdapter implements IStorageAdapter {
|
||||
private opLogCounter = 0;
|
||||
private initialized = false;
|
||||
private fs = new plugins.smartfs.SmartFs(new plugins.smartfs.SmartFsProviderNode());
|
||||
private enableChecksums: boolean;
|
||||
private strictChecksums: boolean;
|
||||
|
||||
constructor(basePath: string) {
|
||||
constructor(basePath: string, options?: IFileStorageAdapterOptions) {
|
||||
this.basePath = basePath;
|
||||
this.enableChecksums = options?.enableChecksums ?? false;
|
||||
this.strictChecksums = options?.strictChecksums ?? false;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
@@ -68,6 +83,45 @@ export class FileStorageAdapter implements IStorageAdapter {
|
||||
return doc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify document checksum and handle errors
|
||||
*/
|
||||
private verifyDocumentChecksum(doc: any): boolean {
|
||||
if (!this.enableChecksums || !doc._checksum) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const isValid = verifyChecksum(doc);
|
||||
if (!isValid) {
|
||||
const errorMsg = `Checksum mismatch for document ${doc._id}`;
|
||||
if (this.strictChecksums) {
|
||||
throw new Error(errorMsg);
|
||||
} else {
|
||||
console.warn(`WARNING: ${errorMsg}`);
|
||||
}
|
||||
}
|
||||
return isValid;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add checksum to document before storing
|
||||
*/
|
||||
private prepareDocumentForStorage(doc: any): any {
|
||||
if (!this.enableChecksums) {
|
||||
return doc;
|
||||
}
|
||||
const checksum = calculateDocumentChecksum(doc);
|
||||
return { ...doc, _checksum: checksum };
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove internal checksum field before returning to user
|
||||
*/
|
||||
private cleanDocumentForReturn(doc: any): IStoredDocument {
|
||||
const { _checksum, ...cleanDoc } = doc;
|
||||
return this.restoreObjectIds(cleanDoc);
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Initialization
|
||||
// ============================================================================
|
||||
@@ -233,7 +287,9 @@ export class FileStorageAdapter implements IStorageAdapter {
|
||||
throw new Error(`Duplicate key error: _id ${idStr}`);
|
||||
}
|
||||
|
||||
docs.push(storedDoc);
|
||||
// Add checksum if enabled
|
||||
const docToStore = this.prepareDocumentForStorage(storedDoc);
|
||||
docs.push(docToStore);
|
||||
await this.writeJsonFile(collPath, docs);
|
||||
return storedDoc;
|
||||
}
|
||||
@@ -258,7 +314,9 @@ export class FileStorageAdapter implements IStorageAdapter {
|
||||
}
|
||||
|
||||
existingIds.add(idStr);
|
||||
docs.push(storedDoc);
|
||||
// Add checksum if enabled
|
||||
const docToStore = this.prepareDocumentForStorage(storedDoc);
|
||||
docs.push(docToStore);
|
||||
results.push(storedDoc);
|
||||
}
|
||||
|
||||
@@ -270,10 +328,33 @@ export class FileStorageAdapter implements IStorageAdapter {
|
||||
await this.createCollection(dbName, collName);
|
||||
const collPath = this.getCollectionPath(dbName, collName);
|
||||
const docs = await this.readJsonFile<any[]>(collPath, []);
|
||||
return docs.map(doc => this.restoreObjectIds(doc));
|
||||
return docs.map(doc => {
|
||||
// Verify checksum if enabled
|
||||
this.verifyDocumentChecksum(doc);
|
||||
// Clean and return document without internal checksum field
|
||||
return this.cleanDocumentForReturn(doc);
|
||||
});
|
||||
}
|
||||
|
||||
async findByIds(dbName: string, collName: string, ids: Set<string>): Promise<IStoredDocument[]> {
|
||||
await this.createCollection(dbName, collName);
|
||||
const collPath = this.getCollectionPath(dbName, collName);
|
||||
const docs = await this.readJsonFile<any[]>(collPath, []);
|
||||
const results: IStoredDocument[] = [];
|
||||
for (const doc of docs) {
|
||||
// Verify checksum if enabled
|
||||
this.verifyDocumentChecksum(doc);
|
||||
// Clean and restore document
|
||||
const cleaned = this.cleanDocumentForReturn(doc);
|
||||
if (ids.has(cleaned._id.toHexString())) {
|
||||
results.push(cleaned);
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
async findById(dbName: string, collName: string, id: plugins.bson.ObjectId): Promise<IStoredDocument | null> {
|
||||
// Use findAll which already handles checksum verification
|
||||
const docs = await this.findAll(dbName, collName);
|
||||
const idStr = id.toHexString();
|
||||
return docs.find(d => d._id.toHexString() === idStr) || null;
|
||||
@@ -291,7 +372,9 @@ export class FileStorageAdapter implements IStorageAdapter {
|
||||
|
||||
if (idx === -1) return false;
|
||||
|
||||
docs[idx] = doc;
|
||||
// Add checksum if enabled
|
||||
const docToStore = this.prepareDocumentForStorage(doc);
|
||||
docs[idx] = docToStore;
|
||||
await this.writeJsonFile(collPath, docs);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -90,6 +90,12 @@ export interface IStorageAdapter {
|
||||
*/
|
||||
findAll(dbName: string, collName: string): Promise<IStoredDocument[]>;
|
||||
|
||||
/**
|
||||
* Find documents by a set of _id strings (hex format)
|
||||
* Used for index-accelerated queries
|
||||
*/
|
||||
findByIds(dbName: string, collName: string, ids: Set<string>): Promise<IStoredDocument[]>;
|
||||
|
||||
/**
|
||||
* Find a document by _id
|
||||
*/
|
||||
|
||||
@@ -196,6 +196,18 @@ export class MemoryStorageAdapter implements IStorageAdapter {
|
||||
return Array.from(collection.values());
|
||||
}
|
||||
|
||||
async findByIds(dbName: string, collName: string, ids: Set<string>): Promise<IStoredDocument[]> {
|
||||
const collection = this.ensureCollection(dbName, collName);
|
||||
const results: IStoredDocument[] = [];
|
||||
for (const id of ids) {
|
||||
const doc = collection.get(id);
|
||||
if (doc) {
|
||||
results.push(doc);
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
async findById(dbName: string, collName: string, id: plugins.bson.ObjectId): Promise<IStoredDocument | null> {
|
||||
const collection = this.ensureCollection(dbName, collName);
|
||||
return collection.get(id.toHexString()) || null;
|
||||
|
||||
375
ts/tsmdb/storage/WAL.ts
Normal file
375
ts/tsmdb/storage/WAL.ts
Normal file
@@ -0,0 +1,375 @@
|
||||
import * as plugins from '../tsmdb.plugins.js';
|
||||
import type { Document, IStoredDocument } from '../types/interfaces.js';
|
||||
|
||||
/**
|
||||
* WAL entry operation types
|
||||
*/
|
||||
export type TWalOperation = 'insert' | 'update' | 'delete' | 'checkpoint' | 'begin' | 'commit' | 'abort';
|
||||
|
||||
/**
|
||||
* WAL entry structure
|
||||
*/
|
||||
export interface IWalEntry {
|
||||
/** Log Sequence Number - monotonically increasing */
|
||||
lsn: number;
|
||||
/** Timestamp of the operation */
|
||||
timestamp: number;
|
||||
/** Operation type */
|
||||
operation: TWalOperation;
|
||||
/** Database name */
|
||||
dbName: string;
|
||||
/** Collection name */
|
||||
collName: string;
|
||||
/** Document ID (hex string) */
|
||||
documentId: string;
|
||||
/** Document data (BSON serialized, base64 encoded) */
|
||||
data?: string;
|
||||
/** Previous document data for updates (for rollback) */
|
||||
previousData?: string;
|
||||
/** Transaction ID if part of a transaction */
|
||||
txnId?: string;
|
||||
/** CRC32 checksum of the entry (excluding this field) */
|
||||
checksum: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checkpoint record
|
||||
*/
|
||||
interface ICheckpointRecord {
|
||||
lsn: number;
|
||||
timestamp: number;
|
||||
lastCommittedLsn: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write-Ahead Log (WAL) for durability and crash recovery
|
||||
*
|
||||
* The WAL ensures durability by writing operations to a log file before
|
||||
* they are applied to the main storage. On crash recovery, uncommitted
|
||||
* operations can be replayed to restore the database to a consistent state.
|
||||
*/
|
||||
export class WAL {
|
||||
private walPath: string;
|
||||
private currentLsn: number = 0;
|
||||
private lastCheckpointLsn: number = 0;
|
||||
private entries: IWalEntry[] = [];
|
||||
private isInitialized: boolean = false;
|
||||
private fs = new plugins.smartfs.SmartFs(new plugins.smartfs.SmartFsProviderNode());
|
||||
|
||||
// In-memory uncommitted entries per transaction
|
||||
private uncommittedTxns: Map<string, IWalEntry[]> = new Map();
|
||||
|
||||
// Checkpoint interval (number of entries between checkpoints)
|
||||
private checkpointInterval: number = 1000;
|
||||
|
||||
constructor(walPath: string, options?: { checkpointInterval?: number }) {
|
||||
this.walPath = walPath;
|
||||
if (options?.checkpointInterval) {
|
||||
this.checkpointInterval = options.checkpointInterval;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the WAL, loading existing entries and recovering if needed
|
||||
*/
|
||||
async initialize(): Promise<{ recoveredEntries: IWalEntry[] }> {
|
||||
if (this.isInitialized) {
|
||||
return { recoveredEntries: [] };
|
||||
}
|
||||
|
||||
// Ensure WAL directory exists
|
||||
const walDir = this.walPath.substring(0, this.walPath.lastIndexOf('/'));
|
||||
if (walDir) {
|
||||
await this.fs.directory(walDir).recursive().create();
|
||||
}
|
||||
|
||||
// Try to load existing WAL
|
||||
const exists = await this.fs.file(this.walPath).exists();
|
||||
if (exists) {
|
||||
const content = await this.fs.file(this.walPath).encoding('utf8').read();
|
||||
const lines = (content as string).split('\n').filter(line => line.trim());
|
||||
|
||||
for (const line of lines) {
|
||||
try {
|
||||
const entry = JSON.parse(line) as IWalEntry;
|
||||
// Verify checksum
|
||||
if (this.verifyChecksum(entry)) {
|
||||
this.entries.push(entry);
|
||||
if (entry.lsn > this.currentLsn) {
|
||||
this.currentLsn = entry.lsn;
|
||||
}
|
||||
if (entry.operation === 'checkpoint') {
|
||||
this.lastCheckpointLsn = entry.lsn;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Skip corrupted entries
|
||||
console.warn('Skipping corrupted WAL entry');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.isInitialized = true;
|
||||
|
||||
// Return entries after last checkpoint that need recovery
|
||||
const recoveredEntries = this.entries.filter(
|
||||
e => e.lsn > this.lastCheckpointLsn &&
|
||||
(e.operation === 'insert' || e.operation === 'update' || e.operation === 'delete')
|
||||
);
|
||||
|
||||
return { recoveredEntries };
|
||||
}
|
||||
|
||||
/**
|
||||
* Log an insert operation
|
||||
*/
|
||||
async logInsert(dbName: string, collName: string, doc: IStoredDocument, txnId?: string): Promise<number> {
|
||||
return this.appendEntry({
|
||||
operation: 'insert',
|
||||
dbName,
|
||||
collName,
|
||||
documentId: doc._id.toHexString(),
|
||||
data: this.serializeDocument(doc),
|
||||
txnId,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Log an update operation
|
||||
*/
|
||||
async logUpdate(
|
||||
dbName: string,
|
||||
collName: string,
|
||||
oldDoc: IStoredDocument,
|
||||
newDoc: IStoredDocument,
|
||||
txnId?: string
|
||||
): Promise<number> {
|
||||
return this.appendEntry({
|
||||
operation: 'update',
|
||||
dbName,
|
||||
collName,
|
||||
documentId: oldDoc._id.toHexString(),
|
||||
data: this.serializeDocument(newDoc),
|
||||
previousData: this.serializeDocument(oldDoc),
|
||||
txnId,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Log a delete operation
|
||||
*/
|
||||
async logDelete(dbName: string, collName: string, doc: IStoredDocument, txnId?: string): Promise<number> {
|
||||
return this.appendEntry({
|
||||
operation: 'delete',
|
||||
dbName,
|
||||
collName,
|
||||
documentId: doc._id.toHexString(),
|
||||
previousData: this.serializeDocument(doc),
|
||||
txnId,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Log transaction begin
|
||||
*/
|
||||
async logBeginTransaction(txnId: string): Promise<number> {
|
||||
this.uncommittedTxns.set(txnId, []);
|
||||
return this.appendEntry({
|
||||
operation: 'begin',
|
||||
dbName: '',
|
||||
collName: '',
|
||||
documentId: '',
|
||||
txnId,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Log transaction commit
|
||||
*/
|
||||
async logCommitTransaction(txnId: string): Promise<number> {
|
||||
this.uncommittedTxns.delete(txnId);
|
||||
return this.appendEntry({
|
||||
operation: 'commit',
|
||||
dbName: '',
|
||||
collName: '',
|
||||
documentId: '',
|
||||
txnId,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Log transaction abort
|
||||
*/
|
||||
async logAbortTransaction(txnId: string): Promise<number> {
|
||||
this.uncommittedTxns.delete(txnId);
|
||||
return this.appendEntry({
|
||||
operation: 'abort',
|
||||
dbName: '',
|
||||
collName: '',
|
||||
documentId: '',
|
||||
txnId,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get entries to roll back for an aborted transaction
|
||||
*/
|
||||
getTransactionEntries(txnId: string): IWalEntry[] {
|
||||
return this.entries.filter(e => e.txnId === txnId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a checkpoint - marks a consistent point in the log
|
||||
*/
|
||||
async checkpoint(): Promise<number> {
|
||||
const lsn = await this.appendEntry({
|
||||
operation: 'checkpoint',
|
||||
dbName: '',
|
||||
collName: '',
|
||||
documentId: '',
|
||||
});
|
||||
this.lastCheckpointLsn = lsn;
|
||||
|
||||
// Truncate old entries (keep only entries after checkpoint)
|
||||
await this.truncate();
|
||||
|
||||
return lsn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Truncate the WAL file, removing entries before the last checkpoint
|
||||
*/
|
||||
private async truncate(): Promise<void> {
|
||||
// Keep entries after last checkpoint
|
||||
const newEntries = this.entries.filter(e => e.lsn >= this.lastCheckpointLsn);
|
||||
this.entries = newEntries;
|
||||
|
||||
// Rewrite the WAL file
|
||||
const lines = this.entries.map(e => JSON.stringify(e)).join('\n');
|
||||
await this.fs.file(this.walPath).encoding('utf8').write(lines);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current LSN
|
||||
*/
|
||||
getCurrentLsn(): number {
|
||||
return this.currentLsn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get entries after a specific LSN (for recovery)
|
||||
*/
|
||||
getEntriesAfter(lsn: number): IWalEntry[] {
|
||||
return this.entries.filter(e => e.lsn > lsn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the WAL
|
||||
*/
|
||||
async close(): Promise<void> {
|
||||
if (this.isInitialized) {
|
||||
// Final checkpoint before close
|
||||
await this.checkpoint();
|
||||
}
|
||||
this.isInitialized = false;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Private Methods
|
||||
// ============================================================================
|
||||
|
||||
private async appendEntry(
|
||||
partial: Omit<IWalEntry, 'lsn' | 'timestamp' | 'checksum'>
|
||||
): Promise<number> {
|
||||
await this.initialize();
|
||||
|
||||
this.currentLsn++;
|
||||
const entry: IWalEntry = {
|
||||
...partial,
|
||||
lsn: this.currentLsn,
|
||||
timestamp: Date.now(),
|
||||
checksum: 0, // Will be calculated
|
||||
};
|
||||
|
||||
// Calculate checksum
|
||||
entry.checksum = this.calculateChecksum(entry);
|
||||
|
||||
// Track in transaction if applicable
|
||||
if (partial.txnId && this.uncommittedTxns.has(partial.txnId)) {
|
||||
this.uncommittedTxns.get(partial.txnId)!.push(entry);
|
||||
}
|
||||
|
||||
// Add to in-memory log
|
||||
this.entries.push(entry);
|
||||
|
||||
// Append to file (append mode for durability)
|
||||
await this.fs.file(this.walPath).encoding('utf8').append(JSON.stringify(entry) + '\n');
|
||||
|
||||
// Check if we need a checkpoint
|
||||
if (this.entries.length - this.lastCheckpointLsn >= this.checkpointInterval) {
|
||||
await this.checkpoint();
|
||||
}
|
||||
|
||||
return entry.lsn;
|
||||
}
|
||||
|
||||
private serializeDocument(doc: Document): string {
|
||||
// Serialize document to BSON and encode as base64
|
||||
const bsonData = plugins.bson.serialize(doc);
|
||||
return Buffer.from(bsonData).toString('base64');
|
||||
}
|
||||
|
||||
private deserializeDocument(data: string): Document {
|
||||
// Decode base64 and deserialize from BSON
|
||||
const buffer = Buffer.from(data, 'base64');
|
||||
return plugins.bson.deserialize(buffer);
|
||||
}
|
||||
|
||||
private calculateChecksum(entry: IWalEntry): number {
|
||||
// Simple CRC32-like checksum
|
||||
const str = JSON.stringify({
|
||||
lsn: entry.lsn,
|
||||
timestamp: entry.timestamp,
|
||||
operation: entry.operation,
|
||||
dbName: entry.dbName,
|
||||
collName: entry.collName,
|
||||
documentId: entry.documentId,
|
||||
data: entry.data,
|
||||
previousData: entry.previousData,
|
||||
txnId: entry.txnId,
|
||||
});
|
||||
|
||||
let crc = 0xFFFFFFFF;
|
||||
for (let i = 0; i < str.length; i++) {
|
||||
crc ^= str.charCodeAt(i);
|
||||
for (let j = 0; j < 8; j++) {
|
||||
crc = (crc >>> 1) ^ (crc & 1 ? 0xEDB88320 : 0);
|
||||
}
|
||||
}
|
||||
return (~crc) >>> 0;
|
||||
}
|
||||
|
||||
private verifyChecksum(entry: IWalEntry): boolean {
|
||||
const savedChecksum = entry.checksum;
|
||||
entry.checksum = 0;
|
||||
const calculatedChecksum = this.calculateChecksum(entry);
|
||||
entry.checksum = savedChecksum;
|
||||
return calculatedChecksum === savedChecksum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Recover document from WAL entry
|
||||
*/
|
||||
recoverDocument(entry: IWalEntry): IStoredDocument | null {
|
||||
if (!entry.data) return null;
|
||||
return this.deserializeDocument(entry.data) as IStoredDocument;
|
||||
}
|
||||
|
||||
/**
|
||||
* Recover previous document state from WAL entry (for rollback)
|
||||
*/
|
||||
recoverPreviousDocument(entry: IWalEntry): IStoredDocument | null {
|
||||
if (!entry.previousData) return null;
|
||||
return this.deserializeDocument(entry.previousData) as IStoredDocument;
|
||||
}
|
||||
}
|
||||
88
ts/tsmdb/utils/checksum.ts
Normal file
88
ts/tsmdb/utils/checksum.ts
Normal file
@@ -0,0 +1,88 @@
|
||||
/**
|
||||
* CRC32 checksum utilities for data integrity
|
||||
*/
|
||||
|
||||
// CRC32 lookup table
|
||||
const CRC32_TABLE: number[] = [];
|
||||
|
||||
// Initialize the CRC32 table
|
||||
function initCRC32Table(): void {
|
||||
if (CRC32_TABLE.length > 0) return;
|
||||
|
||||
for (let i = 0; i < 256; i++) {
|
||||
let crc = i;
|
||||
for (let j = 0; j < 8; j++) {
|
||||
crc = (crc & 1) ? (0xEDB88320 ^ (crc >>> 1)) : (crc >>> 1);
|
||||
}
|
||||
CRC32_TABLE[i] = crc >>> 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate CRC32 checksum for a string
|
||||
*/
|
||||
export function calculateCRC32(data: string): number {
|
||||
initCRC32Table();
|
||||
|
||||
let crc = 0xFFFFFFFF;
|
||||
for (let i = 0; i < data.length; i++) {
|
||||
const byte = data.charCodeAt(i) & 0xFF;
|
||||
crc = CRC32_TABLE[(crc ^ byte) & 0xFF] ^ (crc >>> 8);
|
||||
}
|
||||
return (~crc) >>> 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate CRC32 checksum for a Buffer
|
||||
*/
|
||||
export function calculateCRC32Buffer(data: Buffer): number {
|
||||
initCRC32Table();
|
||||
|
||||
let crc = 0xFFFFFFFF;
|
||||
for (let i = 0; i < data.length; i++) {
|
||||
crc = CRC32_TABLE[(crc ^ data[i]) & 0xFF] ^ (crc >>> 8);
|
||||
}
|
||||
return (~crc) >>> 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate checksum for a document (serialized as JSON)
|
||||
*/
|
||||
export function calculateDocumentChecksum(doc: Record<string, any>): number {
|
||||
// Exclude _checksum field from calculation
|
||||
const { _checksum, ...docWithoutChecksum } = doc;
|
||||
const json = JSON.stringify(docWithoutChecksum);
|
||||
return calculateCRC32(json);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add checksum to a document
|
||||
*/
|
||||
export function addChecksum<T extends Record<string, any>>(doc: T): T & { _checksum: number } {
|
||||
const checksum = calculateDocumentChecksum(doc);
|
||||
return { ...doc, _checksum: checksum };
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify checksum of a document
|
||||
* Returns true if checksum is valid or if document has no checksum
|
||||
*/
|
||||
export function verifyChecksum(doc: Record<string, any>): boolean {
|
||||
if (!('_checksum' in doc)) {
|
||||
// No checksum to verify
|
||||
return true;
|
||||
}
|
||||
|
||||
const storedChecksum = doc._checksum;
|
||||
const calculatedChecksum = calculateDocumentChecksum(doc);
|
||||
|
||||
return storedChecksum === calculatedChecksum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove checksum from a document
|
||||
*/
|
||||
export function removeChecksum<T extends Record<string, any>>(doc: T): Omit<T, '_checksum'> {
|
||||
const { _checksum, ...docWithoutChecksum } = doc;
|
||||
return docWithoutChecksum as Omit<T, '_checksum'>;
|
||||
}
|
||||
1
ts/tsmdb/utils/index.ts
Normal file
1
ts/tsmdb/utils/index.ts
Normal file
@@ -0,0 +1 @@
|
||||
export * from './checksum.js';
|
||||
Reference in New Issue
Block a user