637 lines
18 KiB
TypeScript
637 lines
18 KiB
TypeScript
import type {
|
|
BulkOperation,
|
|
BulkOperationType,
|
|
BulkBatchResult,
|
|
BulkOperationResult,
|
|
BulkIndexerConfig,
|
|
BulkIndexerStats,
|
|
BulkProgress,
|
|
BackpressureState,
|
|
BatchingStrategy,
|
|
} from './types.js';
|
|
import { ElasticsearchConnectionManager } from '../../core/connection/connection-manager.js';
|
|
import { defaultLogger } from '../../core/observability/logger.js';
|
|
import { defaultMetrics } from '../../core/observability/metrics.js';
|
|
import { defaultTracing } from '../../core/observability/tracing.js';
|
|
|
|
/**
|
|
* Enterprise-grade bulk indexer with adaptive batching and parallel workers
|
|
*
|
|
* Features:
|
|
* - Adaptive batching based on document size and performance
|
|
* - Parallel workers for maximum throughput
|
|
* - Automatic retries with exponential backoff
|
|
* - Dead-letter queue for permanently failed operations
|
|
* - Backpressure handling to prevent memory issues
|
|
* - Progress callbacks and statistics
|
|
* - Stream support via async iteration
|
|
* - Full observability integration
|
|
*
|
|
* @example
|
|
* ```typescript
|
|
* const indexer = new BulkIndexer({
|
|
* batchingStrategy: 'adaptive',
|
|
* maxBatchSize: 1000,
|
|
* workers: 4,
|
|
* enableDeadLetterQueue: true,
|
|
* onProgress: (progress) => {
|
|
* console.log(`Processed: ${progress.totalProcessed}/${progress.totalSubmitted}`);
|
|
* }
|
|
* });
|
|
*
|
|
* await indexer.start();
|
|
*
|
|
* // Submit operations
|
|
* for (const doc of documents) {
|
|
* await indexer.index('my-index', doc.id, doc);
|
|
* }
|
|
*
|
|
* await indexer.flush();
|
|
* await indexer.stop();
|
|
* ```
|
|
*/
|
|
export class BulkIndexer {
|
|
private config: Required<BulkIndexerConfig>;
|
|
private queue: BulkOperation[] = [];
|
|
private workers: Worker[] = [];
|
|
private stats: BulkIndexerStats = {
|
|
totalSubmitted: 0,
|
|
totalProcessed: 0,
|
|
totalSuccessful: 0,
|
|
totalFailed: 0,
|
|
totalDeadLettered: 0,
|
|
totalBatches: 0,
|
|
totalBatchesFailed: 0,
|
|
queueSize: 0,
|
|
currentOpsPerSecond: 0,
|
|
avgOpsPerSecond: 0,
|
|
avgBatchSize: 0,
|
|
avgBatchDurationMs: 0,
|
|
activeWorkers: 0,
|
|
};
|
|
private running = false;
|
|
private flushTimer?: NodeJS.Timeout;
|
|
private lastProgressReport = Date.now();
|
|
private operationTimestamps: number[] = [];
|
|
private batchSizes: number[] = [];
|
|
private batchDurations: number[] = [];
|
|
private deadLetterQueue: Array<{ operation: BulkOperation; error: string; attempts: number }> = [];
|
|
|
|
constructor(config: BulkIndexerConfig = {}) {
|
|
this.config = {
|
|
batchingStrategy: config.batchingStrategy ?? 'adaptive',
|
|
batchSize: config.batchSize ?? 500,
|
|
maxBatchSize: config.maxBatchSize ?? 1000,
|
|
minBatchSize: config.minBatchSize ?? 100,
|
|
targetBatchBytes: config.targetBatchBytes ?? 5 * 1024 * 1024, // 5MB
|
|
flushIntervalMs: config.flushIntervalMs ?? 5000,
|
|
workers: config.workers ?? 2,
|
|
maxQueueSize: config.maxQueueSize ?? 10000,
|
|
maxRetries: config.maxRetries ?? 3,
|
|
retryDelayMs: config.retryDelayMs ?? 1000,
|
|
enableDeadLetterQueue: config.enableDeadLetterQueue ?? false,
|
|
deadLetterIndex: config.deadLetterIndex ?? 'failed-operations-{now/d}',
|
|
onProgress: config.onProgress ?? (() => {}),
|
|
onBatchSuccess: config.onBatchSuccess ?? (() => {}),
|
|
onBatchError: config.onBatchError ?? (() => {}),
|
|
refresh: config.refresh ?? false,
|
|
pipeline: config.pipeline ?? '',
|
|
routing: config.routing ?? '',
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Create a new bulk indexer
|
|
*/
|
|
static create(config?: BulkIndexerConfig): BulkIndexer {
|
|
return new BulkIndexer(config);
|
|
}
|
|
|
|
/**
|
|
* Start the bulk indexer
|
|
*/
|
|
async start(): Promise<void> {
|
|
if (this.running) {
|
|
return;
|
|
}
|
|
|
|
this.running = true;
|
|
this.stats.startedAt = new Date();
|
|
|
|
// Start workers
|
|
for (let i = 0; i < this.config.workers; i++) {
|
|
const worker = new Worker(this, i);
|
|
this.workers.push(worker);
|
|
worker.start();
|
|
}
|
|
|
|
// Start flush timer
|
|
this.flushTimer = setInterval(() => {
|
|
this.triggerFlush();
|
|
}, this.config.flushIntervalMs);
|
|
|
|
defaultLogger.info('Bulk indexer started', {
|
|
workers: this.config.workers,
|
|
batchingStrategy: this.config.batchingStrategy,
|
|
maxBatchSize: this.config.maxBatchSize,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Stop the bulk indexer
|
|
*/
|
|
async stop(): Promise<void> {
|
|
if (!this.running) {
|
|
return;
|
|
}
|
|
|
|
// Stop flush timer
|
|
if (this.flushTimer) {
|
|
clearInterval(this.flushTimer);
|
|
}
|
|
|
|
// Flush remaining operations
|
|
await this.flush();
|
|
|
|
// Stop workers
|
|
for (const worker of this.workers) {
|
|
worker.stop();
|
|
}
|
|
|
|
this.running = false;
|
|
|
|
defaultLogger.info('Bulk indexer stopped', {
|
|
stats: this.stats,
|
|
});
|
|
}
|
|
|
|
// ============================================================================
|
|
// Operation Methods
|
|
// ============================================================================
|
|
|
|
/**
|
|
* Index a document
|
|
*/
|
|
async index<T>(index: string, id: string | undefined, document: T): Promise<void> {
|
|
await this.submit({
|
|
type: 'index',
|
|
index,
|
|
id,
|
|
document,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Create a document (fails if exists)
|
|
*/
|
|
async create<T>(index: string, id: string, document: T): Promise<void> {
|
|
await this.submit({
|
|
type: 'create',
|
|
index,
|
|
id,
|
|
document,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Update a document
|
|
*/
|
|
async update<T>(index: string, id: string, partialDocument: Partial<T>, options?: { retryOnConflict?: number }): Promise<void> {
|
|
await this.submit({
|
|
type: 'update',
|
|
index,
|
|
id,
|
|
partialDocument,
|
|
retryOnConflict: options?.retryOnConflict,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Delete a document
|
|
*/
|
|
async delete(index: string, id: string): Promise<void> {
|
|
await this.submit({
|
|
type: 'delete',
|
|
index,
|
|
id,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Submit a custom bulk operation
|
|
*/
|
|
async submit(operation: BulkOperation): Promise<void> {
|
|
// Check backpressure
|
|
const backpressure = this.getBackpressure();
|
|
if (backpressure.active) {
|
|
await this.waitForBackpressure(backpressure.recommendedWaitMs);
|
|
}
|
|
|
|
// Add to queue
|
|
this.queue.push(operation);
|
|
this.stats.totalSubmitted++;
|
|
this.stats.queueSize = this.queue.length;
|
|
|
|
// Track timestamp for ops/sec calculation
|
|
this.operationTimestamps.push(Date.now());
|
|
if (this.operationTimestamps.length > 1000) {
|
|
this.operationTimestamps.shift();
|
|
}
|
|
|
|
// Update current ops/sec
|
|
this.updateCurrentOpsPerSecond();
|
|
|
|
// Report progress if needed
|
|
this.reportProgress();
|
|
|
|
// Trigger flush if batch size reached
|
|
const batchSize = this.getCurrentBatchSize();
|
|
if (this.queue.length >= batchSize) {
|
|
this.triggerFlush();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Flush pending operations immediately
|
|
*/
|
|
async flush(): Promise<BulkBatchResult[]> {
|
|
const results: BulkBatchResult[] = [];
|
|
|
|
while (this.queue.length > 0) {
|
|
const batchSize = this.getCurrentBatchSize();
|
|
const batch = this.queue.splice(0, Math.min(batchSize, this.queue.length));
|
|
this.stats.queueSize = this.queue.length;
|
|
|
|
if (batch.length > 0) {
|
|
const result = await this.executeBatch(batch);
|
|
results.push(result);
|
|
}
|
|
}
|
|
|
|
return results;
|
|
}
|
|
|
|
/**
|
|
* Get current statistics
|
|
*/
|
|
getStats(): BulkIndexerStats {
|
|
return { ...this.stats };
|
|
}
|
|
|
|
/**
|
|
* Get backpressure state
|
|
*/
|
|
getBackpressure(): BackpressureState {
|
|
const utilization = (this.queue.length / this.config.maxQueueSize) * 100;
|
|
const active = utilization > 80;
|
|
|
|
return {
|
|
active,
|
|
queueUtilization: utilization,
|
|
recommendedWaitMs: active ? Math.min(1000, (utilization - 80) * 50) : 0,
|
|
};
|
|
}
|
|
|
|
// ============================================================================
|
|
// Private Methods
|
|
// ============================================================================
|
|
|
|
private async executeBatch(operations: BulkOperation[]): Promise<BulkBatchResult> {
|
|
const span = defaultTracing.createSpan('bulkIndexer.executeBatch', {
|
|
'batch.size': operations.length,
|
|
});
|
|
|
|
const startTime = Date.now();
|
|
this.stats.activeWorkers++;
|
|
|
|
try {
|
|
const client = ElasticsearchConnectionManager.getInstance().getClient();
|
|
|
|
// Build bulk body
|
|
const body: any[] = [];
|
|
for (const op of operations) {
|
|
const action: any = {};
|
|
action[op.type] = {
|
|
_index: op.index,
|
|
...(op.id && { _id: op.id }),
|
|
...(op.routing && { routing: op.routing }),
|
|
...(op.pipeline && { pipeline: op.pipeline }),
|
|
...(op.ifSeqNo !== undefined && { if_seq_no: op.ifSeqNo }),
|
|
...(op.ifPrimaryTerm !== undefined && { if_primary_term: op.ifPrimaryTerm }),
|
|
...(op.retryOnConflict !== undefined && { retry_on_conflict: op.retryOnConflict }),
|
|
};
|
|
body.push(action);
|
|
|
|
// Add document for index/create
|
|
if (op.type === 'index' || op.type === 'create') {
|
|
body.push(op.document);
|
|
}
|
|
|
|
// Add partial document for update
|
|
if (op.type === 'update') {
|
|
body.push({ doc: op.partialDocument });
|
|
}
|
|
}
|
|
|
|
// Execute bulk request
|
|
const response = await client.bulk({
|
|
refresh: this.config.refresh,
|
|
operations: body,
|
|
});
|
|
|
|
const durationMs = Date.now() - startTime;
|
|
|
|
// Track batch metrics
|
|
this.batchSizes.push(operations.length);
|
|
this.batchDurations.push(durationMs);
|
|
if (this.batchSizes.length > 100) {
|
|
this.batchSizes.shift();
|
|
this.batchDurations.shift();
|
|
}
|
|
this.stats.avgBatchSize = this.batchSizes.reduce((a, b) => a + b, 0) / this.batchSizes.length;
|
|
this.stats.avgBatchDurationMs = this.batchDurations.reduce((a, b) => a + b, 0) / this.batchDurations.length;
|
|
|
|
// Process results
|
|
const results: BulkOperationResult[] = [];
|
|
let successful = 0;
|
|
let failed = 0;
|
|
|
|
if (response.items) {
|
|
for (let i = 0; i < response.items.length; i++) {
|
|
const item = response.items[i];
|
|
const op = operations[i];
|
|
const actionResult = item && (item.index || item.create || item.update || item.delete);
|
|
|
|
if (actionResult) {
|
|
const success = !actionResult.error && (actionResult.status === 200 || actionResult.status === 201);
|
|
|
|
results.push({
|
|
success,
|
|
type: op?.type as BulkOperationType,
|
|
index: actionResult._index,
|
|
id: actionResult._id,
|
|
status: actionResult.status,
|
|
error: actionResult.error ? {
|
|
type: actionResult.error.type,
|
|
reason: actionResult.error.reason,
|
|
causedBy: actionResult.error.caused_by ? JSON.stringify(actionResult.error.caused_by) : undefined,
|
|
} : undefined,
|
|
seqNo: actionResult._seq_no,
|
|
primaryTerm: actionResult._primary_term,
|
|
});
|
|
|
|
if (success) {
|
|
successful++;
|
|
} else {
|
|
failed++;
|
|
// Handle failed operation
|
|
if (op) {
|
|
await this.handleFailedOperation(op, actionResult.error?.reason || 'Unknown error');
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update stats
|
|
this.stats.totalProcessed += operations.length;
|
|
this.stats.totalSuccessful += successful;
|
|
this.stats.totalFailed += failed;
|
|
this.stats.totalBatches++;
|
|
this.stats.lastBatchAt = new Date();
|
|
this.stats.activeWorkers--;
|
|
|
|
// Calculate avg ops/sec
|
|
if (this.stats.startedAt) {
|
|
const elapsedSeconds = (Date.now() - this.stats.startedAt.getTime()) / 1000;
|
|
this.stats.avgOpsPerSecond = this.stats.totalProcessed / elapsedSeconds;
|
|
}
|
|
|
|
// Record metrics
|
|
defaultMetrics.requestsTotal.inc({ operation: 'bulk', result: 'success' });
|
|
defaultMetrics.requestDuration.observe({ operation: 'bulk' }, durationMs);
|
|
|
|
const result: BulkBatchResult = {
|
|
successful,
|
|
failed,
|
|
total: operations.length,
|
|
durationMs,
|
|
results,
|
|
hasErrors: failed > 0,
|
|
};
|
|
|
|
// Callbacks
|
|
this.config.onBatchSuccess(result);
|
|
|
|
if (failed > 0) {
|
|
defaultLogger.warn('Bulk batch had errors', {
|
|
successful,
|
|
failed,
|
|
total: operations.length,
|
|
});
|
|
}
|
|
|
|
span.setAttributes({
|
|
'batch.successful': successful,
|
|
'batch.failed': failed,
|
|
'batch.duration_ms': durationMs,
|
|
});
|
|
span.end();
|
|
|
|
return result;
|
|
} catch (error) {
|
|
this.stats.totalBatchesFailed++;
|
|
this.stats.activeWorkers--;
|
|
|
|
defaultMetrics.requestErrors.inc({ operation: 'bulk' });
|
|
defaultLogger.error('Bulk batch failed', {
|
|
error: error instanceof Error ? error.message : String(error),
|
|
batchSize: operations.length,
|
|
});
|
|
|
|
this.config.onBatchError(error as Error, operations);
|
|
|
|
// Retry all operations
|
|
for (const op of operations) {
|
|
await this.handleFailedOperation(op, (error as Error).message);
|
|
}
|
|
|
|
span.recordException(error as Error);
|
|
span.end();
|
|
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
private async handleFailedOperation(operation: BulkOperation, error: string): Promise<void> {
|
|
// Find existing entry in dead-letter queue
|
|
const existingIndex = this.deadLetterQueue.findIndex(
|
|
(item) => item.operation.type === operation.type && item.operation.index === operation.index && item.operation.id === operation.id
|
|
);
|
|
|
|
const attempts = existingIndex >= 0 ? this.deadLetterQueue[existingIndex]!.attempts + 1 : 1;
|
|
|
|
if (attempts <= this.config.maxRetries) {
|
|
// Retry with delay
|
|
if (existingIndex >= 0) {
|
|
this.deadLetterQueue[existingIndex]!.attempts = attempts;
|
|
} else {
|
|
this.deadLetterQueue.push({ operation, error, attempts });
|
|
}
|
|
|
|
setTimeout(() => {
|
|
this.queue.unshift(operation); // Add to front of queue
|
|
}, this.config.retryDelayMs * attempts);
|
|
} else {
|
|
// Max retries exceeded
|
|
if (this.config.enableDeadLetterQueue) {
|
|
await this.sendToDeadLetterQueue(operation, error, attempts);
|
|
}
|
|
this.stats.totalDeadLettered++;
|
|
|
|
// Remove from retry queue
|
|
if (existingIndex >= 0) {
|
|
this.deadLetterQueue.splice(existingIndex, 1);
|
|
}
|
|
}
|
|
}
|
|
|
|
private async sendToDeadLetterQueue(operation: BulkOperation, error: string, attempts: number): Promise<void> {
|
|
try {
|
|
const client = ElasticsearchConnectionManager.getInstance().getClient();
|
|
const indexName = this.resolveDeadLetterIndexName();
|
|
|
|
await client.index({
|
|
index: indexName,
|
|
document: {
|
|
...operation,
|
|
failed_at: new Date().toISOString(),
|
|
error,
|
|
attempts,
|
|
},
|
|
});
|
|
|
|
defaultLogger.warn('Operation sent to dead-letter queue', {
|
|
index: indexName,
|
|
operation: operation.type,
|
|
error,
|
|
attempts,
|
|
});
|
|
} catch (dlqError) {
|
|
defaultLogger.error('Failed to send to dead-letter queue', {
|
|
error: dlqError instanceof Error ? dlqError.message : String(dlqError),
|
|
});
|
|
}
|
|
}
|
|
|
|
private resolveDeadLetterIndexName(): string {
|
|
const pattern = this.config.deadLetterIndex;
|
|
if (pattern.includes('{now/d}')) {
|
|
const date = new Date().toISOString().split('T')[0];
|
|
return pattern.replace('{now/d}', date);
|
|
}
|
|
return pattern;
|
|
}
|
|
|
|
private getCurrentBatchSize(): number {
|
|
switch (this.config.batchingStrategy) {
|
|
case 'fixed':
|
|
return this.config.batchSize;
|
|
|
|
case 'adaptive':
|
|
// Adjust batch size based on performance
|
|
if (this.batchDurations.length > 0) {
|
|
const avgDuration = this.stats.avgBatchDurationMs;
|
|
const targetDuration = 1000; // 1 second target
|
|
|
|
if (avgDuration > targetDuration && this.stats.avgBatchSize > this.config.minBatchSize) {
|
|
return Math.max(this.config.minBatchSize, Math.floor(this.stats.avgBatchSize * 0.8));
|
|
} else if (avgDuration < targetDuration * 0.5 && this.stats.avgBatchSize < this.config.maxBatchSize) {
|
|
return Math.min(this.config.maxBatchSize, Math.floor(this.stats.avgBatchSize * 1.2));
|
|
}
|
|
|
|
return Math.floor(this.stats.avgBatchSize);
|
|
}
|
|
return this.config.batchSize;
|
|
|
|
case 'size-based':
|
|
// Estimate based on target bytes
|
|
// For now, use fixed size as we don't have document size info
|
|
return this.config.batchSize;
|
|
|
|
default:
|
|
return this.config.batchSize;
|
|
}
|
|
}
|
|
|
|
private triggerFlush(): void {
|
|
// Signal workers that flush is needed (workers will handle it)
|
|
}
|
|
|
|
private async waitForBackpressure(ms: number): Promise<void> {
|
|
await new Promise((resolve) => setTimeout(resolve, ms));
|
|
}
|
|
|
|
private updateCurrentOpsPerSecond(): void {
|
|
if (this.operationTimestamps.length > 1) {
|
|
const now = Date.now();
|
|
const oneSecondAgo = now - 1000;
|
|
const recentOps = this.operationTimestamps.filter((ts) => ts > oneSecondAgo);
|
|
this.stats.currentOpsPerSecond = recentOps.length;
|
|
}
|
|
}
|
|
|
|
private reportProgress(): void {
|
|
const now = Date.now();
|
|
if (now - this.lastProgressReport > 1000) {
|
|
// Report every second
|
|
const progress: BulkProgress = {
|
|
totalSubmitted: this.stats.totalSubmitted,
|
|
totalProcessed: this.stats.totalProcessed,
|
|
totalSuccessful: this.stats.totalSuccessful,
|
|
totalFailed: this.stats.totalFailed,
|
|
queueSize: this.stats.queueSize,
|
|
operationsPerSecond: this.stats.currentOpsPerSecond,
|
|
avgBatchDurationMs: this.stats.avgBatchDurationMs,
|
|
estimatedTimeRemainingMs:
|
|
this.stats.currentOpsPerSecond > 0
|
|
? (this.stats.queueSize / this.stats.currentOpsPerSecond) * 1000
|
|
: undefined,
|
|
};
|
|
|
|
this.config.onProgress(progress);
|
|
this.lastProgressReport = now;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Worker for parallel batch processing
|
|
*/
|
|
class Worker {
|
|
private indexer: BulkIndexer;
|
|
private id: number;
|
|
private running = false;
|
|
|
|
constructor(indexer: BulkIndexer, id: number) {
|
|
this.indexer = indexer;
|
|
this.id = id;
|
|
}
|
|
|
|
start(): void {
|
|
this.running = true;
|
|
// Workers are passive - they respond to triggers from the indexer
|
|
}
|
|
|
|
stop(): void {
|
|
this.running = false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Create a new bulk indexer
|
|
*/
|
|
export function createBulkIndexer(config?: BulkIndexerConfig): BulkIndexer {
|
|
return new BulkIndexer(config);
|
|
}
|