2026-02-01 14:34:07 +00:00
|
|
|
import * as plugins from '../../tsmdb.plugins.js';
|
2026-01-31 11:33:11 +00:00
|
|
|
import type { ICommandHandler, IHandlerContext, ICursorState } from '../CommandRouter.js';
|
|
|
|
|
import { AggregationEngine } from '../../engine/AggregationEngine.js';
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* AggregateHandler - Handles aggregate command
|
|
|
|
|
*/
|
|
|
|
|
export class AggregateHandler implements ICommandHandler {
|
|
|
|
|
private cursors: Map<bigint, ICursorState>;
|
|
|
|
|
private nextCursorId: () => bigint;
|
|
|
|
|
|
|
|
|
|
constructor(
|
|
|
|
|
cursors: Map<bigint, ICursorState>,
|
|
|
|
|
nextCursorId: () => bigint
|
|
|
|
|
) {
|
|
|
|
|
this.cursors = cursors;
|
|
|
|
|
this.nextCursorId = nextCursorId;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async handle(context: IHandlerContext): Promise<plugins.bson.Document> {
|
|
|
|
|
const { storage, database, command } = context;
|
|
|
|
|
|
|
|
|
|
const collection = command.aggregate;
|
|
|
|
|
const pipeline = command.pipeline || [];
|
|
|
|
|
const cursor = command.cursor || {};
|
|
|
|
|
const batchSize = cursor.batchSize || 101;
|
|
|
|
|
|
|
|
|
|
// Validate
|
|
|
|
|
if (typeof collection !== 'string' && collection !== 1) {
|
|
|
|
|
return {
|
|
|
|
|
ok: 0,
|
|
|
|
|
errmsg: 'aggregate command requires a collection name or 1',
|
|
|
|
|
code: 2,
|
|
|
|
|
codeName: 'BadValue',
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!Array.isArray(pipeline)) {
|
|
|
|
|
return {
|
|
|
|
|
ok: 0,
|
|
|
|
|
errmsg: 'pipeline must be an array',
|
|
|
|
|
code: 2,
|
|
|
|
|
codeName: 'BadValue',
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
// Get source documents
|
|
|
|
|
let documents: plugins.bson.Document[] = [];
|
|
|
|
|
|
|
|
|
|
if (collection === 1 || collection === '1') {
|
|
|
|
|
// Database-level aggregation (e.g., $listLocalSessions)
|
|
|
|
|
documents = [];
|
|
|
|
|
} else {
|
|
|
|
|
// Collection-level aggregation
|
|
|
|
|
const exists = await storage.collectionExists(database, collection);
|
|
|
|
|
if (exists) {
|
|
|
|
|
documents = await storage.findAll(database, collection);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Handle $lookup and $graphLookup stages that reference other collections
|
|
|
|
|
const processedPipeline = await this.preprocessPipeline(
|
|
|
|
|
storage,
|
|
|
|
|
database,
|
|
|
|
|
pipeline,
|
|
|
|
|
documents
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Run aggregation
|
|
|
|
|
let results: plugins.bson.Document[];
|
|
|
|
|
|
|
|
|
|
// Check for special stages that we handle manually
|
|
|
|
|
if (this.hasSpecialStages(pipeline)) {
|
|
|
|
|
results = await this.executeWithSpecialStages(
|
|
|
|
|
storage,
|
|
|
|
|
database,
|
|
|
|
|
documents,
|
|
|
|
|
pipeline
|
|
|
|
|
);
|
|
|
|
|
} else {
|
|
|
|
|
results = AggregationEngine.aggregate(documents as any, processedPipeline);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Handle $out and $merge stages
|
|
|
|
|
const lastStage = pipeline[pipeline.length - 1];
|
|
|
|
|
if (lastStage && lastStage.$out) {
|
|
|
|
|
await this.handleOut(storage, database, results, lastStage.$out);
|
|
|
|
|
return { ok: 1, cursor: { id: plugins.bson.Long.fromNumber(0), ns: `${database}.${collection}`, firstBatch: [] } };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (lastStage && lastStage.$merge) {
|
|
|
|
|
await this.handleMerge(storage, database, results, lastStage.$merge);
|
|
|
|
|
return { ok: 1, cursor: { id: plugins.bson.Long.fromNumber(0), ns: `${database}.${collection}`, firstBatch: [] } };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build cursor response
|
|
|
|
|
const effectiveBatchSize = Math.min(batchSize, results.length);
|
|
|
|
|
const firstBatch = results.slice(0, effectiveBatchSize);
|
|
|
|
|
const remaining = results.slice(effectiveBatchSize);
|
|
|
|
|
|
|
|
|
|
let cursorId = BigInt(0);
|
|
|
|
|
if (remaining.length > 0) {
|
|
|
|
|
cursorId = this.nextCursorId();
|
|
|
|
|
this.cursors.set(cursorId, {
|
|
|
|
|
id: cursorId,
|
|
|
|
|
database,
|
|
|
|
|
collection: typeof collection === 'string' ? collection : '$cmd.aggregate',
|
|
|
|
|
documents: remaining,
|
|
|
|
|
position: 0,
|
|
|
|
|
batchSize,
|
|
|
|
|
createdAt: new Date(),
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
ok: 1,
|
|
|
|
|
cursor: {
|
|
|
|
|
id: plugins.bson.Long.fromBigInt(cursorId),
|
|
|
|
|
ns: `${database}.${typeof collection === 'string' ? collection : '$cmd.aggregate'}`,
|
|
|
|
|
firstBatch,
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
} catch (error: any) {
|
|
|
|
|
return {
|
|
|
|
|
ok: 0,
|
|
|
|
|
errmsg: error.message || 'Aggregation failed',
|
|
|
|
|
code: 1,
|
|
|
|
|
codeName: 'InternalError',
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Preprocess pipeline to handle cross-collection lookups
|
|
|
|
|
*/
|
|
|
|
|
private async preprocessPipeline(
|
|
|
|
|
storage: any,
|
|
|
|
|
database: string,
|
|
|
|
|
pipeline: plugins.bson.Document[],
|
|
|
|
|
documents: plugins.bson.Document[]
|
|
|
|
|
): Promise<plugins.bson.Document[]> {
|
|
|
|
|
// For now, return the pipeline as-is
|
|
|
|
|
// Cross-collection lookups are handled in executeWithSpecialStages
|
|
|
|
|
return pipeline;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Check if pipeline has stages that need special handling
|
|
|
|
|
*/
|
|
|
|
|
private hasSpecialStages(pipeline: plugins.bson.Document[]): boolean {
|
|
|
|
|
return pipeline.some(stage =>
|
|
|
|
|
stage.$lookup ||
|
|
|
|
|
stage.$graphLookup ||
|
|
|
|
|
stage.$unionWith
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Execute pipeline with special stage handling
|
|
|
|
|
*/
|
|
|
|
|
private async executeWithSpecialStages(
|
|
|
|
|
storage: any,
|
|
|
|
|
database: string,
|
|
|
|
|
documents: plugins.bson.Document[],
|
|
|
|
|
pipeline: plugins.bson.Document[]
|
|
|
|
|
): Promise<plugins.bson.Document[]> {
|
|
|
|
|
let results: plugins.bson.Document[] = [...documents];
|
|
|
|
|
|
|
|
|
|
for (const stage of pipeline) {
|
|
|
|
|
if (stage.$lookup) {
|
|
|
|
|
const lookupSpec = stage.$lookup;
|
|
|
|
|
const fromCollection = lookupSpec.from;
|
|
|
|
|
|
|
|
|
|
// Get foreign collection documents
|
|
|
|
|
const foreignExists = await storage.collectionExists(database, fromCollection);
|
|
|
|
|
const foreignDocs = foreignExists
|
|
|
|
|
? await storage.findAll(database, fromCollection)
|
|
|
|
|
: [];
|
|
|
|
|
|
|
|
|
|
results = AggregationEngine.executeLookup(results as any, lookupSpec, foreignDocs);
|
|
|
|
|
} else if (stage.$graphLookup) {
|
|
|
|
|
const graphLookupSpec = stage.$graphLookup;
|
|
|
|
|
const fromCollection = graphLookupSpec.from;
|
|
|
|
|
|
|
|
|
|
const foreignExists = await storage.collectionExists(database, fromCollection);
|
|
|
|
|
const foreignDocs = foreignExists
|
|
|
|
|
? await storage.findAll(database, fromCollection)
|
|
|
|
|
: [];
|
|
|
|
|
|
|
|
|
|
results = AggregationEngine.executeGraphLookup(results as any, graphLookupSpec, foreignDocs);
|
|
|
|
|
} else if (stage.$unionWith) {
|
|
|
|
|
let unionSpec = stage.$unionWith;
|
|
|
|
|
let unionColl: string;
|
|
|
|
|
let unionPipeline: plugins.bson.Document[] | undefined;
|
|
|
|
|
|
|
|
|
|
if (typeof unionSpec === 'string') {
|
|
|
|
|
unionColl = unionSpec;
|
|
|
|
|
} else {
|
|
|
|
|
unionColl = unionSpec.coll;
|
|
|
|
|
unionPipeline = unionSpec.pipeline;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const unionExists = await storage.collectionExists(database, unionColl);
|
|
|
|
|
const unionDocs = unionExists
|
|
|
|
|
? await storage.findAll(database, unionColl)
|
|
|
|
|
: [];
|
|
|
|
|
|
|
|
|
|
results = AggregationEngine.executeUnionWith(results as any, unionDocs, unionPipeline);
|
|
|
|
|
} else if (stage.$facet) {
|
|
|
|
|
// Execute each facet pipeline separately
|
|
|
|
|
const facetResults: plugins.bson.Document = {};
|
|
|
|
|
|
|
|
|
|
for (const [facetName, facetPipeline] of Object.entries(stage.$facet)) {
|
|
|
|
|
const facetDocs = await this.executeWithSpecialStages(
|
|
|
|
|
storage,
|
|
|
|
|
database,
|
|
|
|
|
results,
|
|
|
|
|
facetPipeline as plugins.bson.Document[]
|
|
|
|
|
);
|
|
|
|
|
facetResults[facetName] = facetDocs;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
results = [facetResults];
|
|
|
|
|
} else {
|
|
|
|
|
// Regular stage - pass to mingo
|
|
|
|
|
results = AggregationEngine.aggregate(results as any, [stage]);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return results;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Handle $out stage - write results to a collection
|
|
|
|
|
*/
|
|
|
|
|
private async handleOut(
|
|
|
|
|
storage: any,
|
|
|
|
|
database: string,
|
|
|
|
|
results: plugins.bson.Document[],
|
|
|
|
|
outSpec: string | { db?: string; coll: string }
|
|
|
|
|
): Promise<void> {
|
|
|
|
|
let targetDb = database;
|
|
|
|
|
let targetColl: string;
|
|
|
|
|
|
|
|
|
|
if (typeof outSpec === 'string') {
|
|
|
|
|
targetColl = outSpec;
|
|
|
|
|
} else {
|
|
|
|
|
targetDb = outSpec.db || database;
|
|
|
|
|
targetColl = outSpec.coll;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Drop existing collection
|
|
|
|
|
await storage.dropCollection(targetDb, targetColl);
|
|
|
|
|
|
|
|
|
|
// Create new collection and insert results
|
|
|
|
|
await storage.createCollection(targetDb, targetColl);
|
|
|
|
|
|
|
|
|
|
for (const doc of results) {
|
|
|
|
|
if (!doc._id) {
|
|
|
|
|
doc._id = new plugins.bson.ObjectId();
|
|
|
|
|
}
|
|
|
|
|
await storage.insertOne(targetDb, targetColl, doc);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Handle $merge stage - merge results into a collection
|
|
|
|
|
*/
|
|
|
|
|
private async handleMerge(
|
|
|
|
|
storage: any,
|
|
|
|
|
database: string,
|
|
|
|
|
results: plugins.bson.Document[],
|
|
|
|
|
mergeSpec: any
|
|
|
|
|
): Promise<void> {
|
|
|
|
|
let targetDb = database;
|
|
|
|
|
let targetColl: string;
|
|
|
|
|
|
|
|
|
|
if (typeof mergeSpec === 'string') {
|
|
|
|
|
targetColl = mergeSpec;
|
|
|
|
|
} else if (typeof mergeSpec.into === 'string') {
|
|
|
|
|
targetColl = mergeSpec.into;
|
|
|
|
|
} else {
|
|
|
|
|
targetDb = mergeSpec.into.db || database;
|
|
|
|
|
targetColl = mergeSpec.into.coll;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const on = mergeSpec.on || '_id';
|
|
|
|
|
const whenMatched = mergeSpec.whenMatched || 'merge';
|
|
|
|
|
const whenNotMatched = mergeSpec.whenNotMatched || 'insert';
|
|
|
|
|
|
|
|
|
|
// Ensure target collection exists
|
|
|
|
|
await storage.createCollection(targetDb, targetColl);
|
|
|
|
|
|
|
|
|
|
for (const doc of results) {
|
|
|
|
|
// Find matching document
|
|
|
|
|
const existingDocs = await storage.findAll(targetDb, targetColl);
|
|
|
|
|
const onFields = Array.isArray(on) ? on : [on];
|
|
|
|
|
|
|
|
|
|
let matchingDoc = null;
|
|
|
|
|
for (const existing of existingDocs) {
|
|
|
|
|
let matches = true;
|
|
|
|
|
for (const field of onFields) {
|
|
|
|
|
if (JSON.stringify(existing[field]) !== JSON.stringify(doc[field])) {
|
|
|
|
|
matches = false;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (matches) {
|
|
|
|
|
matchingDoc = existing;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (matchingDoc) {
|
|
|
|
|
// Handle whenMatched
|
|
|
|
|
if (whenMatched === 'replace') {
|
|
|
|
|
await storage.updateById(targetDb, targetColl, matchingDoc._id, doc);
|
|
|
|
|
} else if (whenMatched === 'keepExisting') {
|
|
|
|
|
// Do nothing
|
|
|
|
|
} else if (whenMatched === 'merge') {
|
|
|
|
|
const merged = { ...matchingDoc, ...doc };
|
|
|
|
|
await storage.updateById(targetDb, targetColl, matchingDoc._id, merged);
|
|
|
|
|
} else if (whenMatched === 'fail') {
|
|
|
|
|
throw new Error('Document matched but whenMatched is fail');
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// Handle whenNotMatched
|
|
|
|
|
if (whenNotMatched === 'insert') {
|
|
|
|
|
if (!doc._id) {
|
|
|
|
|
doc._id = new plugins.bson.ObjectId();
|
|
|
|
|
}
|
|
|
|
|
await storage.insertOne(targetDb, targetColl, doc);
|
|
|
|
|
} else if (whenNotMatched === 'discard') {
|
|
|
|
|
// Do nothing
|
|
|
|
|
} else if (whenNotMatched === 'fail') {
|
|
|
|
|
throw new Error('Document not matched but whenNotMatched is fail');
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|