import * as plugins from '../tsmdb.plugins.js'; import type { Document, IStoredDocument } from '../types/interfaces.js'; /** * WAL entry operation types */ export type TWalOperation = 'insert' | 'update' | 'delete' | 'checkpoint' | 'begin' | 'commit' | 'abort'; /** * WAL entry structure */ export interface IWalEntry { /** Log Sequence Number - monotonically increasing */ lsn: number; /** Timestamp of the operation */ timestamp: number; /** Operation type */ operation: TWalOperation; /** Database name */ dbName: string; /** Collection name */ collName: string; /** Document ID (hex string) */ documentId: string; /** Document data (BSON serialized, base64 encoded) */ data?: string; /** Previous document data for updates (for rollback) */ previousData?: string; /** Transaction ID if part of a transaction */ txnId?: string; /** CRC32 checksum of the entry (excluding this field) */ checksum: number; } /** * Checkpoint record */ interface ICheckpointRecord { lsn: number; timestamp: number; lastCommittedLsn: number; } /** * Write-Ahead Log (WAL) for durability and crash recovery * * The WAL ensures durability by writing operations to a log file before * they are applied to the main storage. On crash recovery, uncommitted * operations can be replayed to restore the database to a consistent state. */ export class WAL { private walPath: string; private currentLsn: number = 0; private lastCheckpointLsn: number = 0; private entries: IWalEntry[] = []; private isInitialized: boolean = false; private fs = new plugins.smartfs.SmartFs(new plugins.smartfs.SmartFsProviderNode()); // In-memory uncommitted entries per transaction private uncommittedTxns: Map = new Map(); // Checkpoint interval (number of entries between checkpoints) private checkpointInterval: number = 1000; constructor(walPath: string, options?: { checkpointInterval?: number }) { this.walPath = walPath; if (options?.checkpointInterval) { this.checkpointInterval = options.checkpointInterval; } } /** * Initialize the WAL, loading existing entries and recovering if needed */ async initialize(): Promise<{ recoveredEntries: IWalEntry[] }> { if (this.isInitialized) { return { recoveredEntries: [] }; } // Ensure WAL directory exists const walDir = this.walPath.substring(0, this.walPath.lastIndexOf('/')); if (walDir) { await this.fs.directory(walDir).recursive().create(); } // Try to load existing WAL const exists = await this.fs.file(this.walPath).exists(); if (exists) { const content = await this.fs.file(this.walPath).encoding('utf8').read(); const lines = (content as string).split('\n').filter(line => line.trim()); for (const line of lines) { try { const entry = JSON.parse(line) as IWalEntry; // Verify checksum if (this.verifyChecksum(entry)) { this.entries.push(entry); if (entry.lsn > this.currentLsn) { this.currentLsn = entry.lsn; } if (entry.operation === 'checkpoint') { this.lastCheckpointLsn = entry.lsn; } } } catch { // Skip corrupted entries console.warn('Skipping corrupted WAL entry'); } } } this.isInitialized = true; // Return entries after last checkpoint that need recovery const recoveredEntries = this.entries.filter( e => e.lsn > this.lastCheckpointLsn && (e.operation === 'insert' || e.operation === 'update' || e.operation === 'delete') ); return { recoveredEntries }; } /** * Log an insert operation */ async logInsert(dbName: string, collName: string, doc: IStoredDocument, txnId?: string): Promise { return this.appendEntry({ operation: 'insert', dbName, collName, documentId: doc._id.toHexString(), data: this.serializeDocument(doc), txnId, }); } /** * Log an update operation */ async logUpdate( dbName: string, collName: string, oldDoc: IStoredDocument, newDoc: IStoredDocument, txnId?: string ): Promise { return this.appendEntry({ operation: 'update', dbName, collName, documentId: oldDoc._id.toHexString(), data: this.serializeDocument(newDoc), previousData: this.serializeDocument(oldDoc), txnId, }); } /** * Log a delete operation */ async logDelete(dbName: string, collName: string, doc: IStoredDocument, txnId?: string): Promise { return this.appendEntry({ operation: 'delete', dbName, collName, documentId: doc._id.toHexString(), previousData: this.serializeDocument(doc), txnId, }); } /** * Log transaction begin */ async logBeginTransaction(txnId: string): Promise { this.uncommittedTxns.set(txnId, []); return this.appendEntry({ operation: 'begin', dbName: '', collName: '', documentId: '', txnId, }); } /** * Log transaction commit */ async logCommitTransaction(txnId: string): Promise { this.uncommittedTxns.delete(txnId); return this.appendEntry({ operation: 'commit', dbName: '', collName: '', documentId: '', txnId, }); } /** * Log transaction abort */ async logAbortTransaction(txnId: string): Promise { this.uncommittedTxns.delete(txnId); return this.appendEntry({ operation: 'abort', dbName: '', collName: '', documentId: '', txnId, }); } /** * Get entries to roll back for an aborted transaction */ getTransactionEntries(txnId: string): IWalEntry[] { return this.entries.filter(e => e.txnId === txnId); } /** * Create a checkpoint - marks a consistent point in the log */ async checkpoint(): Promise { const lsn = await this.appendEntry({ operation: 'checkpoint', dbName: '', collName: '', documentId: '', }); this.lastCheckpointLsn = lsn; // Truncate old entries (keep only entries after checkpoint) await this.truncate(); return lsn; } /** * Truncate the WAL file, removing entries before the last checkpoint */ private async truncate(): Promise { // Keep entries after last checkpoint const newEntries = this.entries.filter(e => e.lsn >= this.lastCheckpointLsn); this.entries = newEntries; // Rewrite the WAL file const lines = this.entries.map(e => JSON.stringify(e)).join('\n'); await this.fs.file(this.walPath).encoding('utf8').write(lines); } /** * Get current LSN */ getCurrentLsn(): number { return this.currentLsn; } /** * Get entries after a specific LSN (for recovery) */ getEntriesAfter(lsn: number): IWalEntry[] { return this.entries.filter(e => e.lsn > lsn); } /** * Close the WAL */ async close(): Promise { if (this.isInitialized) { // Final checkpoint before close await this.checkpoint(); } this.isInitialized = false; } // ============================================================================ // Private Methods // ============================================================================ private async appendEntry( partial: Omit ): Promise { await this.initialize(); this.currentLsn++; const entry: IWalEntry = { ...partial, lsn: this.currentLsn, timestamp: Date.now(), checksum: 0, // Will be calculated }; // Calculate checksum entry.checksum = this.calculateChecksum(entry); // Track in transaction if applicable if (partial.txnId && this.uncommittedTxns.has(partial.txnId)) { this.uncommittedTxns.get(partial.txnId)!.push(entry); } // Add to in-memory log this.entries.push(entry); // Append to file (append mode for durability) await this.fs.file(this.walPath).encoding('utf8').append(JSON.stringify(entry) + '\n'); // Check if we need a checkpoint if (this.entries.length - this.lastCheckpointLsn >= this.checkpointInterval) { await this.checkpoint(); } return entry.lsn; } private serializeDocument(doc: Document): string { // Serialize document to BSON and encode as base64 const bsonData = plugins.bson.serialize(doc); return Buffer.from(bsonData).toString('base64'); } private deserializeDocument(data: string): Document { // Decode base64 and deserialize from BSON const buffer = Buffer.from(data, 'base64'); return plugins.bson.deserialize(buffer); } private calculateChecksum(entry: IWalEntry): number { // Simple CRC32-like checksum const str = JSON.stringify({ lsn: entry.lsn, timestamp: entry.timestamp, operation: entry.operation, dbName: entry.dbName, collName: entry.collName, documentId: entry.documentId, data: entry.data, previousData: entry.previousData, txnId: entry.txnId, }); let crc = 0xFFFFFFFF; for (let i = 0; i < str.length; i++) { crc ^= str.charCodeAt(i); for (let j = 0; j < 8; j++) { crc = (crc >>> 1) ^ (crc & 1 ? 0xEDB88320 : 0); } } return (~crc) >>> 0; } private verifyChecksum(entry: IWalEntry): boolean { const savedChecksum = entry.checksum; entry.checksum = 0; const calculatedChecksum = this.calculateChecksum(entry); entry.checksum = savedChecksum; return calculatedChecksum === savedChecksum; } /** * Recover document from WAL entry */ recoverDocument(entry: IWalEntry): IStoredDocument | null { if (!entry.data) return null; return this.deserializeDocument(entry.data) as IStoredDocument; } /** * Recover previous document state from WAL entry (for rollback) */ recoverPreviousDocument(entry: IWalEntry): IStoredDocument | null { if (!entry.previousData) return null; return this.deserializeDocument(entry.previousData) as IStoredDocument; } }