777 lines
24 KiB
TypeScript
777 lines
24 KiB
TypeScript
import * as plugins from '../plugins.js';
|
|
import type { TsView } from '../tsview.classes.tsview.js';
|
|
import type * as interfaces from './interfaces.streaming.js';
|
|
import type { IStorageChangeEvent } from './interfaces.streaming.js';
|
|
|
|
/**
|
|
* Subscription entry tracking a client's subscription to a resource
|
|
*/
|
|
interface ISubscriptionEntry {
|
|
connectionId: string;
|
|
subscriptionId: string;
|
|
createdAt: Date;
|
|
}
|
|
|
|
/**
|
|
* MongoDB watcher entry
|
|
*/
|
|
interface IMongoWatcherEntry {
|
|
watcher: plugins.mongodb.ChangeStream;
|
|
subscriptions: Map<string, ISubscriptionEntry>; // connectionId -> subscription
|
|
}
|
|
|
|
/**
|
|
* Storage watcher entry
|
|
*/
|
|
interface IStorageWatcherEntry {
|
|
watcher: plugins.smartbucket.BucketWatcher;
|
|
subscriptions: Map<string, ISubscriptionEntry>; // connectionId -> subscription
|
|
}
|
|
|
|
/**
|
|
* ChangeStreamManager manages real-time change streaming for both MongoDB and storage.
|
|
*
|
|
* Features:
|
|
* - MongoDB Change Streams for real-time database updates
|
|
* - S3 BucketWatcher for polling-based storage change detection
|
|
* - Subscription management per WebSocket client
|
|
* - Activity stream with ring buffer for recent events
|
|
* - Automatic cleanup on client disconnect
|
|
*/
|
|
export class ChangeStreamManager {
|
|
private tsview: TsView;
|
|
private typedSocket: plugins.typedserver.TypedServer['typedsocket'] | null = null;
|
|
|
|
// MongoDB watchers: "db/collection" -> watcher entry
|
|
private mongoWatchers: Map<string, IMongoWatcherEntry> = new Map();
|
|
|
|
// Storage watchers: "bucket/prefix" -> watcher entry
|
|
private storageWatchers: Map<string, IStorageWatcherEntry> = new Map();
|
|
|
|
// Activity subscribers: connectionId -> subscription entry
|
|
private activitySubscribers: Map<string, ISubscriptionEntry> = new Map();
|
|
|
|
// Activity ring buffer (max 1000 events)
|
|
private activityBuffer: interfaces.IActivityEvent[] = [];
|
|
private readonly ACTIVITY_BUFFER_SIZE = 1000;
|
|
|
|
// Global watchers for the activity stream (started lazily on first subscriber)
|
|
private globalMongoWatcher: plugins.mongodb.ChangeStream | null = null;
|
|
private globalStorageWatchers: Map<string, plugins.smartbucket.BucketWatcher> = new Map();
|
|
private globalWatchersActive: boolean = false;
|
|
|
|
// Counter for generating unique subscription IDs
|
|
private subscriptionCounter = 0;
|
|
|
|
constructor(tsview: TsView) {
|
|
this.tsview = tsview;
|
|
}
|
|
|
|
/**
|
|
* Initialize the manager with a TypedSocket instance
|
|
*/
|
|
public setTypedSocket(typedSocket: plugins.typedserver.TypedServer['typedsocket']): void {
|
|
this.typedSocket = typedSocket;
|
|
}
|
|
|
|
/**
|
|
* Generate a unique subscription ID
|
|
*/
|
|
private generateSubscriptionId(): string {
|
|
return `sub_${Date.now()}_${++this.subscriptionCounter}`;
|
|
}
|
|
|
|
/**
|
|
* Get the MongoDB key for a database/collection pair
|
|
*/
|
|
private getMongoKey(database: string, collection: string): string {
|
|
return `${database}/${collection}`;
|
|
}
|
|
|
|
/**
|
|
* Get the storage key for a bucket/prefix pair
|
|
*/
|
|
private getStorageKey(bucket: string, prefix?: string): string {
|
|
return prefix ? `${bucket}/${prefix}` : bucket;
|
|
}
|
|
|
|
// ===========================================
|
|
// MongoDB Change Streams
|
|
// ===========================================
|
|
|
|
/**
|
|
* Subscribe a client to MongoDB collection changes
|
|
*/
|
|
public async subscribeToMongo(
|
|
connectionId: string,
|
|
database: string,
|
|
collection: string
|
|
): Promise<{ success: boolean; subscriptionId: string }> {
|
|
const key = this.getMongoKey(database, collection);
|
|
|
|
let entry = this.mongoWatchers.get(key);
|
|
|
|
// Create watcher if it doesn't exist
|
|
if (!entry) {
|
|
const watcher = await this.createMongoWatcher(database, collection);
|
|
if (!watcher) {
|
|
return { success: false, subscriptionId: '' };
|
|
}
|
|
|
|
entry = {
|
|
watcher,
|
|
subscriptions: new Map(),
|
|
};
|
|
this.mongoWatchers.set(key, entry);
|
|
}
|
|
|
|
// Add subscription
|
|
const subscriptionId = this.generateSubscriptionId();
|
|
entry.subscriptions.set(connectionId, {
|
|
connectionId,
|
|
subscriptionId,
|
|
createdAt: new Date(),
|
|
});
|
|
|
|
console.log(`[ChangeStream] MongoDB subscription added: ${key} for connection ${connectionId}`);
|
|
return { success: true, subscriptionId };
|
|
}
|
|
|
|
/**
|
|
* Unsubscribe a client from MongoDB collection changes
|
|
*/
|
|
public async unsubscribeFromMongo(
|
|
connectionId: string,
|
|
database: string,
|
|
collection: string
|
|
): Promise<boolean> {
|
|
const key = this.getMongoKey(database, collection);
|
|
const entry = this.mongoWatchers.get(key);
|
|
|
|
if (!entry) {
|
|
return false;
|
|
}
|
|
|
|
entry.subscriptions.delete(connectionId);
|
|
console.log(`[ChangeStream] MongoDB subscription removed: ${key} for connection ${connectionId}`);
|
|
|
|
// Close watcher if no more subscribers
|
|
if (entry.subscriptions.size === 0) {
|
|
await this.closeMongoWatcher(key);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Create a MongoDB change stream for a collection
|
|
*/
|
|
private async createMongoWatcher(
|
|
database: string,
|
|
collection: string
|
|
): Promise<plugins.mongodb.ChangeStream | null> {
|
|
try {
|
|
const db = await this.tsview.getMongoDb();
|
|
if (!db) {
|
|
console.error('[ChangeStream] MongoDB not configured');
|
|
return null;
|
|
}
|
|
|
|
const client = (db as any).mongoDbClient;
|
|
const mongoDb = client.db(database);
|
|
const mongoCollection = mongoDb.collection(collection);
|
|
|
|
// Create change stream
|
|
const changeStream = mongoCollection.watch([], {
|
|
fullDocument: 'updateLookup',
|
|
});
|
|
|
|
// Handle change events
|
|
changeStream.on('change', (change: any) => {
|
|
this.handleMongoChange(database, collection, change);
|
|
});
|
|
|
|
changeStream.on('error', (error: Error) => {
|
|
console.error(`[ChangeStream] MongoDB error for ${database}/${collection}:`, error);
|
|
});
|
|
|
|
console.log(`[ChangeStream] MongoDB watcher created for ${database}/${collection}`);
|
|
return changeStream;
|
|
} catch (error) {
|
|
console.error(`[ChangeStream] Failed to create MongoDB watcher for ${database}/${collection}:`, error);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle a MongoDB change event
|
|
*/
|
|
private handleMongoChange(database: string, collection: string, change: any): void {
|
|
const key = this.getMongoKey(database, collection);
|
|
const entry = this.mongoWatchers.get(key);
|
|
|
|
if (!entry) return;
|
|
|
|
// Convert MongoDB change event to our interface
|
|
const event: interfaces.IMongoChangeEvent = {
|
|
type: change.operationType as interfaces.IMongoChangeEvent['type'],
|
|
database,
|
|
collection,
|
|
documentId: change.documentKey?._id?.toString(),
|
|
document: change.fullDocument,
|
|
updateDescription: change.updateDescription,
|
|
timestamp: new Date().toISOString(),
|
|
};
|
|
|
|
// Only add to activity buffer if global watchers are NOT active.
|
|
// When active, the global MongoDB watcher already feeds the activity stream.
|
|
if (!this.globalWatchersActive) {
|
|
this.addToActivityBuffer('mongodb', event);
|
|
}
|
|
|
|
// Push to all subscribed clients
|
|
this.pushMongoChangeToClients(key, event);
|
|
}
|
|
|
|
/**
|
|
* Push MongoDB change to subscribed clients
|
|
*/
|
|
private async pushMongoChangeToClients(
|
|
key: string,
|
|
event: interfaces.IMongoChangeEvent
|
|
): Promise<void> {
|
|
const entry = this.mongoWatchers.get(key);
|
|
if (!entry || !this.typedSocket) return;
|
|
|
|
for (const [connectionId, _sub] of entry.subscriptions) {
|
|
try {
|
|
// Find the connection and push the event
|
|
const connection = await this.typedSocket.findTargetConnection(async (conn: any) => {
|
|
return conn.peer?.id === connectionId;
|
|
});
|
|
|
|
if (connection) {
|
|
const request = this.typedSocket.createTypedRequest<interfaces.IReq_PushMongoChange>(
|
|
'pushMongoChange',
|
|
connection
|
|
);
|
|
await request.fire({ event });
|
|
}
|
|
} catch (error) {
|
|
console.error(`[ChangeStream] Failed to push MongoDB change to ${connectionId}:`, error);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Close a MongoDB change stream
|
|
*/
|
|
private async closeMongoWatcher(key: string): Promise<void> {
|
|
const entry = this.mongoWatchers.get(key);
|
|
if (!entry) return;
|
|
|
|
try {
|
|
await entry.watcher.close();
|
|
this.mongoWatchers.delete(key);
|
|
console.log(`[ChangeStream] MongoDB watcher closed for ${key}`);
|
|
} catch (error) {
|
|
console.error(`[ChangeStream] Error closing MongoDB watcher for ${key}:`, error);
|
|
}
|
|
}
|
|
|
|
// ===========================================
|
|
// Storage Change Watching
|
|
// ===========================================
|
|
|
|
/**
|
|
* Subscribe a client to storage bucket/prefix changes
|
|
*/
|
|
public async subscribeToS3(
|
|
connectionId: string,
|
|
bucket: string,
|
|
prefix?: string
|
|
): Promise<{ success: boolean; subscriptionId: string }> {
|
|
const key = this.getStorageKey(bucket, prefix);
|
|
|
|
let entry = this.storageWatchers.get(key);
|
|
|
|
// Create watcher if it doesn't exist
|
|
if (!entry) {
|
|
const watcher = await this.createStorageWatcher(bucket, prefix);
|
|
if (!watcher) {
|
|
return { success: false, subscriptionId: '' };
|
|
}
|
|
|
|
entry = {
|
|
watcher,
|
|
subscriptions: new Map(),
|
|
};
|
|
this.storageWatchers.set(key, entry);
|
|
}
|
|
|
|
// Add subscription
|
|
const subscriptionId = this.generateSubscriptionId();
|
|
entry.subscriptions.set(connectionId, {
|
|
connectionId,
|
|
subscriptionId,
|
|
createdAt: new Date(),
|
|
});
|
|
|
|
console.log(`[ChangeStream] Storage subscription added: ${key} for connection ${connectionId}`);
|
|
return { success: true, subscriptionId };
|
|
}
|
|
|
|
/**
|
|
* Unsubscribe a client from storage bucket/prefix changes
|
|
*/
|
|
public async unsubscribeFromS3(
|
|
connectionId: string,
|
|
bucket: string,
|
|
prefix?: string
|
|
): Promise<boolean> {
|
|
const key = this.getStorageKey(bucket, prefix);
|
|
const entry = this.storageWatchers.get(key);
|
|
|
|
if (!entry) {
|
|
return false;
|
|
}
|
|
|
|
entry.subscriptions.delete(connectionId);
|
|
console.log(`[ChangeStream] Storage subscription removed: ${key} for connection ${connectionId}`);
|
|
|
|
// Close watcher if no more subscribers
|
|
if (entry.subscriptions.size === 0) {
|
|
await this.closeStorageWatcher(key);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Create a storage bucket watcher
|
|
*/
|
|
private async createStorageWatcher(
|
|
bucket: string,
|
|
prefix?: string
|
|
): Promise<plugins.smartbucket.BucketWatcher | null> {
|
|
try {
|
|
const smartbucket = await this.tsview.getSmartBucket();
|
|
if (!smartbucket) {
|
|
console.error('[ChangeStream] Storage not configured');
|
|
return null;
|
|
}
|
|
|
|
const bucketInstance = await smartbucket.getBucketByName(bucket);
|
|
|
|
// Create watcher using smartbucket's BucketWatcher
|
|
const watcher = bucketInstance.createWatcher({
|
|
prefix: prefix || '',
|
|
pollIntervalMs: 5000,
|
|
bufferTimeMs: 500,
|
|
});
|
|
|
|
// Subscribe to change events
|
|
watcher.changeSubject.subscribe((eventOrEvents: IStorageChangeEvent | IStorageChangeEvent[]) => {
|
|
const events = Array.isArray(eventOrEvents) ? eventOrEvents : [eventOrEvents];
|
|
for (const event of events) {
|
|
this.handleStorageChange(bucket, prefix, event);
|
|
}
|
|
});
|
|
|
|
// Start the watcher
|
|
await watcher.start();
|
|
await watcher.readyDeferred.promise;
|
|
|
|
console.log(`[ChangeStream] Storage watcher created for ${bucket}${prefix ? '/' + prefix : ''}`);
|
|
return watcher;
|
|
} catch (error) {
|
|
console.error(`[ChangeStream] Failed to create storage watcher for ${bucket}:`, error);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle a storage change event
|
|
*/
|
|
private handleStorageChange(bucket: string, prefix: string | undefined, event: IStorageChangeEvent): void {
|
|
const key = this.getStorageKey(bucket, prefix);
|
|
const entry = this.storageWatchers.get(key);
|
|
|
|
if (!entry) return;
|
|
|
|
// Only add to activity buffer if global watchers are NOT active.
|
|
// When active, the global storage watchers already feed the activity stream.
|
|
if (!this.globalWatchersActive) {
|
|
this.addToActivityBuffer('storage', event);
|
|
}
|
|
|
|
// Push to all subscribed clients
|
|
this.pushStorageChangeToClients(key, event);
|
|
}
|
|
|
|
/**
|
|
* Push storage change to subscribed clients
|
|
*/
|
|
private async pushStorageChangeToClients(
|
|
key: string,
|
|
event: IStorageChangeEvent
|
|
): Promise<void> {
|
|
const entry = this.storageWatchers.get(key);
|
|
if (!entry || !this.typedSocket) return;
|
|
|
|
for (const [connectionId, _sub] of entry.subscriptions) {
|
|
try {
|
|
const connection = await this.typedSocket.findTargetConnection(async (conn: any) => {
|
|
return conn.peer?.id === connectionId;
|
|
});
|
|
|
|
if (connection) {
|
|
const request = this.typedSocket.createTypedRequest<interfaces.IReq_PushStorageChange>(
|
|
'pushS3Change',
|
|
connection
|
|
);
|
|
await request.fire({ event });
|
|
}
|
|
} catch (error) {
|
|
console.error(`[ChangeStream] Failed to push storage change to ${connectionId}:`, error);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Close a storage bucket watcher
|
|
*/
|
|
private async closeStorageWatcher(key: string): Promise<void> {
|
|
const entry = this.storageWatchers.get(key);
|
|
if (!entry) return;
|
|
|
|
try {
|
|
await entry.watcher.stop();
|
|
this.storageWatchers.delete(key);
|
|
console.log(`[ChangeStream] Storage watcher closed for ${key}`);
|
|
} catch (error) {
|
|
console.error(`[ChangeStream] Error closing storage watcher for ${key}:`, error);
|
|
}
|
|
}
|
|
|
|
// ===========================================
|
|
// Activity Stream
|
|
// ===========================================
|
|
|
|
/**
|
|
* Subscribe a client to the activity stream
|
|
*/
|
|
public async subscribeToActivity(connectionId: string): Promise<{ success: boolean; subscriptionId: string }> {
|
|
const subscriptionId = this.generateSubscriptionId();
|
|
|
|
this.activitySubscribers.set(connectionId, {
|
|
connectionId,
|
|
subscriptionId,
|
|
createdAt: new Date(),
|
|
});
|
|
|
|
console.log(`[ChangeStream] Activity subscription added for connection ${connectionId}`);
|
|
|
|
// Start global watchers when the first activity subscriber connects
|
|
if (this.activitySubscribers.size === 1) {
|
|
await this.startGlobalWatchers();
|
|
}
|
|
|
|
return { success: true, subscriptionId };
|
|
}
|
|
|
|
/**
|
|
* Unsubscribe a client from the activity stream
|
|
*/
|
|
public async unsubscribeFromActivity(connectionId: string): Promise<boolean> {
|
|
const result = this.activitySubscribers.delete(connectionId);
|
|
if (result) {
|
|
console.log(`[ChangeStream] Activity subscription removed for connection ${connectionId}`);
|
|
|
|
// Stop global watchers when no activity subscribers remain
|
|
if (this.activitySubscribers.size === 0) {
|
|
await this.stopGlobalWatchers();
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Get recent activity events
|
|
*/
|
|
public getRecentActivity(limit: number = 100): interfaces.IActivityEvent[] {
|
|
const count = Math.min(limit, this.activityBuffer.length);
|
|
return this.activityBuffer.slice(-count);
|
|
}
|
|
|
|
/**
|
|
* Emit a MongoDB activity event from an API handler (no change stream required).
|
|
*/
|
|
public emitMongoActivityEvent(event: interfaces.IMongoChangeEvent): void {
|
|
this.addToActivityBuffer('mongodb', event);
|
|
}
|
|
|
|
/**
|
|
* Add an event to the activity buffer
|
|
*/
|
|
private addToActivityBuffer(
|
|
source: 'mongodb' | 'storage',
|
|
event: interfaces.IMongoChangeEvent | IStorageChangeEvent
|
|
): void {
|
|
const activityEvent: interfaces.IActivityEvent = {
|
|
id: `evt_${Date.now()}_${Math.random().toString(36).substring(2, 11)}`,
|
|
source,
|
|
event,
|
|
timestamp: new Date().toISOString(),
|
|
};
|
|
|
|
this.activityBuffer.push(activityEvent);
|
|
|
|
// Trim buffer if it exceeds max size
|
|
if (this.activityBuffer.length > this.ACTIVITY_BUFFER_SIZE) {
|
|
this.activityBuffer = this.activityBuffer.slice(-this.ACTIVITY_BUFFER_SIZE);
|
|
}
|
|
|
|
// Push to activity subscribers
|
|
this.pushActivityToClients(activityEvent);
|
|
}
|
|
|
|
/**
|
|
* Push activity event to subscribed clients
|
|
*/
|
|
private async pushActivityToClients(event: interfaces.IActivityEvent): Promise<void> {
|
|
if (!this.typedSocket || this.activitySubscribers.size === 0) return;
|
|
|
|
for (const [connectionId, _sub] of this.activitySubscribers) {
|
|
try {
|
|
const connection = await this.typedSocket.findTargetConnection(async (conn: any) => {
|
|
return conn.peer?.id === connectionId;
|
|
});
|
|
|
|
if (connection) {
|
|
const request = this.typedSocket.createTypedRequest<interfaces.IReq_PushActivityEvent>(
|
|
'pushActivityEvent',
|
|
connection
|
|
);
|
|
await request.fire({ event });
|
|
}
|
|
} catch (error) {
|
|
console.error(`[ChangeStream] Failed to push activity to ${connectionId}:`, error);
|
|
}
|
|
}
|
|
}
|
|
|
|
// ===========================================
|
|
// Global Watchers for Activity Stream
|
|
// ===========================================
|
|
|
|
/**
|
|
* Start global watchers when the first activity subscriber connects.
|
|
* These watch all MongoDB and storage activity and feed into the activity buffer.
|
|
*/
|
|
private async startGlobalWatchers(): Promise<void> {
|
|
if (this.globalWatchersActive) return;
|
|
this.globalWatchersActive = true;
|
|
|
|
console.log('[ChangeStream] Starting global watchers for activity stream...');
|
|
|
|
await Promise.all([
|
|
this.startGlobalMongoWatcher(),
|
|
this.startGlobalStorageWatchers(),
|
|
]);
|
|
}
|
|
|
|
/**
|
|
* Start a deployment-level MongoDB change stream that watches ALL databases/collections.
|
|
*/
|
|
private async startGlobalMongoWatcher(): Promise<void> {
|
|
try {
|
|
const db = await this.tsview.getMongoDb();
|
|
if (!db) {
|
|
console.log('[ChangeStream] MongoDB not configured, skipping global MongoDB watcher');
|
|
return;
|
|
}
|
|
|
|
const client = (db as any).mongoDbClient as plugins.mongodb.MongoClient;
|
|
|
|
// Deployment-level watch — one stream for everything
|
|
const changeStream = client.watch([], {
|
|
fullDocument: 'updateLookup',
|
|
});
|
|
|
|
changeStream.on('change', (change: any) => {
|
|
const database = change.ns?.db || 'unknown';
|
|
const collection = change.ns?.coll || 'unknown';
|
|
|
|
const event: interfaces.IMongoChangeEvent = {
|
|
type: change.operationType as interfaces.IMongoChangeEvent['type'],
|
|
database,
|
|
collection,
|
|
documentId: change.documentKey?._id?.toString(),
|
|
document: change.fullDocument,
|
|
updateDescription: change.updateDescription,
|
|
timestamp: new Date().toISOString(),
|
|
};
|
|
|
|
this.addToActivityBuffer('mongodb', event);
|
|
});
|
|
|
|
changeStream.on('error', (error: Error) => {
|
|
console.error('[ChangeStream] Global MongoDB watcher error:', error);
|
|
});
|
|
|
|
this.globalMongoWatcher = changeStream;
|
|
console.log('[ChangeStream] Global MongoDB watcher started');
|
|
} catch (error) {
|
|
console.warn('[ChangeStream] MongoDB change streams unavailable (requires replica set). MongoDB activity events will come from API operations only.');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Start storage bucket watchers — one BucketWatcher per bucket.
|
|
*/
|
|
private async startGlobalStorageWatchers(): Promise<void> {
|
|
try {
|
|
const smartbucket = await this.tsview.getSmartBucket();
|
|
if (!smartbucket) {
|
|
console.log('[ChangeStream] Storage not configured, skipping global storage watchers');
|
|
return;
|
|
}
|
|
|
|
// List all buckets
|
|
const command = new plugins.s3.ListBucketsCommand({});
|
|
const response = await smartbucket.s3Client.send(command) as plugins.s3.ListBucketsCommandOutput;
|
|
const bucketNames = response.Buckets?.map(b => b.Name).filter((name): name is string => !!name) || [];
|
|
|
|
for (const bucketName of bucketNames) {
|
|
try {
|
|
const bucketInstance = await smartbucket.getBucketByName(bucketName);
|
|
const watcher = bucketInstance.createWatcher({
|
|
prefix: '',
|
|
pollIntervalMs: 5000,
|
|
bufferTimeMs: 500,
|
|
});
|
|
|
|
watcher.changeSubject.subscribe((eventOrEvents: IStorageChangeEvent | IStorageChangeEvent[]) => {
|
|
const events = Array.isArray(eventOrEvents) ? eventOrEvents : [eventOrEvents];
|
|
for (const event of events) {
|
|
this.addToActivityBuffer('storage', event);
|
|
}
|
|
});
|
|
|
|
await watcher.start();
|
|
await watcher.readyDeferred.promise;
|
|
|
|
this.globalStorageWatchers.set(bucketName, watcher);
|
|
console.log(`[ChangeStream] Global storage watcher started for bucket: ${bucketName}`);
|
|
} catch (bucketError) {
|
|
console.error(`[ChangeStream] Failed to start global storage watcher for bucket ${bucketName}:`, bucketError);
|
|
}
|
|
}
|
|
|
|
console.log(`[ChangeStream] Global storage watchers started (${this.globalStorageWatchers.size}/${bucketNames.length} buckets)`);
|
|
} catch (error) {
|
|
console.error('[ChangeStream] Failed to start global storage watchers:', error);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Stop all global watchers when no activity subscribers remain.
|
|
*/
|
|
private async stopGlobalWatchers(): Promise<void> {
|
|
if (!this.globalWatchersActive) return;
|
|
|
|
console.log('[ChangeStream] Stopping global watchers...');
|
|
|
|
// Close global MongoDB watcher
|
|
if (this.globalMongoWatcher) {
|
|
try {
|
|
await this.globalMongoWatcher.close();
|
|
console.log('[ChangeStream] Global MongoDB watcher stopped');
|
|
} catch (error) {
|
|
console.error('[ChangeStream] Error closing global MongoDB watcher:', error);
|
|
}
|
|
this.globalMongoWatcher = null;
|
|
}
|
|
|
|
// Close all global storage watchers
|
|
for (const [bucketName, watcher] of this.globalStorageWatchers) {
|
|
try {
|
|
await watcher.stop();
|
|
console.log(`[ChangeStream] Global storage watcher stopped for bucket: ${bucketName}`);
|
|
} catch (error) {
|
|
console.error(`[ChangeStream] Error closing global storage watcher for ${bucketName}:`, error);
|
|
}
|
|
}
|
|
this.globalStorageWatchers.clear();
|
|
|
|
this.globalWatchersActive = false;
|
|
console.log('[ChangeStream] Global watchers stopped');
|
|
}
|
|
|
|
// ===========================================
|
|
// Connection Management
|
|
// ===========================================
|
|
|
|
/**
|
|
* Handle client disconnect - clean up all subscriptions
|
|
*/
|
|
public async handleDisconnect(connectionId: string): Promise<void> {
|
|
console.log(`[ChangeStream] Cleaning up subscriptions for disconnected connection ${connectionId}`);
|
|
|
|
// Clean up MongoDB subscriptions
|
|
for (const [key, entry] of this.mongoWatchers) {
|
|
if (entry.subscriptions.has(connectionId)) {
|
|
entry.subscriptions.delete(connectionId);
|
|
if (entry.subscriptions.size === 0) {
|
|
await this.closeMongoWatcher(key);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Clean up storage subscriptions
|
|
for (const [key, entry] of this.storageWatchers) {
|
|
if (entry.subscriptions.has(connectionId)) {
|
|
entry.subscriptions.delete(connectionId);
|
|
if (entry.subscriptions.size === 0) {
|
|
await this.closeStorageWatcher(key);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Clean up activity subscription
|
|
this.activitySubscribers.delete(connectionId);
|
|
|
|
// Stop global watchers if no activity subscribers remain
|
|
if (this.activitySubscribers.size === 0) {
|
|
await this.stopGlobalWatchers();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Stop all watchers and clean up
|
|
*/
|
|
public async stop(): Promise<void> {
|
|
console.log('[ChangeStream] Stopping all watchers...');
|
|
|
|
// Stop global watchers first
|
|
await this.stopGlobalWatchers();
|
|
|
|
// Close all MongoDB watchers
|
|
for (const key of this.mongoWatchers.keys()) {
|
|
await this.closeMongoWatcher(key);
|
|
}
|
|
|
|
// Close all storage watchers
|
|
for (const key of this.storageWatchers.keys()) {
|
|
await this.closeStorageWatcher(key);
|
|
}
|
|
|
|
// Clear activity buffer and subscribers
|
|
this.activityBuffer = [];
|
|
this.activitySubscribers.clear();
|
|
|
|
console.log('[ChangeStream] All watchers stopped');
|
|
}
|
|
}
|