import type { LogEntry, LogDestinationConfig, LogBatchResult, LogDestinationStats, ILMPolicyConfig, } from './types.js'; import { ElasticsearchConnectionManager } from '../../core/connection/connection-manager.js'; import { defaultLogger } from '../../core/observability/logger.js'; import { defaultMetricsCollector } from '../../core/observability/metrics.js'; import { defaultTracer } from '../../core/observability/tracing.js'; /** * Enterprise-grade log destination for Elasticsearch * * Features: * - Batched bulk indexing with configurable batch size * - Automatic flushing at intervals * - Log enrichment pipeline * - Sampling strategies (all, errors-only, percentage, rate-limit) * - ILM (Index Lifecycle Management) integration * - Metric extraction from logs * - Auto index template creation * - Queue overflow protection * - Full observability integration * * @example * ```typescript * const logDest = new LogDestination({ * indexPattern: 'logs-myapp-{now/d}', * batchSize: 100, * flushIntervalMs: 5000, * sampling: { * strategy: 'percentage', * percentage: 10, * alwaysSampleErrors: true * }, * enrichers: [addHostInfo, addEnvironment], * ilm: { * name: 'logs-policy', * hotDuration: '7d', * deleteDuration: '30d' * } * }); * * await logDest.initialize(); * await logDest.send({ * timestamp: new Date().toISOString(), * level: 'INFO', * message: 'User logged in', * metadata: { userId: '123' } * }); * ``` */ export class LogDestination { private config: Required; private queue: LogEntry[] = []; private flushTimer?: NodeJS.Timeout; private stats: LogDestinationStats = { totalLogs: 0, totalSuccessful: 0, totalFailed: 0, totalSampled: 0, totalDropped: 0, queueSize: 0, avgBatchDurationMs: 0, }; private batchDurations: number[] = []; private lastRateLimitReset = Date.now(); private rateLimitCounter = 0; private initialized = false; constructor(config: LogDestinationConfig) { this.config = { indexPattern: config.indexPattern, batchSize: config.batchSize ?? 100, flushIntervalMs: config.flushIntervalMs ?? 5000, maxQueueSize: config.maxQueueSize ?? 10000, enrichers: config.enrichers ?? [], sampling: config.sampling ?? { strategy: 'all', alwaysSampleErrors: true }, ilm: config.ilm ?? { name: 'logs-default', hotDuration: '7d', deleteDuration: '30d' }, metrics: config.metrics ?? [], autoCreateTemplate: config.autoCreateTemplate ?? true, templateSettings: config.templateSettings ?? { numberOfShards: 1, numberOfReplicas: 1, refreshInterval: '5s', codec: 'best_compression', }, templateMappings: config.templateMappings ?? {}, }; } /** * Create a new log destination */ static create(config: LogDestinationConfig): LogDestination { return new LogDestination(config); } /** * Initialize the log destination (create template, ILM policy) */ async initialize(): Promise { if (this.initialized) { return; } const span = defaultTracer.startSpan('logDestination.initialize'); try { // Create ILM policy if configured if (this.config.ilm) { await this.createILMPolicy(this.config.ilm); } // Create index template if enabled if (this.config.autoCreateTemplate) { await this.createIndexTemplate(); } // Start flush timer this.startFlushTimer(); this.initialized = true; defaultLogger.info('Log destination initialized', { indexPattern: this.config.indexPattern, batchSize: this.config.batchSize, flushIntervalMs: this.config.flushIntervalMs, }); span.end(); } catch (error) { defaultLogger.error('Failed to initialize log destination', { error: error instanceof Error ? error.message : String(error), }); span.recordException(error as Error); span.end(); throw error; } } /** * Send a log entry */ async send(entry: LogEntry): Promise { this.stats.totalLogs++; // Apply sampling if (!this.shouldSample(entry)) { this.stats.totalSampled++; return; } // Apply enrichers let enrichedEntry = entry; for (const enricher of this.config.enrichers) { enrichedEntry = await enricher(enrichedEntry); } // Extract metrics if configured if (this.config.metrics.length > 0) { this.extractMetrics(enrichedEntry); } // Check queue size if (this.queue.length >= this.config.maxQueueSize) { this.stats.totalDropped++; defaultLogger.warn('Log queue overflow, dropping log', { queueSize: this.queue.length, maxQueueSize: this.config.maxQueueSize, }); return; } // Add to queue this.queue.push(enrichedEntry); this.stats.queueSize = this.queue.length; // Flush if batch size reached if (this.queue.length >= this.config.batchSize) { await this.flush(); } } /** * Send multiple log entries */ async sendBatch(entries: LogEntry[]): Promise { for (const entry of entries) { await this.send(entry); } } /** * Flush pending logs immediately */ async flush(): Promise { if (this.queue.length === 0) { return null; } const span = defaultTracer.startSpan('logDestination.flush', { 'batch.size': this.queue.length, }); const startTime = Date.now(); const batch = this.queue.splice(0, this.config.batchSize); this.stats.queueSize = this.queue.length; try { const client = ElasticsearchConnectionManager.getInstance().getClient(); // Build bulk operations const operations = batch.flatMap((entry) => [ { index: { _index: this.resolveIndexName() } }, entry, ]); // Execute bulk request const result = await client.bulk({ operations }); const durationMs = Date.now() - startTime; this.batchDurations.push(durationMs); if (this.batchDurations.length > 100) { this.batchDurations.shift(); } this.stats.avgBatchDurationMs = this.batchDurations.reduce((a, b) => a + b, 0) / this.batchDurations.length; this.stats.lastFlushAt = new Date(); // Process results const errors: Array<{ log: LogEntry; error: string }> = []; let successful = 0; let failed = 0; if (result.items) { result.items.forEach((item, index) => { const operation = item.index || item.create || item.update; if (operation && operation.error) { failed++; errors.push({ log: batch[index] as LogEntry, error: JSON.stringify(operation.error), }); } else { successful++; } }); } this.stats.totalSuccessful += successful; this.stats.totalFailed += failed; // Record metrics defaultMetricsCollector.requestsTotal.inc({ operation: 'log_flush', index: 'logs' }); defaultMetricsCollector.requestDuration.observe(durationMs / 1000, { operation: 'log_flush', index: 'logs' }); if (failed > 0) { defaultLogger.warn('Some logs failed to index', { successful, failed, errors: errors.slice(0, 5), // Log first 5 errors }); } span.setAttributes({ 'batch.successful': successful, 'batch.failed': failed, 'batch.duration_ms': durationMs, }); span.end(); return { successful, failed, total: batch.length, errors: errors.length > 0 ? errors : undefined, durationMs, }; } catch (error) { this.stats.totalFailed += batch.length; defaultMetricsCollector.requestErrors.inc({ operation: 'log_flush' }); defaultLogger.error('Failed to flush logs', { error: error instanceof Error ? error.message : String(error), batchSize: batch.length, }); span.recordException(error as Error); span.end(); throw error; } } /** * Get destination statistics */ getStats(): LogDestinationStats { return { ...this.stats }; } /** * Destroy the destination (flush pending logs and stop timer) */ async destroy(): Promise { if (this.flushTimer) { clearInterval(this.flushTimer); } // Flush remaining logs if (this.queue.length > 0) { await this.flush(); } this.initialized = false; defaultLogger.info('Log destination destroyed', { stats: this.stats, }); } // ============================================================================ // Private Methods // ============================================================================ private startFlushTimer(): void { this.flushTimer = setInterval(async () => { if (this.queue.length > 0) { try { await this.flush(); } catch (error) { defaultLogger.error('Flush timer error', { error: error instanceof Error ? error.message : String(error), }); } } }, this.config.flushIntervalMs); } private shouldSample(entry: LogEntry): boolean { const sampling = this.config.sampling; // Always sample errors if configured if (sampling.alwaysSampleErrors && entry.level === 'ERROR') { return true; } switch (sampling.strategy) { case 'all': return true; case 'errors-only': return entry.level === 'ERROR'; case 'percentage': return Math.random() * 100 < (sampling.percentage ?? 100); case 'rate-limit': { const now = Date.now(); if (now - this.lastRateLimitReset >= 1000) { this.lastRateLimitReset = now; this.rateLimitCounter = 0; } this.rateLimitCounter++; return this.rateLimitCounter <= (sampling.maxLogsPerSecond ?? 100); } default: return true; } } private resolveIndexName(): string { // Support date math in index pattern const pattern = this.config.indexPattern; // Simple date math support for {now/d} if (pattern.includes('{now/d}')) { const dateParts = new Date().toISOString().split('T'); const date = dateParts[0] ?? new Date().toISOString().substring(0, 10); return pattern.replace('{now/d}', date); } // Support {now/M} for month if (pattern.includes('{now/M}')) { const date = new Date(); const month = `${date.getFullYear()}.${String(date.getMonth() + 1).padStart(2, '0')}`; return pattern.replace('{now/M}', month); } return pattern; } private extractMetrics(entry: LogEntry): void { for (const metric of this.config.metrics) { const value = this.getNestedValue(entry, metric.field); if (value === undefined) continue; const labels: Record = {}; if (metric.labels) { for (const labelField of metric.labels) { const labelValue = this.getNestedValue(entry, labelField); if (labelValue !== undefined) { labels[labelField] = String(labelValue); } } } switch (metric.type) { case 'counter': defaultMetricsCollector.requestsTotal.inc({ ...labels, metric: metric.name }); break; case 'gauge': // Note: Would need custom gauge metric for this break; case 'histogram': if (typeof value === 'number') { defaultMetricsCollector.requestDuration.observe(value, { ...labels, metric: metric.name }); } break; } } } private getNestedValue(obj: unknown, path: string): unknown { const parts = path.split('.'); let current = obj; for (const part of parts) { if (current === null || current === undefined || typeof current !== 'object') { return undefined; } current = (current as Record)[part]; } return current; } private async createILMPolicy(ilm: ILMPolicyConfig): Promise { const client = ElasticsearchConnectionManager.getInstance().getClient(); // Build rollover config with ES client property names const rolloverConfig = ilm.rollover ? { ...(ilm.rollover.maxSize && { max_size: ilm.rollover.maxSize }), ...(ilm.rollover.maxAge && { max_age: ilm.rollover.maxAge }), ...(ilm.rollover.maxDocs && { max_docs: ilm.rollover.maxDocs }), } : undefined; const policy = { policy: { phases: { ...(ilm.hotDuration && { hot: { actions: { ...(rolloverConfig && { rollover: rolloverConfig }), }, }, }), ...(ilm.warmDuration && { warm: { min_age: ilm.warmDuration, actions: { shrink: { number_of_shards: 1 }, forcemerge: { max_num_segments: 1 }, }, }, }), ...(ilm.coldDuration && { cold: { min_age: ilm.coldDuration, actions: { freeze: {}, }, }, }), ...(ilm.deleteDuration && { delete: { min_age: ilm.deleteDuration, actions: { delete: {}, }, }, }), }, }, }; try { await client.ilm.putLifecycle({ name: ilm.name, ...policy, } as any); defaultLogger.info('ILM policy created', { policy: ilm.name }); } catch (error) { defaultLogger.warn('Failed to create ILM policy (may already exist)', { policy: ilm.name, error: error instanceof Error ? error.message : String(error), }); } } private async createIndexTemplate(): Promise { const client = ElasticsearchConnectionManager.getInstance().getClient(); const templateName = `logs-${this.config.indexPattern.split('-')[1] || 'default'}-template`; const indexPattern = this.config.indexPattern.replace(/\{.*?\}/g, '*'); const template = { index_patterns: [indexPattern], template: { settings: { number_of_shards: this.config.templateSettings.numberOfShards, number_of_replicas: this.config.templateSettings.numberOfReplicas, refresh_interval: this.config.templateSettings.refreshInterval, codec: this.config.templateSettings.codec, ...(this.config.ilm && { 'index.lifecycle.name': this.config.ilm.name, 'index.lifecycle.rollover_alias': indexPattern, }), }, mappings: { properties: { timestamp: { type: 'date' }, level: { type: 'keyword' }, message: { type: 'text' }, correlationId: { type: 'keyword' }, service: { type: 'keyword' }, version: { type: 'keyword' }, host: { type: 'keyword' }, environment: { type: 'keyword' }, tags: { type: 'keyword' }, metadata: { type: 'object', enabled: false }, error: { properties: { name: { type: 'keyword' }, message: { type: 'text' }, stack: { type: 'text' }, code: { type: 'keyword' }, }, }, metrics: { properties: { duration: { type: 'long' }, memory: { type: 'long' }, cpu: { type: 'float' }, }, }, ...this.config.templateMappings, }, }, }, }; try { await client.indices.putIndexTemplate({ name: templateName, ...template, } as any); defaultLogger.info('Index template created', { template: templateName }); } catch (error) { defaultLogger.warn('Failed to create index template (may already exist)', { template: templateName, error: error instanceof Error ? error.message : String(error), }); } } } /** * Create a new log destination */ export function createLogDestination(config: LogDestinationConfig): LogDestination { return new LogDestination(config); }