import * as plugins from '../plugins.js'; import type { TsView } from '../tsview.classes.tsview.js'; import type * as interfaces from './interfaces.streaming.js'; import type { IS3ChangeEvent } from '@push.rocks/smartbucket'; /** * Subscription entry tracking a client's subscription to a resource */ interface ISubscriptionEntry { connectionId: string; subscriptionId: string; createdAt: Date; } /** * MongoDB watcher entry */ interface IMongoWatcherEntry { watcher: plugins.mongodb.ChangeStream; subscriptions: Map; // connectionId -> subscription } /** * S3 watcher entry */ interface IS3WatcherEntry { watcher: plugins.smartbucket.BucketWatcher; subscriptions: Map; // connectionId -> subscription } /** * ChangeStreamManager manages real-time change streaming for both MongoDB and S3. * * Features: * - MongoDB Change Streams for real-time database updates * - S3 BucketWatcher for polling-based S3 change detection * - Subscription management per WebSocket client * - Activity stream with ring buffer for recent events * - Automatic cleanup on client disconnect */ export class ChangeStreamManager { private tsview: TsView; private typedSocket: plugins.typedserver.TypedServer['typedsocket'] | null = null; // MongoDB watchers: "db/collection" -> watcher entry private mongoWatchers: Map = new Map(); // S3 watchers: "bucket/prefix" -> watcher entry private s3Watchers: Map = new Map(); // Activity subscribers: connectionId -> subscription entry private activitySubscribers: Map = new Map(); // Activity ring buffer (max 1000 events) private activityBuffer: interfaces.IActivityEvent[] = []; private readonly ACTIVITY_BUFFER_SIZE = 1000; // Counter for generating unique subscription IDs private subscriptionCounter = 0; constructor(tsview: TsView) { this.tsview = tsview; } /** * Initialize the manager with a TypedSocket instance */ public setTypedSocket(typedSocket: plugins.typedserver.TypedServer['typedsocket']): void { this.typedSocket = typedSocket; } /** * Generate a unique subscription ID */ private generateSubscriptionId(): string { return `sub_${Date.now()}_${++this.subscriptionCounter}`; } /** * Get the MongoDB key for a database/collection pair */ private getMongoKey(database: string, collection: string): string { return `${database}/${collection}`; } /** * Get the S3 key for a bucket/prefix pair */ private getS3Key(bucket: string, prefix?: string): string { return prefix ? `${bucket}/${prefix}` : bucket; } // =========================================== // MongoDB Change Streams // =========================================== /** * Subscribe a client to MongoDB collection changes */ public async subscribeToMongo( connectionId: string, database: string, collection: string ): Promise<{ success: boolean; subscriptionId: string }> { const key = this.getMongoKey(database, collection); let entry = this.mongoWatchers.get(key); // Create watcher if it doesn't exist if (!entry) { const watcher = await this.createMongoWatcher(database, collection); if (!watcher) { return { success: false, subscriptionId: '' }; } entry = { watcher, subscriptions: new Map(), }; this.mongoWatchers.set(key, entry); } // Add subscription const subscriptionId = this.generateSubscriptionId(); entry.subscriptions.set(connectionId, { connectionId, subscriptionId, createdAt: new Date(), }); console.log(`[ChangeStream] MongoDB subscription added: ${key} for connection ${connectionId}`); return { success: true, subscriptionId }; } /** * Unsubscribe a client from MongoDB collection changes */ public async unsubscribeFromMongo( connectionId: string, database: string, collection: string ): Promise { const key = this.getMongoKey(database, collection); const entry = this.mongoWatchers.get(key); if (!entry) { return false; } entry.subscriptions.delete(connectionId); console.log(`[ChangeStream] MongoDB subscription removed: ${key} for connection ${connectionId}`); // Close watcher if no more subscribers if (entry.subscriptions.size === 0) { await this.closeMongoWatcher(key); } return true; } /** * Create a MongoDB change stream for a collection */ private async createMongoWatcher( database: string, collection: string ): Promise { try { const db = await this.tsview.getMongoDb(); if (!db) { console.error('[ChangeStream] MongoDB not configured'); return null; } const client = (db as any).mongoDbClient; const mongoDb = client.db(database); const mongoCollection = mongoDb.collection(collection); // Create change stream const changeStream = mongoCollection.watch([], { fullDocument: 'updateLookup', }); // Handle change events changeStream.on('change', (change: any) => { this.handleMongoChange(database, collection, change); }); changeStream.on('error', (error: Error) => { console.error(`[ChangeStream] MongoDB error for ${database}/${collection}:`, error); }); console.log(`[ChangeStream] MongoDB watcher created for ${database}/${collection}`); return changeStream; } catch (error) { console.error(`[ChangeStream] Failed to create MongoDB watcher for ${database}/${collection}:`, error); return null; } } /** * Handle a MongoDB change event */ private handleMongoChange(database: string, collection: string, change: any): void { const key = this.getMongoKey(database, collection); const entry = this.mongoWatchers.get(key); if (!entry) return; // Convert MongoDB change event to our interface const event: interfaces.IMongoChangeEvent = { type: change.operationType as interfaces.IMongoChangeEvent['type'], database, collection, documentId: change.documentKey?._id?.toString(), document: change.fullDocument, updateDescription: change.updateDescription, timestamp: new Date().toISOString(), }; // Add to activity buffer this.addToActivityBuffer('mongodb', event); // Push to all subscribed clients this.pushMongoChangeToClients(key, event); } /** * Push MongoDB change to subscribed clients */ private async pushMongoChangeToClients( key: string, event: interfaces.IMongoChangeEvent ): Promise { const entry = this.mongoWatchers.get(key); if (!entry || !this.typedSocket) return; for (const [connectionId, _sub] of entry.subscriptions) { try { // Find the connection and push the event const connection = await this.typedSocket.findTargetConnection(async (conn: any) => { return conn.alias === connectionId || conn.socketId === connectionId; }); if (connection) { const request = this.typedSocket.createTypedRequest( 'pushMongoChange', connection ); await request.fire({ event }); } } catch (error) { console.error(`[ChangeStream] Failed to push MongoDB change to ${connectionId}:`, error); } } } /** * Close a MongoDB change stream */ private async closeMongoWatcher(key: string): Promise { const entry = this.mongoWatchers.get(key); if (!entry) return; try { await entry.watcher.close(); this.mongoWatchers.delete(key); console.log(`[ChangeStream] MongoDB watcher closed for ${key}`); } catch (error) { console.error(`[ChangeStream] Error closing MongoDB watcher for ${key}:`, error); } } // =========================================== // S3 Change Watching // =========================================== /** * Subscribe a client to S3 bucket/prefix changes */ public async subscribeToS3( connectionId: string, bucket: string, prefix?: string ): Promise<{ success: boolean; subscriptionId: string }> { const key = this.getS3Key(bucket, prefix); let entry = this.s3Watchers.get(key); // Create watcher if it doesn't exist if (!entry) { const watcher = await this.createS3Watcher(bucket, prefix); if (!watcher) { return { success: false, subscriptionId: '' }; } entry = { watcher, subscriptions: new Map(), }; this.s3Watchers.set(key, entry); } // Add subscription const subscriptionId = this.generateSubscriptionId(); entry.subscriptions.set(connectionId, { connectionId, subscriptionId, createdAt: new Date(), }); console.log(`[ChangeStream] S3 subscription added: ${key} for connection ${connectionId}`); return { success: true, subscriptionId }; } /** * Unsubscribe a client from S3 bucket/prefix changes */ public async unsubscribeFromS3( connectionId: string, bucket: string, prefix?: string ): Promise { const key = this.getS3Key(bucket, prefix); const entry = this.s3Watchers.get(key); if (!entry) { return false; } entry.subscriptions.delete(connectionId); console.log(`[ChangeStream] S3 subscription removed: ${key} for connection ${connectionId}`); // Close watcher if no more subscribers if (entry.subscriptions.size === 0) { await this.closeS3Watcher(key); } return true; } /** * Create an S3 bucket watcher */ private async createS3Watcher( bucket: string, prefix?: string ): Promise { try { const smartbucket = await this.tsview.getSmartBucket(); if (!smartbucket) { console.error('[ChangeStream] S3 not configured'); return null; } const bucketInstance = await smartbucket.getBucketByName(bucket); // Create watcher using smartbucket's BucketWatcher const watcher = bucketInstance.createWatcher({ prefix: prefix || '', pollIntervalMs: 5000, bufferTimeMs: 500, }); // Subscribe to change events watcher.changeSubject.subscribe((eventOrEvents: IS3ChangeEvent | IS3ChangeEvent[]) => { const events = Array.isArray(eventOrEvents) ? eventOrEvents : [eventOrEvents]; for (const event of events) { this.handleS3Change(bucket, prefix, event); } }); // Start the watcher await watcher.start(); await watcher.readyDeferred.promise; console.log(`[ChangeStream] S3 watcher created for ${bucket}${prefix ? '/' + prefix : ''}`); return watcher; } catch (error) { console.error(`[ChangeStream] Failed to create S3 watcher for ${bucket}:`, error); return null; } } /** * Handle an S3 change event */ private handleS3Change(bucket: string, prefix: string | undefined, event: IS3ChangeEvent): void { const key = this.getS3Key(bucket, prefix); const entry = this.s3Watchers.get(key); if (!entry) return; // Add to activity buffer this.addToActivityBuffer('s3', event); // Push to all subscribed clients this.pushS3ChangeToClients(key, event); } /** * Push S3 change to subscribed clients */ private async pushS3ChangeToClients( key: string, event: IS3ChangeEvent ): Promise { const entry = this.s3Watchers.get(key); if (!entry || !this.typedSocket) return; for (const [connectionId, _sub] of entry.subscriptions) { try { const connection = await this.typedSocket.findTargetConnection(async (conn: any) => { return conn.alias === connectionId || conn.socketId === connectionId; }); if (connection) { const request = this.typedSocket.createTypedRequest( 'pushS3Change', connection ); await request.fire({ event }); } } catch (error) { console.error(`[ChangeStream] Failed to push S3 change to ${connectionId}:`, error); } } } /** * Close an S3 bucket watcher */ private async closeS3Watcher(key: string): Promise { const entry = this.s3Watchers.get(key); if (!entry) return; try { await entry.watcher.stop(); this.s3Watchers.delete(key); console.log(`[ChangeStream] S3 watcher closed for ${key}`); } catch (error) { console.error(`[ChangeStream] Error closing S3 watcher for ${key}:`, error); } } // =========================================== // Activity Stream // =========================================== /** * Subscribe a client to the activity stream */ public async subscribeToActivity(connectionId: string): Promise<{ success: boolean; subscriptionId: string }> { const subscriptionId = this.generateSubscriptionId(); this.activitySubscribers.set(connectionId, { connectionId, subscriptionId, createdAt: new Date(), }); console.log(`[ChangeStream] Activity subscription added for connection ${connectionId}`); return { success: true, subscriptionId }; } /** * Unsubscribe a client from the activity stream */ public async unsubscribeFromActivity(connectionId: string): Promise { const result = this.activitySubscribers.delete(connectionId); if (result) { console.log(`[ChangeStream] Activity subscription removed for connection ${connectionId}`); } return result; } /** * Get recent activity events */ public getRecentActivity(limit: number = 100): interfaces.IActivityEvent[] { const count = Math.min(limit, this.activityBuffer.length); return this.activityBuffer.slice(-count); } /** * Add an event to the activity buffer */ private addToActivityBuffer( source: 'mongodb' | 's3', event: interfaces.IMongoChangeEvent | IS3ChangeEvent ): void { const activityEvent: interfaces.IActivityEvent = { id: `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`, source, event, timestamp: new Date().toISOString(), }; this.activityBuffer.push(activityEvent); // Trim buffer if it exceeds max size if (this.activityBuffer.length > this.ACTIVITY_BUFFER_SIZE) { this.activityBuffer = this.activityBuffer.slice(-this.ACTIVITY_BUFFER_SIZE); } // Push to activity subscribers this.pushActivityToClients(activityEvent); } /** * Push activity event to subscribed clients */ private async pushActivityToClients(event: interfaces.IActivityEvent): Promise { if (!this.typedSocket || this.activitySubscribers.size === 0) return; for (const [connectionId, _sub] of this.activitySubscribers) { try { const connection = await this.typedSocket.findTargetConnection(async (conn: any) => { return conn.alias === connectionId || conn.socketId === connectionId; }); if (connection) { const request = this.typedSocket.createTypedRequest( 'pushActivityEvent', connection ); await request.fire({ event }); } } catch (error) { console.error(`[ChangeStream] Failed to push activity to ${connectionId}:`, error); } } } // =========================================== // Connection Management // =========================================== /** * Handle client disconnect - clean up all subscriptions */ public async handleDisconnect(connectionId: string): Promise { console.log(`[ChangeStream] Cleaning up subscriptions for disconnected connection ${connectionId}`); // Clean up MongoDB subscriptions for (const [key, entry] of this.mongoWatchers) { if (entry.subscriptions.has(connectionId)) { entry.subscriptions.delete(connectionId); if (entry.subscriptions.size === 0) { await this.closeMongoWatcher(key); } } } // Clean up S3 subscriptions for (const [key, entry] of this.s3Watchers) { if (entry.subscriptions.has(connectionId)) { entry.subscriptions.delete(connectionId); if (entry.subscriptions.size === 0) { await this.closeS3Watcher(key); } } } // Clean up activity subscription this.activitySubscribers.delete(connectionId); } /** * Stop all watchers and clean up */ public async stop(): Promise { console.log('[ChangeStream] Stopping all watchers...'); // Close all MongoDB watchers for (const key of this.mongoWatchers.keys()) { await this.closeMongoWatcher(key); } // Close all S3 watchers for (const key of this.s3Watchers.keys()) { await this.closeS3Watcher(key); } // Clear activity buffer and subscribers this.activityBuffer = []; this.activitySubscribers.clear(); console.log('[ChangeStream] All watchers stopped'); } }