import type { Client as ElasticClient } from '@elastic/elasticsearch'; import { ElasticsearchConnectionManager } from '../../core/connection/connection-manager.js'; import { Logger, defaultLogger } from '../../core/observability/logger.js'; import { MetricsCollector, defaultMetricsCollector } from '../../core/observability/metrics.js'; import { TracingProvider, defaultTracingProvider } from '../../core/observability/tracing.js'; import { DocumentSession } from './document-session.js'; import type { DocumentWithMeta, SessionConfig, SnapshotProcessor, SnapshotMeta, IteratorOptions, } from './types.js'; import { IndexNotFoundError } from '../../core/errors/elasticsearch-error.js'; /** * Document manager configuration */ export interface DocumentManagerConfig { /** Index name */ index: string; /** Connection manager (optional, will use singleton if not provided) */ connectionManager?: ElasticsearchConnectionManager; /** Logger (optional, will use default if not provided) */ logger?: Logger; /** Metrics collector (optional) */ metrics?: MetricsCollector; /** Tracing provider (optional) */ tracing?: TracingProvider; /** Auto-create index if it doesn't exist */ autoCreateIndex?: boolean; /** Default batch size for operations */ defaultBatchSize?: number; } /** * Fluent document manager for Elasticsearch * * @example * ```typescript * const docs = new DocumentManager('products'); * await docs.initialize(); * * // Session-based operations * await docs * .session() * .start() * .upsert('prod-1', { name: 'Widget', price: 99.99 }) * .upsert('prod-2', { name: 'Gadget', price: 149.99 }) * .commit(); * * // Get a document * const product = await docs.get('prod-1'); * * // Create snapshot * const snapshot = await docs.snapshot(async (iterator) => { * const products = []; * for await (const doc of iterator) { * products.push(doc._source); * } * return { totalCount: products.length, products }; * }); * ``` */ export class DocumentManager { private client: ElasticClient; private connectionManager: ElasticsearchConnectionManager; private logger: Logger; private metrics: MetricsCollector; private tracing: TracingProvider; private index: string; private config: DocumentManagerConfig; private isInitialized = false; constructor(config: DocumentManagerConfig) { this.config = config; this.index = config.index; // Get or create connection manager this.connectionManager = config.connectionManager || ElasticsearchConnectionManager.getInstance(); // Set up observability this.logger = config.logger || defaultLogger.child(`documents:${this.index}`); this.metrics = config.metrics || defaultMetricsCollector; this.tracing = config.tracing || defaultTracingProvider; // Get client (will throw if connection manager not initialized) this.client = this.connectionManager.getClient(); } /** * Static factory method for fluent creation */ static create(index: string, config: Omit = {}): DocumentManager { return new DocumentManager({ ...config, index }); } /** * Initialize the document manager */ async initialize(): Promise { if (this.isInitialized) { return; } return this.tracing.withSpan('DocumentManager.initialize', async (span) => { span.setAttribute('index', this.index); try { // Check if index exists const exists = await this.client.indices.exists({ index: this.index }); if (!exists && this.config.autoCreateIndex) { this.logger.info('Creating index', { index: this.index }); await this.client.indices.create({ index: this.index }); this.logger.info('Index created', { index: this.index }); } else if (!exists) { throw new IndexNotFoundError(this.index); } this.isInitialized = true; this.logger.info('Document manager initialized', { index: this.index }); } catch (error) { this.logger.error('Failed to initialize document manager', error as Error, { index: this.index, }); span.recordException(error as Error); throw error; } }); } /** * Create a new session for batch operations */ session(config?: SessionConfig): DocumentSession { this.ensureInitialized(); return new DocumentSession(this.client, this.index, this.logger, config); } /** * Get a single document by ID */ async get(documentId: string): Promise | null> { this.ensureInitialized(); return this.tracing.withSpan('DocumentManager.get', async (span) => { span.setAttributes({ 'document.id': documentId, 'document.index': this.index, }); const startTime = Date.now(); try { const result = await this.client.get({ index: this.index, id: documentId, }); const duration = (Date.now() - startTime) / 1000; this.metrics.requestDuration.observe(duration, { operation: 'get', index: this.index, }); return { _id: result._id, _source: result._source as T, _version: result._version, _seq_no: result._seq_no, _primary_term: result._primary_term, _index: result._index, }; } catch (error: any) { if (error.statusCode === 404) { this.logger.debug('Document not found', { documentId, index: this.index }); return null; } this.logger.error('Failed to get document', error, { documentId, index: this.index }); span.recordException(error); throw error; } }); } /** * Create a document */ async create(documentId: string, document: T): Promise { this.ensureInitialized(); return this.tracing.withSpan('DocumentManager.create', async (span) => { span.setAttributes({ 'document.id': documentId, 'document.index': this.index, }); const startTime = Date.now(); try { await this.client.create({ index: this.index, id: documentId, document: document as Record, refresh: true, }); const duration = (Date.now() - startTime) / 1000; this.metrics.requestDuration.observe(duration, { operation: 'create', index: this.index, }); this.logger.debug('Document created', { documentId, index: this.index }); } catch (error) { this.logger.error('Failed to create document', error as Error, { documentId, index: this.index, }); span.recordException(error as Error); throw error; } }); } /** * Update a document */ async update( documentId: string, document: Partial, options?: { seqNo?: number; primaryTerm?: number } ): Promise { this.ensureInitialized(); return this.tracing.withSpan('DocumentManager.update', async (span) => { span.setAttributes({ 'document.id': documentId, 'document.index': this.index, }); const startTime = Date.now(); try { await this.client.update({ index: this.index, id: documentId, doc: document as Record, refresh: true, ...(options?.seqNo !== undefined && { if_seq_no: options.seqNo }), ...(options?.primaryTerm !== undefined && { if_primary_term: options.primaryTerm }), }); const duration = (Date.now() - startTime) / 1000; this.metrics.requestDuration.observe(duration, { operation: 'update', index: this.index, }); this.logger.debug('Document updated', { documentId, index: this.index }); } catch (error) { this.logger.error('Failed to update document', error as Error, { documentId, index: this.index, }); span.recordException(error as Error); throw error; } }); } /** * Upsert a document (create or update) */ async upsert(documentId: string, document: T): Promise { this.ensureInitialized(); return this.tracing.withSpan('DocumentManager.upsert', async (span) => { span.setAttributes({ 'document.id': documentId, 'document.index': this.index, }); const startTime = Date.now(); try { await this.client.index({ index: this.index, id: documentId, document: document as Record, refresh: true, }); const duration = (Date.now() - startTime) / 1000; this.metrics.requestDuration.observe(duration, { operation: 'upsert', index: this.index, }); this.logger.debug('Document upserted', { documentId, index: this.index }); } catch (error) { this.logger.error('Failed to upsert document', error as Error, { documentId, index: this.index, }); span.recordException(error as Error); throw error; } }); } /** * Delete a document */ async delete(documentId: string): Promise { this.ensureInitialized(); return this.tracing.withSpan('DocumentManager.delete', async (span) => { span.setAttributes({ 'document.id': documentId, 'document.index': this.index, }); const startTime = Date.now(); try { await this.client.delete({ index: this.index, id: documentId, refresh: true, }); const duration = (Date.now() - startTime) / 1000; this.metrics.requestDuration.observe(duration, { operation: 'delete', index: this.index, }); this.logger.debug('Document deleted', { documentId, index: this.index }); } catch (error: any) { if (error.statusCode === 404) { this.logger.debug('Document not found for deletion', { documentId, index: this.index }); return; // Idempotent delete } this.logger.error('Failed to delete document', error, { documentId, index: this.index }); span.recordException(error); throw error; } }); } /** * Check if index exists */ async exists(): Promise { try { return await this.client.indices.exists({ index: this.index }); } catch (error) { this.logger.error('Failed to check if index exists', error as Error, { index: this.index, }); return false; } } /** * Delete the index */ async deleteIndex(): Promise { return this.tracing.withSpan('DocumentManager.deleteIndex', async (span) => { span.setAttribute('index', this.index); try { await this.client.indices.delete({ index: this.index }); this.isInitialized = false; this.logger.info('Index deleted', { index: this.index }); } catch (error) { this.logger.error('Failed to delete index', error as Error, { index: this.index }); span.recordException(error as Error); throw error; } }); } /** * Get document count */ async count(query?: unknown): Promise { this.ensureInitialized(); try { const queryObj = query as Record | undefined; const result = await this.client.count({ index: this.index, ...(queryObj ? { query: queryObj } : {}), }); return result.count; } catch (error) { this.logger.error('Failed to count documents', error as Error, { index: this.index }); throw error; } } /** * Create a snapshot with custom processor */ async snapshot(processor: SnapshotProcessor): Promise> { this.ensureInitialized(); return this.tracing.withSpan('DocumentManager.snapshot', async (span) => { span.setAttribute('index', this.index); const startTime = Date.now(); const snapshotIndex = `${this.index}-snapshots`; try { // Get previous snapshot const previousSnapshot = await this.getLatestSnapshot(snapshotIndex); // Create iterator for all documents const iterator = this.iterate(); // Process snapshot const snapshotData = await processor(iterator, previousSnapshot); // Count documents const documentCount = await this.count(); // Store snapshot const snapshot: SnapshotMeta = { date: new Date(), data: snapshotData, documentCount, processingTime: Date.now() - startTime, }; await this.storeSnapshot(snapshotIndex, snapshot); this.logger.info('Snapshot created', { index: this.index, documentCount, processingTime: snapshot.processingTime, }); return snapshot; } catch (error) { this.logger.error('Failed to create snapshot', error as Error, { index: this.index }); span.recordException(error as Error); throw error; } }); } /** * Iterate over all documents */ async *iterate(options: IteratorOptions = {}): AsyncIterableIterator> { this.ensureInitialized(); const batchSize = options.batchSize || this.config.defaultBatchSize || 1000; // TODO: Use Point-in-Time API for better performance // For now, use basic search with search_after let searchAfter: any[] | undefined; let hasMore = true; while (hasMore) { const searchQuery = options.query as Record | undefined; const result = await this.client.search({ index: this.index, size: batchSize, ...(searchAfter ? { search_after: searchAfter } : {}), sort: options.sort || [{ _id: 'asc' }], ...(searchQuery ? { query: searchQuery } : {}), }); const hits = result.hits.hits; if (hits.length === 0) { hasMore = false; break; } for (const hit of hits) { yield { _id: hit._id ?? '', _source: hit._source as T, _version: hit._version, _seq_no: hit._seq_no, _primary_term: hit._primary_term, _index: hit._index, _score: hit._score ?? undefined, }; } // Get last sort value for pagination const lastHit = hits[hits.length - 1]; if (lastHit) { searchAfter = lastHit.sort; } if (hits.length < batchSize) { hasMore = false; } } } /** * Get latest snapshot */ private async getLatestSnapshot(snapshotIndex: string): Promise { try { const result = await this.client.search({ index: snapshotIndex, size: 1, sort: [{ 'date': 'desc' }] as unknown as Array, }); const firstHit = result.hits.hits[0]; if (!firstHit) { return null; } const snapshot = firstHit._source as SnapshotMeta; return snapshot.data; } catch (error: any) { if (error.statusCode === 404) { return null; // Index doesn't exist yet } throw error; } } /** * Store snapshot */ private async storeSnapshot(snapshotIndex: string, snapshot: SnapshotMeta): Promise { await this.client.index({ index: snapshotIndex, document: snapshot as unknown as Record, refresh: true, }); } /** * Ensure manager is initialized */ private ensureInitialized(): void { if (!this.isInitialized) { throw new Error('DocumentManager not initialized. Call initialize() first.'); } } /** * Get index name */ getIndex(): string { return this.index; } }