284 lines
7.7 KiB
TypeScript
284 lines
7.7 KiB
TypeScript
import * as plugins from '../plugins.js';
|
|
import type { Document, IStoredDocument, IAggregateOptions } from '../types/interfaces.js';
|
|
|
|
// Import mingo Aggregator
|
|
import { Aggregator } from 'mingo';
|
|
|
|
/**
|
|
* Aggregation engine using mingo for MongoDB-compatible aggregation pipeline execution
|
|
*/
|
|
export class AggregationEngine {
|
|
/**
|
|
* Execute an aggregation pipeline on a collection of documents
|
|
*/
|
|
static aggregate(
|
|
documents: IStoredDocument[],
|
|
pipeline: Document[],
|
|
options?: IAggregateOptions
|
|
): Document[] {
|
|
if (!pipeline || pipeline.length === 0) {
|
|
return documents;
|
|
}
|
|
|
|
// Create mingo aggregator with the pipeline
|
|
const aggregator = new Aggregator(pipeline, {
|
|
collation: options?.collation as any,
|
|
});
|
|
|
|
// Run the aggregation
|
|
const result = aggregator.run(documents);
|
|
|
|
return Array.isArray(result) ? result : [];
|
|
}
|
|
|
|
/**
|
|
* Execute aggregation and return an iterator for lazy evaluation
|
|
*/
|
|
static *aggregateIterator(
|
|
documents: IStoredDocument[],
|
|
pipeline: Document[],
|
|
options?: IAggregateOptions
|
|
): Generator<Document> {
|
|
const aggregator = new Aggregator(pipeline, {
|
|
collation: options?.collation as any,
|
|
});
|
|
|
|
// Get the cursor from mingo
|
|
const cursor = aggregator.stream(documents);
|
|
|
|
for (const doc of cursor) {
|
|
yield doc;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Execute a $lookup stage manually (for cross-collection lookups)
|
|
* This is used when the lookup references another collection in the same database
|
|
*/
|
|
static executeLookup(
|
|
documents: IStoredDocument[],
|
|
lookupSpec: {
|
|
from: string;
|
|
localField: string;
|
|
foreignField: string;
|
|
as: string;
|
|
},
|
|
foreignCollection: IStoredDocument[]
|
|
): Document[] {
|
|
const { localField, foreignField, as } = lookupSpec;
|
|
|
|
return documents.map(doc => {
|
|
const localValue = this.getNestedValue(doc, localField);
|
|
const matches = foreignCollection.filter(foreignDoc => {
|
|
const foreignValue = this.getNestedValue(foreignDoc, foreignField);
|
|
return this.valuesMatch(localValue, foreignValue);
|
|
});
|
|
|
|
return {
|
|
...doc,
|
|
[as]: matches,
|
|
};
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Execute a $graphLookup stage manually
|
|
*/
|
|
static executeGraphLookup(
|
|
documents: IStoredDocument[],
|
|
graphLookupSpec: {
|
|
from: string;
|
|
startWith: string | Document;
|
|
connectFromField: string;
|
|
connectToField: string;
|
|
as: string;
|
|
maxDepth?: number;
|
|
depthField?: string;
|
|
restrictSearchWithMatch?: Document;
|
|
},
|
|
foreignCollection: IStoredDocument[]
|
|
): Document[] {
|
|
const {
|
|
startWith,
|
|
connectFromField,
|
|
connectToField,
|
|
as,
|
|
maxDepth = 10,
|
|
depthField,
|
|
restrictSearchWithMatch,
|
|
} = graphLookupSpec;
|
|
|
|
return documents.map(doc => {
|
|
const startValue = typeof startWith === 'string' && startWith.startsWith('$')
|
|
? this.getNestedValue(doc, startWith.slice(1))
|
|
: startWith;
|
|
|
|
const results: Document[] = [];
|
|
const visited = new Set<string>();
|
|
const queue: Array<{ value: any; depth: number }> = [];
|
|
|
|
// Initialize with start value(s)
|
|
const startValues = Array.isArray(startValue) ? startValue : [startValue];
|
|
for (const val of startValues) {
|
|
queue.push({ value: val, depth: 0 });
|
|
}
|
|
|
|
while (queue.length > 0) {
|
|
const { value, depth } = queue.shift()!;
|
|
if (depth > maxDepth) continue;
|
|
|
|
const valueKey = JSON.stringify(value);
|
|
if (visited.has(valueKey)) continue;
|
|
visited.add(valueKey);
|
|
|
|
// Find matching documents
|
|
for (const foreignDoc of foreignCollection) {
|
|
const foreignValue = this.getNestedValue(foreignDoc, connectToField);
|
|
|
|
if (this.valuesMatch(value, foreignValue)) {
|
|
// Check restrictSearchWithMatch
|
|
if (restrictSearchWithMatch) {
|
|
const matchQuery = new plugins.mingo.Query(restrictSearchWithMatch);
|
|
if (!matchQuery.test(foreignDoc)) continue;
|
|
}
|
|
|
|
const resultDoc = depthField
|
|
? { ...foreignDoc, [depthField]: depth }
|
|
: { ...foreignDoc };
|
|
|
|
// Avoid duplicates in results
|
|
const docKey = foreignDoc._id.toHexString();
|
|
if (!results.some(r => r._id?.toHexString?.() === docKey)) {
|
|
results.push(resultDoc);
|
|
|
|
// Add connected values to queue
|
|
const nextValue = this.getNestedValue(foreignDoc, connectFromField);
|
|
if (nextValue !== undefined) {
|
|
const nextValues = Array.isArray(nextValue) ? nextValue : [nextValue];
|
|
for (const nv of nextValues) {
|
|
queue.push({ value: nv, depth: depth + 1 });
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return {
|
|
...doc,
|
|
[as]: results,
|
|
};
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Execute a $facet stage manually
|
|
*/
|
|
static executeFacet(
|
|
documents: IStoredDocument[],
|
|
facetSpec: Record<string, Document[]>
|
|
): Document {
|
|
const result: Document = {};
|
|
|
|
for (const [facetName, pipeline] of Object.entries(facetSpec)) {
|
|
result[facetName] = this.aggregate(documents, pipeline);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Execute a $unionWith stage
|
|
*/
|
|
static executeUnionWith(
|
|
documents: IStoredDocument[],
|
|
otherDocuments: IStoredDocument[],
|
|
pipeline?: Document[]
|
|
): Document[] {
|
|
let unionDocs: Document[] = otherDocuments;
|
|
if (pipeline && pipeline.length > 0) {
|
|
unionDocs = this.aggregate(otherDocuments, pipeline);
|
|
}
|
|
return [...documents, ...unionDocs];
|
|
}
|
|
|
|
/**
|
|
* Execute a $merge stage (output to another collection)
|
|
* Returns the documents that would be inserted/updated
|
|
*/
|
|
static prepareMerge(
|
|
documents: Document[],
|
|
mergeSpec: {
|
|
into: string;
|
|
on?: string | string[];
|
|
whenMatched?: 'replace' | 'keepExisting' | 'merge' | 'fail' | Document[];
|
|
whenNotMatched?: 'insert' | 'discard' | 'fail';
|
|
}
|
|
): {
|
|
toInsert: Document[];
|
|
toUpdate: Array<{ filter: Document; update: Document }>;
|
|
onField: string | string[];
|
|
whenMatched: string | Document[];
|
|
whenNotMatched: string;
|
|
} {
|
|
const onField = mergeSpec.on || '_id';
|
|
const whenMatched = mergeSpec.whenMatched || 'merge';
|
|
const whenNotMatched = mergeSpec.whenNotMatched || 'insert';
|
|
|
|
return {
|
|
toInsert: [],
|
|
toUpdate: [],
|
|
onField,
|
|
whenMatched,
|
|
whenNotMatched,
|
|
};
|
|
}
|
|
|
|
// ============================================================================
|
|
// Helper Methods
|
|
// ============================================================================
|
|
|
|
private static getNestedValue(obj: any, path: string): any {
|
|
const parts = path.split('.');
|
|
let current = obj;
|
|
|
|
for (const part of parts) {
|
|
if (current === null || current === undefined) {
|
|
return undefined;
|
|
}
|
|
current = current[part];
|
|
}
|
|
|
|
return current;
|
|
}
|
|
|
|
private static valuesMatch(a: any, b: any): boolean {
|
|
if (a === b) return true;
|
|
|
|
// Handle ObjectId comparison
|
|
if (a instanceof plugins.bson.ObjectId && b instanceof plugins.bson.ObjectId) {
|
|
return a.equals(b);
|
|
}
|
|
|
|
// Handle array contains check
|
|
if (Array.isArray(a)) {
|
|
return a.some(item => this.valuesMatch(item, b));
|
|
}
|
|
if (Array.isArray(b)) {
|
|
return b.some(item => this.valuesMatch(a, item));
|
|
}
|
|
|
|
// Handle Date comparison
|
|
if (a instanceof Date && b instanceof Date) {
|
|
return a.getTime() === b.getTime();
|
|
}
|
|
|
|
// Handle object comparison
|
|
if (typeof a === 'object' && typeof b === 'object' && a !== null && b !== null) {
|
|
return JSON.stringify(a) === JSON.stringify(b);
|
|
}
|
|
|
|
return false;
|
|
}
|
|
}
|