feat(streaming): add real-time streaming (MongoDB change streams & S3 bucket watchers) with WebSocket subscriptions and activity stream UI

This commit is contained in:
2026-01-25 21:41:55 +00:00
parent c60cbf5215
commit 20e08d123f
23 changed files with 2456 additions and 19 deletions

View File

@@ -0,0 +1,521 @@
import * as plugins from '../plugins.js';
/**
* MongoDB change event
*/
export interface IMongoChangeEvent {
type: 'insert' | 'update' | 'delete' | 'replace' | 'drop' | 'invalidate';
database: string;
collection: string;
documentId?: string;
document?: Record<string, unknown>;
updateDescription?: {
updatedFields?: Record<string, unknown>;
removedFields?: string[];
};
timestamp: string;
}
/**
* S3 change event
*/
export interface IS3ChangeEvent {
type: 'add' | 'modify' | 'delete';
key: string;
size?: number;
etag?: string;
lastModified?: Date;
bucket: string;
}
/**
* Combined activity event
*/
export interface IActivityEvent {
id: string;
source: 'mongodb' | 's3';
event: IMongoChangeEvent | IS3ChangeEvent;
timestamp: string;
}
/**
* Subscription info tracked by the service
*/
interface ISubscription {
type: 'mongo' | 's3' | 'activity';
key: string; // "db/collection" or "bucket/prefix" or "activity"
subscriptionId: string;
}
/**
* ChangeStreamService manages real-time change subscriptions from the browser.
*
* Features:
* - WebSocket connection via TypedSocket
* - Automatic reconnection with subscription restoration
* - RxJS Subjects for reactive UI updates
* - Subscription lifecycle management
*/
export class ChangeStreamService {
private typedSocket: plugins.typedsocket.TypedSocket | null = null;
private isConnected = false;
private isConnecting = false;
private subscriptions: Map<string, ISubscription> = new Map();
// RxJS Subjects for UI components
public readonly mongoChanges$ = new plugins.smartrx.rxjs.Subject<IMongoChangeEvent>();
public readonly s3Changes$ = new plugins.smartrx.rxjs.Subject<IS3ChangeEvent>();
public readonly activityEvents$ = new plugins.smartrx.rxjs.Subject<IActivityEvent>();
public readonly connectionStatus$ = new plugins.smartrx.rxjs.ReplaySubject<'connected' | 'disconnected' | 'connecting'>(1);
constructor() {
// Emit initial disconnected status
this.connectionStatus$.next('disconnected');
}
/**
* Connect to the WebSocket server
*/
public async connect(): Promise<void> {
if (this.isConnected || this.isConnecting) {
return;
}
this.isConnecting = true;
this.connectionStatus$.next('connecting');
try {
// Create client router to handle server-initiated pushes
const clientRouter = new plugins.typedrequest.TypedRouter();
// Register handlers for server push events
this.registerPushHandlers(clientRouter);
// Connect to WebSocket server using current origin
this.typedSocket = await plugins.typedsocket.TypedSocket.createClient(
clientRouter,
plugins.typedsocket.TypedSocket.useWindowLocationOriginUrl()
);
this.isConnected = true;
this.isConnecting = false;
this.connectionStatus$.next('connected');
console.log('[ChangeStream] WebSocket connected');
// Handle reconnection events via statusSubject
this.typedSocket.statusSubject.subscribe((status) => {
if (status === 'disconnected') {
this.handleDisconnect();
} else if (status === 'connected') {
this.handleReconnect();
}
});
} catch (error) {
this.isConnecting = false;
this.connectionStatus$.next('disconnected');
console.error('[ChangeStream] Failed to connect:', error);
throw error;
}
}
/**
* Disconnect from the WebSocket server
*/
public async disconnect(): Promise<void> {
if (!this.typedSocket) {
return;
}
try {
await this.typedSocket.stop();
} catch (error) {
console.error('[ChangeStream] Error during disconnect:', error);
}
this.typedSocket = null;
this.isConnected = false;
this.subscriptions.clear();
this.connectionStatus$.next('disconnected');
console.log('[ChangeStream] WebSocket disconnected');
}
/**
* Register handlers for server push events
*/
private registerPushHandlers(router: plugins.typedrequest.TypedRouter): void {
// Handle MongoDB change push
router.addTypedHandler(
new plugins.typedrequest.TypedHandler<any>(
'pushMongoChange',
async (data: { event: IMongoChangeEvent }) => {
this.mongoChanges$.next(data.event);
return { received: true };
}
)
);
// Handle S3 change push
router.addTypedHandler(
new plugins.typedrequest.TypedHandler<any>(
'pushS3Change',
async (data: { event: IS3ChangeEvent }) => {
this.s3Changes$.next(data.event);
return { received: true };
}
)
);
// Handle activity event push
router.addTypedHandler(
new plugins.typedrequest.TypedHandler<any>(
'pushActivityEvent',
async (data: { event: IActivityEvent }) => {
this.activityEvents$.next(data.event);
return { received: true };
}
)
);
}
/**
* Handle WebSocket disconnection
*/
private handleDisconnect(): void {
this.isConnected = false;
this.connectionStatus$.next('disconnected');
console.log('[ChangeStream] WebSocket disconnected, waiting for reconnect...');
}
/**
* Handle WebSocket reconnection - restore all subscriptions
*/
private async handleReconnect(): Promise<void> {
this.isConnected = true;
this.connectionStatus$.next('connected');
console.log('[ChangeStream] WebSocket reconnected, restoring subscriptions...');
// Restore all subscriptions
const subscriptionsToRestore = Array.from(this.subscriptions.values());
this.subscriptions.clear();
for (const sub of subscriptionsToRestore) {
try {
if (sub.type === 'mongo') {
const [database, collection] = sub.key.split('/');
await this.subscribeToCollection(database, collection);
} else if (sub.type === 's3') {
const parts = sub.key.split('/');
const bucket = parts[0];
const prefix = parts.slice(1).join('/') || undefined;
await this.subscribeToBucket(bucket, prefix);
} else if (sub.type === 'activity') {
await this.subscribeToActivity();
}
} catch (error) {
console.error(`[ChangeStream] Failed to restore subscription ${sub.key}:`, error);
}
}
}
// ===========================================
// MongoDB Subscriptions
// ===========================================
/**
* Subscribe to changes in a MongoDB collection
*/
public async subscribeToCollection(database: string, collection: string): Promise<boolean> {
if (!this.typedSocket || !this.isConnected) {
console.warn('[ChangeStream] Not connected, cannot subscribe');
return false;
}
const key = `${database}/${collection}`;
// Check if already subscribed
if (this.subscriptions.has(`mongo:${key}`)) {
return true;
}
try {
const request = this.typedSocket.createTypedRequest<any>('subscribeMongo');
const response = await request.fire({ database, collection });
if (response.success) {
this.subscriptions.set(`mongo:${key}`, {
type: 'mongo',
key,
subscriptionId: response.subscriptionId,
});
console.log(`[ChangeStream] Subscribed to MongoDB ${key}`);
return true;
}
return false;
} catch (error) {
console.error(`[ChangeStream] Failed to subscribe to MongoDB ${key}:`, error);
return false;
}
}
/**
* Unsubscribe from changes in a MongoDB collection
*/
public async unsubscribeFromCollection(database: string, collection: string): Promise<boolean> {
if (!this.typedSocket || !this.isConnected) {
return false;
}
const key = `${database}/${collection}`;
const subKey = `mongo:${key}`;
if (!this.subscriptions.has(subKey)) {
return true;
}
try {
const request = this.typedSocket.createTypedRequest<any>('unsubscribeMongo');
const response = await request.fire({ database, collection });
if (response.success) {
this.subscriptions.delete(subKey);
console.log(`[ChangeStream] Unsubscribed from MongoDB ${key}`);
}
return response.success;
} catch (error) {
console.error(`[ChangeStream] Failed to unsubscribe from MongoDB ${key}:`, error);
return false;
}
}
/**
* Check if subscribed to a MongoDB collection
*/
public isSubscribedToCollection(database: string, collection: string): boolean {
return this.subscriptions.has(`mongo:${database}/${collection}`);
}
// ===========================================
// S3 Subscriptions
// ===========================================
/**
* Subscribe to changes in an S3 bucket/prefix
*/
public async subscribeToBucket(bucket: string, prefix?: string): Promise<boolean> {
if (!this.typedSocket || !this.isConnected) {
console.warn('[ChangeStream] Not connected, cannot subscribe');
return false;
}
const key = prefix ? `${bucket}/${prefix}` : bucket;
// Check if already subscribed
if (this.subscriptions.has(`s3:${key}`)) {
return true;
}
try {
const request = this.typedSocket.createTypedRequest<any>('subscribeS3');
const response = await request.fire({ bucket, prefix });
if (response.success) {
this.subscriptions.set(`s3:${key}`, {
type: 's3',
key,
subscriptionId: response.subscriptionId,
});
console.log(`[ChangeStream] Subscribed to S3 ${key}`);
return true;
}
return false;
} catch (error) {
console.error(`[ChangeStream] Failed to subscribe to S3 ${key}:`, error);
return false;
}
}
/**
* Unsubscribe from changes in an S3 bucket/prefix
*/
public async unsubscribeFromBucket(bucket: string, prefix?: string): Promise<boolean> {
if (!this.typedSocket || !this.isConnected) {
return false;
}
const key = prefix ? `${bucket}/${prefix}` : bucket;
const subKey = `s3:${key}`;
if (!this.subscriptions.has(subKey)) {
return true;
}
try {
const request = this.typedSocket.createTypedRequest<any>('unsubscribeS3');
const response = await request.fire({ bucket, prefix });
if (response.success) {
this.subscriptions.delete(subKey);
console.log(`[ChangeStream] Unsubscribed from S3 ${key}`);
}
return response.success;
} catch (error) {
console.error(`[ChangeStream] Failed to unsubscribe from S3 ${key}:`, error);
return false;
}
}
/**
* Check if subscribed to an S3 bucket/prefix
*/
public isSubscribedToBucket(bucket: string, prefix?: string): boolean {
const key = prefix ? `${bucket}/${prefix}` : bucket;
return this.subscriptions.has(`s3:${key}`);
}
// ===========================================
// Activity Stream Subscriptions
// ===========================================
/**
* Subscribe to the activity stream
*/
public async subscribeToActivity(): Promise<boolean> {
if (!this.typedSocket || !this.isConnected) {
console.warn('[ChangeStream] Not connected, cannot subscribe');
return false;
}
// Check if already subscribed
if (this.subscriptions.has('activity:activity')) {
return true;
}
try {
const request = this.typedSocket.createTypedRequest<any>('subscribeActivity');
const response = await request.fire({});
if (response.success) {
this.subscriptions.set('activity:activity', {
type: 'activity',
key: 'activity',
subscriptionId: response.subscriptionId,
});
console.log('[ChangeStream] Subscribed to activity stream');
return true;
}
return false;
} catch (error) {
console.error('[ChangeStream] Failed to subscribe to activity stream:', error);
return false;
}
}
/**
* Unsubscribe from the activity stream
*/
public async unsubscribeFromActivity(): Promise<boolean> {
if (!this.typedSocket || !this.isConnected) {
return false;
}
if (!this.subscriptions.has('activity:activity')) {
return true;
}
try {
const request = this.typedSocket.createTypedRequest<any>('unsubscribeActivity');
const response = await request.fire({});
if (response.success) {
this.subscriptions.delete('activity:activity');
console.log('[ChangeStream] Unsubscribed from activity stream');
}
return response.success;
} catch (error) {
console.error('[ChangeStream] Failed to unsubscribe from activity stream:', error);
return false;
}
}
/**
* Get recent activity events
*/
public async getRecentActivity(limit: number = 100): Promise<IActivityEvent[]> {
if (!this.typedSocket || !this.isConnected) {
return [];
}
try {
const request = this.typedSocket.createTypedRequest<any>('getRecentActivity');
const response = await request.fire({ limit });
return response.events || [];
} catch (error) {
console.error('[ChangeStream] Failed to get recent activity:', error);
return [];
}
}
/**
* Check if subscribed to activity stream
*/
public isSubscribedToActivity(): boolean {
return this.subscriptions.has('activity:activity');
}
// ===========================================
// Observables for UI Components
// ===========================================
/**
* Get MongoDB changes as an Observable
*/
public getMongoChanges(): plugins.smartrx.rxjs.Observable<IMongoChangeEvent> {
return this.mongoChanges$.asObservable();
}
/**
* Get S3 changes as an Observable
*/
public getS3Changes(): plugins.smartrx.rxjs.Observable<IS3ChangeEvent> {
return this.s3Changes$.asObservable();
}
/**
* Get activity events as an Observable
*/
public getActivityStream(): plugins.smartrx.rxjs.Observable<IActivityEvent> {
return this.activityEvents$.asObservable();
}
/**
* Get filtered MongoDB changes for a specific collection
*/
public getCollectionChanges(database: string, collection: string): plugins.smartrx.rxjs.Observable<IMongoChangeEvent> {
return this.mongoChanges$.pipe(
plugins.smartrx.rxjs.ops.filter(
(event) => event.database === database && event.collection === collection
)
);
}
/**
* Get filtered S3 changes for a specific bucket/prefix
*/
public getBucketChanges(bucket: string, prefix?: string): plugins.smartrx.rxjs.Observable<IS3ChangeEvent> {
return this.s3Changes$.pipe(
plugins.smartrx.rxjs.ops.filter((event) => {
if (event.bucket !== bucket) return false;
if (prefix && !event.key.startsWith(prefix)) return false;
return true;
})
);
}
}