/** * Transaction Manager * * Manages distributed transactions with ACID-like semantics */ import { ElasticsearchConnectionManager } from '../../core/connection/connection-manager.js'; import { Logger, defaultLogger } from '../../core/observability/logger.js'; import { MetricsCollector, defaultMetricsCollector } from '../../core/observability/metrics.js'; import { DocumentConflictError } from '../../core/errors/index.js'; import type { TransactionConfig, TransactionContext, TransactionOperation, TransactionResult, TransactionStats, TransactionState, TransactionManagerConfig, TransactionCallbacks, ConflictInfo, ConflictResolutionStrategy, Savepoint, } from './types.js'; /** * Default configuration */ const DEFAULT_CONFIG: Required = { defaultIsolationLevel: 'read_committed', defaultLockingStrategy: 'optimistic', defaultTimeout: 30000, // 30 seconds enableCleanup: true, cleanupInterval: 60000, // 1 minute maxConcurrentTransactions: 1000, conflictResolution: 'retry', enableLogging: true, enableMetrics: true, }; /** * Transaction Manager */ export class TransactionManager { private config: Required; private transactions: Map = new Map(); private stats: TransactionStats; private cleanupTimer?: NodeJS.Timeout; private logger: Logger; private metrics: MetricsCollector; private transactionCounter = 0; constructor(config: TransactionManagerConfig = {}) { this.config = { ...DEFAULT_CONFIG, ...config }; this.logger = defaultLogger; this.metrics = defaultMetricsCollector; this.stats = { totalStarted: 0, totalCommitted: 0, totalRolledBack: 0, totalFailed: 0, totalOperations: 0, totalConflicts: 0, totalRetries: 0, avgDuration: 0, avgOperationsPerTransaction: 0, successRate: 0, activeTransactions: 0, }; } /** * Initialize transaction manager */ async initialize(): Promise { if (this.config.enableCleanup) { this.startCleanupTimer(); } this.logger.info('TransactionManager initialized', { defaultIsolationLevel: this.config.defaultIsolationLevel, defaultLockingStrategy: this.config.defaultLockingStrategy, maxConcurrentTransactions: this.config.maxConcurrentTransactions, }); } /** * Begin a new transaction */ async begin( config: TransactionConfig = {}, callbacks?: TransactionCallbacks ): Promise { // Check concurrent transaction limit if (this.transactions.size >= this.config.maxConcurrentTransactions) { throw new Error( `Maximum concurrent transactions limit reached (${this.config.maxConcurrentTransactions})` ); } // Generate transaction ID const transactionId = config.id || this.generateTransactionId(); // Create transaction context const context: TransactionContext = { id: transactionId, state: 'active', config: { id: transactionId, isolationLevel: config.isolationLevel ?? this.config.defaultIsolationLevel, lockingStrategy: config.lockingStrategy ?? this.config.defaultLockingStrategy, timeout: config.timeout ?? this.config.defaultTimeout, autoRollback: config.autoRollback ?? true, maxRetries: config.maxRetries ?? 3, retryDelay: config.retryDelay ?? 100, strictOrdering: config.strictOrdering ?? false, metadata: config.metadata ?? {}, }, operations: [], readSet: new Map(), writeSet: new Set(), startTime: new Date(), retryAttempts: 0, }; this.transactions.set(transactionId, context); this.stats.totalStarted++; this.stats.activeTransactions++; if (this.config.enableLogging) { this.logger.info('Transaction started', { transactionId, isolationLevel: context.config.isolationLevel, lockingStrategy: context.config.lockingStrategy, }); } if (this.config.enableMetrics) { this.metrics.recordCounter('transactions.started', 1); this.metrics.recordGauge('transactions.active', this.stats.activeTransactions); } // Call onBegin callback if (callbacks?.onBegin) { await callbacks.onBegin(context); } return new Transaction(this, context, callbacks); } /** * Get transaction context */ getTransaction(transactionId: string): TransactionContext | undefined { return this.transactions.get(transactionId); } /** * Commit a transaction */ async commit(transactionId: string, callbacks?: TransactionCallbacks): Promise { const context = this.transactions.get(transactionId); if (!context) { throw new Error(`Transaction ${transactionId} not found`); } if (context.state !== 'active' && context.state !== 'prepared') { throw new Error(`Cannot commit transaction in state: ${context.state}`); } const startTime = Date.now(); try { // Call onBeforeCommit callback if (callbacks?.onBeforeCommit) { await callbacks.onBeforeCommit(context); } context.state = 'committing'; // Execute and commit all operations let committed = 0; for (const operation of context.operations) { if (operation.committed) { committed++; continue; } // Execute operation if not yet executed if (!operation.executed) { await this.executeOperation(context, operation, callbacks); } // Mark as committed operation.committed = true; committed++; } context.state = 'committed'; context.endTime = new Date(); const duration = Date.now() - startTime; this.stats.totalCommitted++; this.stats.activeTransactions--; this.updateAverages(duration, context.operations.length); const result: TransactionResult = { success: true, transactionId, state: 'committed', operationsExecuted: context.operations.filter((op) => op.executed).length, operationsCommitted: committed, operationsRolledBack: 0, duration, metadata: context.config.metadata, }; if (this.config.enableLogging) { this.logger.info('Transaction committed', { transactionId, operations: committed, duration, }); } if (this.config.enableMetrics) { this.metrics.recordCounter('transactions.committed', 1); this.metrics.recordHistogram('transactions.duration', duration); this.metrics.recordGauge('transactions.active', this.stats.activeTransactions); } // Call onAfterCommit callback if (callbacks?.onAfterCommit) { await callbacks.onAfterCommit(result); } // Cleanup transaction this.transactions.delete(transactionId); return result; } catch (error: any) { context.state = 'failed'; context.error = error; if (this.config.enableLogging) { this.logger.error('Transaction commit failed', { transactionId, error: error.message, }); } // Auto-rollback if enabled if (context.config.autoRollback) { return await this.rollback(transactionId, callbacks); } throw error; } } /** * Rollback a transaction */ async rollback( transactionId: string, callbacks?: TransactionCallbacks ): Promise { const context = this.transactions.get(transactionId); if (!context) { throw new Error(`Transaction ${transactionId} not found`); } const startTime = Date.now(); try { // Call onBeforeRollback callback if (callbacks?.onBeforeRollback) { await callbacks.onBeforeRollback(context); } context.state = 'rolling_back'; // Execute compensation operations in reverse order let rolledBack = 0; for (let i = context.operations.length - 1; i >= 0; i--) { const operation = context.operations[i]; if (!operation || !operation.executed || !operation.compensation) { continue; } try { await this.executeOperation(context, operation.compensation); rolledBack++; } catch (error: any) { this.logger.error('Compensation operation failed', { transactionId, operation: operation.type, index: operation.index, id: operation.id, error: error.message, }); } } context.state = 'rolled_back'; context.endTime = new Date(); const duration = Date.now() - startTime; this.stats.totalRolledBack++; this.stats.activeTransactions--; const result: TransactionResult = { success: false, transactionId, state: 'rolled_back', operationsExecuted: context.operations.filter((op) => op.executed).length, operationsCommitted: 0, operationsRolledBack: rolledBack, duration, error: context.error ? { message: context.error.message, type: context.error.name, } : undefined, metadata: context.config.metadata, }; if (this.config.enableLogging) { this.logger.info('Transaction rolled back', { transactionId, rolledBack, duration, }); } if (this.config.enableMetrics) { this.metrics.recordCounter('transactions.rolled_back', 1); this.metrics.recordGauge('transactions.active', this.stats.activeTransactions); } // Call onAfterRollback callback if (callbacks?.onAfterRollback) { await callbacks.onAfterRollback(result); } // Cleanup transaction this.transactions.delete(transactionId); return result; } catch (error: any) { context.state = 'failed'; context.error = error; this.stats.totalFailed++; if (this.config.enableLogging) { this.logger.error('Transaction rollback failed', { transactionId, error: error.message, }); } throw error; } } /** * Get transaction statistics */ getStats(): TransactionStats { return { ...this.stats }; } /** * Destroy transaction manager */ async destroy(): Promise { if (this.cleanupTimer) { clearInterval(this.cleanupTimer); } // Rollback all active transactions const activeTransactions = Array.from(this.transactions.keys()); for (const transactionId of activeTransactions) { try { await this.rollback(transactionId); } catch (error) { // Ignore errors during cleanup } } this.transactions.clear(); } // ============================================================================ // Internal Methods // ============================================================================ /** * Add operation to transaction */ addOperation(context: TransactionContext, operation: TransactionOperation): void { context.operations.push(operation); this.stats.totalOperations++; const key = `${operation.index}:${operation.id}`; if (operation.type === 'read') { // Add to read set for repeatable read if (operation.version) { context.readSet.set(key, operation.version); } } else { // Add to write set context.writeSet.add(key); } } /** * Execute an operation */ private async executeOperation( context: TransactionContext, operation: TransactionOperation, callbacks?: TransactionCallbacks ): Promise { // Call onBeforeOperation callback if (callbacks?.onBeforeOperation) { await callbacks.onBeforeOperation(operation); } const client = ElasticsearchConnectionManager.getInstance().getClient(); try { switch (operation.type) { case 'read': { const result = await client.get({ index: operation.index, id: operation.id, }); operation.version = { seqNo: result._seq_no ?? 0, primaryTerm: result._primary_term ?? 0, }; operation.originalDocument = result._source; break; } case 'create': { const result = await client.index({ index: operation.index, id: operation.id, document: operation.document, op_type: 'create', }); operation.version = { seqNo: result._seq_no ?? 0, primaryTerm: result._primary_term ?? 0, }; // Create compensation (delete) operation.compensation = { type: 'delete', index: operation.index, id: operation.id, timestamp: new Date(), executed: false, committed: false, }; break; } case 'update': { const updateRequest: any = { index: operation.index, id: operation.id, document: operation.document, }; // Add version for optimistic locking if (operation.version) { updateRequest.if_seq_no = operation.version.seqNo; updateRequest.if_primary_term = operation.version.primaryTerm; } const result = await client.index(updateRequest); operation.version = { seqNo: result._seq_no ?? 0, primaryTerm: result._primary_term ?? 0, }; // Create compensation (restore original) if (operation.originalDocument) { operation.compensation = { type: 'update', index: operation.index, id: operation.id, document: operation.originalDocument, timestamp: new Date(), executed: false, committed: false, }; } break; } case 'delete': { const deleteRequest: any = { index: operation.index, id: operation.id, }; // Add version for optimistic locking if (operation.version) { deleteRequest.if_seq_no = operation.version.seqNo; deleteRequest.if_primary_term = operation.version.primaryTerm; } await client.delete(deleteRequest); // Create compensation (restore document) if (operation.originalDocument) { operation.compensation = { type: 'create', index: operation.index, id: operation.id, document: operation.originalDocument, timestamp: new Date(), executed: false, committed: false, }; } break; } } operation.executed = true; // Call onAfterOperation callback if (callbacks?.onAfterOperation) { await callbacks.onAfterOperation(operation); } } catch (error: any) { // Handle version conflict if (error.name === 'ResponseError' && error.meta?.statusCode === 409) { await this.handleConflict(context, operation, error, callbacks); } else { throw error; } } } /** * Handle version conflict */ private async handleConflict( context: TransactionContext, operation: TransactionOperation, _error: Error, callbacks?: TransactionCallbacks ): Promise { this.stats.totalConflicts++; const conflict: ConflictInfo = { operation, expectedVersion: operation.version, detectedAt: new Date(), }; if (this.config.enableMetrics) { this.metrics.recordCounter('transactions.conflicts', 1); } // Call onConflict callback let strategy: ConflictResolutionStrategy = this.config.conflictResolution; if (callbacks?.onConflict) { strategy = await callbacks.onConflict(conflict); } switch (strategy) { case 'abort': throw new DocumentConflictError( `Version conflict for ${operation.index}/${operation.id}`, `${operation.index}/${operation.id}` ); case 'retry': if (context.retryAttempts >= context.config.maxRetries) { throw new DocumentConflictError( `Max retries exceeded for ${operation.index}/${operation.id}`, `${operation.index}/${operation.id}` ); } context.retryAttempts++; this.stats.totalRetries++; // Wait before retry await new Promise((resolve) => setTimeout(resolve, context.config.retryDelay)); // Retry operation await this.executeOperation(context, operation, callbacks); break; case 'skip': // Skip this operation operation.executed = false; break; case 'force': // Force update without version check delete operation.version; await this.executeOperation(context, operation, callbacks); break; case 'merge': // Not implemented - requires custom merge logic throw new Error('Merge conflict resolution not implemented'); } } /** * Generate transaction ID */ private generateTransactionId(): string { this.transactionCounter++; return `txn-${Date.now()}-${this.transactionCounter}`; } /** * Start cleanup timer for expired transactions */ private startCleanupTimer(): void { this.cleanupTimer = setInterval(() => { this.cleanupExpiredTransactions(); }, this.config.cleanupInterval); } /** * Cleanup expired transactions */ private cleanupExpiredTransactions(): void { const now = Date.now(); for (const [transactionId, context] of this.transactions) { const elapsed = now - context.startTime.getTime(); if (elapsed > context.config.timeout) { this.logger.warn('Transaction timeout, rolling back', { transactionId }); this.rollback(transactionId).catch((error) => { this.logger.error('Failed to rollback expired transaction', { transactionId, error, }); }); } } } /** * Update average statistics */ private updateAverages(duration: number, operations: number): void { const total = this.stats.totalCommitted + this.stats.totalRolledBack; this.stats.avgDuration = (this.stats.avgDuration * (total - 1) + duration) / total; this.stats.avgOperationsPerTransaction = (this.stats.avgOperationsPerTransaction * (total - 1) + operations) / total; this.stats.successRate = this.stats.totalCommitted / (this.stats.totalCommitted + this.stats.totalRolledBack + this.stats.totalFailed); } } /** * Transaction class for fluent API */ export class Transaction { private savepoints: Map = new Map(); constructor( private manager: TransactionManager, private context: TransactionContext, private callbacks?: TransactionCallbacks ) {} /** * Get transaction ID */ getId(): string { return this.context.id; } /** * Get transaction state */ getState(): TransactionState { return this.context.state; } /** * Read a document */ async read(index: string, id: string): Promise { const operation: TransactionOperation = { type: 'read', index, id, timestamp: new Date(), executed: false, committed: false, }; this.manager.addOperation(this.context, operation); const client = ElasticsearchConnectionManager.getInstance().getClient(); try { const result = await client.get({ index, id }); operation.version = { seqNo: result._seq_no!, primaryTerm: result._primary_term!, }; operation.originalDocument = result._source as T; operation.executed = true; return result._source as T; } catch (error: any) { if (error.name === 'ResponseError' && error.meta?.statusCode === 404) { return null; } throw error; } } /** * Create a document */ async create(index: string, id: string, document: T): Promise { const operation: TransactionOperation = { type: 'create', index, id, document, timestamp: new Date(), executed: false, committed: false, }; this.manager.addOperation(this.context, operation); } /** * Update a document */ async update(index: string, id: string, document: Partial): Promise { // First read the current version const current = await this.read(index, id); const operation: TransactionOperation = { type: 'update', index, id, document: { ...current, ...document } as T, originalDocument: current ?? undefined, timestamp: new Date(), executed: false, committed: false, }; this.manager.addOperation(this.context, operation); } /** * Delete a document */ async delete(index: string, id: string): Promise { // First read the current version const current = await this.read(index, id); const operation: TransactionOperation = { type: 'delete', index, id, originalDocument: current ?? undefined, timestamp: new Date(), executed: false, committed: false, }; this.manager.addOperation(this.context, operation); } /** * Create a savepoint */ savepoint(name: string): void { this.savepoints.set(name, { name, operationsCount: this.context.operations.length, createdAt: new Date(), }); } /** * Rollback to savepoint */ rollbackTo(name: string): void { const savepoint = this.savepoints.get(name); if (!savepoint) { throw new Error(`Savepoint '${name}' not found`); } // Remove operations after savepoint this.context.operations.splice(savepoint.operationsCount); } /** * Commit the transaction */ async commit(): Promise { return await this.manager.commit(this.context.id, this.callbacks); } /** * Rollback the transaction */ async rollback(): Promise { return await this.manager.rollback(this.context.id, this.callbacks); } } /** * Create a transaction manager */ export function createTransactionManager( config?: TransactionManagerConfig ): TransactionManager { return new TransactionManager(config); }