Files

290 lines
9.3 KiB
TypeScript
Raw Permalink Normal View History

import * as plugins from '../plugins.js';
import type { IStorageAdapter } from '../storage/IStorageAdapter.js';
import type { IParsedCommand } from './WireProtocol.js';
import type { TsmdbServer } from './TsmdbServer.js';
import { IndexEngine } from '../engine/IndexEngine.js';
import { TransactionEngine } from '../engine/TransactionEngine.js';
import { SessionEngine } from '../engine/SessionEngine.js';
// Import handlers
import { HelloHandler } from './handlers/HelloHandler.js';
import { InsertHandler } from './handlers/InsertHandler.js';
import { FindHandler } from './handlers/FindHandler.js';
import { UpdateHandler } from './handlers/UpdateHandler.js';
import { DeleteHandler } from './handlers/DeleteHandler.js';
import { AggregateHandler } from './handlers/AggregateHandler.js';
import { IndexHandler } from './handlers/IndexHandler.js';
import { AdminHandler } from './handlers/AdminHandler.js';
/**
* Handler context passed to command handlers
*/
export interface IHandlerContext {
storage: IStorageAdapter;
server: TsmdbServer;
database: string;
command: plugins.bson.Document;
documentSequences?: Map<string, plugins.bson.Document[]>;
/** Get or create an IndexEngine for a collection */
getIndexEngine: (collName: string) => IndexEngine;
/** Transaction engine instance */
transactionEngine: TransactionEngine;
/** Current transaction ID (if in a transaction) */
txnId?: string;
/** Session ID (from lsid) */
sessionId?: string;
/** Session engine instance */
sessionEngine: SessionEngine;
}
/**
* Command handler interface
*/
export interface ICommandHandler {
handle(context: IHandlerContext): Promise<plugins.bson.Document>;
}
/**
* CommandRouter - Routes incoming commands to appropriate handlers
*/
export class CommandRouter {
private storage: IStorageAdapter;
private server: TsmdbServer;
private handlers: Map<string, ICommandHandler> = new Map();
// Cursor state for getMore operations
private cursors: Map<bigint, ICursorState> = new Map();
private cursorIdCounter: bigint = BigInt(1);
// Index engine cache: db.collection -> IndexEngine
private indexEngines: Map<string, IndexEngine> = new Map();
// Transaction engine (shared across all handlers)
private transactionEngine: TransactionEngine;
// Session engine (shared across all handlers)
private sessionEngine: SessionEngine;
constructor(storage: IStorageAdapter, server: TsmdbServer) {
this.storage = storage;
this.server = server;
this.transactionEngine = new TransactionEngine(storage);
this.sessionEngine = new SessionEngine();
// Link session engine to transaction engine for auto-abort on session expiry
this.sessionEngine.setTransactionEngine(this.transactionEngine);
this.registerHandlers();
}
/**
* Get or create an IndexEngine for a database.collection
*/
getIndexEngine(dbName: string, collName: string): IndexEngine {
const key = `${dbName}.${collName}`;
let engine = this.indexEngines.get(key);
if (!engine) {
engine = new IndexEngine(dbName, collName, this.storage);
this.indexEngines.set(key, engine);
}
return engine;
}
/**
* Clear index engine cache for a collection (used when collection is dropped)
*/
clearIndexEngineCache(dbName: string, collName?: string): void {
if (collName) {
this.indexEngines.delete(`${dbName}.${collName}`);
} else {
// Clear all engines for the database
for (const key of this.indexEngines.keys()) {
if (key.startsWith(`${dbName}.`)) {
this.indexEngines.delete(key);
}
}
}
}
/**
* Register all command handlers
*/
private registerHandlers(): void {
// Create handler instances with shared state
const helloHandler = new HelloHandler();
const findHandler = new FindHandler(this.cursors, () => this.cursorIdCounter++);
const insertHandler = new InsertHandler();
const updateHandler = new UpdateHandler();
const deleteHandler = new DeleteHandler();
const aggregateHandler = new AggregateHandler(this.cursors, () => this.cursorIdCounter++);
const indexHandler = new IndexHandler();
const adminHandler = new AdminHandler();
// Handshake commands
this.handlers.set('hello', helloHandler);
this.handlers.set('ismaster', helloHandler);
this.handlers.set('isMaster', helloHandler);
// CRUD commands
this.handlers.set('find', findHandler);
this.handlers.set('insert', insertHandler);
this.handlers.set('update', updateHandler);
this.handlers.set('delete', deleteHandler);
this.handlers.set('findAndModify', updateHandler);
this.handlers.set('getMore', findHandler);
this.handlers.set('killCursors', findHandler);
// Aggregation
this.handlers.set('aggregate', aggregateHandler);
this.handlers.set('count', findHandler);
this.handlers.set('distinct', findHandler);
// Index operations
this.handlers.set('createIndexes', indexHandler);
this.handlers.set('dropIndexes', indexHandler);
this.handlers.set('listIndexes', indexHandler);
// Admin/Database operations
this.handlers.set('ping', adminHandler);
this.handlers.set('listDatabases', adminHandler);
this.handlers.set('listCollections', adminHandler);
this.handlers.set('drop', adminHandler);
this.handlers.set('dropDatabase', adminHandler);
this.handlers.set('create', adminHandler);
this.handlers.set('serverStatus', adminHandler);
this.handlers.set('buildInfo', adminHandler);
this.handlers.set('whatsmyuri', adminHandler);
this.handlers.set('getLog', adminHandler);
this.handlers.set('hostInfo', adminHandler);
this.handlers.set('replSetGetStatus', adminHandler);
this.handlers.set('isMaster', helloHandler);
this.handlers.set('saslStart', adminHandler);
this.handlers.set('saslContinue', adminHandler);
this.handlers.set('endSessions', adminHandler);
this.handlers.set('abortTransaction', adminHandler);
this.handlers.set('commitTransaction', adminHandler);
this.handlers.set('collStats', adminHandler);
this.handlers.set('dbStats', adminHandler);
this.handlers.set('connectionStatus', adminHandler);
this.handlers.set('currentOp', adminHandler);
this.handlers.set('collMod', adminHandler);
this.handlers.set('renameCollection', adminHandler);
}
/**
* Route a command to its handler
*/
async route(parsedCommand: IParsedCommand): Promise<plugins.bson.Document> {
const { commandName, command, database, documentSequences } = parsedCommand;
// Extract session ID from lsid using SessionEngine helper
let sessionId = SessionEngine.extractSessionId(command.lsid);
let txnId: string | undefined;
// If we have a session ID, register/touch the session
if (sessionId) {
this.sessionEngine.getOrCreateSession(sessionId);
}
// Check if this starts a new transaction
if (command.startTransaction && sessionId) {
txnId = this.transactionEngine.startTransaction(sessionId);
this.sessionEngine.startTransaction(sessionId, txnId, command.txnNumber);
} else if (sessionId && this.sessionEngine.isInTransaction(sessionId)) {
// Continue existing transaction
txnId = this.sessionEngine.getTransactionId(sessionId);
// Verify transaction is still active
if (txnId && !this.transactionEngine.isActive(txnId)) {
this.sessionEngine.endTransaction(sessionId);
txnId = undefined;
}
}
// Create handler context
const context: IHandlerContext = {
storage: this.storage,
server: this.server,
database,
command,
documentSequences,
getIndexEngine: (collName: string) => this.getIndexEngine(database, collName),
transactionEngine: this.transactionEngine,
sessionEngine: this.sessionEngine,
txnId,
sessionId,
};
// Find handler
const handler = this.handlers.get(commandName);
if (!handler) {
// Unknown command
return {
ok: 0,
errmsg: `no such command: '${commandName}'`,
code: 59,
codeName: 'CommandNotFound',
};
}
try {
return await handler.handle(context);
} catch (error: any) {
// Handle known error types
if (error.code) {
return {
ok: 0,
errmsg: error.message,
code: error.code,
codeName: error.codeName || 'UnknownError',
};
}
// Generic error
return {
ok: 0,
errmsg: error.message || 'Internal error',
code: 1,
codeName: 'InternalError',
};
}
}
/**
* Close the command router and cleanup resources
*/
close(): void {
// Close session engine (stops cleanup interval, clears sessions)
this.sessionEngine.close();
// Clear cursors
this.cursors.clear();
// Clear index engine cache
this.indexEngines.clear();
}
/**
* Get session engine (for administrative purposes)
*/
getSessionEngine(): SessionEngine {
return this.sessionEngine;
}
/**
* Get transaction engine (for administrative purposes)
*/
getTransactionEngine(): TransactionEngine {
return this.transactionEngine;
}
}
/**
* Cursor state for multi-batch queries
*/
export interface ICursorState {
id: bigint;
database: string;
collection: string;
documents: plugins.bson.Document[];
position: number;
batchSize: number;
createdAt: Date;
}