import * as plugins from '../tsmdb.plugins.js'; import type { IStorageAdapter } from '../storage/IStorageAdapter.js'; import type { IParsedCommand } from './WireProtocol.js'; import type { TsmdbServer } from './TsmdbServer.js'; import { IndexEngine } from '../engine/IndexEngine.js'; import { TransactionEngine } from '../engine/TransactionEngine.js'; import { SessionEngine } from '../engine/SessionEngine.js'; // Import handlers import { HelloHandler } from './handlers/HelloHandler.js'; import { InsertHandler } from './handlers/InsertHandler.js'; import { FindHandler } from './handlers/FindHandler.js'; import { UpdateHandler } from './handlers/UpdateHandler.js'; import { DeleteHandler } from './handlers/DeleteHandler.js'; import { AggregateHandler } from './handlers/AggregateHandler.js'; import { IndexHandler } from './handlers/IndexHandler.js'; import { AdminHandler } from './handlers/AdminHandler.js'; /** * Handler context passed to command handlers */ export interface IHandlerContext { storage: IStorageAdapter; server: TsmdbServer; database: string; command: plugins.bson.Document; documentSequences?: Map; /** Get or create an IndexEngine for a collection */ getIndexEngine: (collName: string) => IndexEngine; /** Transaction engine instance */ transactionEngine: TransactionEngine; /** Current transaction ID (if in a transaction) */ txnId?: string; /** Session ID (from lsid) */ sessionId?: string; /** Session engine instance */ sessionEngine: SessionEngine; } /** * Command handler interface */ export interface ICommandHandler { handle(context: IHandlerContext): Promise; } /** * CommandRouter - Routes incoming commands to appropriate handlers */ export class CommandRouter { private storage: IStorageAdapter; private server: TsmdbServer; private handlers: Map = new Map(); // Cursor state for getMore operations private cursors: Map = new Map(); private cursorIdCounter: bigint = BigInt(1); // Index engine cache: db.collection -> IndexEngine private indexEngines: Map = new Map(); // Transaction engine (shared across all handlers) private transactionEngine: TransactionEngine; // Session engine (shared across all handlers) private sessionEngine: SessionEngine; constructor(storage: IStorageAdapter, server: TsmdbServer) { this.storage = storage; this.server = server; this.transactionEngine = new TransactionEngine(storage); this.sessionEngine = new SessionEngine(); // Link session engine to transaction engine for auto-abort on session expiry this.sessionEngine.setTransactionEngine(this.transactionEngine); this.registerHandlers(); } /** * Get or create an IndexEngine for a database.collection */ getIndexEngine(dbName: string, collName: string): IndexEngine { const key = `${dbName}.${collName}`; let engine = this.indexEngines.get(key); if (!engine) { engine = new IndexEngine(dbName, collName, this.storage); this.indexEngines.set(key, engine); } return engine; } /** * Clear index engine cache for a collection (used when collection is dropped) */ clearIndexEngineCache(dbName: string, collName?: string): void { if (collName) { this.indexEngines.delete(`${dbName}.${collName}`); } else { // Clear all engines for the database for (const key of this.indexEngines.keys()) { if (key.startsWith(`${dbName}.`)) { this.indexEngines.delete(key); } } } } /** * Register all command handlers */ private registerHandlers(): void { // Create handler instances with shared state const helloHandler = new HelloHandler(); const findHandler = new FindHandler(this.cursors, () => this.cursorIdCounter++); const insertHandler = new InsertHandler(); const updateHandler = new UpdateHandler(); const deleteHandler = new DeleteHandler(); const aggregateHandler = new AggregateHandler(this.cursors, () => this.cursorIdCounter++); const indexHandler = new IndexHandler(); const adminHandler = new AdminHandler(); // Handshake commands this.handlers.set('hello', helloHandler); this.handlers.set('ismaster', helloHandler); this.handlers.set('isMaster', helloHandler); // CRUD commands this.handlers.set('find', findHandler); this.handlers.set('insert', insertHandler); this.handlers.set('update', updateHandler); this.handlers.set('delete', deleteHandler); this.handlers.set('findAndModify', updateHandler); this.handlers.set('getMore', findHandler); this.handlers.set('killCursors', findHandler); // Aggregation this.handlers.set('aggregate', aggregateHandler); this.handlers.set('count', findHandler); this.handlers.set('distinct', findHandler); // Index operations this.handlers.set('createIndexes', indexHandler); this.handlers.set('dropIndexes', indexHandler); this.handlers.set('listIndexes', indexHandler); // Admin/Database operations this.handlers.set('ping', adminHandler); this.handlers.set('listDatabases', adminHandler); this.handlers.set('listCollections', adminHandler); this.handlers.set('drop', adminHandler); this.handlers.set('dropDatabase', adminHandler); this.handlers.set('create', adminHandler); this.handlers.set('serverStatus', adminHandler); this.handlers.set('buildInfo', adminHandler); this.handlers.set('whatsmyuri', adminHandler); this.handlers.set('getLog', adminHandler); this.handlers.set('hostInfo', adminHandler); this.handlers.set('replSetGetStatus', adminHandler); this.handlers.set('isMaster', helloHandler); this.handlers.set('saslStart', adminHandler); this.handlers.set('saslContinue', adminHandler); this.handlers.set('endSessions', adminHandler); this.handlers.set('abortTransaction', adminHandler); this.handlers.set('commitTransaction', adminHandler); this.handlers.set('collStats', adminHandler); this.handlers.set('dbStats', adminHandler); this.handlers.set('connectionStatus', adminHandler); this.handlers.set('currentOp', adminHandler); this.handlers.set('collMod', adminHandler); this.handlers.set('renameCollection', adminHandler); } /** * Route a command to its handler */ async route(parsedCommand: IParsedCommand): Promise { const { commandName, command, database, documentSequences } = parsedCommand; // Extract session ID from lsid using SessionEngine helper let sessionId = SessionEngine.extractSessionId(command.lsid); let txnId: string | undefined; // If we have a session ID, register/touch the session if (sessionId) { this.sessionEngine.getOrCreateSession(sessionId); } // Check if this starts a new transaction if (command.startTransaction && sessionId) { txnId = this.transactionEngine.startTransaction(sessionId); this.sessionEngine.startTransaction(sessionId, txnId, command.txnNumber); } else if (sessionId && this.sessionEngine.isInTransaction(sessionId)) { // Continue existing transaction txnId = this.sessionEngine.getTransactionId(sessionId); // Verify transaction is still active if (txnId && !this.transactionEngine.isActive(txnId)) { this.sessionEngine.endTransaction(sessionId); txnId = undefined; } } // Create handler context const context: IHandlerContext = { storage: this.storage, server: this.server, database, command, documentSequences, getIndexEngine: (collName: string) => this.getIndexEngine(database, collName), transactionEngine: this.transactionEngine, sessionEngine: this.sessionEngine, txnId, sessionId, }; // Find handler const handler = this.handlers.get(commandName); if (!handler) { // Unknown command return { ok: 0, errmsg: `no such command: '${commandName}'`, code: 59, codeName: 'CommandNotFound', }; } try { return await handler.handle(context); } catch (error: any) { // Handle known error types if (error.code) { return { ok: 0, errmsg: error.message, code: error.code, codeName: error.codeName || 'UnknownError', }; } // Generic error return { ok: 0, errmsg: error.message || 'Internal error', code: 1, codeName: 'InternalError', }; } } /** * Close the command router and cleanup resources */ close(): void { // Close session engine (stops cleanup interval, clears sessions) this.sessionEngine.close(); // Clear cursors this.cursors.clear(); // Clear index engine cache this.indexEngines.clear(); } /** * Get session engine (for administrative purposes) */ getSessionEngine(): SessionEngine { return this.sessionEngine; } /** * Get transaction engine (for administrative purposes) */ getTransactionEngine(): TransactionEngine { return this.transactionEngine; } } /** * Cursor state for multi-batch queries */ export interface ICursorState { id: bigint; database: string; collection: string; documents: plugins.bson.Document[]; position: number; batchSize: number; createdAt: Date; }