feat(tsmdb): implement TsmDB Mongo-wire-compatible server, add storage/engine modules and reorganize exports

This commit is contained in:
2026-02-01 23:33:35 +00:00
parent 678bf15eb4
commit fff77fbd8e
40 changed files with 261 additions and 95 deletions

View File

@@ -0,0 +1,562 @@
import * as plugins from '../plugins.js';
import type { IStorageAdapter } from './IStorageAdapter.js';
import type { IStoredDocument, IOpLogEntry, Document } from '../types/interfaces.js';
import { calculateDocumentChecksum, verifyChecksum } from '../utils/checksum.js';
/**
* File storage adapter options
*/
export interface IFileStorageAdapterOptions {
/** Enable checksum verification for data integrity */
enableChecksums?: boolean;
/** Throw error on checksum mismatch (default: false, just log warning) */
strictChecksums?: boolean;
}
/**
* File-based storage adapter for TsmDB
* Stores data in JSON files on disk for persistence
*/
export class FileStorageAdapter implements IStorageAdapter {
private basePath: string;
private opLogCounter = 0;
private initialized = false;
private fs = new plugins.smartfs.SmartFs(new plugins.smartfs.SmartFsProviderNode());
private enableChecksums: boolean;
private strictChecksums: boolean;
constructor(basePath: string, options?: IFileStorageAdapterOptions) {
this.basePath = basePath;
this.enableChecksums = options?.enableChecksums ?? false;
this.strictChecksums = options?.strictChecksums ?? false;
}
// ============================================================================
// Helper Methods
// ============================================================================
private getDbPath(dbName: string): string {
return plugins.smartpath.join(this.basePath, dbName);
}
private getCollectionPath(dbName: string, collName: string): string {
return plugins.smartpath.join(this.basePath, dbName, `${collName}.json`);
}
private getIndexPath(dbName: string, collName: string): string {
return plugins.smartpath.join(this.basePath, dbName, `${collName}.indexes.json`);
}
private getOpLogPath(): string {
return plugins.smartpath.join(this.basePath, '_oplog.json');
}
private getMetaPath(): string {
return plugins.smartpath.join(this.basePath, '_meta.json');
}
private async readJsonFile<T>(filePath: string, defaultValue: T): Promise<T> {
try {
const exists = await this.fs.file(filePath).exists();
if (!exists) return defaultValue;
const content = await this.fs.file(filePath).encoding('utf8').read();
return JSON.parse(content as string);
} catch {
return defaultValue;
}
}
private async writeJsonFile(filePath: string, data: any): Promise<void> {
const dir = filePath.substring(0, filePath.lastIndexOf('/'));
await this.fs.directory(dir).recursive().create();
await this.fs.file(filePath).encoding('utf8').write(JSON.stringify(data, null, 2));
}
private restoreObjectIds(doc: any): IStoredDocument {
if (doc._id) {
if (typeof doc._id === 'string') {
doc._id = new plugins.bson.ObjectId(doc._id);
} else if (typeof doc._id === 'object' && doc._id.$oid) {
doc._id = new plugins.bson.ObjectId(doc._id.$oid);
}
}
return doc;
}
/**
* Verify document checksum and handle errors
*/
private verifyDocumentChecksum(doc: any): boolean {
if (!this.enableChecksums || !doc._checksum) {
return true;
}
const isValid = verifyChecksum(doc);
if (!isValid) {
const errorMsg = `Checksum mismatch for document ${doc._id}`;
if (this.strictChecksums) {
throw new Error(errorMsg);
} else {
console.warn(`WARNING: ${errorMsg}`);
}
}
return isValid;
}
/**
* Add checksum to document before storing
*/
private prepareDocumentForStorage(doc: any): any {
if (!this.enableChecksums) {
return doc;
}
const checksum = calculateDocumentChecksum(doc);
return { ...doc, _checksum: checksum };
}
/**
* Remove internal checksum field before returning to user
*/
private cleanDocumentForReturn(doc: any): IStoredDocument {
const { _checksum, ...cleanDoc } = doc;
return this.restoreObjectIds(cleanDoc);
}
// ============================================================================
// Initialization
// ============================================================================
async initialize(): Promise<void> {
if (this.initialized) return;
await this.fs.directory(this.basePath).recursive().create();
// Load metadata
const meta = await this.readJsonFile(this.getMetaPath(), { opLogCounter: 0 });
this.opLogCounter = meta.opLogCounter || 0;
this.initialized = true;
}
async close(): Promise<void> {
// Save metadata
await this.writeJsonFile(this.getMetaPath(), { opLogCounter: this.opLogCounter });
this.initialized = false;
}
// ============================================================================
// Database Operations
// ============================================================================
async listDatabases(): Promise<string[]> {
await this.initialize();
try {
const entries = await this.fs.directory(this.basePath).list();
return entries
.filter(entry => entry.isDirectory && !entry.name.startsWith('_'))
.map(entry => entry.name);
} catch {
return [];
}
}
async createDatabase(dbName: string): Promise<void> {
await this.initialize();
const dbPath = this.getDbPath(dbName);
await this.fs.directory(dbPath).recursive().create();
}
async dropDatabase(dbName: string): Promise<boolean> {
await this.initialize();
const dbPath = this.getDbPath(dbName);
try {
const exists = await this.fs.directory(dbPath).exists();
if (exists) {
await this.fs.directory(dbPath).recursive().delete();
return true;
}
return false;
} catch {
return false;
}
}
async databaseExists(dbName: string): Promise<boolean> {
await this.initialize();
const dbPath = this.getDbPath(dbName);
return this.fs.directory(dbPath).exists();
}
// ============================================================================
// Collection Operations
// ============================================================================
async listCollections(dbName: string): Promise<string[]> {
await this.initialize();
const dbPath = this.getDbPath(dbName);
try {
const entries = await this.fs.directory(dbPath).list();
return entries
.filter(entry => entry.isFile && entry.name.endsWith('.json') && !entry.name.endsWith('.indexes.json'))
.map(entry => entry.name.replace('.json', ''));
} catch {
return [];
}
}
async createCollection(dbName: string, collName: string): Promise<void> {
await this.createDatabase(dbName);
const collPath = this.getCollectionPath(dbName, collName);
const exists = await this.fs.file(collPath).exists();
if (!exists) {
await this.writeJsonFile(collPath, []);
// Create default _id index
await this.writeJsonFile(this.getIndexPath(dbName, collName), [
{ name: '_id_', key: { _id: 1 }, unique: true }
]);
}
}
async dropCollection(dbName: string, collName: string): Promise<boolean> {
await this.initialize();
const collPath = this.getCollectionPath(dbName, collName);
const indexPath = this.getIndexPath(dbName, collName);
try {
const exists = await this.fs.file(collPath).exists();
if (exists) {
await this.fs.file(collPath).delete();
try {
await this.fs.file(indexPath).delete();
} catch {}
return true;
}
return false;
} catch {
return false;
}
}
async collectionExists(dbName: string, collName: string): Promise<boolean> {
await this.initialize();
const collPath = this.getCollectionPath(dbName, collName);
return this.fs.file(collPath).exists();
}
async renameCollection(dbName: string, oldName: string, newName: string): Promise<void> {
await this.initialize();
const oldPath = this.getCollectionPath(dbName, oldName);
const newPath = this.getCollectionPath(dbName, newName);
const oldIndexPath = this.getIndexPath(dbName, oldName);
const newIndexPath = this.getIndexPath(dbName, newName);
const exists = await this.fs.file(oldPath).exists();
if (!exists) {
throw new Error(`Collection ${oldName} not found`);
}
// Read, write to new, delete old
const docs = await this.readJsonFile<any[]>(oldPath, []);
await this.writeJsonFile(newPath, docs);
await this.fs.file(oldPath).delete();
// Handle indexes
const indexes = await this.readJsonFile<any[]>(oldIndexPath, []);
await this.writeJsonFile(newIndexPath, indexes);
try {
await this.fs.file(oldIndexPath).delete();
} catch {}
}
// ============================================================================
// Document Operations
// ============================================================================
async insertOne(dbName: string, collName: string, doc: Document): Promise<IStoredDocument> {
await this.createCollection(dbName, collName);
const collPath = this.getCollectionPath(dbName, collName);
const docs = await this.readJsonFile<any[]>(collPath, []);
const storedDoc: IStoredDocument = {
...doc,
_id: doc._id ? (doc._id instanceof plugins.bson.ObjectId ? doc._id : new plugins.bson.ObjectId(doc._id)) : new plugins.bson.ObjectId(),
};
// Check for duplicate
const idStr = storedDoc._id.toHexString();
if (docs.some(d => d._id === idStr || (d._id && d._id.toString() === idStr))) {
throw new Error(`Duplicate key error: _id ${idStr}`);
}
// Add checksum if enabled
const docToStore = this.prepareDocumentForStorage(storedDoc);
docs.push(docToStore);
await this.writeJsonFile(collPath, docs);
return storedDoc;
}
async insertMany(dbName: string, collName: string, docsToInsert: Document[]): Promise<IStoredDocument[]> {
await this.createCollection(dbName, collName);
const collPath = this.getCollectionPath(dbName, collName);
const docs = await this.readJsonFile<any[]>(collPath, []);
const results: IStoredDocument[] = [];
const existingIds = new Set(docs.map(d => d._id?.toString?.() || d._id));
for (const doc of docsToInsert) {
const storedDoc: IStoredDocument = {
...doc,
_id: doc._id ? (doc._id instanceof plugins.bson.ObjectId ? doc._id : new plugins.bson.ObjectId(doc._id)) : new plugins.bson.ObjectId(),
};
const idStr = storedDoc._id.toHexString();
if (existingIds.has(idStr)) {
throw new Error(`Duplicate key error: _id ${idStr}`);
}
existingIds.add(idStr);
// Add checksum if enabled
const docToStore = this.prepareDocumentForStorage(storedDoc);
docs.push(docToStore);
results.push(storedDoc);
}
await this.writeJsonFile(collPath, docs);
return results;
}
async findAll(dbName: string, collName: string): Promise<IStoredDocument[]> {
await this.createCollection(dbName, collName);
const collPath = this.getCollectionPath(dbName, collName);
const docs = await this.readJsonFile<any[]>(collPath, []);
return docs.map(doc => {
// Verify checksum if enabled
this.verifyDocumentChecksum(doc);
// Clean and return document without internal checksum field
return this.cleanDocumentForReturn(doc);
});
}
async findByIds(dbName: string, collName: string, ids: Set<string>): Promise<IStoredDocument[]> {
await this.createCollection(dbName, collName);
const collPath = this.getCollectionPath(dbName, collName);
const docs = await this.readJsonFile<any[]>(collPath, []);
const results: IStoredDocument[] = [];
for (const doc of docs) {
// Verify checksum if enabled
this.verifyDocumentChecksum(doc);
// Clean and restore document
const cleaned = this.cleanDocumentForReturn(doc);
if (ids.has(cleaned._id.toHexString())) {
results.push(cleaned);
}
}
return results;
}
async findById(dbName: string, collName: string, id: plugins.bson.ObjectId): Promise<IStoredDocument | null> {
// Use findAll which already handles checksum verification
const docs = await this.findAll(dbName, collName);
const idStr = id.toHexString();
return docs.find(d => d._id.toHexString() === idStr) || null;
}
async updateById(dbName: string, collName: string, id: plugins.bson.ObjectId, doc: IStoredDocument): Promise<boolean> {
const collPath = this.getCollectionPath(dbName, collName);
const docs = await this.readJsonFile<any[]>(collPath, []);
const idStr = id.toHexString();
const idx = docs.findIndex(d => {
const docId = d._id?.toHexString?.() || d._id?.toString?.() || d._id;
return docId === idStr;
});
if (idx === -1) return false;
// Add checksum if enabled
const docToStore = this.prepareDocumentForStorage(doc);
docs[idx] = docToStore;
await this.writeJsonFile(collPath, docs);
return true;
}
async deleteById(dbName: string, collName: string, id: plugins.bson.ObjectId): Promise<boolean> {
const collPath = this.getCollectionPath(dbName, collName);
const docs = await this.readJsonFile<any[]>(collPath, []);
const idStr = id.toHexString();
const idx = docs.findIndex(d => {
const docId = d._id?.toHexString?.() || d._id?.toString?.() || d._id;
return docId === idStr;
});
if (idx === -1) return false;
docs.splice(idx, 1);
await this.writeJsonFile(collPath, docs);
return true;
}
async deleteByIds(dbName: string, collName: string, ids: plugins.bson.ObjectId[]): Promise<number> {
const collPath = this.getCollectionPath(dbName, collName);
const docs = await this.readJsonFile<any[]>(collPath, []);
const idStrs = new Set(ids.map(id => id.toHexString()));
const originalLength = docs.length;
const filtered = docs.filter(d => {
const docId = d._id?.toHexString?.() || d._id?.toString?.() || d._id;
return !idStrs.has(docId);
});
await this.writeJsonFile(collPath, filtered);
return originalLength - filtered.length;
}
async count(dbName: string, collName: string): Promise<number> {
const collPath = this.getCollectionPath(dbName, collName);
const docs = await this.readJsonFile<any[]>(collPath, []);
return docs.length;
}
// ============================================================================
// Index Operations
// ============================================================================
async saveIndex(
dbName: string,
collName: string,
indexName: string,
indexSpec: { key: Record<string, any>; unique?: boolean; sparse?: boolean; expireAfterSeconds?: number }
): Promise<void> {
await this.createCollection(dbName, collName);
const indexPath = this.getIndexPath(dbName, collName);
const indexes = await this.readJsonFile<any[]>(indexPath, [
{ name: '_id_', key: { _id: 1 }, unique: true }
]);
const existingIdx = indexes.findIndex(i => i.name === indexName);
if (existingIdx >= 0) {
indexes[existingIdx] = { name: indexName, ...indexSpec };
} else {
indexes.push({ name: indexName, ...indexSpec });
}
await this.writeJsonFile(indexPath, indexes);
}
async getIndexes(dbName: string, collName: string): Promise<Array<{
name: string;
key: Record<string, any>;
unique?: boolean;
sparse?: boolean;
expireAfterSeconds?: number;
}>> {
const indexPath = this.getIndexPath(dbName, collName);
return this.readJsonFile(indexPath, [{ name: '_id_', key: { _id: 1 }, unique: true }]);
}
async dropIndex(dbName: string, collName: string, indexName: string): Promise<boolean> {
if (indexName === '_id_') {
throw new Error('Cannot drop _id index');
}
const indexPath = this.getIndexPath(dbName, collName);
const indexes = await this.readJsonFile<any[]>(indexPath, []);
const idx = indexes.findIndex(i => i.name === indexName);
if (idx >= 0) {
indexes.splice(idx, 1);
await this.writeJsonFile(indexPath, indexes);
return true;
}
return false;
}
// ============================================================================
// OpLog Operations
// ============================================================================
async appendOpLog(entry: IOpLogEntry): Promise<void> {
const opLogPath = this.getOpLogPath();
const opLog = await this.readJsonFile<IOpLogEntry[]>(opLogPath, []);
opLog.push(entry);
// Trim oplog if it gets too large
if (opLog.length > 10000) {
opLog.splice(0, opLog.length - 10000);
}
await this.writeJsonFile(opLogPath, opLog);
}
async getOpLogAfter(ts: plugins.bson.Timestamp, limit: number = 1000): Promise<IOpLogEntry[]> {
const opLogPath = this.getOpLogPath();
const opLog = await this.readJsonFile<any[]>(opLogPath, []);
const tsValue = ts.toNumber();
const entries = opLog.filter(e => {
const entryTs = e.ts.toNumber ? e.ts.toNumber() : (e.ts.t * 4294967296 + e.ts.i);
return entryTs > tsValue;
});
return entries.slice(0, limit);
}
async getLatestOpLogTimestamp(): Promise<plugins.bson.Timestamp | null> {
const opLogPath = this.getOpLogPath();
const opLog = await this.readJsonFile<any[]>(opLogPath, []);
if (opLog.length === 0) return null;
const last = opLog[opLog.length - 1];
if (last.ts instanceof plugins.bson.Timestamp) {
return last.ts;
}
return new plugins.bson.Timestamp({ t: last.ts.t, i: last.ts.i });
}
generateTimestamp(): plugins.bson.Timestamp {
this.opLogCounter++;
return new plugins.bson.Timestamp({ t: Math.floor(Date.now() / 1000), i: this.opLogCounter });
}
// ============================================================================
// Transaction Support
// ============================================================================
async createSnapshot(dbName: string, collName: string): Promise<IStoredDocument[]> {
const docs = await this.findAll(dbName, collName);
return docs.map(doc => JSON.parse(JSON.stringify(doc)));
}
async hasConflicts(
dbName: string,
collName: string,
ids: plugins.bson.ObjectId[],
snapshotTime: plugins.bson.Timestamp
): Promise<boolean> {
const opLogPath = this.getOpLogPath();
const opLog = await this.readJsonFile<any[]>(opLogPath, []);
const ns = `${dbName}.${collName}`;
const snapshotTs = snapshotTime.toNumber();
const modifiedIds = new Set<string>();
for (const entry of opLog) {
const entryTs = entry.ts.toNumber ? entry.ts.toNumber() : (entry.ts.t * 4294967296 + entry.ts.i);
if (entryTs > snapshotTs && entry.ns === ns) {
if (entry.o._id) {
modifiedIds.add(entry.o._id.toString());
}
if (entry.o2?._id) {
modifiedIds.add(entry.o2._id.toString());
}
}
}
for (const id of ids) {
if (modifiedIds.has(id.toString())) {
return true;
}
}
return false;
}
}

View File

@@ -0,0 +1,208 @@
import type * as plugins from '../plugins.js';
import type { IStoredDocument, IOpLogEntry, Document } from '../types/interfaces.js';
/**
* Storage adapter interface for TsmDB
* Implementations can provide different storage backends (memory, file, etc.)
*/
export interface IStorageAdapter {
/**
* Initialize the storage adapter
*/
initialize(): Promise<void>;
/**
* Close the storage adapter and release resources
*/
close(): Promise<void>;
// ============================================================================
// Database Operations
// ============================================================================
/**
* List all database names
*/
listDatabases(): Promise<string[]>;
/**
* Create a database (typically lazy - just marks it as existing)
*/
createDatabase(dbName: string): Promise<void>;
/**
* Drop a database and all its collections
*/
dropDatabase(dbName: string): Promise<boolean>;
/**
* Check if a database exists
*/
databaseExists(dbName: string): Promise<boolean>;
// ============================================================================
// Collection Operations
// ============================================================================
/**
* List all collection names in a database
*/
listCollections(dbName: string): Promise<string[]>;
/**
* Create a collection
*/
createCollection(dbName: string, collName: string): Promise<void>;
/**
* Drop a collection
*/
dropCollection(dbName: string, collName: string): Promise<boolean>;
/**
* Check if a collection exists
*/
collectionExists(dbName: string, collName: string): Promise<boolean>;
/**
* Rename a collection
*/
renameCollection(dbName: string, oldName: string, newName: string): Promise<void>;
// ============================================================================
// Document Operations
// ============================================================================
/**
* Insert a single document
* @returns The inserted document with _id
*/
insertOne(dbName: string, collName: string, doc: Document): Promise<IStoredDocument>;
/**
* Insert multiple documents
* @returns Array of inserted documents with _ids
*/
insertMany(dbName: string, collName: string, docs: Document[]): Promise<IStoredDocument[]>;
/**
* Find all documents in a collection
*/
findAll(dbName: string, collName: string): Promise<IStoredDocument[]>;
/**
* Find documents by a set of _id strings (hex format)
* Used for index-accelerated queries
*/
findByIds(dbName: string, collName: string, ids: Set<string>): Promise<IStoredDocument[]>;
/**
* Find a document by _id
*/
findById(dbName: string, collName: string, id: plugins.bson.ObjectId): Promise<IStoredDocument | null>;
/**
* Update a document by _id
* @returns true if document was updated
*/
updateById(dbName: string, collName: string, id: plugins.bson.ObjectId, doc: IStoredDocument): Promise<boolean>;
/**
* Delete a document by _id
* @returns true if document was deleted
*/
deleteById(dbName: string, collName: string, id: plugins.bson.ObjectId): Promise<boolean>;
/**
* Delete multiple documents by _id
* @returns Number of deleted documents
*/
deleteByIds(dbName: string, collName: string, ids: plugins.bson.ObjectId[]): Promise<number>;
/**
* Get the count of documents in a collection
*/
count(dbName: string, collName: string): Promise<number>;
// ============================================================================
// Index Operations
// ============================================================================
/**
* Store index metadata
*/
saveIndex(
dbName: string,
collName: string,
indexName: string,
indexSpec: { key: Record<string, any>; unique?: boolean; sparse?: boolean; expireAfterSeconds?: number }
): Promise<void>;
/**
* Get all index metadata for a collection
*/
getIndexes(dbName: string, collName: string): Promise<Array<{
name: string;
key: Record<string, any>;
unique?: boolean;
sparse?: boolean;
expireAfterSeconds?: number;
}>>;
/**
* Drop an index
*/
dropIndex(dbName: string, collName: string, indexName: string): Promise<boolean>;
// ============================================================================
// OpLog Operations (for change streams)
// ============================================================================
/**
* Append an operation to the oplog
*/
appendOpLog(entry: IOpLogEntry): Promise<void>;
/**
* Get oplog entries after a timestamp
*/
getOpLogAfter(ts: plugins.bson.Timestamp, limit?: number): Promise<IOpLogEntry[]>;
/**
* Get the latest oplog timestamp
*/
getLatestOpLogTimestamp(): Promise<plugins.bson.Timestamp | null>;
// ============================================================================
// Transaction Support
// ============================================================================
/**
* Create a snapshot of current data for transaction isolation
*/
createSnapshot(dbName: string, collName: string): Promise<IStoredDocument[]>;
/**
* Check if any documents have been modified since the snapshot
*/
hasConflicts(
dbName: string,
collName: string,
ids: plugins.bson.ObjectId[],
snapshotTime: plugins.bson.Timestamp
): Promise<boolean>;
// ============================================================================
// Persistence (optional, for MemoryStorageAdapter with file backup)
// ============================================================================
/**
* Persist current state to disk (if supported)
*/
persist?(): Promise<void>;
/**
* Load state from disk (if supported)
*/
restore?(): Promise<void>;
}

View File

@@ -0,0 +1,455 @@
import * as plugins from '../plugins.js';
import type { IStorageAdapter } from './IStorageAdapter.js';
import type { IStoredDocument, IOpLogEntry, Document } from '../types/interfaces.js';
/**
* In-memory storage adapter for TsmDB
* Optionally supports persistence to a file
*/
export class MemoryStorageAdapter implements IStorageAdapter {
// Database -> Collection -> Documents
private databases: Map<string, Map<string, Map<string, IStoredDocument>>> = new Map();
// Database -> Collection -> Indexes
private indexes: Map<string, Map<string, Array<{
name: string;
key: Record<string, any>;
unique?: boolean;
sparse?: boolean;
expireAfterSeconds?: number;
}>>> = new Map();
// OpLog entries
private opLog: IOpLogEntry[] = [];
private opLogCounter = 0;
// Persistence settings
private persistPath?: string;
private persistInterval?: ReturnType<typeof setInterval>;
private fs = new plugins.smartfs.SmartFs(new plugins.smartfs.SmartFsProviderNode());
constructor(options?: { persistPath?: string; persistIntervalMs?: number }) {
this.persistPath = options?.persistPath;
if (this.persistPath && options?.persistIntervalMs) {
this.persistInterval = setInterval(() => {
this.persist().catch(console.error);
}, options.persistIntervalMs);
}
}
async initialize(): Promise<void> {
if (this.persistPath) {
await this.restore();
}
}
async close(): Promise<void> {
if (this.persistInterval) {
clearInterval(this.persistInterval);
}
if (this.persistPath) {
await this.persist();
}
}
// ============================================================================
// Database Operations
// ============================================================================
async listDatabases(): Promise<string[]> {
return Array.from(this.databases.keys());
}
async createDatabase(dbName: string): Promise<void> {
if (!this.databases.has(dbName)) {
this.databases.set(dbName, new Map());
this.indexes.set(dbName, new Map());
}
}
async dropDatabase(dbName: string): Promise<boolean> {
const existed = this.databases.has(dbName);
this.databases.delete(dbName);
this.indexes.delete(dbName);
return existed;
}
async databaseExists(dbName: string): Promise<boolean> {
return this.databases.has(dbName);
}
// ============================================================================
// Collection Operations
// ============================================================================
async listCollections(dbName: string): Promise<string[]> {
const db = this.databases.get(dbName);
return db ? Array.from(db.keys()) : [];
}
async createCollection(dbName: string, collName: string): Promise<void> {
await this.createDatabase(dbName);
const db = this.databases.get(dbName)!;
if (!db.has(collName)) {
db.set(collName, new Map());
// Initialize default _id index
const dbIndexes = this.indexes.get(dbName)!;
dbIndexes.set(collName, [{ name: '_id_', key: { _id: 1 }, unique: true }]);
}
}
async dropCollection(dbName: string, collName: string): Promise<boolean> {
const db = this.databases.get(dbName);
if (!db) return false;
const existed = db.has(collName);
db.delete(collName);
const dbIndexes = this.indexes.get(dbName);
if (dbIndexes) {
dbIndexes.delete(collName);
}
return existed;
}
async collectionExists(dbName: string, collName: string): Promise<boolean> {
const db = this.databases.get(dbName);
return db ? db.has(collName) : false;
}
async renameCollection(dbName: string, oldName: string, newName: string): Promise<void> {
const db = this.databases.get(dbName);
if (!db || !db.has(oldName)) {
throw new Error(`Collection ${oldName} not found`);
}
const collection = db.get(oldName)!;
db.set(newName, collection);
db.delete(oldName);
// Also rename indexes
const dbIndexes = this.indexes.get(dbName);
if (dbIndexes && dbIndexes.has(oldName)) {
const collIndexes = dbIndexes.get(oldName)!;
dbIndexes.set(newName, collIndexes);
dbIndexes.delete(oldName);
}
}
// ============================================================================
// Document Operations
// ============================================================================
private getCollection(dbName: string, collName: string): Map<string, IStoredDocument> {
const db = this.databases.get(dbName);
if (!db) {
throw new Error(`Database ${dbName} not found`);
}
const collection = db.get(collName);
if (!collection) {
throw new Error(`Collection ${collName} not found`);
}
return collection;
}
private ensureCollection(dbName: string, collName: string): Map<string, IStoredDocument> {
if (!this.databases.has(dbName)) {
this.databases.set(dbName, new Map());
this.indexes.set(dbName, new Map());
}
const db = this.databases.get(dbName)!;
if (!db.has(collName)) {
db.set(collName, new Map());
const dbIndexes = this.indexes.get(dbName)!;
dbIndexes.set(collName, [{ name: '_id_', key: { _id: 1 }, unique: true }]);
}
return db.get(collName)!;
}
async insertOne(dbName: string, collName: string, doc: Document): Promise<IStoredDocument> {
const collection = this.ensureCollection(dbName, collName);
const storedDoc: IStoredDocument = {
...doc,
_id: doc._id instanceof plugins.bson.ObjectId ? doc._id : new plugins.bson.ObjectId(doc._id),
};
if (!storedDoc._id) {
storedDoc._id = new plugins.bson.ObjectId();
}
const idStr = storedDoc._id.toHexString();
if (collection.has(idStr)) {
throw new Error(`Duplicate key error: _id ${idStr}`);
}
collection.set(idStr, storedDoc);
return storedDoc;
}
async insertMany(dbName: string, collName: string, docs: Document[]): Promise<IStoredDocument[]> {
const results: IStoredDocument[] = [];
for (const doc of docs) {
results.push(await this.insertOne(dbName, collName, doc));
}
return results;
}
async findAll(dbName: string, collName: string): Promise<IStoredDocument[]> {
const collection = this.ensureCollection(dbName, collName);
return Array.from(collection.values());
}
async findByIds(dbName: string, collName: string, ids: Set<string>): Promise<IStoredDocument[]> {
const collection = this.ensureCollection(dbName, collName);
const results: IStoredDocument[] = [];
for (const id of ids) {
const doc = collection.get(id);
if (doc) {
results.push(doc);
}
}
return results;
}
async findById(dbName: string, collName: string, id: plugins.bson.ObjectId): Promise<IStoredDocument | null> {
const collection = this.ensureCollection(dbName, collName);
return collection.get(id.toHexString()) || null;
}
async updateById(dbName: string, collName: string, id: plugins.bson.ObjectId, doc: IStoredDocument): Promise<boolean> {
const collection = this.ensureCollection(dbName, collName);
const idStr = id.toHexString();
if (!collection.has(idStr)) {
return false;
}
collection.set(idStr, doc);
return true;
}
async deleteById(dbName: string, collName: string, id: plugins.bson.ObjectId): Promise<boolean> {
const collection = this.ensureCollection(dbName, collName);
return collection.delete(id.toHexString());
}
async deleteByIds(dbName: string, collName: string, ids: plugins.bson.ObjectId[]): Promise<number> {
let count = 0;
for (const id of ids) {
if (await this.deleteById(dbName, collName, id)) {
count++;
}
}
return count;
}
async count(dbName: string, collName: string): Promise<number> {
const collection = this.ensureCollection(dbName, collName);
return collection.size;
}
// ============================================================================
// Index Operations
// ============================================================================
async saveIndex(
dbName: string,
collName: string,
indexName: string,
indexSpec: { key: Record<string, any>; unique?: boolean; sparse?: boolean; expireAfterSeconds?: number }
): Promise<void> {
await this.createCollection(dbName, collName);
const dbIndexes = this.indexes.get(dbName)!;
let collIndexes = dbIndexes.get(collName);
if (!collIndexes) {
collIndexes = [{ name: '_id_', key: { _id: 1 }, unique: true }];
dbIndexes.set(collName, collIndexes);
}
// Check if index already exists
const existingIndex = collIndexes.findIndex(i => i.name === indexName);
if (existingIndex >= 0) {
collIndexes[existingIndex] = { name: indexName, ...indexSpec };
} else {
collIndexes.push({ name: indexName, ...indexSpec });
}
}
async getIndexes(dbName: string, collName: string): Promise<Array<{
name: string;
key: Record<string, any>;
unique?: boolean;
sparse?: boolean;
expireAfterSeconds?: number;
}>> {
const dbIndexes = this.indexes.get(dbName);
if (!dbIndexes) return [{ name: '_id_', key: { _id: 1 }, unique: true }];
const collIndexes = dbIndexes.get(collName);
return collIndexes || [{ name: '_id_', key: { _id: 1 }, unique: true }];
}
async dropIndex(dbName: string, collName: string, indexName: string): Promise<boolean> {
if (indexName === '_id_') {
throw new Error('Cannot drop _id index');
}
const dbIndexes = this.indexes.get(dbName);
if (!dbIndexes) return false;
const collIndexes = dbIndexes.get(collName);
if (!collIndexes) return false;
const idx = collIndexes.findIndex(i => i.name === indexName);
if (idx >= 0) {
collIndexes.splice(idx, 1);
return true;
}
return false;
}
// ============================================================================
// OpLog Operations
// ============================================================================
async appendOpLog(entry: IOpLogEntry): Promise<void> {
this.opLog.push(entry);
// Trim oplog if it gets too large (keep last 10000 entries)
if (this.opLog.length > 10000) {
this.opLog = this.opLog.slice(-10000);
}
}
async getOpLogAfter(ts: plugins.bson.Timestamp, limit: number = 1000): Promise<IOpLogEntry[]> {
const tsValue = ts.toNumber();
const entries = this.opLog.filter(e => e.ts.toNumber() > tsValue);
return entries.slice(0, limit);
}
async getLatestOpLogTimestamp(): Promise<plugins.bson.Timestamp | null> {
if (this.opLog.length === 0) return null;
return this.opLog[this.opLog.length - 1].ts;
}
/**
* Generate a new timestamp for oplog entries
*/
generateTimestamp(): plugins.bson.Timestamp {
this.opLogCounter++;
return new plugins.bson.Timestamp({ t: Math.floor(Date.now() / 1000), i: this.opLogCounter });
}
// ============================================================================
// Transaction Support
// ============================================================================
async createSnapshot(dbName: string, collName: string): Promise<IStoredDocument[]> {
const docs = await this.findAll(dbName, collName);
// Deep clone the documents for snapshot isolation
return docs.map(doc => JSON.parse(JSON.stringify(doc)));
}
async hasConflicts(
dbName: string,
collName: string,
ids: plugins.bson.ObjectId[],
snapshotTime: plugins.bson.Timestamp
): Promise<boolean> {
// Check if any of the given document IDs have been modified after snapshotTime
const ns = `${dbName}.${collName}`;
const modifiedIds = new Set<string>();
for (const entry of this.opLog) {
if (entry.ts.greaterThan(snapshotTime) && entry.ns === ns) {
if (entry.o._id) {
modifiedIds.add(entry.o._id.toString());
}
if (entry.o2?._id) {
modifiedIds.add(entry.o2._id.toString());
}
}
}
for (const id of ids) {
if (modifiedIds.has(id.toString())) {
return true;
}
}
return false;
}
// ============================================================================
// Persistence
// ============================================================================
async persist(): Promise<void> {
if (!this.persistPath) return;
const data = {
databases: {} as Record<string, Record<string, IStoredDocument[]>>,
indexes: {} as Record<string, Record<string, any[]>>,
opLogCounter: this.opLogCounter,
};
for (const [dbName, collections] of this.databases) {
data.databases[dbName] = {};
for (const [collName, docs] of collections) {
data.databases[dbName][collName] = Array.from(docs.values());
}
}
for (const [dbName, collIndexes] of this.indexes) {
data.indexes[dbName] = {};
for (const [collName, indexes] of collIndexes) {
data.indexes[dbName][collName] = indexes;
}
}
// Ensure parent directory exists
const dir = this.persistPath.substring(0, this.persistPath.lastIndexOf('/'));
if (dir) {
await this.fs.directory(dir).recursive().create();
}
await this.fs.file(this.persistPath).encoding('utf8').write(JSON.stringify(data, null, 2));
}
async restore(): Promise<void> {
if (!this.persistPath) return;
try {
const exists = await this.fs.file(this.persistPath).exists();
if (!exists) return;
const content = await this.fs.file(this.persistPath).encoding('utf8').read();
const data = JSON.parse(content as string);
this.databases.clear();
this.indexes.clear();
for (const [dbName, collections] of Object.entries(data.databases || {})) {
const dbMap = new Map<string, Map<string, IStoredDocument>>();
this.databases.set(dbName, dbMap);
for (const [collName, docs] of Object.entries(collections as Record<string, any[]>)) {
const collMap = new Map<string, IStoredDocument>();
for (const doc of docs) {
// Restore ObjectId
if (doc._id && typeof doc._id === 'string') {
doc._id = new plugins.bson.ObjectId(doc._id);
} else if (doc._id && typeof doc._id === 'object' && doc._id.$oid) {
doc._id = new plugins.bson.ObjectId(doc._id.$oid);
}
collMap.set(doc._id.toHexString(), doc);
}
dbMap.set(collName, collMap);
}
}
for (const [dbName, collIndexes] of Object.entries(data.indexes || {})) {
const indexMap = new Map<string, any[]>();
this.indexes.set(dbName, indexMap);
for (const [collName, indexes] of Object.entries(collIndexes as Record<string, any[]>)) {
indexMap.set(collName, indexes);
}
}
this.opLogCounter = data.opLogCounter || 0;
} catch (error) {
// If restore fails, start fresh
console.warn('Failed to restore from persistence:', error);
}
}
}

View File

@@ -0,0 +1,282 @@
import * as plugins from '../plugins.js';
import type { IStorageAdapter } from './IStorageAdapter.js';
import type { IOpLogEntry, Document, IResumeToken, ChangeStreamOperationType } from '../types/interfaces.js';
/**
* Operation Log for tracking all mutations
* Used primarily for change stream support
*/
export class OpLog {
private storage: IStorageAdapter;
private counter = 0;
private listeners: Array<(entry: IOpLogEntry) => void> = [];
constructor(storage: IStorageAdapter) {
this.storage = storage;
}
/**
* Generate a new timestamp for oplog entries
*/
generateTimestamp(): plugins.bson.Timestamp {
this.counter++;
return new plugins.bson.Timestamp({ t: Math.floor(Date.now() / 1000), i: this.counter });
}
/**
* Generate a resume token from a timestamp
*/
generateResumeToken(ts: plugins.bson.Timestamp): IResumeToken {
// Create a resume token similar to MongoDB's format
// It's a base64-encoded BSON document containing the timestamp
const tokenData = {
_data: Buffer.from(JSON.stringify({
ts: { t: ts.high, i: ts.low },
version: 1,
})).toString('base64'),
};
return tokenData;
}
/**
* Parse a resume token to get the timestamp
*/
parseResumeToken(token: IResumeToken): plugins.bson.Timestamp {
try {
const data = JSON.parse(Buffer.from(token._data, 'base64').toString('utf-8'));
return new plugins.bson.Timestamp({ t: data.ts.t, i: data.ts.i });
} catch {
throw new Error('Invalid resume token');
}
}
/**
* Record an insert operation
*/
async recordInsert(
dbName: string,
collName: string,
document: Document,
txnInfo?: { txnNumber?: number; lsid?: { id: plugins.bson.Binary } }
): Promise<IOpLogEntry> {
const entry: IOpLogEntry = {
ts: this.generateTimestamp(),
op: 'i',
ns: `${dbName}.${collName}`,
o: document,
...txnInfo,
};
await this.storage.appendOpLog(entry);
this.notifyListeners(entry);
return entry;
}
/**
* Record an update operation
*/
async recordUpdate(
dbName: string,
collName: string,
filter: Document,
update: Document,
txnInfo?: { txnNumber?: number; lsid?: { id: plugins.bson.Binary } }
): Promise<IOpLogEntry> {
const entry: IOpLogEntry = {
ts: this.generateTimestamp(),
op: 'u',
ns: `${dbName}.${collName}`,
o: update,
o2: filter,
...txnInfo,
};
await this.storage.appendOpLog(entry);
this.notifyListeners(entry);
return entry;
}
/**
* Record a delete operation
*/
async recordDelete(
dbName: string,
collName: string,
filter: Document,
txnInfo?: { txnNumber?: number; lsid?: { id: plugins.bson.Binary } }
): Promise<IOpLogEntry> {
const entry: IOpLogEntry = {
ts: this.generateTimestamp(),
op: 'd',
ns: `${dbName}.${collName}`,
o: filter,
...txnInfo,
};
await this.storage.appendOpLog(entry);
this.notifyListeners(entry);
return entry;
}
/**
* Record a command (drop, rename, etc.)
*/
async recordCommand(
dbName: string,
command: Document
): Promise<IOpLogEntry> {
const entry: IOpLogEntry = {
ts: this.generateTimestamp(),
op: 'c',
ns: `${dbName}.$cmd`,
o: command,
};
await this.storage.appendOpLog(entry);
this.notifyListeners(entry);
return entry;
}
/**
* Get oplog entries after a timestamp
*/
async getEntriesAfter(ts: plugins.bson.Timestamp, limit?: number): Promise<IOpLogEntry[]> {
return this.storage.getOpLogAfter(ts, limit);
}
/**
* Get the latest timestamp
*/
async getLatestTimestamp(): Promise<plugins.bson.Timestamp | null> {
return this.storage.getLatestOpLogTimestamp();
}
/**
* Subscribe to oplog changes (for change streams)
*/
subscribe(listener: (entry: IOpLogEntry) => void): () => void {
this.listeners.push(listener);
return () => {
const idx = this.listeners.indexOf(listener);
if (idx >= 0) {
this.listeners.splice(idx, 1);
}
};
}
/**
* Notify all listeners of a new entry
*/
private notifyListeners(entry: IOpLogEntry): void {
for (const listener of this.listeners) {
try {
listener(entry);
} catch (error) {
console.error('Error in oplog listener:', error);
}
}
}
/**
* Convert an oplog entry to a change stream document
*/
opLogEntryToChangeEvent(
entry: IOpLogEntry,
fullDocument?: Document,
fullDocumentBeforeChange?: Document
): {
_id: IResumeToken;
operationType: ChangeStreamOperationType;
fullDocument?: Document;
fullDocumentBeforeChange?: Document;
ns: { db: string; coll?: string };
documentKey?: { _id: plugins.bson.ObjectId };
updateDescription?: {
updatedFields?: Document;
removedFields?: string[];
};
clusterTime: plugins.bson.Timestamp;
} {
const [db, coll] = entry.ns.split('.');
const resumeToken = this.generateResumeToken(entry.ts);
const baseEvent = {
_id: resumeToken,
ns: { db, coll: coll === '$cmd' ? undefined : coll },
clusterTime: entry.ts,
};
switch (entry.op) {
case 'i':
return {
...baseEvent,
operationType: 'insert' as ChangeStreamOperationType,
fullDocument: fullDocument || entry.o,
documentKey: entry.o._id ? { _id: entry.o._id } : undefined,
};
case 'u':
const updateEvent: any = {
...baseEvent,
operationType: 'update' as ChangeStreamOperationType,
documentKey: entry.o2?._id ? { _id: entry.o2._id } : undefined,
};
if (fullDocument) {
updateEvent.fullDocument = fullDocument;
}
if (fullDocumentBeforeChange) {
updateEvent.fullDocumentBeforeChange = fullDocumentBeforeChange;
}
// Parse update description
if (entry.o.$set || entry.o.$unset) {
updateEvent.updateDescription = {
updatedFields: entry.o.$set || {},
removedFields: entry.o.$unset ? Object.keys(entry.o.$unset) : [],
};
}
return updateEvent;
case 'd':
return {
...baseEvent,
operationType: 'delete' as ChangeStreamOperationType,
documentKey: entry.o._id ? { _id: entry.o._id } : undefined,
fullDocumentBeforeChange,
};
case 'c':
if (entry.o.drop) {
return {
...baseEvent,
operationType: 'drop' as ChangeStreamOperationType,
ns: { db, coll: entry.o.drop },
};
}
if (entry.o.dropDatabase) {
return {
...baseEvent,
operationType: 'dropDatabase' as ChangeStreamOperationType,
};
}
if (entry.o.renameCollection) {
return {
...baseEvent,
operationType: 'rename' as ChangeStreamOperationType,
};
}
return {
...baseEvent,
operationType: 'invalidate' as ChangeStreamOperationType,
};
default:
return {
...baseEvent,
operationType: 'invalidate' as ChangeStreamOperationType,
};
}
}
}

375
ts/ts_tsmdb/storage/WAL.ts Normal file
View File

@@ -0,0 +1,375 @@
import * as plugins from '../plugins.js';
import type { Document, IStoredDocument } from '../types/interfaces.js';
/**
* WAL entry operation types
*/
export type TWalOperation = 'insert' | 'update' | 'delete' | 'checkpoint' | 'begin' | 'commit' | 'abort';
/**
* WAL entry structure
*/
export interface IWalEntry {
/** Log Sequence Number - monotonically increasing */
lsn: number;
/** Timestamp of the operation */
timestamp: number;
/** Operation type */
operation: TWalOperation;
/** Database name */
dbName: string;
/** Collection name */
collName: string;
/** Document ID (hex string) */
documentId: string;
/** Document data (BSON serialized, base64 encoded) */
data?: string;
/** Previous document data for updates (for rollback) */
previousData?: string;
/** Transaction ID if part of a transaction */
txnId?: string;
/** CRC32 checksum of the entry (excluding this field) */
checksum: number;
}
/**
* Checkpoint record
*/
interface ICheckpointRecord {
lsn: number;
timestamp: number;
lastCommittedLsn: number;
}
/**
* Write-Ahead Log (WAL) for durability and crash recovery
*
* The WAL ensures durability by writing operations to a log file before
* they are applied to the main storage. On crash recovery, uncommitted
* operations can be replayed to restore the database to a consistent state.
*/
export class WAL {
private walPath: string;
private currentLsn: number = 0;
private lastCheckpointLsn: number = 0;
private entries: IWalEntry[] = [];
private isInitialized: boolean = false;
private fs = new plugins.smartfs.SmartFs(new plugins.smartfs.SmartFsProviderNode());
// In-memory uncommitted entries per transaction
private uncommittedTxns: Map<string, IWalEntry[]> = new Map();
// Checkpoint interval (number of entries between checkpoints)
private checkpointInterval: number = 1000;
constructor(walPath: string, options?: { checkpointInterval?: number }) {
this.walPath = walPath;
if (options?.checkpointInterval) {
this.checkpointInterval = options.checkpointInterval;
}
}
/**
* Initialize the WAL, loading existing entries and recovering if needed
*/
async initialize(): Promise<{ recoveredEntries: IWalEntry[] }> {
if (this.isInitialized) {
return { recoveredEntries: [] };
}
// Ensure WAL directory exists
const walDir = this.walPath.substring(0, this.walPath.lastIndexOf('/'));
if (walDir) {
await this.fs.directory(walDir).recursive().create();
}
// Try to load existing WAL
const exists = await this.fs.file(this.walPath).exists();
if (exists) {
const content = await this.fs.file(this.walPath).encoding('utf8').read();
const lines = (content as string).split('\n').filter(line => line.trim());
for (const line of lines) {
try {
const entry = JSON.parse(line) as IWalEntry;
// Verify checksum
if (this.verifyChecksum(entry)) {
this.entries.push(entry);
if (entry.lsn > this.currentLsn) {
this.currentLsn = entry.lsn;
}
if (entry.operation === 'checkpoint') {
this.lastCheckpointLsn = entry.lsn;
}
}
} catch {
// Skip corrupted entries
console.warn('Skipping corrupted WAL entry');
}
}
}
this.isInitialized = true;
// Return entries after last checkpoint that need recovery
const recoveredEntries = this.entries.filter(
e => e.lsn > this.lastCheckpointLsn &&
(e.operation === 'insert' || e.operation === 'update' || e.operation === 'delete')
);
return { recoveredEntries };
}
/**
* Log an insert operation
*/
async logInsert(dbName: string, collName: string, doc: IStoredDocument, txnId?: string): Promise<number> {
return this.appendEntry({
operation: 'insert',
dbName,
collName,
documentId: doc._id.toHexString(),
data: this.serializeDocument(doc),
txnId,
});
}
/**
* Log an update operation
*/
async logUpdate(
dbName: string,
collName: string,
oldDoc: IStoredDocument,
newDoc: IStoredDocument,
txnId?: string
): Promise<number> {
return this.appendEntry({
operation: 'update',
dbName,
collName,
documentId: oldDoc._id.toHexString(),
data: this.serializeDocument(newDoc),
previousData: this.serializeDocument(oldDoc),
txnId,
});
}
/**
* Log a delete operation
*/
async logDelete(dbName: string, collName: string, doc: IStoredDocument, txnId?: string): Promise<number> {
return this.appendEntry({
operation: 'delete',
dbName,
collName,
documentId: doc._id.toHexString(),
previousData: this.serializeDocument(doc),
txnId,
});
}
/**
* Log transaction begin
*/
async logBeginTransaction(txnId: string): Promise<number> {
this.uncommittedTxns.set(txnId, []);
return this.appendEntry({
operation: 'begin',
dbName: '',
collName: '',
documentId: '',
txnId,
});
}
/**
* Log transaction commit
*/
async logCommitTransaction(txnId: string): Promise<number> {
this.uncommittedTxns.delete(txnId);
return this.appendEntry({
operation: 'commit',
dbName: '',
collName: '',
documentId: '',
txnId,
});
}
/**
* Log transaction abort
*/
async logAbortTransaction(txnId: string): Promise<number> {
this.uncommittedTxns.delete(txnId);
return this.appendEntry({
operation: 'abort',
dbName: '',
collName: '',
documentId: '',
txnId,
});
}
/**
* Get entries to roll back for an aborted transaction
*/
getTransactionEntries(txnId: string): IWalEntry[] {
return this.entries.filter(e => e.txnId === txnId);
}
/**
* Create a checkpoint - marks a consistent point in the log
*/
async checkpoint(): Promise<number> {
const lsn = await this.appendEntry({
operation: 'checkpoint',
dbName: '',
collName: '',
documentId: '',
});
this.lastCheckpointLsn = lsn;
// Truncate old entries (keep only entries after checkpoint)
await this.truncate();
return lsn;
}
/**
* Truncate the WAL file, removing entries before the last checkpoint
*/
private async truncate(): Promise<void> {
// Keep entries after last checkpoint
const newEntries = this.entries.filter(e => e.lsn >= this.lastCheckpointLsn);
this.entries = newEntries;
// Rewrite the WAL file
const lines = this.entries.map(e => JSON.stringify(e)).join('\n');
await this.fs.file(this.walPath).encoding('utf8').write(lines);
}
/**
* Get current LSN
*/
getCurrentLsn(): number {
return this.currentLsn;
}
/**
* Get entries after a specific LSN (for recovery)
*/
getEntriesAfter(lsn: number): IWalEntry[] {
return this.entries.filter(e => e.lsn > lsn);
}
/**
* Close the WAL
*/
async close(): Promise<void> {
if (this.isInitialized) {
// Final checkpoint before close
await this.checkpoint();
}
this.isInitialized = false;
}
// ============================================================================
// Private Methods
// ============================================================================
private async appendEntry(
partial: Omit<IWalEntry, 'lsn' | 'timestamp' | 'checksum'>
): Promise<number> {
await this.initialize();
this.currentLsn++;
const entry: IWalEntry = {
...partial,
lsn: this.currentLsn,
timestamp: Date.now(),
checksum: 0, // Will be calculated
};
// Calculate checksum
entry.checksum = this.calculateChecksum(entry);
// Track in transaction if applicable
if (partial.txnId && this.uncommittedTxns.has(partial.txnId)) {
this.uncommittedTxns.get(partial.txnId)!.push(entry);
}
// Add to in-memory log
this.entries.push(entry);
// Append to file (append mode for durability)
await this.fs.file(this.walPath).encoding('utf8').append(JSON.stringify(entry) + '\n');
// Check if we need a checkpoint
if (this.entries.length - this.lastCheckpointLsn >= this.checkpointInterval) {
await this.checkpoint();
}
return entry.lsn;
}
private serializeDocument(doc: Document): string {
// Serialize document to BSON and encode as base64
const bsonData = plugins.bson.serialize(doc);
return Buffer.from(bsonData).toString('base64');
}
private deserializeDocument(data: string): Document {
// Decode base64 and deserialize from BSON
const buffer = Buffer.from(data, 'base64');
return plugins.bson.deserialize(buffer);
}
private calculateChecksum(entry: IWalEntry): number {
// Simple CRC32-like checksum
const str = JSON.stringify({
lsn: entry.lsn,
timestamp: entry.timestamp,
operation: entry.operation,
dbName: entry.dbName,
collName: entry.collName,
documentId: entry.documentId,
data: entry.data,
previousData: entry.previousData,
txnId: entry.txnId,
});
let crc = 0xFFFFFFFF;
for (let i = 0; i < str.length; i++) {
crc ^= str.charCodeAt(i);
for (let j = 0; j < 8; j++) {
crc = (crc >>> 1) ^ (crc & 1 ? 0xEDB88320 : 0);
}
}
return (~crc) >>> 0;
}
private verifyChecksum(entry: IWalEntry): boolean {
const savedChecksum = entry.checksum;
entry.checksum = 0;
const calculatedChecksum = this.calculateChecksum(entry);
entry.checksum = savedChecksum;
return calculatedChecksum === savedChecksum;
}
/**
* Recover document from WAL entry
*/
recoverDocument(entry: IWalEntry): IStoredDocument | null {
if (!entry.data) return null;
return this.deserializeDocument(entry.data) as IStoredDocument;
}
/**
* Recover previous document state from WAL entry (for rollback)
*/
recoverPreviousDocument(entry: IWalEntry): IStoredDocument | null {
if (!entry.previousData) return null;
return this.deserializeDocument(entry.previousData) as IStoredDocument;
}
}