import * as plugins from '../plugins.js'; import type { TsView } from '../tsview.classes.tsview.js'; import type * as interfaces from './interfaces.streaming.js'; import type { IS3ChangeEvent } from '@push.rocks/smartbucket'; /** * Subscription entry tracking a client's subscription to a resource */ interface ISubscriptionEntry { connectionId: string; subscriptionId: string; createdAt: Date; } /** * MongoDB watcher entry */ interface IMongoWatcherEntry { watcher: plugins.mongodb.ChangeStream; subscriptions: Map; // connectionId -> subscription } /** * S3 watcher entry */ interface IS3WatcherEntry { watcher: plugins.smartbucket.BucketWatcher; subscriptions: Map; // connectionId -> subscription } /** * ChangeStreamManager manages real-time change streaming for both MongoDB and S3. * * Features: * - MongoDB Change Streams for real-time database updates * - S3 BucketWatcher for polling-based S3 change detection * - Subscription management per WebSocket client * - Activity stream with ring buffer for recent events * - Automatic cleanup on client disconnect */ export class ChangeStreamManager { private tsview: TsView; private typedSocket: plugins.typedserver.TypedServer['typedsocket'] | null = null; // MongoDB watchers: "db/collection" -> watcher entry private mongoWatchers: Map = new Map(); // S3 watchers: "bucket/prefix" -> watcher entry private s3Watchers: Map = new Map(); // Activity subscribers: connectionId -> subscription entry private activitySubscribers: Map = new Map(); // Activity ring buffer (max 1000 events) private activityBuffer: interfaces.IActivityEvent[] = []; private readonly ACTIVITY_BUFFER_SIZE = 1000; // Global watchers for the activity stream (started lazily on first subscriber) private globalMongoWatcher: plugins.mongodb.ChangeStream | null = null; private globalS3Watchers: Map = new Map(); private globalWatchersActive: boolean = false; // Counter for generating unique subscription IDs private subscriptionCounter = 0; constructor(tsview: TsView) { this.tsview = tsview; } /** * Initialize the manager with a TypedSocket instance */ public setTypedSocket(typedSocket: plugins.typedserver.TypedServer['typedsocket']): void { this.typedSocket = typedSocket; } /** * Generate a unique subscription ID */ private generateSubscriptionId(): string { return `sub_${Date.now()}_${++this.subscriptionCounter}`; } /** * Get the MongoDB key for a database/collection pair */ private getMongoKey(database: string, collection: string): string { return `${database}/${collection}`; } /** * Get the S3 key for a bucket/prefix pair */ private getS3Key(bucket: string, prefix?: string): string { return prefix ? `${bucket}/${prefix}` : bucket; } // =========================================== // MongoDB Change Streams // =========================================== /** * Subscribe a client to MongoDB collection changes */ public async subscribeToMongo( connectionId: string, database: string, collection: string ): Promise<{ success: boolean; subscriptionId: string }> { const key = this.getMongoKey(database, collection); let entry = this.mongoWatchers.get(key); // Create watcher if it doesn't exist if (!entry) { const watcher = await this.createMongoWatcher(database, collection); if (!watcher) { return { success: false, subscriptionId: '' }; } entry = { watcher, subscriptions: new Map(), }; this.mongoWatchers.set(key, entry); } // Add subscription const subscriptionId = this.generateSubscriptionId(); entry.subscriptions.set(connectionId, { connectionId, subscriptionId, createdAt: new Date(), }); console.log(`[ChangeStream] MongoDB subscription added: ${key} for connection ${connectionId}`); return { success: true, subscriptionId }; } /** * Unsubscribe a client from MongoDB collection changes */ public async unsubscribeFromMongo( connectionId: string, database: string, collection: string ): Promise { const key = this.getMongoKey(database, collection); const entry = this.mongoWatchers.get(key); if (!entry) { return false; } entry.subscriptions.delete(connectionId); console.log(`[ChangeStream] MongoDB subscription removed: ${key} for connection ${connectionId}`); // Close watcher if no more subscribers if (entry.subscriptions.size === 0) { await this.closeMongoWatcher(key); } return true; } /** * Create a MongoDB change stream for a collection */ private async createMongoWatcher( database: string, collection: string ): Promise { try { const db = await this.tsview.getMongoDb(); if (!db) { console.error('[ChangeStream] MongoDB not configured'); return null; } const client = (db as any).mongoDbClient; const mongoDb = client.db(database); const mongoCollection = mongoDb.collection(collection); // Create change stream const changeStream = mongoCollection.watch([], { fullDocument: 'updateLookup', }); // Handle change events changeStream.on('change', (change: any) => { this.handleMongoChange(database, collection, change); }); changeStream.on('error', (error: Error) => { console.error(`[ChangeStream] MongoDB error for ${database}/${collection}:`, error); }); console.log(`[ChangeStream] MongoDB watcher created for ${database}/${collection}`); return changeStream; } catch (error) { console.error(`[ChangeStream] Failed to create MongoDB watcher for ${database}/${collection}:`, error); return null; } } /** * Handle a MongoDB change event */ private handleMongoChange(database: string, collection: string, change: any): void { const key = this.getMongoKey(database, collection); const entry = this.mongoWatchers.get(key); if (!entry) return; // Convert MongoDB change event to our interface const event: interfaces.IMongoChangeEvent = { type: change.operationType as interfaces.IMongoChangeEvent['type'], database, collection, documentId: change.documentKey?._id?.toString(), document: change.fullDocument, updateDescription: change.updateDescription, timestamp: new Date().toISOString(), }; // Only add to activity buffer if global watchers are NOT active. // When active, the global MongoDB watcher already feeds the activity stream. if (!this.globalWatchersActive) { this.addToActivityBuffer('mongodb', event); } // Push to all subscribed clients this.pushMongoChangeToClients(key, event); } /** * Push MongoDB change to subscribed clients */ private async pushMongoChangeToClients( key: string, event: interfaces.IMongoChangeEvent ): Promise { const entry = this.mongoWatchers.get(key); if (!entry || !this.typedSocket) return; for (const [connectionId, _sub] of entry.subscriptions) { try { // Find the connection and push the event const connection = await this.typedSocket.findTargetConnection(async (conn: any) => { return conn.peer?.id === connectionId; }); if (connection) { const request = this.typedSocket.createTypedRequest( 'pushMongoChange', connection ); await request.fire({ event }); } } catch (error) { console.error(`[ChangeStream] Failed to push MongoDB change to ${connectionId}:`, error); } } } /** * Close a MongoDB change stream */ private async closeMongoWatcher(key: string): Promise { const entry = this.mongoWatchers.get(key); if (!entry) return; try { await entry.watcher.close(); this.mongoWatchers.delete(key); console.log(`[ChangeStream] MongoDB watcher closed for ${key}`); } catch (error) { console.error(`[ChangeStream] Error closing MongoDB watcher for ${key}:`, error); } } // =========================================== // S3 Change Watching // =========================================== /** * Subscribe a client to S3 bucket/prefix changes */ public async subscribeToS3( connectionId: string, bucket: string, prefix?: string ): Promise<{ success: boolean; subscriptionId: string }> { const key = this.getS3Key(bucket, prefix); let entry = this.s3Watchers.get(key); // Create watcher if it doesn't exist if (!entry) { const watcher = await this.createS3Watcher(bucket, prefix); if (!watcher) { return { success: false, subscriptionId: '' }; } entry = { watcher, subscriptions: new Map(), }; this.s3Watchers.set(key, entry); } // Add subscription const subscriptionId = this.generateSubscriptionId(); entry.subscriptions.set(connectionId, { connectionId, subscriptionId, createdAt: new Date(), }); console.log(`[ChangeStream] S3 subscription added: ${key} for connection ${connectionId}`); return { success: true, subscriptionId }; } /** * Unsubscribe a client from S3 bucket/prefix changes */ public async unsubscribeFromS3( connectionId: string, bucket: string, prefix?: string ): Promise { const key = this.getS3Key(bucket, prefix); const entry = this.s3Watchers.get(key); if (!entry) { return false; } entry.subscriptions.delete(connectionId); console.log(`[ChangeStream] S3 subscription removed: ${key} for connection ${connectionId}`); // Close watcher if no more subscribers if (entry.subscriptions.size === 0) { await this.closeS3Watcher(key); } return true; } /** * Create an S3 bucket watcher */ private async createS3Watcher( bucket: string, prefix?: string ): Promise { try { const smartbucket = await this.tsview.getSmartBucket(); if (!smartbucket) { console.error('[ChangeStream] S3 not configured'); return null; } const bucketInstance = await smartbucket.getBucketByName(bucket); // Create watcher using smartbucket's BucketWatcher const watcher = bucketInstance.createWatcher({ prefix: prefix || '', pollIntervalMs: 5000, bufferTimeMs: 500, }); // Subscribe to change events watcher.changeSubject.subscribe((eventOrEvents: IS3ChangeEvent | IS3ChangeEvent[]) => { const events = Array.isArray(eventOrEvents) ? eventOrEvents : [eventOrEvents]; for (const event of events) { this.handleS3Change(bucket, prefix, event); } }); // Start the watcher await watcher.start(); await watcher.readyDeferred.promise; console.log(`[ChangeStream] S3 watcher created for ${bucket}${prefix ? '/' + prefix : ''}`); return watcher; } catch (error) { console.error(`[ChangeStream] Failed to create S3 watcher for ${bucket}:`, error); return null; } } /** * Handle an S3 change event */ private handleS3Change(bucket: string, prefix: string | undefined, event: IS3ChangeEvent): void { const key = this.getS3Key(bucket, prefix); const entry = this.s3Watchers.get(key); if (!entry) return; // Only add to activity buffer if global watchers are NOT active. // When active, the global S3 watchers already feed the activity stream. if (!this.globalWatchersActive) { this.addToActivityBuffer('s3', event); } // Push to all subscribed clients this.pushS3ChangeToClients(key, event); } /** * Push S3 change to subscribed clients */ private async pushS3ChangeToClients( key: string, event: IS3ChangeEvent ): Promise { const entry = this.s3Watchers.get(key); if (!entry || !this.typedSocket) return; for (const [connectionId, _sub] of entry.subscriptions) { try { const connection = await this.typedSocket.findTargetConnection(async (conn: any) => { return conn.peer?.id === connectionId; }); if (connection) { const request = this.typedSocket.createTypedRequest( 'pushS3Change', connection ); await request.fire({ event }); } } catch (error) { console.error(`[ChangeStream] Failed to push S3 change to ${connectionId}:`, error); } } } /** * Close an S3 bucket watcher */ private async closeS3Watcher(key: string): Promise { const entry = this.s3Watchers.get(key); if (!entry) return; try { await entry.watcher.stop(); this.s3Watchers.delete(key); console.log(`[ChangeStream] S3 watcher closed for ${key}`); } catch (error) { console.error(`[ChangeStream] Error closing S3 watcher for ${key}:`, error); } } // =========================================== // Activity Stream // =========================================== /** * Subscribe a client to the activity stream */ public async subscribeToActivity(connectionId: string): Promise<{ success: boolean; subscriptionId: string }> { const subscriptionId = this.generateSubscriptionId(); this.activitySubscribers.set(connectionId, { connectionId, subscriptionId, createdAt: new Date(), }); console.log(`[ChangeStream] Activity subscription added for connection ${connectionId}`); // Start global watchers when the first activity subscriber connects if (this.activitySubscribers.size === 1) { await this.startGlobalWatchers(); } return { success: true, subscriptionId }; } /** * Unsubscribe a client from the activity stream */ public async unsubscribeFromActivity(connectionId: string): Promise { const result = this.activitySubscribers.delete(connectionId); if (result) { console.log(`[ChangeStream] Activity subscription removed for connection ${connectionId}`); // Stop global watchers when no activity subscribers remain if (this.activitySubscribers.size === 0) { await this.stopGlobalWatchers(); } } return result; } /** * Get recent activity events */ public getRecentActivity(limit: number = 100): interfaces.IActivityEvent[] { const count = Math.min(limit, this.activityBuffer.length); return this.activityBuffer.slice(-count); } /** * Emit a MongoDB activity event from an API handler (no change stream required). */ public emitMongoActivityEvent(event: interfaces.IMongoChangeEvent): void { this.addToActivityBuffer('mongodb', event); } /** * Add an event to the activity buffer */ private addToActivityBuffer( source: 'mongodb' | 's3', event: interfaces.IMongoChangeEvent | IS3ChangeEvent ): void { const activityEvent: interfaces.IActivityEvent = { id: `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`, source, event, timestamp: new Date().toISOString(), }; this.activityBuffer.push(activityEvent); // Trim buffer if it exceeds max size if (this.activityBuffer.length > this.ACTIVITY_BUFFER_SIZE) { this.activityBuffer = this.activityBuffer.slice(-this.ACTIVITY_BUFFER_SIZE); } // Push to activity subscribers this.pushActivityToClients(activityEvent); } /** * Push activity event to subscribed clients */ private async pushActivityToClients(event: interfaces.IActivityEvent): Promise { if (!this.typedSocket || this.activitySubscribers.size === 0) return; for (const [connectionId, _sub] of this.activitySubscribers) { try { const connection = await this.typedSocket.findTargetConnection(async (conn: any) => { return conn.peer?.id === connectionId; }); if (connection) { const request = this.typedSocket.createTypedRequest( 'pushActivityEvent', connection ); await request.fire({ event }); } } catch (error) { console.error(`[ChangeStream] Failed to push activity to ${connectionId}:`, error); } } } // =========================================== // Global Watchers for Activity Stream // =========================================== /** * Start global watchers when the first activity subscriber connects. * These watch all MongoDB and S3 activity and feed into the activity buffer. */ private async startGlobalWatchers(): Promise { if (this.globalWatchersActive) return; this.globalWatchersActive = true; console.log('[ChangeStream] Starting global watchers for activity stream...'); await Promise.all([ this.startGlobalMongoWatcher(), this.startGlobalS3Watchers(), ]); } /** * Start a deployment-level MongoDB change stream that watches ALL databases/collections. */ private async startGlobalMongoWatcher(): Promise { try { const db = await this.tsview.getMongoDb(); if (!db) { console.log('[ChangeStream] MongoDB not configured, skipping global MongoDB watcher'); return; } const client = (db as any).mongoDbClient as plugins.mongodb.MongoClient; // Deployment-level watch — one stream for everything const changeStream = client.watch([], { fullDocument: 'updateLookup', }); changeStream.on('change', (change: any) => { const database = change.ns?.db || 'unknown'; const collection = change.ns?.coll || 'unknown'; const event: interfaces.IMongoChangeEvent = { type: change.operationType as interfaces.IMongoChangeEvent['type'], database, collection, documentId: change.documentKey?._id?.toString(), document: change.fullDocument, updateDescription: change.updateDescription, timestamp: new Date().toISOString(), }; this.addToActivityBuffer('mongodb', event); }); changeStream.on('error', (error: Error) => { console.error('[ChangeStream] Global MongoDB watcher error:', error); }); this.globalMongoWatcher = changeStream; console.log('[ChangeStream] Global MongoDB watcher started'); } catch (error) { console.warn('[ChangeStream] MongoDB change streams unavailable (requires replica set). MongoDB activity events will come from API operations only.'); } } /** * Start S3 bucket watchers — one BucketWatcher per bucket. */ private async startGlobalS3Watchers(): Promise { try { const smartbucket = await this.tsview.getSmartBucket(); if (!smartbucket) { console.log('[ChangeStream] S3 not configured, skipping global S3 watchers'); return; } // List all buckets const command = new plugins.s3.ListBucketsCommand({}); const response = await smartbucket.s3Client.send(command) as plugins.s3.ListBucketsCommandOutput; const bucketNames = response.Buckets?.map(b => b.Name).filter((name): name is string => !!name) || []; for (const bucketName of bucketNames) { try { const bucketInstance = await smartbucket.getBucketByName(bucketName); const watcher = bucketInstance.createWatcher({ prefix: '', pollIntervalMs: 5000, bufferTimeMs: 500, }); watcher.changeSubject.subscribe((eventOrEvents: IS3ChangeEvent | IS3ChangeEvent[]) => { const events = Array.isArray(eventOrEvents) ? eventOrEvents : [eventOrEvents]; for (const event of events) { this.addToActivityBuffer('s3', event); } }); await watcher.start(); await watcher.readyDeferred.promise; this.globalS3Watchers.set(bucketName, watcher); console.log(`[ChangeStream] Global S3 watcher started for bucket: ${bucketName}`); } catch (bucketError) { console.error(`[ChangeStream] Failed to start global S3 watcher for bucket ${bucketName}:`, bucketError); } } console.log(`[ChangeStream] Global S3 watchers started (${this.globalS3Watchers.size}/${bucketNames.length} buckets)`); } catch (error) { console.error('[ChangeStream] Failed to start global S3 watchers:', error); } } /** * Stop all global watchers when no activity subscribers remain. */ private async stopGlobalWatchers(): Promise { if (!this.globalWatchersActive) return; console.log('[ChangeStream] Stopping global watchers...'); // Close global MongoDB watcher if (this.globalMongoWatcher) { try { await this.globalMongoWatcher.close(); console.log('[ChangeStream] Global MongoDB watcher stopped'); } catch (error) { console.error('[ChangeStream] Error closing global MongoDB watcher:', error); } this.globalMongoWatcher = null; } // Close all global S3 watchers for (const [bucketName, watcher] of this.globalS3Watchers) { try { await watcher.stop(); console.log(`[ChangeStream] Global S3 watcher stopped for bucket: ${bucketName}`); } catch (error) { console.error(`[ChangeStream] Error closing global S3 watcher for ${bucketName}:`, error); } } this.globalS3Watchers.clear(); this.globalWatchersActive = false; console.log('[ChangeStream] Global watchers stopped'); } // =========================================== // Connection Management // =========================================== /** * Handle client disconnect - clean up all subscriptions */ public async handleDisconnect(connectionId: string): Promise { console.log(`[ChangeStream] Cleaning up subscriptions for disconnected connection ${connectionId}`); // Clean up MongoDB subscriptions for (const [key, entry] of this.mongoWatchers) { if (entry.subscriptions.has(connectionId)) { entry.subscriptions.delete(connectionId); if (entry.subscriptions.size === 0) { await this.closeMongoWatcher(key); } } } // Clean up S3 subscriptions for (const [key, entry] of this.s3Watchers) { if (entry.subscriptions.has(connectionId)) { entry.subscriptions.delete(connectionId); if (entry.subscriptions.size === 0) { await this.closeS3Watcher(key); } } } // Clean up activity subscription this.activitySubscribers.delete(connectionId); // Stop global watchers if no activity subscribers remain if (this.activitySubscribers.size === 0) { await this.stopGlobalWatchers(); } } /** * Stop all watchers and clean up */ public async stop(): Promise { console.log('[ChangeStream] Stopping all watchers...'); // Stop global watchers first await this.stopGlobalWatchers(); // Close all MongoDB watchers for (const key of this.mongoWatchers.keys()) { await this.closeMongoWatcher(key); } // Close all S3 watchers for (const key of this.s3Watchers.keys()) { await this.closeS3Watcher(key); } // Clear activity buffer and subscribers this.activityBuffer = []; this.activitySubscribers.clear(); console.log('[ChangeStream] All watchers stopped'); } }