import * as plugins from '../../congodb.plugins.js'; import type { ICommandHandler, IHandlerContext, ICursorState } from '../CommandRouter.js'; import { AggregationEngine } from '../../engine/AggregationEngine.js'; /** * AggregateHandler - Handles aggregate command */ export class AggregateHandler implements ICommandHandler { private cursors: Map; private nextCursorId: () => bigint; constructor( cursors: Map, nextCursorId: () => bigint ) { this.cursors = cursors; this.nextCursorId = nextCursorId; } async handle(context: IHandlerContext): Promise { const { storage, database, command } = context; const collection = command.aggregate; const pipeline = command.pipeline || []; const cursor = command.cursor || {}; const batchSize = cursor.batchSize || 101; // Validate if (typeof collection !== 'string' && collection !== 1) { return { ok: 0, errmsg: 'aggregate command requires a collection name or 1', code: 2, codeName: 'BadValue', }; } if (!Array.isArray(pipeline)) { return { ok: 0, errmsg: 'pipeline must be an array', code: 2, codeName: 'BadValue', }; } try { // Get source documents let documents: plugins.bson.Document[] = []; if (collection === 1 || collection === '1') { // Database-level aggregation (e.g., $listLocalSessions) documents = []; } else { // Collection-level aggregation const exists = await storage.collectionExists(database, collection); if (exists) { documents = await storage.findAll(database, collection); } } // Handle $lookup and $graphLookup stages that reference other collections const processedPipeline = await this.preprocessPipeline( storage, database, pipeline, documents ); // Run aggregation let results: plugins.bson.Document[]; // Check for special stages that we handle manually if (this.hasSpecialStages(pipeline)) { results = await this.executeWithSpecialStages( storage, database, documents, pipeline ); } else { results = AggregationEngine.aggregate(documents as any, processedPipeline); } // Handle $out and $merge stages const lastStage = pipeline[pipeline.length - 1]; if (lastStage && lastStage.$out) { await this.handleOut(storage, database, results, lastStage.$out); return { ok: 1, cursor: { id: plugins.bson.Long.fromNumber(0), ns: `${database}.${collection}`, firstBatch: [] } }; } if (lastStage && lastStage.$merge) { await this.handleMerge(storage, database, results, lastStage.$merge); return { ok: 1, cursor: { id: plugins.bson.Long.fromNumber(0), ns: `${database}.${collection}`, firstBatch: [] } }; } // Build cursor response const effectiveBatchSize = Math.min(batchSize, results.length); const firstBatch = results.slice(0, effectiveBatchSize); const remaining = results.slice(effectiveBatchSize); let cursorId = BigInt(0); if (remaining.length > 0) { cursorId = this.nextCursorId(); this.cursors.set(cursorId, { id: cursorId, database, collection: typeof collection === 'string' ? collection : '$cmd.aggregate', documents: remaining, position: 0, batchSize, createdAt: new Date(), }); } return { ok: 1, cursor: { id: plugins.bson.Long.fromBigInt(cursorId), ns: `${database}.${typeof collection === 'string' ? collection : '$cmd.aggregate'}`, firstBatch, }, }; } catch (error: any) { return { ok: 0, errmsg: error.message || 'Aggregation failed', code: 1, codeName: 'InternalError', }; } } /** * Preprocess pipeline to handle cross-collection lookups */ private async preprocessPipeline( storage: any, database: string, pipeline: plugins.bson.Document[], documents: plugins.bson.Document[] ): Promise { // For now, return the pipeline as-is // Cross-collection lookups are handled in executeWithSpecialStages return pipeline; } /** * Check if pipeline has stages that need special handling */ private hasSpecialStages(pipeline: plugins.bson.Document[]): boolean { return pipeline.some(stage => stage.$lookup || stage.$graphLookup || stage.$unionWith ); } /** * Execute pipeline with special stage handling */ private async executeWithSpecialStages( storage: any, database: string, documents: plugins.bson.Document[], pipeline: plugins.bson.Document[] ): Promise { let results: plugins.bson.Document[] = [...documents]; for (const stage of pipeline) { if (stage.$lookup) { const lookupSpec = stage.$lookup; const fromCollection = lookupSpec.from; // Get foreign collection documents const foreignExists = await storage.collectionExists(database, fromCollection); const foreignDocs = foreignExists ? await storage.findAll(database, fromCollection) : []; results = AggregationEngine.executeLookup(results as any, lookupSpec, foreignDocs); } else if (stage.$graphLookup) { const graphLookupSpec = stage.$graphLookup; const fromCollection = graphLookupSpec.from; const foreignExists = await storage.collectionExists(database, fromCollection); const foreignDocs = foreignExists ? await storage.findAll(database, fromCollection) : []; results = AggregationEngine.executeGraphLookup(results as any, graphLookupSpec, foreignDocs); } else if (stage.$unionWith) { let unionSpec = stage.$unionWith; let unionColl: string; let unionPipeline: plugins.bson.Document[] | undefined; if (typeof unionSpec === 'string') { unionColl = unionSpec; } else { unionColl = unionSpec.coll; unionPipeline = unionSpec.pipeline; } const unionExists = await storage.collectionExists(database, unionColl); const unionDocs = unionExists ? await storage.findAll(database, unionColl) : []; results = AggregationEngine.executeUnionWith(results as any, unionDocs, unionPipeline); } else if (stage.$facet) { // Execute each facet pipeline separately const facetResults: plugins.bson.Document = {}; for (const [facetName, facetPipeline] of Object.entries(stage.$facet)) { const facetDocs = await this.executeWithSpecialStages( storage, database, results, facetPipeline as plugins.bson.Document[] ); facetResults[facetName] = facetDocs; } results = [facetResults]; } else { // Regular stage - pass to mingo results = AggregationEngine.aggregate(results as any, [stage]); } } return results; } /** * Handle $out stage - write results to a collection */ private async handleOut( storage: any, database: string, results: plugins.bson.Document[], outSpec: string | { db?: string; coll: string } ): Promise { let targetDb = database; let targetColl: string; if (typeof outSpec === 'string') { targetColl = outSpec; } else { targetDb = outSpec.db || database; targetColl = outSpec.coll; } // Drop existing collection await storage.dropCollection(targetDb, targetColl); // Create new collection and insert results await storage.createCollection(targetDb, targetColl); for (const doc of results) { if (!doc._id) { doc._id = new plugins.bson.ObjectId(); } await storage.insertOne(targetDb, targetColl, doc); } } /** * Handle $merge stage - merge results into a collection */ private async handleMerge( storage: any, database: string, results: plugins.bson.Document[], mergeSpec: any ): Promise { let targetDb = database; let targetColl: string; if (typeof mergeSpec === 'string') { targetColl = mergeSpec; } else if (typeof mergeSpec.into === 'string') { targetColl = mergeSpec.into; } else { targetDb = mergeSpec.into.db || database; targetColl = mergeSpec.into.coll; } const on = mergeSpec.on || '_id'; const whenMatched = mergeSpec.whenMatched || 'merge'; const whenNotMatched = mergeSpec.whenNotMatched || 'insert'; // Ensure target collection exists await storage.createCollection(targetDb, targetColl); for (const doc of results) { // Find matching document const existingDocs = await storage.findAll(targetDb, targetColl); const onFields = Array.isArray(on) ? on : [on]; let matchingDoc = null; for (const existing of existingDocs) { let matches = true; for (const field of onFields) { if (JSON.stringify(existing[field]) !== JSON.stringify(doc[field])) { matches = false; break; } } if (matches) { matchingDoc = existing; break; } } if (matchingDoc) { // Handle whenMatched if (whenMatched === 'replace') { await storage.updateById(targetDb, targetColl, matchingDoc._id, doc); } else if (whenMatched === 'keepExisting') { // Do nothing } else if (whenMatched === 'merge') { const merged = { ...matchingDoc, ...doc }; await storage.updateById(targetDb, targetColl, matchingDoc._id, merged); } else if (whenMatched === 'fail') { throw new Error('Document matched but whenMatched is fail'); } } else { // Handle whenNotMatched if (whenNotMatched === 'insert') { if (!doc._id) { doc._id = new plugins.bson.ObjectId(); } await storage.insertOne(targetDb, targetColl, doc); } else if (whenNotMatched === 'discard') { // Do nothing } else if (whenNotMatched === 'fail') { throw new Error('Document not matched but whenNotMatched is fail'); } } } } }