import type { Client as ElasticClient } from '@elastic/elasticsearch'; import { BatchOperation, BatchResult, DocumentOperation, SessionConfig, } from './types.js'; import { Logger } from '../../core/observability/logger.js'; import { BulkOperationError } from '../../core/errors/elasticsearch-error.js'; /** * Document session for managing document lifecycle * * Tracks documents during a session and can clean up stale ones at the end. */ export class DocumentSession { private operations: BatchOperation[] = []; private seenDocuments = new Set(); private config: Required; private startTimestamp: Date; private isActive = false; constructor( private client: ElasticClient, private index: string, private logger: Logger, config: SessionConfig = {} ) { this.config = { onlyNew: config.onlyNew || false, fromTimestamp: config.fromTimestamp || new Date(), cleanupStale: config.cleanupStale !== false, batchSize: config.batchSize || 1000, }; this.startTimestamp = new Date(); } /** * Start the session */ start(): this { if (this.isActive) { throw new Error('Session already active'); } this.isActive = true; this.operations = []; this.seenDocuments.clear(); this.startTimestamp = new Date(); this.logger.debug('Document session started', { index: this.index, config: this.config, }); return this; } /** * Add a document (upsert - create or update) */ upsert(documentId: string, document: T): this { this.ensureActive(); this.operations.push({ operation: DocumentOperation.UPSERT, documentId, document, }); this.seenDocuments.add(documentId); return this; } /** * Create a document (fails if exists) */ create(documentId: string, document: T): this { this.ensureActive(); this.operations.push({ operation: DocumentOperation.CREATE, documentId, document, }); this.seenDocuments.add(documentId); return this; } /** * Update a document (fails if doesn't exist) */ update(documentId: string, document: T, version?: { seqNo: number; primaryTerm: number }): this { this.ensureActive(); this.operations.push({ operation: DocumentOperation.UPDATE, documentId, document, ...(version && { seqNo: version.seqNo, primaryTerm: version.primaryTerm, }), }); this.seenDocuments.add(documentId); return this; } /** * Delete a document */ delete(documentId: string): this { this.ensureActive(); this.operations.push({ operation: DocumentOperation.DELETE, documentId, }); return this; } /** * Commit the session and execute all operations */ async commit(): Promise { this.ensureActive(); try { // Execute batched operations const result = await this.executeBatch(); // Clean up stale documents if configured if (this.config.cleanupStale) { await this.cleanupStaleDocuments(); } this.isActive = false; this.logger.info('Session committed', { index: this.index, successful: result.successful, failed: result.failed, duration: Date.now() - this.startTimestamp.getTime(), }); return result; } catch (error) { this.logger.error('Session commit failed', error as Error, { index: this.index, operationCount: this.operations.length, }); throw error; } } /** * Rollback the session (discard all operations) */ rollback(): void { this.operations = []; this.seenDocuments.clear(); this.isActive = false; this.logger.debug('Session rolled back', { index: this.index }); } /** * Execute batch operations */ private async executeBatch(): Promise { if (this.operations.length === 0) { return { successful: 0, failed: 0, errors: [], took: 0, }; } const startTime = Date.now(); const bulkBody: any[] = []; // Build bulk request body for (const op of this.operations) { switch (op.operation) { case DocumentOperation.CREATE: bulkBody.push({ create: { _index: this.index, _id: op.documentId } }); bulkBody.push(op.document); break; case DocumentOperation.UPDATE: bulkBody.push({ update: { _index: this.index, _id: op.documentId, ...(op.seqNo !== undefined && { if_seq_no: op.seqNo }), ...(op.primaryTerm !== undefined && { if_primary_term: op.primaryTerm }), }, }); bulkBody.push({ doc: op.document }); break; case DocumentOperation.UPSERT: bulkBody.push({ index: { _index: this.index, _id: op.documentId } }); bulkBody.push(op.document); break; case DocumentOperation.DELETE: bulkBody.push({ delete: { _index: this.index, _id: op.documentId } }); break; } } // Execute bulk request const response = await this.client.bulk({ body: bulkBody, refresh: true, // Make changes immediately visible }); const took = Date.now() - startTime; // Process results let successful = 0; let failed = 0; const errors: Array<{ documentId: string; operation: DocumentOperation; error: string; statusCode: number; }> = []; if (response.errors) { for (let i = 0; i < response.items.length; i++) { const item = response.items[i]; const operation = this.operations[i]; const action = Object.keys(item)[0]; const result = item[action as keyof typeof item] as any; if (result.error) { failed++; errors.push({ documentId: operation.documentId, operation: operation.operation, error: result.error.reason || result.error, statusCode: result.status, }); } else { successful++; } } } else { successful = response.items.length; } const result: BatchResult = { successful, failed, errors, took, }; if (failed > 0) { this.logger.warn('Batch operation had failures', { successful, failed, errors: errors.slice(0, 5), // Log first 5 errors }); if (failed === this.operations.length) { // Complete failure throw new BulkOperationError( 'All bulk operations failed', successful, failed, errors ); } } return result; } /** * Clean up documents not seen in this session */ private async cleanupStaleDocuments(): Promise { if (this.seenDocuments.size === 0) { return; // No documents to keep, skip cleanup } this.logger.debug('Cleaning up stale documents', { index: this.index, seenCount: this.seenDocuments.size, }); try { // Use deleteByQuery to remove documents not in seen set // This is more efficient than the old scroll-and-compare approach const seenIds = Array.from(this.seenDocuments); await this.client.deleteByQuery({ index: this.index, body: { query: { bool: { must_not: { ids: { values: seenIds, }, }, }, }, }, refresh: true, }); this.logger.debug('Stale documents cleaned up', { index: this.index }); } catch (error) { this.logger.warn('Failed to cleanup stale documents', undefined, { index: this.index, error: (error as Error).message, }); // Don't throw - cleanup is best-effort } } /** * Ensure session is active */ private ensureActive(): void { if (!this.isActive) { throw new Error('Session not active. Call start() first.'); } } /** * Get session statistics */ getStats(): { isActive: boolean; operationCount: number; seenDocumentCount: number; startTime: Date; } { return { isActive: this.isActive, operationCount: this.operations.length, seenDocumentCount: this.seenDocuments.size, startTime: this.startTimestamp, }; } }