import * as plugins from '../plugins.js'; /** * MongoDB change event */ export interface IMongoChangeEvent { type: 'insert' | 'update' | 'delete' | 'replace' | 'drop' | 'invalidate'; database: string; collection: string; documentId?: string; document?: Record; updateDescription?: { updatedFields?: Record; removedFields?: string[]; }; timestamp: string; } /** * S3 change event */ export interface IS3ChangeEvent { type: 'add' | 'modify' | 'delete'; key: string; size?: number; etag?: string; lastModified?: Date; bucket: string; } /** * Combined activity event */ export interface IActivityEvent { id: string; source: 'mongodb' | 's3'; event: IMongoChangeEvent | IS3ChangeEvent; timestamp: string; } /** * Subscription info tracked by the service */ interface ISubscription { type: 'mongo' | 's3' | 'activity'; key: string; // "db/collection" or "bucket/prefix" or "activity" subscriptionId: string; } /** * ChangeStreamService manages real-time change subscriptions from the browser. * * Features: * - WebSocket connection via TypedSocket * - Automatic reconnection with subscription restoration * - RxJS Subjects for reactive UI updates * - Subscription lifecycle management */ export class ChangeStreamService { private typedSocket: plugins.typedsocket.TypedSocket | null = null; private isConnected = false; private isConnecting = false; private subscriptions: Map = new Map(); // RxJS Subjects for UI components public readonly mongoChanges$ = new plugins.smartrx.rxjs.Subject(); public readonly s3Changes$ = new plugins.smartrx.rxjs.Subject(); public readonly activityEvents$ = new plugins.smartrx.rxjs.Subject(); public readonly connectionStatus$ = new plugins.smartrx.rxjs.ReplaySubject<'connected' | 'disconnected' | 'connecting'>(1); constructor() { // Emit initial disconnected status this.connectionStatus$.next('disconnected'); } /** * Connect to the WebSocket server */ public async connect(): Promise { if (this.isConnected || this.isConnecting) { return; } this.isConnecting = true; this.connectionStatus$.next('connecting'); try { // Create client router to handle server-initiated pushes const clientRouter = new plugins.typedrequest.TypedRouter(); // Register handlers for server push events this.registerPushHandlers(clientRouter); // Connect to WebSocket server using current origin this.typedSocket = await plugins.typedsocket.TypedSocket.createClient( clientRouter, plugins.typedsocket.TypedSocket.useWindowLocationOriginUrl() ); this.isConnected = true; this.isConnecting = false; this.connectionStatus$.next('connected'); console.log('[ChangeStream] WebSocket connected'); // Handle reconnection events via statusSubject this.typedSocket.statusSubject.subscribe((status) => { if (status === 'disconnected') { this.handleDisconnect(); } else if (status === 'connected') { this.handleReconnect(); } }); } catch (error) { this.isConnecting = false; this.connectionStatus$.next('disconnected'); console.error('[ChangeStream] Failed to connect:', error); throw error; } } /** * Disconnect from the WebSocket server */ public async disconnect(): Promise { if (!this.typedSocket) { return; } try { await this.typedSocket.stop(); } catch (error) { console.error('[ChangeStream] Error during disconnect:', error); } this.typedSocket = null; this.isConnected = false; this.subscriptions.clear(); this.connectionStatus$.next('disconnected'); console.log('[ChangeStream] WebSocket disconnected'); } /** * Register handlers for server push events */ private registerPushHandlers(router: plugins.typedrequest.TypedRouter): void { // Handle MongoDB change push router.addTypedHandler( new plugins.typedrequest.TypedHandler( 'pushMongoChange', async (data: { event: IMongoChangeEvent }) => { this.mongoChanges$.next(data.event); return { received: true }; } ) ); // Handle S3 change push router.addTypedHandler( new plugins.typedrequest.TypedHandler( 'pushS3Change', async (data: { event: IS3ChangeEvent }) => { this.s3Changes$.next(data.event); return { received: true }; } ) ); // Handle activity event push router.addTypedHandler( new plugins.typedrequest.TypedHandler( 'pushActivityEvent', async (data: { event: IActivityEvent }) => { this.activityEvents$.next(data.event); return { received: true }; } ) ); } /** * Handle WebSocket disconnection */ private handleDisconnect(): void { this.isConnected = false; this.connectionStatus$.next('disconnected'); console.log('[ChangeStream] WebSocket disconnected, waiting for reconnect...'); } /** * Handle WebSocket reconnection - restore all subscriptions */ private async handleReconnect(): Promise { this.isConnected = true; this.connectionStatus$.next('connected'); console.log('[ChangeStream] WebSocket reconnected, restoring subscriptions...'); // Restore all subscriptions const subscriptionsToRestore = Array.from(this.subscriptions.values()); this.subscriptions.clear(); for (const sub of subscriptionsToRestore) { try { if (sub.type === 'mongo') { const [database, collection] = sub.key.split('/'); await this.subscribeToCollection(database, collection); } else if (sub.type === 's3') { const parts = sub.key.split('/'); const bucket = parts[0]; const prefix = parts.slice(1).join('/') || undefined; await this.subscribeToBucket(bucket, prefix); } else if (sub.type === 'activity') { await this.subscribeToActivity(); } } catch (error) { console.error(`[ChangeStream] Failed to restore subscription ${sub.key}:`, error); } } } // =========================================== // MongoDB Subscriptions // =========================================== /** * Subscribe to changes in a MongoDB collection */ public async subscribeToCollection(database: string, collection: string): Promise { if (!this.typedSocket || !this.isConnected) { console.warn('[ChangeStream] Not connected, cannot subscribe'); return false; } const key = `${database}/${collection}`; // Check if already subscribed if (this.subscriptions.has(`mongo:${key}`)) { return true; } try { const request = this.typedSocket.createTypedRequest('subscribeMongo'); const response = await request.fire({ database, collection }); if (response.success) { this.subscriptions.set(`mongo:${key}`, { type: 'mongo', key, subscriptionId: response.subscriptionId, }); console.log(`[ChangeStream] Subscribed to MongoDB ${key}`); return true; } return false; } catch (error) { console.error(`[ChangeStream] Failed to subscribe to MongoDB ${key}:`, error); return false; } } /** * Unsubscribe from changes in a MongoDB collection */ public async unsubscribeFromCollection(database: string, collection: string): Promise { if (!this.typedSocket || !this.isConnected) { return false; } const key = `${database}/${collection}`; const subKey = `mongo:${key}`; if (!this.subscriptions.has(subKey)) { return true; } try { const request = this.typedSocket.createTypedRequest('unsubscribeMongo'); const response = await request.fire({ database, collection }); if (response.success) { this.subscriptions.delete(subKey); console.log(`[ChangeStream] Unsubscribed from MongoDB ${key}`); } return response.success; } catch (error) { console.error(`[ChangeStream] Failed to unsubscribe from MongoDB ${key}:`, error); return false; } } /** * Check if subscribed to a MongoDB collection */ public isSubscribedToCollection(database: string, collection: string): boolean { return this.subscriptions.has(`mongo:${database}/${collection}`); } // =========================================== // S3 Subscriptions // =========================================== /** * Subscribe to changes in an S3 bucket/prefix */ public async subscribeToBucket(bucket: string, prefix?: string): Promise { if (!this.typedSocket || !this.isConnected) { console.warn('[ChangeStream] Not connected, cannot subscribe'); return false; } const key = prefix ? `${bucket}/${prefix}` : bucket; // Check if already subscribed if (this.subscriptions.has(`s3:${key}`)) { return true; } try { const request = this.typedSocket.createTypedRequest('subscribeS3'); const response = await request.fire({ bucket, prefix }); if (response.success) { this.subscriptions.set(`s3:${key}`, { type: 's3', key, subscriptionId: response.subscriptionId, }); console.log(`[ChangeStream] Subscribed to S3 ${key}`); return true; } return false; } catch (error) { console.error(`[ChangeStream] Failed to subscribe to S3 ${key}:`, error); return false; } } /** * Unsubscribe from changes in an S3 bucket/prefix */ public async unsubscribeFromBucket(bucket: string, prefix?: string): Promise { if (!this.typedSocket || !this.isConnected) { return false; } const key = prefix ? `${bucket}/${prefix}` : bucket; const subKey = `s3:${key}`; if (!this.subscriptions.has(subKey)) { return true; } try { const request = this.typedSocket.createTypedRequest('unsubscribeS3'); const response = await request.fire({ bucket, prefix }); if (response.success) { this.subscriptions.delete(subKey); console.log(`[ChangeStream] Unsubscribed from S3 ${key}`); } return response.success; } catch (error) { console.error(`[ChangeStream] Failed to unsubscribe from S3 ${key}:`, error); return false; } } /** * Check if subscribed to an S3 bucket/prefix */ public isSubscribedToBucket(bucket: string, prefix?: string): boolean { const key = prefix ? `${bucket}/${prefix}` : bucket; return this.subscriptions.has(`s3:${key}`); } // =========================================== // Activity Stream Subscriptions // =========================================== /** * Subscribe to the activity stream */ public async subscribeToActivity(): Promise { if (!this.typedSocket || !this.isConnected) { console.warn('[ChangeStream] Not connected, cannot subscribe'); return false; } // Check if already subscribed if (this.subscriptions.has('activity:activity')) { return true; } try { const request = this.typedSocket.createTypedRequest('subscribeActivity'); const response = await request.fire({}); if (response.success) { this.subscriptions.set('activity:activity', { type: 'activity', key: 'activity', subscriptionId: response.subscriptionId, }); console.log('[ChangeStream] Subscribed to activity stream'); return true; } return false; } catch (error) { console.error('[ChangeStream] Failed to subscribe to activity stream:', error); return false; } } /** * Unsubscribe from the activity stream */ public async unsubscribeFromActivity(): Promise { if (!this.typedSocket || !this.isConnected) { return false; } if (!this.subscriptions.has('activity:activity')) { return true; } try { const request = this.typedSocket.createTypedRequest('unsubscribeActivity'); const response = await request.fire({}); if (response.success) { this.subscriptions.delete('activity:activity'); console.log('[ChangeStream] Unsubscribed from activity stream'); } return response.success; } catch (error) { console.error('[ChangeStream] Failed to unsubscribe from activity stream:', error); return false; } } /** * Get recent activity events */ public async getRecentActivity(limit: number = 100): Promise { if (!this.typedSocket || !this.isConnected) { return []; } try { const request = this.typedSocket.createTypedRequest('getRecentActivity'); const response = await request.fire({ limit }); return response.events || []; } catch (error) { console.error('[ChangeStream] Failed to get recent activity:', error); return []; } } /** * Check if subscribed to activity stream */ public isSubscribedToActivity(): boolean { return this.subscriptions.has('activity:activity'); } // =========================================== // Observables for UI Components // =========================================== /** * Get MongoDB changes as an Observable */ public getMongoChanges(): plugins.smartrx.rxjs.Observable { return this.mongoChanges$.asObservable(); } /** * Get S3 changes as an Observable */ public getS3Changes(): plugins.smartrx.rxjs.Observable { return this.s3Changes$.asObservable(); } /** * Get activity events as an Observable */ public getActivityStream(): plugins.smartrx.rxjs.Observable { return this.activityEvents$.asObservable(); } /** * Get filtered MongoDB changes for a specific collection */ public getCollectionChanges(database: string, collection: string): plugins.smartrx.rxjs.Observable { return this.mongoChanges$.pipe( plugins.smartrx.rxjs.ops.filter( (event) => event.database === database && event.collection === collection ) ); } /** * Get filtered S3 changes for a specific bucket/prefix */ public getBucketChanges(bucket: string, prefix?: string): plugins.smartrx.rxjs.Observable { return this.s3Changes$.pipe( plugins.smartrx.rxjs.ops.filter((event) => { if (event.bucket !== bucket) return false; if (prefix && !event.key.startsWith(prefix)) return false; return true; }) ); } }