Files
elasticsearch/ts/domain/logging/log-destination.ts
Juergen Kunz 820f84ee61 fix(core): Resolve TypeScript strict mode and ES client API compatibility issues for v3.0.0
- Fix ES client v8+ API: use document/doc instead of body for index/update operations
- Add type assertions (as any) for ES client ILM, template, and search APIs
- Fix strict null checks with proper undefined handling (nullish coalescing)
- Fix MetricsCollector interface to match required method signatures
- Fix Logger.error signature compatibility in plugins
- Resolve TermsQuery type index signature conflict
- Remove sourceMap from tsconfig (handled by tsbuild with inlineSourceMap)
2025-11-29 21:19:28 +00:00

576 lines
16 KiB
TypeScript

import type {
LogEntry,
LogDestinationConfig,
LogBatchResult,
LogDestinationStats,
ILMPolicyConfig,
} from './types.js';
import { ElasticsearchConnectionManager } from '../../core/connection/connection-manager.js';
import { defaultLogger } from '../../core/observability/logger.js';
import { defaultMetricsCollector } from '../../core/observability/metrics.js';
import { defaultTracer } from '../../core/observability/tracing.js';
/**
* Enterprise-grade log destination for Elasticsearch
*
* Features:
* - Batched bulk indexing with configurable batch size
* - Automatic flushing at intervals
* - Log enrichment pipeline
* - Sampling strategies (all, errors-only, percentage, rate-limit)
* - ILM (Index Lifecycle Management) integration
* - Metric extraction from logs
* - Auto index template creation
* - Queue overflow protection
* - Full observability integration
*
* @example
* ```typescript
* const logDest = new LogDestination({
* indexPattern: 'logs-myapp-{now/d}',
* batchSize: 100,
* flushIntervalMs: 5000,
* sampling: {
* strategy: 'percentage',
* percentage: 10,
* alwaysSampleErrors: true
* },
* enrichers: [addHostInfo, addEnvironment],
* ilm: {
* name: 'logs-policy',
* hotDuration: '7d',
* deleteDuration: '30d'
* }
* });
*
* await logDest.initialize();
* await logDest.send({
* timestamp: new Date().toISOString(),
* level: 'INFO',
* message: 'User logged in',
* metadata: { userId: '123' }
* });
* ```
*/
export class LogDestination {
private config: Required<LogDestinationConfig>;
private queue: LogEntry[] = [];
private flushTimer?: NodeJS.Timeout;
private stats: LogDestinationStats = {
totalLogs: 0,
totalSuccessful: 0,
totalFailed: 0,
totalSampled: 0,
totalDropped: 0,
queueSize: 0,
avgBatchDurationMs: 0,
};
private batchDurations: number[] = [];
private lastRateLimitReset = Date.now();
private rateLimitCounter = 0;
private initialized = false;
constructor(config: LogDestinationConfig) {
this.config = {
indexPattern: config.indexPattern,
batchSize: config.batchSize ?? 100,
flushIntervalMs: config.flushIntervalMs ?? 5000,
maxQueueSize: config.maxQueueSize ?? 10000,
enrichers: config.enrichers ?? [],
sampling: config.sampling ?? { strategy: 'all', alwaysSampleErrors: true },
ilm: config.ilm ?? { name: 'logs-default', hotDuration: '7d', deleteDuration: '30d' },
metrics: config.metrics ?? [],
autoCreateTemplate: config.autoCreateTemplate ?? true,
templateSettings: config.templateSettings ?? {
numberOfShards: 1,
numberOfReplicas: 1,
refreshInterval: '5s',
codec: 'best_compression',
},
templateMappings: config.templateMappings ?? {},
};
}
/**
* Create a new log destination
*/
static create(config: LogDestinationConfig): LogDestination {
return new LogDestination(config);
}
/**
* Initialize the log destination (create template, ILM policy)
*/
async initialize(): Promise<void> {
if (this.initialized) {
return;
}
const span = defaultTracer.startSpan('logDestination.initialize');
try {
// Create ILM policy if configured
if (this.config.ilm) {
await this.createILMPolicy(this.config.ilm);
}
// Create index template if enabled
if (this.config.autoCreateTemplate) {
await this.createIndexTemplate();
}
// Start flush timer
this.startFlushTimer();
this.initialized = true;
defaultLogger.info('Log destination initialized', {
indexPattern: this.config.indexPattern,
batchSize: this.config.batchSize,
flushIntervalMs: this.config.flushIntervalMs,
});
span.end();
} catch (error) {
defaultLogger.error('Failed to initialize log destination', {
error: error instanceof Error ? error.message : String(error),
});
span.recordException(error as Error);
span.end();
throw error;
}
}
/**
* Send a log entry
*/
async send(entry: LogEntry): Promise<void> {
this.stats.totalLogs++;
// Apply sampling
if (!this.shouldSample(entry)) {
this.stats.totalSampled++;
return;
}
// Apply enrichers
let enrichedEntry = entry;
for (const enricher of this.config.enrichers) {
enrichedEntry = await enricher(enrichedEntry);
}
// Extract metrics if configured
if (this.config.metrics.length > 0) {
this.extractMetrics(enrichedEntry);
}
// Check queue size
if (this.queue.length >= this.config.maxQueueSize) {
this.stats.totalDropped++;
defaultLogger.warn('Log queue overflow, dropping log', {
queueSize: this.queue.length,
maxQueueSize: this.config.maxQueueSize,
});
return;
}
// Add to queue
this.queue.push(enrichedEntry);
this.stats.queueSize = this.queue.length;
// Flush if batch size reached
if (this.queue.length >= this.config.batchSize) {
await this.flush();
}
}
/**
* Send multiple log entries
*/
async sendBatch(entries: LogEntry[]): Promise<void> {
for (const entry of entries) {
await this.send(entry);
}
}
/**
* Flush pending logs immediately
*/
async flush(): Promise<LogBatchResult | null> {
if (this.queue.length === 0) {
return null;
}
const span = defaultTracer.startSpan('logDestination.flush', {
'batch.size': this.queue.length,
});
const startTime = Date.now();
const batch = this.queue.splice(0, this.config.batchSize);
this.stats.queueSize = this.queue.length;
try {
const client = ElasticsearchConnectionManager.getInstance().getClient();
// Build bulk operations
const operations = batch.flatMap((entry) => [
{ index: { _index: this.resolveIndexName() } },
entry,
]);
// Execute bulk request
const result = await client.bulk({ operations });
const durationMs = Date.now() - startTime;
this.batchDurations.push(durationMs);
if (this.batchDurations.length > 100) {
this.batchDurations.shift();
}
this.stats.avgBatchDurationMs =
this.batchDurations.reduce((a, b) => a + b, 0) / this.batchDurations.length;
this.stats.lastFlushAt = new Date();
// Process results
const errors: Array<{ log: LogEntry; error: string }> = [];
let successful = 0;
let failed = 0;
if (result.items) {
result.items.forEach((item, index) => {
const operation = item.index || item.create || item.update;
if (operation && operation.error) {
failed++;
errors.push({
log: batch[index] as LogEntry,
error: JSON.stringify(operation.error),
});
} else {
successful++;
}
});
}
this.stats.totalSuccessful += successful;
this.stats.totalFailed += failed;
// Record metrics
defaultMetricsCollector.requestsTotal.inc({ operation: 'log_flush', index: 'logs' });
defaultMetricsCollector.requestDuration.observe(durationMs / 1000, { operation: 'log_flush', index: 'logs' });
if (failed > 0) {
defaultLogger.warn('Some logs failed to index', {
successful,
failed,
errors: errors.slice(0, 5), // Log first 5 errors
});
}
span.setAttributes({
'batch.successful': successful,
'batch.failed': failed,
'batch.duration_ms': durationMs,
});
span.end();
return {
successful,
failed,
total: batch.length,
errors: errors.length > 0 ? errors : undefined,
durationMs,
};
} catch (error) {
this.stats.totalFailed += batch.length;
defaultMetricsCollector.requestErrors.inc({ operation: 'log_flush' });
defaultLogger.error('Failed to flush logs', {
error: error instanceof Error ? error.message : String(error),
batchSize: batch.length,
});
span.recordException(error as Error);
span.end();
throw error;
}
}
/**
* Get destination statistics
*/
getStats(): LogDestinationStats {
return { ...this.stats };
}
/**
* Destroy the destination (flush pending logs and stop timer)
*/
async destroy(): Promise<void> {
if (this.flushTimer) {
clearInterval(this.flushTimer);
}
// Flush remaining logs
if (this.queue.length > 0) {
await this.flush();
}
this.initialized = false;
defaultLogger.info('Log destination destroyed', {
stats: this.stats,
});
}
// ============================================================================
// Private Methods
// ============================================================================
private startFlushTimer(): void {
this.flushTimer = setInterval(async () => {
if (this.queue.length > 0) {
try {
await this.flush();
} catch (error) {
defaultLogger.error('Flush timer error', {
error: error instanceof Error ? error.message : String(error),
});
}
}
}, this.config.flushIntervalMs);
}
private shouldSample(entry: LogEntry): boolean {
const sampling = this.config.sampling;
// Always sample errors if configured
if (sampling.alwaysSampleErrors && entry.level === 'ERROR') {
return true;
}
switch (sampling.strategy) {
case 'all':
return true;
case 'errors-only':
return entry.level === 'ERROR';
case 'percentage':
return Math.random() * 100 < (sampling.percentage ?? 100);
case 'rate-limit': {
const now = Date.now();
if (now - this.lastRateLimitReset >= 1000) {
this.lastRateLimitReset = now;
this.rateLimitCounter = 0;
}
this.rateLimitCounter++;
return this.rateLimitCounter <= (sampling.maxLogsPerSecond ?? 100);
}
default:
return true;
}
}
private resolveIndexName(): string {
// Support date math in index pattern
const pattern = this.config.indexPattern;
// Simple date math support for {now/d}
if (pattern.includes('{now/d}')) {
const dateParts = new Date().toISOString().split('T');
const date = dateParts[0] ?? new Date().toISOString().substring(0, 10);
return pattern.replace('{now/d}', date);
}
// Support {now/M} for month
if (pattern.includes('{now/M}')) {
const date = new Date();
const month = `${date.getFullYear()}.${String(date.getMonth() + 1).padStart(2, '0')}`;
return pattern.replace('{now/M}', month);
}
return pattern;
}
private extractMetrics(entry: LogEntry): void {
for (const metric of this.config.metrics) {
const value = this.getNestedValue(entry, metric.field);
if (value === undefined) continue;
const labels: Record<string, string> = {};
if (metric.labels) {
for (const labelField of metric.labels) {
const labelValue = this.getNestedValue(entry, labelField);
if (labelValue !== undefined) {
labels[labelField] = String(labelValue);
}
}
}
switch (metric.type) {
case 'counter':
defaultMetricsCollector.requestsTotal.inc({ ...labels, metric: metric.name });
break;
case 'gauge':
// Note: Would need custom gauge metric for this
break;
case 'histogram':
if (typeof value === 'number') {
defaultMetricsCollector.requestDuration.observe(value, { ...labels, metric: metric.name });
}
break;
}
}
}
private getNestedValue(obj: unknown, path: string): unknown {
const parts = path.split('.');
let current = obj;
for (const part of parts) {
if (current === null || current === undefined || typeof current !== 'object') {
return undefined;
}
current = (current as Record<string, unknown>)[part];
}
return current;
}
private async createILMPolicy(ilm: ILMPolicyConfig): Promise<void> {
const client = ElasticsearchConnectionManager.getInstance().getClient();
// Build rollover config with ES client property names
const rolloverConfig = ilm.rollover ? {
...(ilm.rollover.maxSize && { max_size: ilm.rollover.maxSize }),
...(ilm.rollover.maxAge && { max_age: ilm.rollover.maxAge }),
...(ilm.rollover.maxDocs && { max_docs: ilm.rollover.maxDocs }),
} : undefined;
const policy = {
policy: {
phases: {
...(ilm.hotDuration && {
hot: {
actions: {
...(rolloverConfig && { rollover: rolloverConfig }),
},
},
}),
...(ilm.warmDuration && {
warm: {
min_age: ilm.warmDuration,
actions: {
shrink: { number_of_shards: 1 },
forcemerge: { max_num_segments: 1 },
},
},
}),
...(ilm.coldDuration && {
cold: {
min_age: ilm.coldDuration,
actions: {
freeze: {},
},
},
}),
...(ilm.deleteDuration && {
delete: {
min_age: ilm.deleteDuration,
actions: {
delete: {},
},
},
}),
},
},
};
try {
await client.ilm.putLifecycle({
name: ilm.name,
...policy,
} as any);
defaultLogger.info('ILM policy created', { policy: ilm.name });
} catch (error) {
defaultLogger.warn('Failed to create ILM policy (may already exist)', {
policy: ilm.name,
error: error instanceof Error ? error.message : String(error),
});
}
}
private async createIndexTemplate(): Promise<void> {
const client = ElasticsearchConnectionManager.getInstance().getClient();
const templateName = `logs-${this.config.indexPattern.split('-')[1] || 'default'}-template`;
const indexPattern = this.config.indexPattern.replace(/\{.*?\}/g, '*');
const template = {
index_patterns: [indexPattern],
template: {
settings: {
number_of_shards: this.config.templateSettings.numberOfShards,
number_of_replicas: this.config.templateSettings.numberOfReplicas,
refresh_interval: this.config.templateSettings.refreshInterval,
codec: this.config.templateSettings.codec,
...(this.config.ilm && {
'index.lifecycle.name': this.config.ilm.name,
'index.lifecycle.rollover_alias': indexPattern,
}),
},
mappings: {
properties: {
timestamp: { type: 'date' },
level: { type: 'keyword' },
message: { type: 'text' },
correlationId: { type: 'keyword' },
service: { type: 'keyword' },
version: { type: 'keyword' },
host: { type: 'keyword' },
environment: { type: 'keyword' },
tags: { type: 'keyword' },
metadata: { type: 'object', enabled: false },
error: {
properties: {
name: { type: 'keyword' },
message: { type: 'text' },
stack: { type: 'text' },
code: { type: 'keyword' },
},
},
metrics: {
properties: {
duration: { type: 'long' },
memory: { type: 'long' },
cpu: { type: 'float' },
},
},
...this.config.templateMappings,
},
},
},
};
try {
await client.indices.putIndexTemplate({
name: templateName,
...template,
} as any);
defaultLogger.info('Index template created', { template: templateName });
} catch (error) {
defaultLogger.warn('Failed to create index template (may already exist)', {
template: templateName,
error: error instanceof Error ? error.message : String(error),
});
}
}
}
/**
* Create a new log destination
*/
export function createLogDestination(config: LogDestinationConfig): LogDestination {
return new LogDestination(config);
}