- Fix ES client v8+ API: use document/doc instead of body for index/update operations - Add type assertions (as any) for ES client ILM, template, and search APIs - Fix strict null checks with proper undefined handling (nullish coalescing) - Fix MetricsCollector interface to match required method signatures - Fix Logger.error signature compatibility in plugins - Resolve TermsQuery type index signature conflict - Remove sourceMap from tsconfig (handled by tsbuild with inlineSourceMap)
359 lines
8.6 KiB
TypeScript
359 lines
8.6 KiB
TypeScript
import type { Client as ElasticClient } from '@elastic/elasticsearch';
|
|
import type {
|
|
BatchOperation,
|
|
BatchResult,
|
|
SessionConfig,
|
|
} from './types.js';
|
|
import { DocumentOperation } from './types.js';
|
|
import { Logger } from '../../core/observability/logger.js';
|
|
import { BulkOperationError } from '../../core/errors/elasticsearch-error.js';
|
|
|
|
/**
|
|
* Document session for managing document lifecycle
|
|
*
|
|
* Tracks documents during a session and can clean up stale ones at the end.
|
|
*/
|
|
export class DocumentSession<T = unknown> {
|
|
private operations: BatchOperation<T>[] = [];
|
|
private seenDocuments = new Set<string>();
|
|
private config: Required<SessionConfig>;
|
|
private startTimestamp: Date;
|
|
private isActive = false;
|
|
|
|
constructor(
|
|
private client: ElasticClient,
|
|
private index: string,
|
|
private logger: Logger,
|
|
config: SessionConfig = {}
|
|
) {
|
|
this.config = {
|
|
onlyNew: config.onlyNew || false,
|
|
fromTimestamp: config.fromTimestamp || new Date(),
|
|
cleanupStale: config.cleanupStale !== false,
|
|
batchSize: config.batchSize || 1000,
|
|
};
|
|
this.startTimestamp = new Date();
|
|
}
|
|
|
|
/**
|
|
* Start the session
|
|
*/
|
|
start(): this {
|
|
if (this.isActive) {
|
|
throw new Error('Session already active');
|
|
}
|
|
|
|
this.isActive = true;
|
|
this.operations = [];
|
|
this.seenDocuments.clear();
|
|
this.startTimestamp = new Date();
|
|
|
|
this.logger.debug('Document session started', {
|
|
index: this.index,
|
|
config: this.config,
|
|
});
|
|
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Add a document (upsert - create or update)
|
|
*/
|
|
upsert(documentId: string, document: T): this {
|
|
this.ensureActive();
|
|
|
|
this.operations.push({
|
|
operation: DocumentOperation.UPSERT,
|
|
documentId,
|
|
document,
|
|
});
|
|
|
|
this.seenDocuments.add(documentId);
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Create a document (fails if exists)
|
|
*/
|
|
create(documentId: string, document: T): this {
|
|
this.ensureActive();
|
|
|
|
this.operations.push({
|
|
operation: DocumentOperation.CREATE,
|
|
documentId,
|
|
document,
|
|
});
|
|
|
|
this.seenDocuments.add(documentId);
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Update a document (fails if doesn't exist)
|
|
*/
|
|
update(documentId: string, document: T, version?: { seqNo: number; primaryTerm: number }): this {
|
|
this.ensureActive();
|
|
|
|
this.operations.push({
|
|
operation: DocumentOperation.UPDATE,
|
|
documentId,
|
|
document,
|
|
...(version && {
|
|
seqNo: version.seqNo,
|
|
primaryTerm: version.primaryTerm,
|
|
}),
|
|
});
|
|
|
|
this.seenDocuments.add(documentId);
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Delete a document
|
|
*/
|
|
delete(documentId: string): this {
|
|
this.ensureActive();
|
|
|
|
this.operations.push({
|
|
operation: DocumentOperation.DELETE,
|
|
documentId,
|
|
});
|
|
|
|
return this;
|
|
}
|
|
|
|
/**
|
|
* Commit the session and execute all operations
|
|
*/
|
|
async commit(): Promise<BatchResult> {
|
|
this.ensureActive();
|
|
|
|
try {
|
|
// Execute batched operations
|
|
const result = await this.executeBatch();
|
|
|
|
// Clean up stale documents if configured
|
|
if (this.config.cleanupStale) {
|
|
await this.cleanupStaleDocuments();
|
|
}
|
|
|
|
this.isActive = false;
|
|
|
|
this.logger.info('Session committed', {
|
|
index: this.index,
|
|
successful: result.successful,
|
|
failed: result.failed,
|
|
duration: Date.now() - this.startTimestamp.getTime(),
|
|
});
|
|
|
|
return result;
|
|
} catch (error) {
|
|
this.logger.error('Session commit failed', error as Error, {
|
|
index: this.index,
|
|
operationCount: this.operations.length,
|
|
});
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Rollback the session (discard all operations)
|
|
*/
|
|
rollback(): void {
|
|
this.operations = [];
|
|
this.seenDocuments.clear();
|
|
this.isActive = false;
|
|
|
|
this.logger.debug('Session rolled back', { index: this.index });
|
|
}
|
|
|
|
/**
|
|
* Execute batch operations
|
|
*/
|
|
private async executeBatch(): Promise<BatchResult> {
|
|
if (this.operations.length === 0) {
|
|
return {
|
|
successful: 0,
|
|
failed: 0,
|
|
errors: [],
|
|
took: 0,
|
|
};
|
|
}
|
|
|
|
const startTime = Date.now();
|
|
const bulkBody: any[] = [];
|
|
|
|
// Build bulk request body
|
|
for (const op of this.operations) {
|
|
switch (op.operation) {
|
|
case DocumentOperation.CREATE:
|
|
bulkBody.push({ create: { _index: this.index, _id: op.documentId } });
|
|
bulkBody.push(op.document);
|
|
break;
|
|
|
|
case DocumentOperation.UPDATE:
|
|
bulkBody.push({
|
|
update: {
|
|
_index: this.index,
|
|
_id: op.documentId,
|
|
...(op.seqNo !== undefined && { if_seq_no: op.seqNo }),
|
|
...(op.primaryTerm !== undefined && { if_primary_term: op.primaryTerm }),
|
|
},
|
|
});
|
|
bulkBody.push({ doc: op.document });
|
|
break;
|
|
|
|
case DocumentOperation.UPSERT:
|
|
bulkBody.push({ index: { _index: this.index, _id: op.documentId } });
|
|
bulkBody.push(op.document);
|
|
break;
|
|
|
|
case DocumentOperation.DELETE:
|
|
bulkBody.push({ delete: { _index: this.index, _id: op.documentId } });
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Execute bulk request
|
|
const response = await this.client.bulk({
|
|
body: bulkBody,
|
|
refresh: true, // Make changes immediately visible
|
|
});
|
|
|
|
const took = Date.now() - startTime;
|
|
|
|
// Process results
|
|
let successful = 0;
|
|
let failed = 0;
|
|
const errors: Array<{
|
|
documentId: string;
|
|
operation: DocumentOperation;
|
|
error: string;
|
|
statusCode: number;
|
|
}> = [];
|
|
|
|
if (response.errors) {
|
|
for (let i = 0; i < response.items.length; i++) {
|
|
const item = response.items[i];
|
|
const operation = this.operations[i];
|
|
|
|
if (!item || !operation) continue;
|
|
|
|
const action = Object.keys(item)[0];
|
|
if (!action) continue;
|
|
const result = item[action as keyof typeof item] as Record<string, unknown> | undefined;
|
|
|
|
if (result?.error) {
|
|
failed++;
|
|
const errorInfo = result.error as { reason?: string } | string;
|
|
errors.push({
|
|
documentId: operation.documentId,
|
|
operation: operation.operation,
|
|
error: typeof errorInfo === 'string' ? errorInfo : (errorInfo.reason ?? 'Unknown error'),
|
|
statusCode: result.status as number,
|
|
});
|
|
} else {
|
|
successful++;
|
|
}
|
|
}
|
|
} else {
|
|
successful = response.items.length;
|
|
}
|
|
|
|
const result: BatchResult = {
|
|
successful,
|
|
failed,
|
|
errors,
|
|
took,
|
|
};
|
|
|
|
if (failed > 0) {
|
|
this.logger.warn('Batch operation had failures', {
|
|
successful,
|
|
failed,
|
|
errors: errors.slice(0, 5), // Log first 5 errors
|
|
});
|
|
|
|
if (failed === this.operations.length) {
|
|
// Complete failure
|
|
throw new BulkOperationError(
|
|
'All bulk operations failed',
|
|
successful,
|
|
failed,
|
|
errors.map(e => ({ documentId: e.documentId, error: e.error, status: e.statusCode }))
|
|
);
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Clean up documents not seen in this session
|
|
*/
|
|
private async cleanupStaleDocuments(): Promise<void> {
|
|
if (this.seenDocuments.size === 0) {
|
|
return; // No documents to keep, skip cleanup
|
|
}
|
|
|
|
this.logger.debug('Cleaning up stale documents', {
|
|
index: this.index,
|
|
seenCount: this.seenDocuments.size,
|
|
});
|
|
|
|
try {
|
|
// Use deleteByQuery to remove documents not in seen set
|
|
// This is more efficient than the old scroll-and-compare approach
|
|
const seenIds = Array.from(this.seenDocuments);
|
|
|
|
await this.client.deleteByQuery({
|
|
index: this.index,
|
|
query: {
|
|
bool: {
|
|
must_not: {
|
|
ids: {
|
|
values: seenIds,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
refresh: true,
|
|
});
|
|
|
|
this.logger.debug('Stale documents cleaned up', { index: this.index });
|
|
} catch (error) {
|
|
this.logger.warn('Failed to cleanup stale documents', {
|
|
index: this.index,
|
|
error: (error as Error).message,
|
|
});
|
|
// Don't throw - cleanup is best-effort
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Ensure session is active
|
|
*/
|
|
private ensureActive(): void {
|
|
if (!this.isActive) {
|
|
throw new Error('Session not active. Call start() first.');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get session statistics
|
|
*/
|
|
getStats(): {
|
|
isActive: boolean;
|
|
operationCount: number;
|
|
seenDocumentCount: number;
|
|
startTime: Date;
|
|
} {
|
|
return {
|
|
isActive: this.isActive,
|
|
operationCount: this.operations.length,
|
|
seenDocumentCount: this.seenDocuments.size,
|
|
startTime: this.startTimestamp,
|
|
};
|
|
}
|
|
}
|