283 lines
7.2 KiB
TypeScript
283 lines
7.2 KiB
TypeScript
import * as plugins from '../plugins.js';
|
|
import type { IStorageAdapter } from './IStorageAdapter.js';
|
|
import type { IOpLogEntry, Document, IResumeToken, ChangeStreamOperationType } from '../types/interfaces.js';
|
|
|
|
/**
|
|
* Operation Log for tracking all mutations
|
|
* Used primarily for change stream support
|
|
*/
|
|
export class OpLog {
|
|
private storage: IStorageAdapter;
|
|
private counter = 0;
|
|
private listeners: Array<(entry: IOpLogEntry) => void> = [];
|
|
|
|
constructor(storage: IStorageAdapter) {
|
|
this.storage = storage;
|
|
}
|
|
|
|
/**
|
|
* Generate a new timestamp for oplog entries
|
|
*/
|
|
generateTimestamp(): plugins.bson.Timestamp {
|
|
this.counter++;
|
|
return new plugins.bson.Timestamp({ t: Math.floor(Date.now() / 1000), i: this.counter });
|
|
}
|
|
|
|
/**
|
|
* Generate a resume token from a timestamp
|
|
*/
|
|
generateResumeToken(ts: plugins.bson.Timestamp): IResumeToken {
|
|
// Create a resume token similar to MongoDB's format
|
|
// It's a base64-encoded BSON document containing the timestamp
|
|
const tokenData = {
|
|
_data: Buffer.from(JSON.stringify({
|
|
ts: { t: ts.high, i: ts.low },
|
|
version: 1,
|
|
})).toString('base64'),
|
|
};
|
|
return tokenData;
|
|
}
|
|
|
|
/**
|
|
* Parse a resume token to get the timestamp
|
|
*/
|
|
parseResumeToken(token: IResumeToken): plugins.bson.Timestamp {
|
|
try {
|
|
const data = JSON.parse(Buffer.from(token._data, 'base64').toString('utf-8'));
|
|
return new plugins.bson.Timestamp({ t: data.ts.t, i: data.ts.i });
|
|
} catch {
|
|
throw new Error('Invalid resume token');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Record an insert operation
|
|
*/
|
|
async recordInsert(
|
|
dbName: string,
|
|
collName: string,
|
|
document: Document,
|
|
txnInfo?: { txnNumber?: number; lsid?: { id: plugins.bson.Binary } }
|
|
): Promise<IOpLogEntry> {
|
|
const entry: IOpLogEntry = {
|
|
ts: this.generateTimestamp(),
|
|
op: 'i',
|
|
ns: `${dbName}.${collName}`,
|
|
o: document,
|
|
...txnInfo,
|
|
};
|
|
|
|
await this.storage.appendOpLog(entry);
|
|
this.notifyListeners(entry);
|
|
return entry;
|
|
}
|
|
|
|
/**
|
|
* Record an update operation
|
|
*/
|
|
async recordUpdate(
|
|
dbName: string,
|
|
collName: string,
|
|
filter: Document,
|
|
update: Document,
|
|
txnInfo?: { txnNumber?: number; lsid?: { id: plugins.bson.Binary } }
|
|
): Promise<IOpLogEntry> {
|
|
const entry: IOpLogEntry = {
|
|
ts: this.generateTimestamp(),
|
|
op: 'u',
|
|
ns: `${dbName}.${collName}`,
|
|
o: update,
|
|
o2: filter,
|
|
...txnInfo,
|
|
};
|
|
|
|
await this.storage.appendOpLog(entry);
|
|
this.notifyListeners(entry);
|
|
return entry;
|
|
}
|
|
|
|
/**
|
|
* Record a delete operation
|
|
*/
|
|
async recordDelete(
|
|
dbName: string,
|
|
collName: string,
|
|
filter: Document,
|
|
txnInfo?: { txnNumber?: number; lsid?: { id: plugins.bson.Binary } }
|
|
): Promise<IOpLogEntry> {
|
|
const entry: IOpLogEntry = {
|
|
ts: this.generateTimestamp(),
|
|
op: 'd',
|
|
ns: `${dbName}.${collName}`,
|
|
o: filter,
|
|
...txnInfo,
|
|
};
|
|
|
|
await this.storage.appendOpLog(entry);
|
|
this.notifyListeners(entry);
|
|
return entry;
|
|
}
|
|
|
|
/**
|
|
* Record a command (drop, rename, etc.)
|
|
*/
|
|
async recordCommand(
|
|
dbName: string,
|
|
command: Document
|
|
): Promise<IOpLogEntry> {
|
|
const entry: IOpLogEntry = {
|
|
ts: this.generateTimestamp(),
|
|
op: 'c',
|
|
ns: `${dbName}.$cmd`,
|
|
o: command,
|
|
};
|
|
|
|
await this.storage.appendOpLog(entry);
|
|
this.notifyListeners(entry);
|
|
return entry;
|
|
}
|
|
|
|
/**
|
|
* Get oplog entries after a timestamp
|
|
*/
|
|
async getEntriesAfter(ts: plugins.bson.Timestamp, limit?: number): Promise<IOpLogEntry[]> {
|
|
return this.storage.getOpLogAfter(ts, limit);
|
|
}
|
|
|
|
/**
|
|
* Get the latest timestamp
|
|
*/
|
|
async getLatestTimestamp(): Promise<plugins.bson.Timestamp | null> {
|
|
return this.storage.getLatestOpLogTimestamp();
|
|
}
|
|
|
|
/**
|
|
* Subscribe to oplog changes (for change streams)
|
|
*/
|
|
subscribe(listener: (entry: IOpLogEntry) => void): () => void {
|
|
this.listeners.push(listener);
|
|
return () => {
|
|
const idx = this.listeners.indexOf(listener);
|
|
if (idx >= 0) {
|
|
this.listeners.splice(idx, 1);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Notify all listeners of a new entry
|
|
*/
|
|
private notifyListeners(entry: IOpLogEntry): void {
|
|
for (const listener of this.listeners) {
|
|
try {
|
|
listener(entry);
|
|
} catch (error) {
|
|
console.error('Error in oplog listener:', error);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Convert an oplog entry to a change stream document
|
|
*/
|
|
opLogEntryToChangeEvent(
|
|
entry: IOpLogEntry,
|
|
fullDocument?: Document,
|
|
fullDocumentBeforeChange?: Document
|
|
): {
|
|
_id: IResumeToken;
|
|
operationType: ChangeStreamOperationType;
|
|
fullDocument?: Document;
|
|
fullDocumentBeforeChange?: Document;
|
|
ns: { db: string; coll?: string };
|
|
documentKey?: { _id: plugins.bson.ObjectId };
|
|
updateDescription?: {
|
|
updatedFields?: Document;
|
|
removedFields?: string[];
|
|
};
|
|
clusterTime: plugins.bson.Timestamp;
|
|
} {
|
|
const [db, coll] = entry.ns.split('.');
|
|
const resumeToken = this.generateResumeToken(entry.ts);
|
|
|
|
const baseEvent = {
|
|
_id: resumeToken,
|
|
ns: { db, coll: coll === '$cmd' ? undefined : coll },
|
|
clusterTime: entry.ts,
|
|
};
|
|
|
|
switch (entry.op) {
|
|
case 'i':
|
|
return {
|
|
...baseEvent,
|
|
operationType: 'insert' as ChangeStreamOperationType,
|
|
fullDocument: fullDocument || entry.o,
|
|
documentKey: entry.o._id ? { _id: entry.o._id } : undefined,
|
|
};
|
|
|
|
case 'u':
|
|
const updateEvent: any = {
|
|
...baseEvent,
|
|
operationType: 'update' as ChangeStreamOperationType,
|
|
documentKey: entry.o2?._id ? { _id: entry.o2._id } : undefined,
|
|
};
|
|
|
|
if (fullDocument) {
|
|
updateEvent.fullDocument = fullDocument;
|
|
}
|
|
if (fullDocumentBeforeChange) {
|
|
updateEvent.fullDocumentBeforeChange = fullDocumentBeforeChange;
|
|
}
|
|
|
|
// Parse update description
|
|
if (entry.o.$set || entry.o.$unset) {
|
|
updateEvent.updateDescription = {
|
|
updatedFields: entry.o.$set || {},
|
|
removedFields: entry.o.$unset ? Object.keys(entry.o.$unset) : [],
|
|
};
|
|
}
|
|
|
|
return updateEvent;
|
|
|
|
case 'd':
|
|
return {
|
|
...baseEvent,
|
|
operationType: 'delete' as ChangeStreamOperationType,
|
|
documentKey: entry.o._id ? { _id: entry.o._id } : undefined,
|
|
fullDocumentBeforeChange,
|
|
};
|
|
|
|
case 'c':
|
|
if (entry.o.drop) {
|
|
return {
|
|
...baseEvent,
|
|
operationType: 'drop' as ChangeStreamOperationType,
|
|
ns: { db, coll: entry.o.drop },
|
|
};
|
|
}
|
|
if (entry.o.dropDatabase) {
|
|
return {
|
|
...baseEvent,
|
|
operationType: 'dropDatabase' as ChangeStreamOperationType,
|
|
};
|
|
}
|
|
if (entry.o.renameCollection) {
|
|
return {
|
|
...baseEvent,
|
|
operationType: 'rename' as ChangeStreamOperationType,
|
|
};
|
|
}
|
|
return {
|
|
...baseEvent,
|
|
operationType: 'invalidate' as ChangeStreamOperationType,
|
|
};
|
|
|
|
default:
|
|
return {
|
|
...baseEvent,
|
|
operationType: 'invalidate' as ChangeStreamOperationType,
|
|
};
|
|
}
|
|
}
|
|
}
|