592 lines
17 KiB
TypeScript
592 lines
17 KiB
TypeScript
import * as plugins from '../plugins.js';
|
|
import type { TsView } from '../tsview.classes.tsview.js';
|
|
import type * as interfaces from './interfaces.streaming.js';
|
|
import type { IS3ChangeEvent } from '@push.rocks/smartbucket';
|
|
|
|
/**
|
|
* Subscription entry tracking a client's subscription to a resource
|
|
*/
|
|
interface ISubscriptionEntry {
|
|
connectionId: string;
|
|
subscriptionId: string;
|
|
createdAt: Date;
|
|
}
|
|
|
|
/**
|
|
* MongoDB watcher entry
|
|
*/
|
|
interface IMongoWatcherEntry {
|
|
watcher: plugins.mongodb.ChangeStream;
|
|
subscriptions: Map<string, ISubscriptionEntry>; // connectionId -> subscription
|
|
}
|
|
|
|
/**
|
|
* S3 watcher entry
|
|
*/
|
|
interface IS3WatcherEntry {
|
|
watcher: plugins.smartbucket.BucketWatcher;
|
|
subscriptions: Map<string, ISubscriptionEntry>; // connectionId -> subscription
|
|
}
|
|
|
|
/**
|
|
* ChangeStreamManager manages real-time change streaming for both MongoDB and S3.
|
|
*
|
|
* Features:
|
|
* - MongoDB Change Streams for real-time database updates
|
|
* - S3 BucketWatcher for polling-based S3 change detection
|
|
* - Subscription management per WebSocket client
|
|
* - Activity stream with ring buffer for recent events
|
|
* - Automatic cleanup on client disconnect
|
|
*/
|
|
export class ChangeStreamManager {
|
|
private tsview: TsView;
|
|
private typedSocket: plugins.typedserver.TypedServer['typedsocket'] | null = null;
|
|
|
|
// MongoDB watchers: "db/collection" -> watcher entry
|
|
private mongoWatchers: Map<string, IMongoWatcherEntry> = new Map();
|
|
|
|
// S3 watchers: "bucket/prefix" -> watcher entry
|
|
private s3Watchers: Map<string, IS3WatcherEntry> = new Map();
|
|
|
|
// Activity subscribers: connectionId -> subscription entry
|
|
private activitySubscribers: Map<string, ISubscriptionEntry> = new Map();
|
|
|
|
// Activity ring buffer (max 1000 events)
|
|
private activityBuffer: interfaces.IActivityEvent[] = [];
|
|
private readonly ACTIVITY_BUFFER_SIZE = 1000;
|
|
|
|
// Counter for generating unique subscription IDs
|
|
private subscriptionCounter = 0;
|
|
|
|
constructor(tsview: TsView) {
|
|
this.tsview = tsview;
|
|
}
|
|
|
|
/**
|
|
* Initialize the manager with a TypedSocket instance
|
|
*/
|
|
public setTypedSocket(typedSocket: plugins.typedserver.TypedServer['typedsocket']): void {
|
|
this.typedSocket = typedSocket;
|
|
}
|
|
|
|
/**
|
|
* Generate a unique subscription ID
|
|
*/
|
|
private generateSubscriptionId(): string {
|
|
return `sub_${Date.now()}_${++this.subscriptionCounter}`;
|
|
}
|
|
|
|
/**
|
|
* Get the MongoDB key for a database/collection pair
|
|
*/
|
|
private getMongoKey(database: string, collection: string): string {
|
|
return `${database}/${collection}`;
|
|
}
|
|
|
|
/**
|
|
* Get the S3 key for a bucket/prefix pair
|
|
*/
|
|
private getS3Key(bucket: string, prefix?: string): string {
|
|
return prefix ? `${bucket}/${prefix}` : bucket;
|
|
}
|
|
|
|
// ===========================================
|
|
// MongoDB Change Streams
|
|
// ===========================================
|
|
|
|
/**
|
|
* Subscribe a client to MongoDB collection changes
|
|
*/
|
|
public async subscribeToMongo(
|
|
connectionId: string,
|
|
database: string,
|
|
collection: string
|
|
): Promise<{ success: boolean; subscriptionId: string }> {
|
|
const key = this.getMongoKey(database, collection);
|
|
|
|
let entry = this.mongoWatchers.get(key);
|
|
|
|
// Create watcher if it doesn't exist
|
|
if (!entry) {
|
|
const watcher = await this.createMongoWatcher(database, collection);
|
|
if (!watcher) {
|
|
return { success: false, subscriptionId: '' };
|
|
}
|
|
|
|
entry = {
|
|
watcher,
|
|
subscriptions: new Map(),
|
|
};
|
|
this.mongoWatchers.set(key, entry);
|
|
}
|
|
|
|
// Add subscription
|
|
const subscriptionId = this.generateSubscriptionId();
|
|
entry.subscriptions.set(connectionId, {
|
|
connectionId,
|
|
subscriptionId,
|
|
createdAt: new Date(),
|
|
});
|
|
|
|
console.log(`[ChangeStream] MongoDB subscription added: ${key} for connection ${connectionId}`);
|
|
return { success: true, subscriptionId };
|
|
}
|
|
|
|
/**
|
|
* Unsubscribe a client from MongoDB collection changes
|
|
*/
|
|
public async unsubscribeFromMongo(
|
|
connectionId: string,
|
|
database: string,
|
|
collection: string
|
|
): Promise<boolean> {
|
|
const key = this.getMongoKey(database, collection);
|
|
const entry = this.mongoWatchers.get(key);
|
|
|
|
if (!entry) {
|
|
return false;
|
|
}
|
|
|
|
entry.subscriptions.delete(connectionId);
|
|
console.log(`[ChangeStream] MongoDB subscription removed: ${key} for connection ${connectionId}`);
|
|
|
|
// Close watcher if no more subscribers
|
|
if (entry.subscriptions.size === 0) {
|
|
await this.closeMongoWatcher(key);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Create a MongoDB change stream for a collection
|
|
*/
|
|
private async createMongoWatcher(
|
|
database: string,
|
|
collection: string
|
|
): Promise<plugins.mongodb.ChangeStream | null> {
|
|
try {
|
|
const db = await this.tsview.getMongoDb();
|
|
if (!db) {
|
|
console.error('[ChangeStream] MongoDB not configured');
|
|
return null;
|
|
}
|
|
|
|
const client = (db as any).mongoDbClient;
|
|
const mongoDb = client.db(database);
|
|
const mongoCollection = mongoDb.collection(collection);
|
|
|
|
// Create change stream
|
|
const changeStream = mongoCollection.watch([], {
|
|
fullDocument: 'updateLookup',
|
|
});
|
|
|
|
// Handle change events
|
|
changeStream.on('change', (change: any) => {
|
|
this.handleMongoChange(database, collection, change);
|
|
});
|
|
|
|
changeStream.on('error', (error: Error) => {
|
|
console.error(`[ChangeStream] MongoDB error for ${database}/${collection}:`, error);
|
|
});
|
|
|
|
console.log(`[ChangeStream] MongoDB watcher created for ${database}/${collection}`);
|
|
return changeStream;
|
|
} catch (error) {
|
|
console.error(`[ChangeStream] Failed to create MongoDB watcher for ${database}/${collection}:`, error);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle a MongoDB change event
|
|
*/
|
|
private handleMongoChange(database: string, collection: string, change: any): void {
|
|
const key = this.getMongoKey(database, collection);
|
|
const entry = this.mongoWatchers.get(key);
|
|
|
|
if (!entry) return;
|
|
|
|
// Convert MongoDB change event to our interface
|
|
const event: interfaces.IMongoChangeEvent = {
|
|
type: change.operationType as interfaces.IMongoChangeEvent['type'],
|
|
database,
|
|
collection,
|
|
documentId: change.documentKey?._id?.toString(),
|
|
document: change.fullDocument,
|
|
updateDescription: change.updateDescription,
|
|
timestamp: new Date().toISOString(),
|
|
};
|
|
|
|
// Add to activity buffer
|
|
this.addToActivityBuffer('mongodb', event);
|
|
|
|
// Push to all subscribed clients
|
|
this.pushMongoChangeToClients(key, event);
|
|
}
|
|
|
|
/**
|
|
* Push MongoDB change to subscribed clients
|
|
*/
|
|
private async pushMongoChangeToClients(
|
|
key: string,
|
|
event: interfaces.IMongoChangeEvent
|
|
): Promise<void> {
|
|
const entry = this.mongoWatchers.get(key);
|
|
if (!entry || !this.typedSocket) return;
|
|
|
|
for (const [connectionId, _sub] of entry.subscriptions) {
|
|
try {
|
|
// Find the connection and push the event
|
|
const connection = await this.typedSocket.findTargetConnection(async (conn: any) => {
|
|
return conn.alias === connectionId || conn.socketId === connectionId;
|
|
});
|
|
|
|
if (connection) {
|
|
const request = this.typedSocket.createTypedRequest<interfaces.IReq_PushMongoChange>(
|
|
'pushMongoChange',
|
|
connection
|
|
);
|
|
await request.fire({ event });
|
|
}
|
|
} catch (error) {
|
|
console.error(`[ChangeStream] Failed to push MongoDB change to ${connectionId}:`, error);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Close a MongoDB change stream
|
|
*/
|
|
private async closeMongoWatcher(key: string): Promise<void> {
|
|
const entry = this.mongoWatchers.get(key);
|
|
if (!entry) return;
|
|
|
|
try {
|
|
await entry.watcher.close();
|
|
this.mongoWatchers.delete(key);
|
|
console.log(`[ChangeStream] MongoDB watcher closed for ${key}`);
|
|
} catch (error) {
|
|
console.error(`[ChangeStream] Error closing MongoDB watcher for ${key}:`, error);
|
|
}
|
|
}
|
|
|
|
// ===========================================
|
|
// S3 Change Watching
|
|
// ===========================================
|
|
|
|
/**
|
|
* Subscribe a client to S3 bucket/prefix changes
|
|
*/
|
|
public async subscribeToS3(
|
|
connectionId: string,
|
|
bucket: string,
|
|
prefix?: string
|
|
): Promise<{ success: boolean; subscriptionId: string }> {
|
|
const key = this.getS3Key(bucket, prefix);
|
|
|
|
let entry = this.s3Watchers.get(key);
|
|
|
|
// Create watcher if it doesn't exist
|
|
if (!entry) {
|
|
const watcher = await this.createS3Watcher(bucket, prefix);
|
|
if (!watcher) {
|
|
return { success: false, subscriptionId: '' };
|
|
}
|
|
|
|
entry = {
|
|
watcher,
|
|
subscriptions: new Map(),
|
|
};
|
|
this.s3Watchers.set(key, entry);
|
|
}
|
|
|
|
// Add subscription
|
|
const subscriptionId = this.generateSubscriptionId();
|
|
entry.subscriptions.set(connectionId, {
|
|
connectionId,
|
|
subscriptionId,
|
|
createdAt: new Date(),
|
|
});
|
|
|
|
console.log(`[ChangeStream] S3 subscription added: ${key} for connection ${connectionId}`);
|
|
return { success: true, subscriptionId };
|
|
}
|
|
|
|
/**
|
|
* Unsubscribe a client from S3 bucket/prefix changes
|
|
*/
|
|
public async unsubscribeFromS3(
|
|
connectionId: string,
|
|
bucket: string,
|
|
prefix?: string
|
|
): Promise<boolean> {
|
|
const key = this.getS3Key(bucket, prefix);
|
|
const entry = this.s3Watchers.get(key);
|
|
|
|
if (!entry) {
|
|
return false;
|
|
}
|
|
|
|
entry.subscriptions.delete(connectionId);
|
|
console.log(`[ChangeStream] S3 subscription removed: ${key} for connection ${connectionId}`);
|
|
|
|
// Close watcher if no more subscribers
|
|
if (entry.subscriptions.size === 0) {
|
|
await this.closeS3Watcher(key);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Create an S3 bucket watcher
|
|
*/
|
|
private async createS3Watcher(
|
|
bucket: string,
|
|
prefix?: string
|
|
): Promise<plugins.smartbucket.BucketWatcher | null> {
|
|
try {
|
|
const smartbucket = await this.tsview.getSmartBucket();
|
|
if (!smartbucket) {
|
|
console.error('[ChangeStream] S3 not configured');
|
|
return null;
|
|
}
|
|
|
|
const bucketInstance = await smartbucket.getBucketByName(bucket);
|
|
|
|
// Create watcher using smartbucket's BucketWatcher
|
|
const watcher = bucketInstance.createWatcher({
|
|
prefix: prefix || '',
|
|
pollIntervalMs: 5000,
|
|
bufferTimeMs: 500,
|
|
});
|
|
|
|
// Subscribe to change events
|
|
watcher.changeSubject.subscribe((eventOrEvents: IS3ChangeEvent | IS3ChangeEvent[]) => {
|
|
const events = Array.isArray(eventOrEvents) ? eventOrEvents : [eventOrEvents];
|
|
for (const event of events) {
|
|
this.handleS3Change(bucket, prefix, event);
|
|
}
|
|
});
|
|
|
|
// Start the watcher
|
|
await watcher.start();
|
|
await watcher.readyDeferred.promise;
|
|
|
|
console.log(`[ChangeStream] S3 watcher created for ${bucket}${prefix ? '/' + prefix : ''}`);
|
|
return watcher;
|
|
} catch (error) {
|
|
console.error(`[ChangeStream] Failed to create S3 watcher for ${bucket}:`, error);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle an S3 change event
|
|
*/
|
|
private handleS3Change(bucket: string, prefix: string | undefined, event: IS3ChangeEvent): void {
|
|
const key = this.getS3Key(bucket, prefix);
|
|
const entry = this.s3Watchers.get(key);
|
|
|
|
if (!entry) return;
|
|
|
|
// Add to activity buffer
|
|
this.addToActivityBuffer('s3', event);
|
|
|
|
// Push to all subscribed clients
|
|
this.pushS3ChangeToClients(key, event);
|
|
}
|
|
|
|
/**
|
|
* Push S3 change to subscribed clients
|
|
*/
|
|
private async pushS3ChangeToClients(
|
|
key: string,
|
|
event: IS3ChangeEvent
|
|
): Promise<void> {
|
|
const entry = this.s3Watchers.get(key);
|
|
if (!entry || !this.typedSocket) return;
|
|
|
|
for (const [connectionId, _sub] of entry.subscriptions) {
|
|
try {
|
|
const connection = await this.typedSocket.findTargetConnection(async (conn: any) => {
|
|
return conn.alias === connectionId || conn.socketId === connectionId;
|
|
});
|
|
|
|
if (connection) {
|
|
const request = this.typedSocket.createTypedRequest<interfaces.IReq_PushS3Change>(
|
|
'pushS3Change',
|
|
connection
|
|
);
|
|
await request.fire({ event });
|
|
}
|
|
} catch (error) {
|
|
console.error(`[ChangeStream] Failed to push S3 change to ${connectionId}:`, error);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Close an S3 bucket watcher
|
|
*/
|
|
private async closeS3Watcher(key: string): Promise<void> {
|
|
const entry = this.s3Watchers.get(key);
|
|
if (!entry) return;
|
|
|
|
try {
|
|
await entry.watcher.stop();
|
|
this.s3Watchers.delete(key);
|
|
console.log(`[ChangeStream] S3 watcher closed for ${key}`);
|
|
} catch (error) {
|
|
console.error(`[ChangeStream] Error closing S3 watcher for ${key}:`, error);
|
|
}
|
|
}
|
|
|
|
// ===========================================
|
|
// Activity Stream
|
|
// ===========================================
|
|
|
|
/**
|
|
* Subscribe a client to the activity stream
|
|
*/
|
|
public async subscribeToActivity(connectionId: string): Promise<{ success: boolean; subscriptionId: string }> {
|
|
const subscriptionId = this.generateSubscriptionId();
|
|
|
|
this.activitySubscribers.set(connectionId, {
|
|
connectionId,
|
|
subscriptionId,
|
|
createdAt: new Date(),
|
|
});
|
|
|
|
console.log(`[ChangeStream] Activity subscription added for connection ${connectionId}`);
|
|
return { success: true, subscriptionId };
|
|
}
|
|
|
|
/**
|
|
* Unsubscribe a client from the activity stream
|
|
*/
|
|
public async unsubscribeFromActivity(connectionId: string): Promise<boolean> {
|
|
const result = this.activitySubscribers.delete(connectionId);
|
|
if (result) {
|
|
console.log(`[ChangeStream] Activity subscription removed for connection ${connectionId}`);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Get recent activity events
|
|
*/
|
|
public getRecentActivity(limit: number = 100): interfaces.IActivityEvent[] {
|
|
const count = Math.min(limit, this.activityBuffer.length);
|
|
return this.activityBuffer.slice(-count);
|
|
}
|
|
|
|
/**
|
|
* Add an event to the activity buffer
|
|
*/
|
|
private addToActivityBuffer(
|
|
source: 'mongodb' | 's3',
|
|
event: interfaces.IMongoChangeEvent | IS3ChangeEvent
|
|
): void {
|
|
const activityEvent: interfaces.IActivityEvent = {
|
|
id: `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
|
|
source,
|
|
event,
|
|
timestamp: new Date().toISOString(),
|
|
};
|
|
|
|
this.activityBuffer.push(activityEvent);
|
|
|
|
// Trim buffer if it exceeds max size
|
|
if (this.activityBuffer.length > this.ACTIVITY_BUFFER_SIZE) {
|
|
this.activityBuffer = this.activityBuffer.slice(-this.ACTIVITY_BUFFER_SIZE);
|
|
}
|
|
|
|
// Push to activity subscribers
|
|
this.pushActivityToClients(activityEvent);
|
|
}
|
|
|
|
/**
|
|
* Push activity event to subscribed clients
|
|
*/
|
|
private async pushActivityToClients(event: interfaces.IActivityEvent): Promise<void> {
|
|
if (!this.typedSocket || this.activitySubscribers.size === 0) return;
|
|
|
|
for (const [connectionId, _sub] of this.activitySubscribers) {
|
|
try {
|
|
const connection = await this.typedSocket.findTargetConnection(async (conn: any) => {
|
|
return conn.alias === connectionId || conn.socketId === connectionId;
|
|
});
|
|
|
|
if (connection) {
|
|
const request = this.typedSocket.createTypedRequest<interfaces.IReq_PushActivityEvent>(
|
|
'pushActivityEvent',
|
|
connection
|
|
);
|
|
await request.fire({ event });
|
|
}
|
|
} catch (error) {
|
|
console.error(`[ChangeStream] Failed to push activity to ${connectionId}:`, error);
|
|
}
|
|
}
|
|
}
|
|
|
|
// ===========================================
|
|
// Connection Management
|
|
// ===========================================
|
|
|
|
/**
|
|
* Handle client disconnect - clean up all subscriptions
|
|
*/
|
|
public async handleDisconnect(connectionId: string): Promise<void> {
|
|
console.log(`[ChangeStream] Cleaning up subscriptions for disconnected connection ${connectionId}`);
|
|
|
|
// Clean up MongoDB subscriptions
|
|
for (const [key, entry] of this.mongoWatchers) {
|
|
if (entry.subscriptions.has(connectionId)) {
|
|
entry.subscriptions.delete(connectionId);
|
|
if (entry.subscriptions.size === 0) {
|
|
await this.closeMongoWatcher(key);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Clean up S3 subscriptions
|
|
for (const [key, entry] of this.s3Watchers) {
|
|
if (entry.subscriptions.has(connectionId)) {
|
|
entry.subscriptions.delete(connectionId);
|
|
if (entry.subscriptions.size === 0) {
|
|
await this.closeS3Watcher(key);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Clean up activity subscription
|
|
this.activitySubscribers.delete(connectionId);
|
|
}
|
|
|
|
/**
|
|
* Stop all watchers and clean up
|
|
*/
|
|
public async stop(): Promise<void> {
|
|
console.log('[ChangeStream] Stopping all watchers...');
|
|
|
|
// Close all MongoDB watchers
|
|
for (const key of this.mongoWatchers.keys()) {
|
|
await this.closeMongoWatcher(key);
|
|
}
|
|
|
|
// Close all S3 watchers
|
|
for (const key of this.s3Watchers.keys()) {
|
|
await this.closeS3Watcher(key);
|
|
}
|
|
|
|
// Clear activity buffer and subscribers
|
|
this.activityBuffer = [];
|
|
this.activitySubscribers.clear();
|
|
|
|
console.log('[ChangeStream] All watchers stopped');
|
|
}
|
|
}
|