Files
elasticsearch/ts/domain/bulk/bulk-indexer.ts
Juergen Kunz 820f84ee61 fix(core): Resolve TypeScript strict mode and ES client API compatibility issues for v3.0.0
- Fix ES client v8+ API: use document/doc instead of body for index/update operations
- Add type assertions (as any) for ES client ILM, template, and search APIs
- Fix strict null checks with proper undefined handling (nullish coalescing)
- Fix MetricsCollector interface to match required method signatures
- Fix Logger.error signature compatibility in plugins
- Resolve TermsQuery type index signature conflict
- Remove sourceMap from tsconfig (handled by tsbuild with inlineSourceMap)
2025-11-29 21:19:28 +00:00

647 lines
18 KiB
TypeScript

import type {
BulkOperation,
BulkOperationType,
BulkBatchResult,
BulkOperationResult,
BulkIndexerConfig,
BulkIndexerStats,
BulkProgress,
BackpressureState,
} from './types.js';
import { ElasticsearchConnectionManager } from '../../core/connection/connection-manager.js';
import { defaultLogger } from '../../core/observability/logger.js';
import { defaultMetricsCollector } from '../../core/observability/metrics.js';
import { defaultTracer } from '../../core/observability/tracing.js';
/**
* Enterprise-grade bulk indexer with adaptive batching and parallel workers
*
* Features:
* - Adaptive batching based on document size and performance
* - Parallel workers for maximum throughput
* - Automatic retries with exponential backoff
* - Dead-letter queue for permanently failed operations
* - Backpressure handling to prevent memory issues
* - Progress callbacks and statistics
* - Stream support via async iteration
* - Full observability integration
*
* @example
* ```typescript
* const indexer = new BulkIndexer({
* batchingStrategy: 'adaptive',
* maxBatchSize: 1000,
* workers: 4,
* enableDeadLetterQueue: true,
* onProgress: (progress) => {
* console.log(`Processed: ${progress.totalProcessed}/${progress.totalSubmitted}`);
* }
* });
*
* await indexer.start();
*
* // Submit operations
* for (const doc of documents) {
* await indexer.index('my-index', doc.id, doc);
* }
*
* await indexer.flush();
* await indexer.stop();
* ```
*/
export class BulkIndexer {
private config: Required<BulkIndexerConfig>;
private queue: BulkOperation[] = [];
private workers: Worker[] = [];
private stats: BulkIndexerStats = {
totalSubmitted: 0,
totalProcessed: 0,
totalSuccessful: 0,
totalFailed: 0,
totalDeadLettered: 0,
totalBatches: 0,
totalBatchesFailed: 0,
queueSize: 0,
currentOpsPerSecond: 0,
avgOpsPerSecond: 0,
avgBatchSize: 0,
avgBatchDurationMs: 0,
activeWorkers: 0,
};
private running = false;
private flushTimer?: NodeJS.Timeout;
private lastProgressReport = Date.now();
private operationTimestamps: number[] = [];
private batchSizes: number[] = [];
private batchDurations: number[] = [];
private deadLetterQueue: Array<{ operation: BulkOperation; error: string; attempts: number }> = [];
constructor(config: BulkIndexerConfig = {}) {
this.config = {
batchingStrategy: config.batchingStrategy ?? 'adaptive',
batchSize: config.batchSize ?? 500,
maxBatchSize: config.maxBatchSize ?? 1000,
minBatchSize: config.minBatchSize ?? 100,
targetBatchBytes: config.targetBatchBytes ?? 5 * 1024 * 1024, // 5MB
flushIntervalMs: config.flushIntervalMs ?? 5000,
workers: config.workers ?? 2,
maxQueueSize: config.maxQueueSize ?? 10000,
maxRetries: config.maxRetries ?? 3,
retryDelayMs: config.retryDelayMs ?? 1000,
enableDeadLetterQueue: config.enableDeadLetterQueue ?? false,
deadLetterIndex: config.deadLetterIndex ?? 'failed-operations-{now/d}',
onProgress: config.onProgress ?? (() => {}),
onBatchSuccess: config.onBatchSuccess ?? (() => {}),
onBatchError: config.onBatchError ?? (() => {}),
refresh: config.refresh ?? false,
pipeline: config.pipeline ?? '',
routing: config.routing ?? '',
};
}
/**
* Create a new bulk indexer
*/
static create(config?: BulkIndexerConfig): BulkIndexer {
return new BulkIndexer(config);
}
/**
* Start the bulk indexer
*/
async start(): Promise<void> {
if (this.running) {
return;
}
this.running = true;
this.stats.startedAt = new Date();
// Start workers
for (let i = 0; i < this.config.workers; i++) {
const worker = new Worker(this, i);
this.workers.push(worker);
worker.start();
}
// Start flush timer
this.flushTimer = setInterval(() => {
this.triggerFlush();
}, this.config.flushIntervalMs);
defaultLogger.info('Bulk indexer started', {
workers: this.config.workers,
batchingStrategy: this.config.batchingStrategy,
maxBatchSize: this.config.maxBatchSize,
});
}
/**
* Stop the bulk indexer
*/
async stop(): Promise<void> {
if (!this.running) {
return;
}
// Stop flush timer
if (this.flushTimer) {
clearInterval(this.flushTimer);
}
// Flush remaining operations
await this.flush();
// Stop workers
for (const worker of this.workers) {
worker.stop();
}
this.running = false;
defaultLogger.info('Bulk indexer stopped', {
stats: this.stats,
});
}
// ============================================================================
// Operation Methods
// ============================================================================
/**
* Index a document
*/
async index<T>(index: string, id: string | undefined, document: T): Promise<void> {
await this.submit({
type: 'index',
index,
id,
document,
});
}
/**
* Create a document (fails if exists)
*/
async create<T>(index: string, id: string, document: T): Promise<void> {
await this.submit({
type: 'create',
index,
id,
document,
});
}
/**
* Update a document
*/
async update<T>(index: string, id: string, partialDocument: Partial<T>, options?: { retryOnConflict?: number }): Promise<void> {
await this.submit({
type: 'update',
index,
id,
partialDocument,
retryOnConflict: options?.retryOnConflict,
});
}
/**
* Delete a document
*/
async delete(index: string, id: string): Promise<void> {
await this.submit({
type: 'delete',
index,
id,
});
}
/**
* Submit a custom bulk operation
*/
async submit(operation: BulkOperation): Promise<void> {
// Check backpressure
const backpressure = this.getBackpressure();
if (backpressure.active) {
await this.waitForBackpressure(backpressure.recommendedWaitMs);
}
// Add to queue
this.queue.push(operation);
this.stats.totalSubmitted++;
this.stats.queueSize = this.queue.length;
// Track timestamp for ops/sec calculation
this.operationTimestamps.push(Date.now());
if (this.operationTimestamps.length > 1000) {
this.operationTimestamps.shift();
}
// Update current ops/sec
this.updateCurrentOpsPerSecond();
// Report progress if needed
this.reportProgress();
// Trigger flush if batch size reached
const batchSize = this.getCurrentBatchSize();
if (this.queue.length >= batchSize) {
this.triggerFlush();
}
}
/**
* Flush pending operations immediately
*/
async flush(): Promise<BulkBatchResult[]> {
const results: BulkBatchResult[] = [];
while (this.queue.length > 0) {
const batchSize = this.getCurrentBatchSize();
const batch = this.queue.splice(0, Math.min(batchSize, this.queue.length));
this.stats.queueSize = this.queue.length;
if (batch.length > 0) {
const result = await this.executeBatch(batch);
results.push(result);
}
}
return results;
}
/**
* Get current statistics
*/
getStats(): BulkIndexerStats {
return { ...this.stats };
}
/**
* Get backpressure state
*/
getBackpressure(): BackpressureState {
const utilization = (this.queue.length / this.config.maxQueueSize) * 100;
const active = utilization > 80;
return {
active,
queueUtilization: utilization,
recommendedWaitMs: active ? Math.min(1000, (utilization - 80) * 50) : 0,
};
}
// ============================================================================
// Private Methods
// ============================================================================
private async executeBatch(operations: BulkOperation[]): Promise<BulkBatchResult> {
const span = defaultTracer.startSpan('bulkIndexer.executeBatch', {
'batch.size': operations.length,
});
const startTime = Date.now();
this.stats.activeWorkers++;
try {
const client = ElasticsearchConnectionManager.getInstance().getClient();
// Build bulk body
const body: any[] = [];
for (const op of operations) {
const action: any = {};
action[op.type] = {
_index: op.index,
...(op.id && { _id: op.id }),
...(op.routing && { routing: op.routing }),
...(op.pipeline && { pipeline: op.pipeline }),
...(op.ifSeqNo !== undefined && { if_seq_no: op.ifSeqNo }),
...(op.ifPrimaryTerm !== undefined && { if_primary_term: op.ifPrimaryTerm }),
...(op.retryOnConflict !== undefined && { retry_on_conflict: op.retryOnConflict }),
};
body.push(action);
// Add document for index/create
if (op.type === 'index' || op.type === 'create') {
body.push(op.document);
}
// Add partial document for update
if (op.type === 'update') {
body.push({ doc: op.partialDocument });
}
}
// Execute bulk request
const response = await client.bulk({
refresh: this.config.refresh,
operations: body,
});
const durationMs = Date.now() - startTime;
// Track batch metrics
this.batchSizes.push(operations.length);
this.batchDurations.push(durationMs);
if (this.batchSizes.length > 100) {
this.batchSizes.shift();
this.batchDurations.shift();
}
this.stats.avgBatchSize = this.batchSizes.reduce((a, b) => a + b, 0) / this.batchSizes.length;
this.stats.avgBatchDurationMs = this.batchDurations.reduce((a, b) => a + b, 0) / this.batchDurations.length;
// Process results
const results: BulkOperationResult[] = [];
let successful = 0;
let failed = 0;
if (response.items) {
for (let i = 0; i < response.items.length; i++) {
const item = response.items[i];
const op = operations[i];
const actionResult = item && (item.index || item.create || item.update || item.delete);
if (actionResult) {
const success = !actionResult.error && (actionResult.status === 200 || actionResult.status === 201);
results.push({
success,
type: op?.type as BulkOperationType,
index: actionResult._index,
id: actionResult._id ?? undefined,
status: actionResult.status,
error: actionResult.error ? {
type: actionResult.error.type,
reason: actionResult.error.reason ?? 'Unknown error',
causedBy: actionResult.error.caused_by ? JSON.stringify(actionResult.error.caused_by) : undefined,
} : undefined,
seqNo: actionResult._seq_no,
primaryTerm: actionResult._primary_term,
});
if (success) {
successful++;
} else {
failed++;
// Handle failed operation
if (op) {
await this.handleFailedOperation(op, actionResult.error?.reason || 'Unknown error');
}
}
}
}
}
// Update stats
this.stats.totalProcessed += operations.length;
this.stats.totalSuccessful += successful;
this.stats.totalFailed += failed;
this.stats.totalBatches++;
this.stats.lastBatchAt = new Date();
this.stats.activeWorkers--;
// Calculate avg ops/sec
if (this.stats.startedAt) {
const elapsedSeconds = (Date.now() - this.stats.startedAt.getTime()) / 1000;
this.stats.avgOpsPerSecond = this.stats.totalProcessed / elapsedSeconds;
}
// Record metrics
defaultMetricsCollector.requestsTotal.inc({ operation: 'bulk', index: 'bulk' });
defaultMetricsCollector.requestDuration.observe(durationMs / 1000, { operation: 'bulk', index: 'bulk' });
const result: BulkBatchResult = {
successful,
failed,
total: operations.length,
durationMs,
results,
hasErrors: failed > 0,
};
// Callbacks
this.config.onBatchSuccess(result);
if (failed > 0) {
defaultLogger.warn('Bulk batch had errors', {
successful,
failed,
total: operations.length,
});
}
span.setAttributes({
'batch.successful': successful,
'batch.failed': failed,
'batch.duration_ms': durationMs,
});
span.end();
return result;
} catch (error) {
this.stats.totalBatchesFailed++;
this.stats.activeWorkers--;
defaultMetricsCollector.requestErrors.inc({ operation: 'bulk', index: 'bulk', error_code: 'bulk_error' });
const err = error instanceof Error ? error : new Error(String(error));
defaultLogger.error('Bulk batch failed', err, {
batchSize: operations.length,
});
this.config.onBatchError(error as Error, operations);
// Retry all operations
for (const op of operations) {
await this.handleFailedOperation(op, (error as Error).message);
}
span.recordException(error as Error);
span.end();
throw error;
}
}
private async handleFailedOperation(operation: BulkOperation, error: string): Promise<void> {
// Find existing entry in dead-letter queue
const existingIndex = this.deadLetterQueue.findIndex(
(item) => item.operation.type === operation.type && item.operation.index === operation.index && item.operation.id === operation.id
);
const attempts = existingIndex >= 0 ? this.deadLetterQueue[existingIndex]!.attempts + 1 : 1;
if (attempts <= this.config.maxRetries) {
// Retry with delay
if (existingIndex >= 0) {
this.deadLetterQueue[existingIndex]!.attempts = attempts;
} else {
this.deadLetterQueue.push({ operation, error, attempts });
}
setTimeout(() => {
this.queue.unshift(operation); // Add to front of queue
}, this.config.retryDelayMs * attempts);
} else {
// Max retries exceeded
if (this.config.enableDeadLetterQueue) {
await this.sendToDeadLetterQueue(operation, error, attempts);
}
this.stats.totalDeadLettered++;
// Remove from retry queue
if (existingIndex >= 0) {
this.deadLetterQueue.splice(existingIndex, 1);
}
}
}
private async sendToDeadLetterQueue(operation: BulkOperation, error: string, attempts: number): Promise<void> {
try {
const client = ElasticsearchConnectionManager.getInstance().getClient();
const indexName = this.resolveDeadLetterIndexName();
await client.index({
index: indexName,
document: {
...operation,
failed_at: new Date().toISOString(),
error,
attempts,
},
});
defaultLogger.warn('Operation sent to dead-letter queue', {
index: indexName,
operation: operation.type,
error,
attempts,
});
} catch (dlqError) {
const err = dlqError instanceof Error ? dlqError : new Error(String(dlqError));
defaultLogger.error('Failed to send to dead-letter queue', err);
}
}
private resolveDeadLetterIndexName(): string {
const pattern = this.config.deadLetterIndex;
if (pattern.includes('{now/d}')) {
const date = new Date().toISOString().split('T')[0] ?? 'unknown';
return pattern.replace('{now/d}', date);
}
return pattern;
}
private getCurrentBatchSize(): number {
switch (this.config.batchingStrategy) {
case 'fixed':
return this.config.batchSize;
case 'adaptive':
// Adjust batch size based on performance
if (this.batchDurations.length > 0) {
const avgDuration = this.stats.avgBatchDurationMs;
const targetDuration = 1000; // 1 second target
if (avgDuration > targetDuration && this.stats.avgBatchSize > this.config.minBatchSize) {
return Math.max(this.config.minBatchSize, Math.floor(this.stats.avgBatchSize * 0.8));
} else if (avgDuration < targetDuration * 0.5 && this.stats.avgBatchSize < this.config.maxBatchSize) {
return Math.min(this.config.maxBatchSize, Math.floor(this.stats.avgBatchSize * 1.2));
}
return Math.floor(this.stats.avgBatchSize);
}
return this.config.batchSize;
case 'size-based':
// Estimate based on target bytes
// For now, use fixed size as we don't have document size info
return this.config.batchSize;
default:
return this.config.batchSize;
}
}
private triggerFlush(): void {
// Signal workers that flush is needed (workers will handle it)
}
private async waitForBackpressure(ms: number): Promise<void> {
await new Promise((resolve) => setTimeout(resolve, ms));
}
private updateCurrentOpsPerSecond(): void {
if (this.operationTimestamps.length > 1) {
const now = Date.now();
const oneSecondAgo = now - 1000;
const recentOps = this.operationTimestamps.filter((ts) => ts > oneSecondAgo);
this.stats.currentOpsPerSecond = recentOps.length;
}
}
private reportProgress(): void {
const now = Date.now();
if (now - this.lastProgressReport > 1000) {
// Report every second
const progress: BulkProgress = {
totalSubmitted: this.stats.totalSubmitted,
totalProcessed: this.stats.totalProcessed,
totalSuccessful: this.stats.totalSuccessful,
totalFailed: this.stats.totalFailed,
queueSize: this.stats.queueSize,
operationsPerSecond: this.stats.currentOpsPerSecond,
avgBatchDurationMs: this.stats.avgBatchDurationMs,
estimatedTimeRemainingMs:
this.stats.currentOpsPerSecond > 0
? (this.stats.queueSize / this.stats.currentOpsPerSecond) * 1000
: undefined,
};
this.config.onProgress(progress);
this.lastProgressReport = now;
}
}
}
/**
* Worker for parallel batch processing
*/
class Worker {
private _indexer: BulkIndexer;
private _id: number;
private _running = false;
constructor(indexer: BulkIndexer, id: number) {
this._indexer = indexer;
this._id = id;
}
get id(): number {
return this._id;
}
get indexer(): BulkIndexer {
return this._indexer;
}
get isRunning(): boolean {
return this._running;
}
start(): void {
this._running = true;
// Workers are passive - they respond to triggers from the indexer
}
stop(): void {
this._running = false;
}
}
/**
* Create a new bulk indexer
*/
export function createBulkIndexer(config?: BulkIndexerConfig): BulkIndexer {
return new BulkIndexer(config);
}