import * as plugins from '../tsmdb.plugins.js'; import type { IStorageAdapter } from './IStorageAdapter.js'; import type { IOpLogEntry, Document, IResumeToken, ChangeStreamOperationType } from '../types/interfaces.js'; /** * Operation Log for tracking all mutations * Used primarily for change stream support */ export class OpLog { private storage: IStorageAdapter; private counter = 0; private listeners: Array<(entry: IOpLogEntry) => void> = []; constructor(storage: IStorageAdapter) { this.storage = storage; } /** * Generate a new timestamp for oplog entries */ generateTimestamp(): plugins.bson.Timestamp { this.counter++; return new plugins.bson.Timestamp({ t: Math.floor(Date.now() / 1000), i: this.counter }); } /** * Generate a resume token from a timestamp */ generateResumeToken(ts: plugins.bson.Timestamp): IResumeToken { // Create a resume token similar to MongoDB's format // It's a base64-encoded BSON document containing the timestamp const tokenData = { _data: Buffer.from(JSON.stringify({ ts: { t: ts.high, i: ts.low }, version: 1, })).toString('base64'), }; return tokenData; } /** * Parse a resume token to get the timestamp */ parseResumeToken(token: IResumeToken): plugins.bson.Timestamp { try { const data = JSON.parse(Buffer.from(token._data, 'base64').toString('utf-8')); return new plugins.bson.Timestamp({ t: data.ts.t, i: data.ts.i }); } catch { throw new Error('Invalid resume token'); } } /** * Record an insert operation */ async recordInsert( dbName: string, collName: string, document: Document, txnInfo?: { txnNumber?: number; lsid?: { id: plugins.bson.Binary } } ): Promise { const entry: IOpLogEntry = { ts: this.generateTimestamp(), op: 'i', ns: `${dbName}.${collName}`, o: document, ...txnInfo, }; await this.storage.appendOpLog(entry); this.notifyListeners(entry); return entry; } /** * Record an update operation */ async recordUpdate( dbName: string, collName: string, filter: Document, update: Document, txnInfo?: { txnNumber?: number; lsid?: { id: plugins.bson.Binary } } ): Promise { const entry: IOpLogEntry = { ts: this.generateTimestamp(), op: 'u', ns: `${dbName}.${collName}`, o: update, o2: filter, ...txnInfo, }; await this.storage.appendOpLog(entry); this.notifyListeners(entry); return entry; } /** * Record a delete operation */ async recordDelete( dbName: string, collName: string, filter: Document, txnInfo?: { txnNumber?: number; lsid?: { id: plugins.bson.Binary } } ): Promise { const entry: IOpLogEntry = { ts: this.generateTimestamp(), op: 'd', ns: `${dbName}.${collName}`, o: filter, ...txnInfo, }; await this.storage.appendOpLog(entry); this.notifyListeners(entry); return entry; } /** * Record a command (drop, rename, etc.) */ async recordCommand( dbName: string, command: Document ): Promise { const entry: IOpLogEntry = { ts: this.generateTimestamp(), op: 'c', ns: `${dbName}.$cmd`, o: command, }; await this.storage.appendOpLog(entry); this.notifyListeners(entry); return entry; } /** * Get oplog entries after a timestamp */ async getEntriesAfter(ts: plugins.bson.Timestamp, limit?: number): Promise { return this.storage.getOpLogAfter(ts, limit); } /** * Get the latest timestamp */ async getLatestTimestamp(): Promise { return this.storage.getLatestOpLogTimestamp(); } /** * Subscribe to oplog changes (for change streams) */ subscribe(listener: (entry: IOpLogEntry) => void): () => void { this.listeners.push(listener); return () => { const idx = this.listeners.indexOf(listener); if (idx >= 0) { this.listeners.splice(idx, 1); } }; } /** * Notify all listeners of a new entry */ private notifyListeners(entry: IOpLogEntry): void { for (const listener of this.listeners) { try { listener(entry); } catch (error) { console.error('Error in oplog listener:', error); } } } /** * Convert an oplog entry to a change stream document */ opLogEntryToChangeEvent( entry: IOpLogEntry, fullDocument?: Document, fullDocumentBeforeChange?: Document ): { _id: IResumeToken; operationType: ChangeStreamOperationType; fullDocument?: Document; fullDocumentBeforeChange?: Document; ns: { db: string; coll?: string }; documentKey?: { _id: plugins.bson.ObjectId }; updateDescription?: { updatedFields?: Document; removedFields?: string[]; }; clusterTime: plugins.bson.Timestamp; } { const [db, coll] = entry.ns.split('.'); const resumeToken = this.generateResumeToken(entry.ts); const baseEvent = { _id: resumeToken, ns: { db, coll: coll === '$cmd' ? undefined : coll }, clusterTime: entry.ts, }; switch (entry.op) { case 'i': return { ...baseEvent, operationType: 'insert' as ChangeStreamOperationType, fullDocument: fullDocument || entry.o, documentKey: entry.o._id ? { _id: entry.o._id } : undefined, }; case 'u': const updateEvent: any = { ...baseEvent, operationType: 'update' as ChangeStreamOperationType, documentKey: entry.o2?._id ? { _id: entry.o2._id } : undefined, }; if (fullDocument) { updateEvent.fullDocument = fullDocument; } if (fullDocumentBeforeChange) { updateEvent.fullDocumentBeforeChange = fullDocumentBeforeChange; } // Parse update description if (entry.o.$set || entry.o.$unset) { updateEvent.updateDescription = { updatedFields: entry.o.$set || {}, removedFields: entry.o.$unset ? Object.keys(entry.o.$unset) : [], }; } return updateEvent; case 'd': return { ...baseEvent, operationType: 'delete' as ChangeStreamOperationType, documentKey: entry.o._id ? { _id: entry.o._id } : undefined, fullDocumentBeforeChange, }; case 'c': if (entry.o.drop) { return { ...baseEvent, operationType: 'drop' as ChangeStreamOperationType, ns: { db, coll: entry.o.drop }, }; } if (entry.o.dropDatabase) { return { ...baseEvent, operationType: 'dropDatabase' as ChangeStreamOperationType, }; } if (entry.o.renameCollection) { return { ...baseEvent, operationType: 'rename' as ChangeStreamOperationType, }; } return { ...baseEvent, operationType: 'invalidate' as ChangeStreamOperationType, }; default: return { ...baseEvent, operationType: 'invalidate' as ChangeStreamOperationType, }; } } }