BREAKING CHANGE(tsmdb): rename CongoDB to TsmDB and relocate/rename wire-protocol server implementation and public exports
This commit is contained in:
282
ts/tsmdb/storage/OpLog.ts
Normal file
282
ts/tsmdb/storage/OpLog.ts
Normal file
@@ -0,0 +1,282 @@
|
||||
import * as plugins from '../tsmdb.plugins.js';
|
||||
import type { IStorageAdapter } from './IStorageAdapter.js';
|
||||
import type { IOpLogEntry, Document, IResumeToken, ChangeStreamOperationType } from '../types/interfaces.js';
|
||||
|
||||
/**
|
||||
* Operation Log for tracking all mutations
|
||||
* Used primarily for change stream support
|
||||
*/
|
||||
export class OpLog {
|
||||
private storage: IStorageAdapter;
|
||||
private counter = 0;
|
||||
private listeners: Array<(entry: IOpLogEntry) => void> = [];
|
||||
|
||||
constructor(storage: IStorageAdapter) {
|
||||
this.storage = storage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a new timestamp for oplog entries
|
||||
*/
|
||||
generateTimestamp(): plugins.bson.Timestamp {
|
||||
this.counter++;
|
||||
return new plugins.bson.Timestamp({ t: Math.floor(Date.now() / 1000), i: this.counter });
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a resume token from a timestamp
|
||||
*/
|
||||
generateResumeToken(ts: plugins.bson.Timestamp): IResumeToken {
|
||||
// Create a resume token similar to MongoDB's format
|
||||
// It's a base64-encoded BSON document containing the timestamp
|
||||
const tokenData = {
|
||||
_data: Buffer.from(JSON.stringify({
|
||||
ts: { t: ts.high, i: ts.low },
|
||||
version: 1,
|
||||
})).toString('base64'),
|
||||
};
|
||||
return tokenData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a resume token to get the timestamp
|
||||
*/
|
||||
parseResumeToken(token: IResumeToken): plugins.bson.Timestamp {
|
||||
try {
|
||||
const data = JSON.parse(Buffer.from(token._data, 'base64').toString('utf-8'));
|
||||
return new plugins.bson.Timestamp({ t: data.ts.t, i: data.ts.i });
|
||||
} catch {
|
||||
throw new Error('Invalid resume token');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Record an insert operation
|
||||
*/
|
||||
async recordInsert(
|
||||
dbName: string,
|
||||
collName: string,
|
||||
document: Document,
|
||||
txnInfo?: { txnNumber?: number; lsid?: { id: plugins.bson.Binary } }
|
||||
): Promise<IOpLogEntry> {
|
||||
const entry: IOpLogEntry = {
|
||||
ts: this.generateTimestamp(),
|
||||
op: 'i',
|
||||
ns: `${dbName}.${collName}`,
|
||||
o: document,
|
||||
...txnInfo,
|
||||
};
|
||||
|
||||
await this.storage.appendOpLog(entry);
|
||||
this.notifyListeners(entry);
|
||||
return entry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Record an update operation
|
||||
*/
|
||||
async recordUpdate(
|
||||
dbName: string,
|
||||
collName: string,
|
||||
filter: Document,
|
||||
update: Document,
|
||||
txnInfo?: { txnNumber?: number; lsid?: { id: plugins.bson.Binary } }
|
||||
): Promise<IOpLogEntry> {
|
||||
const entry: IOpLogEntry = {
|
||||
ts: this.generateTimestamp(),
|
||||
op: 'u',
|
||||
ns: `${dbName}.${collName}`,
|
||||
o: update,
|
||||
o2: filter,
|
||||
...txnInfo,
|
||||
};
|
||||
|
||||
await this.storage.appendOpLog(entry);
|
||||
this.notifyListeners(entry);
|
||||
return entry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Record a delete operation
|
||||
*/
|
||||
async recordDelete(
|
||||
dbName: string,
|
||||
collName: string,
|
||||
filter: Document,
|
||||
txnInfo?: { txnNumber?: number; lsid?: { id: plugins.bson.Binary } }
|
||||
): Promise<IOpLogEntry> {
|
||||
const entry: IOpLogEntry = {
|
||||
ts: this.generateTimestamp(),
|
||||
op: 'd',
|
||||
ns: `${dbName}.${collName}`,
|
||||
o: filter,
|
||||
...txnInfo,
|
||||
};
|
||||
|
||||
await this.storage.appendOpLog(entry);
|
||||
this.notifyListeners(entry);
|
||||
return entry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Record a command (drop, rename, etc.)
|
||||
*/
|
||||
async recordCommand(
|
||||
dbName: string,
|
||||
command: Document
|
||||
): Promise<IOpLogEntry> {
|
||||
const entry: IOpLogEntry = {
|
||||
ts: this.generateTimestamp(),
|
||||
op: 'c',
|
||||
ns: `${dbName}.$cmd`,
|
||||
o: command,
|
||||
};
|
||||
|
||||
await this.storage.appendOpLog(entry);
|
||||
this.notifyListeners(entry);
|
||||
return entry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get oplog entries after a timestamp
|
||||
*/
|
||||
async getEntriesAfter(ts: plugins.bson.Timestamp, limit?: number): Promise<IOpLogEntry[]> {
|
||||
return this.storage.getOpLogAfter(ts, limit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the latest timestamp
|
||||
*/
|
||||
async getLatestTimestamp(): Promise<plugins.bson.Timestamp | null> {
|
||||
return this.storage.getLatestOpLogTimestamp();
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to oplog changes (for change streams)
|
||||
*/
|
||||
subscribe(listener: (entry: IOpLogEntry) => void): () => void {
|
||||
this.listeners.push(listener);
|
||||
return () => {
|
||||
const idx = this.listeners.indexOf(listener);
|
||||
if (idx >= 0) {
|
||||
this.listeners.splice(idx, 1);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify all listeners of a new entry
|
||||
*/
|
||||
private notifyListeners(entry: IOpLogEntry): void {
|
||||
for (const listener of this.listeners) {
|
||||
try {
|
||||
listener(entry);
|
||||
} catch (error) {
|
||||
console.error('Error in oplog listener:', error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert an oplog entry to a change stream document
|
||||
*/
|
||||
opLogEntryToChangeEvent(
|
||||
entry: IOpLogEntry,
|
||||
fullDocument?: Document,
|
||||
fullDocumentBeforeChange?: Document
|
||||
): {
|
||||
_id: IResumeToken;
|
||||
operationType: ChangeStreamOperationType;
|
||||
fullDocument?: Document;
|
||||
fullDocumentBeforeChange?: Document;
|
||||
ns: { db: string; coll?: string };
|
||||
documentKey?: { _id: plugins.bson.ObjectId };
|
||||
updateDescription?: {
|
||||
updatedFields?: Document;
|
||||
removedFields?: string[];
|
||||
};
|
||||
clusterTime: plugins.bson.Timestamp;
|
||||
} {
|
||||
const [db, coll] = entry.ns.split('.');
|
||||
const resumeToken = this.generateResumeToken(entry.ts);
|
||||
|
||||
const baseEvent = {
|
||||
_id: resumeToken,
|
||||
ns: { db, coll: coll === '$cmd' ? undefined : coll },
|
||||
clusterTime: entry.ts,
|
||||
};
|
||||
|
||||
switch (entry.op) {
|
||||
case 'i':
|
||||
return {
|
||||
...baseEvent,
|
||||
operationType: 'insert' as ChangeStreamOperationType,
|
||||
fullDocument: fullDocument || entry.o,
|
||||
documentKey: entry.o._id ? { _id: entry.o._id } : undefined,
|
||||
};
|
||||
|
||||
case 'u':
|
||||
const updateEvent: any = {
|
||||
...baseEvent,
|
||||
operationType: 'update' as ChangeStreamOperationType,
|
||||
documentKey: entry.o2?._id ? { _id: entry.o2._id } : undefined,
|
||||
};
|
||||
|
||||
if (fullDocument) {
|
||||
updateEvent.fullDocument = fullDocument;
|
||||
}
|
||||
if (fullDocumentBeforeChange) {
|
||||
updateEvent.fullDocumentBeforeChange = fullDocumentBeforeChange;
|
||||
}
|
||||
|
||||
// Parse update description
|
||||
if (entry.o.$set || entry.o.$unset) {
|
||||
updateEvent.updateDescription = {
|
||||
updatedFields: entry.o.$set || {},
|
||||
removedFields: entry.o.$unset ? Object.keys(entry.o.$unset) : [],
|
||||
};
|
||||
}
|
||||
|
||||
return updateEvent;
|
||||
|
||||
case 'd':
|
||||
return {
|
||||
...baseEvent,
|
||||
operationType: 'delete' as ChangeStreamOperationType,
|
||||
documentKey: entry.o._id ? { _id: entry.o._id } : undefined,
|
||||
fullDocumentBeforeChange,
|
||||
};
|
||||
|
||||
case 'c':
|
||||
if (entry.o.drop) {
|
||||
return {
|
||||
...baseEvent,
|
||||
operationType: 'drop' as ChangeStreamOperationType,
|
||||
ns: { db, coll: entry.o.drop },
|
||||
};
|
||||
}
|
||||
if (entry.o.dropDatabase) {
|
||||
return {
|
||||
...baseEvent,
|
||||
operationType: 'dropDatabase' as ChangeStreamOperationType,
|
||||
};
|
||||
}
|
||||
if (entry.o.renameCollection) {
|
||||
return {
|
||||
...baseEvent,
|
||||
operationType: 'rename' as ChangeStreamOperationType,
|
||||
};
|
||||
}
|
||||
return {
|
||||
...baseEvent,
|
||||
operationType: 'invalidate' as ChangeStreamOperationType,
|
||||
};
|
||||
|
||||
default:
|
||||
return {
|
||||
...baseEvent,
|
||||
operationType: 'invalidate' as ChangeStreamOperationType,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user