feat(streaming): add global activity watchers, client-side buffering, and improved real-time streaming UX
This commit is contained in:
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@git.zone/tsview',
|
||||
version: '1.7.0',
|
||||
version: '1.8.0',
|
||||
description: 'A CLI tool for viewing S3 and MongoDB data with a web UI'
|
||||
}
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
import * as plugins from '../plugins.js';
|
||||
import type * as interfaces from '../interfaces/index.js';
|
||||
import type { TsView } from '../tsview.classes.tsview.js';
|
||||
import type { ChangeStreamManager } from '../streaming/index.js';
|
||||
|
||||
/**
|
||||
* Register MongoDB API handlers
|
||||
*/
|
||||
export async function registerMongoHandlers(
|
||||
typedrouter: plugins.typedrequest.TypedRouter,
|
||||
tsview: TsView
|
||||
tsview: TsView,
|
||||
changeStreamManager?: ChangeStreamManager
|
||||
): Promise<void> {
|
||||
// Helper to get the native MongoDB client
|
||||
const getMongoClient = async () => {
|
||||
@@ -122,6 +124,14 @@ export async function registerMongoHandlers(
|
||||
const db = client.db(reqData.databaseName);
|
||||
// Create a placeholder collection to materialize the database
|
||||
await db.createCollection('_tsview_init');
|
||||
if (changeStreamManager) {
|
||||
changeStreamManager.emitMongoActivityEvent({
|
||||
type: 'insert',
|
||||
database: reqData.databaseName,
|
||||
collection: '_tsview_init',
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
return { success: true };
|
||||
} catch (err) {
|
||||
console.error('Error creating database:', err);
|
||||
@@ -140,6 +150,14 @@ export async function registerMongoHandlers(
|
||||
const client = await getMongoClient();
|
||||
const db = client.db(reqData.databaseName);
|
||||
await db.dropDatabase();
|
||||
if (changeStreamManager) {
|
||||
changeStreamManager.emitMongoActivityEvent({
|
||||
type: 'drop',
|
||||
database: reqData.databaseName,
|
||||
collection: '*',
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
return { success: true };
|
||||
} catch (err) {
|
||||
console.error('Error dropping database:', err);
|
||||
@@ -158,6 +176,14 @@ export async function registerMongoHandlers(
|
||||
const client = await getMongoClient();
|
||||
const db = client.db(reqData.databaseName);
|
||||
await db.createCollection(reqData.collectionName);
|
||||
if (changeStreamManager) {
|
||||
changeStreamManager.emitMongoActivityEvent({
|
||||
type: 'insert',
|
||||
database: reqData.databaseName,
|
||||
collection: reqData.collectionName,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
return { success: true };
|
||||
} catch (err) {
|
||||
console.error('Error creating collection:', err);
|
||||
@@ -176,6 +202,14 @@ export async function registerMongoHandlers(
|
||||
const client = await getMongoClient();
|
||||
const db = client.db(reqData.databaseName);
|
||||
await db.dropCollection(reqData.collectionName);
|
||||
if (changeStreamManager) {
|
||||
changeStreamManager.emitMongoActivityEvent({
|
||||
type: 'drop',
|
||||
database: reqData.databaseName,
|
||||
collection: reqData.collectionName,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
return { success: true };
|
||||
} catch (err) {
|
||||
console.error('Error dropping collection:', err);
|
||||
@@ -267,6 +301,16 @@ export async function registerMongoHandlers(
|
||||
|
||||
const result = await collection.insertOne(reqData.document);
|
||||
|
||||
if (changeStreamManager) {
|
||||
changeStreamManager.emitMongoActivityEvent({
|
||||
type: 'insert',
|
||||
database: reqData.databaseName,
|
||||
collection: reqData.collectionName,
|
||||
documentId: result.insertedId.toString(),
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
return { insertedId: result.insertedId.toString() };
|
||||
} catch (err) {
|
||||
console.error('Error inserting document:', err);
|
||||
@@ -294,6 +338,16 @@ export async function registerMongoHandlers(
|
||||
|
||||
const result = await collection.updateOne(filter, updateDoc);
|
||||
|
||||
if (changeStreamManager && (result.modifiedCount > 0 || result.matchedCount > 0)) {
|
||||
changeStreamManager.emitMongoActivityEvent({
|
||||
type: 'update',
|
||||
database: reqData.databaseName,
|
||||
collection: reqData.collectionName,
|
||||
documentId: reqData.documentId,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
success: result.modifiedCount > 0 || result.matchedCount > 0,
|
||||
modifiedCount: result.modifiedCount,
|
||||
@@ -319,6 +373,16 @@ export async function registerMongoHandlers(
|
||||
const filter = createIdFilter(reqData.documentId);
|
||||
const result = await collection.deleteOne(filter);
|
||||
|
||||
if (changeStreamManager && result.deletedCount > 0) {
|
||||
changeStreamManager.emitMongoActivityEvent({
|
||||
type: 'delete',
|
||||
database: reqData.databaseName,
|
||||
collection: reqData.collectionName,
|
||||
documentId: reqData.documentId,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
success: result.deletedCount > 0,
|
||||
deletedCount: result.deletedCount,
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -35,18 +35,18 @@ export class ViewServer {
|
||||
noCache: true,
|
||||
});
|
||||
|
||||
// Initialize ChangeStreamManager for real-time updates (before handlers so they can emit events)
|
||||
this.changeStreamManager = new ChangeStreamManager(this.tsview);
|
||||
|
||||
// Register API handlers directly to server's router
|
||||
if (this.tsview.config.hasS3()) {
|
||||
await registerS3Handlers(this.typedServer.typedrouter, this.tsview);
|
||||
}
|
||||
|
||||
if (this.tsview.config.hasMongo()) {
|
||||
await registerMongoHandlers(this.typedServer.typedrouter, this.tsview);
|
||||
await registerMongoHandlers(this.typedServer.typedrouter, this.tsview, this.changeStreamManager);
|
||||
}
|
||||
|
||||
// Initialize ChangeStreamManager for real-time updates
|
||||
this.changeStreamManager = new ChangeStreamManager(this.tsview);
|
||||
|
||||
// Register streaming handlers
|
||||
await this.registerStreamingHandlers();
|
||||
|
||||
@@ -192,18 +192,16 @@ export class ViewServer {
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract connection ID from request context
|
||||
* Extract connection ID from request context.
|
||||
* SmartServe attaches the WebSocket peer to `localData.peer` on each request.
|
||||
*/
|
||||
private getConnectionId(context: any): string | null {
|
||||
// Try to get connection ID from WebSocket context
|
||||
if (context?.socketConnection?.socketId) {
|
||||
return context.socketConnection.socketId;
|
||||
// The TypedTools instance carries localData from the transport layer.
|
||||
// SmartServe puts the IWebSocketPeer at localData.peer.
|
||||
if (context?.localData?.peer?.id) {
|
||||
return context.localData.peer.id;
|
||||
}
|
||||
if (context?.socketConnection?.alias) {
|
||||
return context.socketConnection.alias;
|
||||
}
|
||||
// Fallback: generate a unique ID for HTTP requests
|
||||
// Note: Real-time streaming requires WebSocket connection
|
||||
// HTTP requests don't have a peer — real-time streaming requires WebSocket.
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@@ -55,6 +55,11 @@ export class ChangeStreamManager {
|
||||
private activityBuffer: interfaces.IActivityEvent[] = [];
|
||||
private readonly ACTIVITY_BUFFER_SIZE = 1000;
|
||||
|
||||
// Global watchers for the activity stream (started lazily on first subscriber)
|
||||
private globalMongoWatcher: plugins.mongodb.ChangeStream | null = null;
|
||||
private globalS3Watchers: Map<string, plugins.smartbucket.BucketWatcher> = new Map();
|
||||
private globalWatchersActive: boolean = false;
|
||||
|
||||
// Counter for generating unique subscription IDs
|
||||
private subscriptionCounter = 0;
|
||||
|
||||
@@ -218,8 +223,11 @@ export class ChangeStreamManager {
|
||||
timestamp: new Date().toISOString(),
|
||||
};
|
||||
|
||||
// Add to activity buffer
|
||||
this.addToActivityBuffer('mongodb', event);
|
||||
// Only add to activity buffer if global watchers are NOT active.
|
||||
// When active, the global MongoDB watcher already feeds the activity stream.
|
||||
if (!this.globalWatchersActive) {
|
||||
this.addToActivityBuffer('mongodb', event);
|
||||
}
|
||||
|
||||
// Push to all subscribed clients
|
||||
this.pushMongoChangeToClients(key, event);
|
||||
@@ -239,7 +247,7 @@ export class ChangeStreamManager {
|
||||
try {
|
||||
// Find the connection and push the event
|
||||
const connection = await this.typedSocket.findTargetConnection(async (conn: any) => {
|
||||
return conn.alias === connectionId || conn.socketId === connectionId;
|
||||
return conn.peer?.id === connectionId;
|
||||
});
|
||||
|
||||
if (connection) {
|
||||
@@ -391,8 +399,11 @@ export class ChangeStreamManager {
|
||||
|
||||
if (!entry) return;
|
||||
|
||||
// Add to activity buffer
|
||||
this.addToActivityBuffer('s3', event);
|
||||
// Only add to activity buffer if global watchers are NOT active.
|
||||
// When active, the global S3 watchers already feed the activity stream.
|
||||
if (!this.globalWatchersActive) {
|
||||
this.addToActivityBuffer('s3', event);
|
||||
}
|
||||
|
||||
// Push to all subscribed clients
|
||||
this.pushS3ChangeToClients(key, event);
|
||||
@@ -411,7 +422,7 @@ export class ChangeStreamManager {
|
||||
for (const [connectionId, _sub] of entry.subscriptions) {
|
||||
try {
|
||||
const connection = await this.typedSocket.findTargetConnection(async (conn: any) => {
|
||||
return conn.alias === connectionId || conn.socketId === connectionId;
|
||||
return conn.peer?.id === connectionId;
|
||||
});
|
||||
|
||||
if (connection) {
|
||||
@@ -460,6 +471,12 @@ export class ChangeStreamManager {
|
||||
});
|
||||
|
||||
console.log(`[ChangeStream] Activity subscription added for connection ${connectionId}`);
|
||||
|
||||
// Start global watchers when the first activity subscriber connects
|
||||
if (this.activitySubscribers.size === 1) {
|
||||
await this.startGlobalWatchers();
|
||||
}
|
||||
|
||||
return { success: true, subscriptionId };
|
||||
}
|
||||
|
||||
@@ -470,6 +487,11 @@ export class ChangeStreamManager {
|
||||
const result = this.activitySubscribers.delete(connectionId);
|
||||
if (result) {
|
||||
console.log(`[ChangeStream] Activity subscription removed for connection ${connectionId}`);
|
||||
|
||||
// Stop global watchers when no activity subscribers remain
|
||||
if (this.activitySubscribers.size === 0) {
|
||||
await this.stopGlobalWatchers();
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
@@ -482,6 +504,13 @@ export class ChangeStreamManager {
|
||||
return this.activityBuffer.slice(-count);
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit a MongoDB activity event from an API handler (no change stream required).
|
||||
*/
|
||||
public emitMongoActivityEvent(event: interfaces.IMongoChangeEvent): void {
|
||||
this.addToActivityBuffer('mongodb', event);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an event to the activity buffer
|
||||
*/
|
||||
@@ -516,7 +545,7 @@ export class ChangeStreamManager {
|
||||
for (const [connectionId, _sub] of this.activitySubscribers) {
|
||||
try {
|
||||
const connection = await this.typedSocket.findTargetConnection(async (conn: any) => {
|
||||
return conn.alias === connectionId || conn.socketId === connectionId;
|
||||
return conn.peer?.id === connectionId;
|
||||
});
|
||||
|
||||
if (connection) {
|
||||
@@ -532,6 +561,154 @@ export class ChangeStreamManager {
|
||||
}
|
||||
}
|
||||
|
||||
// ===========================================
|
||||
// Global Watchers for Activity Stream
|
||||
// ===========================================
|
||||
|
||||
/**
|
||||
* Start global watchers when the first activity subscriber connects.
|
||||
* These watch all MongoDB and S3 activity and feed into the activity buffer.
|
||||
*/
|
||||
private async startGlobalWatchers(): Promise<void> {
|
||||
if (this.globalWatchersActive) return;
|
||||
this.globalWatchersActive = true;
|
||||
|
||||
console.log('[ChangeStream] Starting global watchers for activity stream...');
|
||||
|
||||
await Promise.all([
|
||||
this.startGlobalMongoWatcher(),
|
||||
this.startGlobalS3Watchers(),
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a deployment-level MongoDB change stream that watches ALL databases/collections.
|
||||
*/
|
||||
private async startGlobalMongoWatcher(): Promise<void> {
|
||||
try {
|
||||
const db = await this.tsview.getMongoDb();
|
||||
if (!db) {
|
||||
console.log('[ChangeStream] MongoDB not configured, skipping global MongoDB watcher');
|
||||
return;
|
||||
}
|
||||
|
||||
const client = (db as any).mongoDbClient as plugins.mongodb.MongoClient;
|
||||
|
||||
// Deployment-level watch — one stream for everything
|
||||
const changeStream = client.watch([], {
|
||||
fullDocument: 'updateLookup',
|
||||
});
|
||||
|
||||
changeStream.on('change', (change: any) => {
|
||||
const database = change.ns?.db || 'unknown';
|
||||
const collection = change.ns?.coll || 'unknown';
|
||||
|
||||
const event: interfaces.IMongoChangeEvent = {
|
||||
type: change.operationType as interfaces.IMongoChangeEvent['type'],
|
||||
database,
|
||||
collection,
|
||||
documentId: change.documentKey?._id?.toString(),
|
||||
document: change.fullDocument,
|
||||
updateDescription: change.updateDescription,
|
||||
timestamp: new Date().toISOString(),
|
||||
};
|
||||
|
||||
this.addToActivityBuffer('mongodb', event);
|
||||
});
|
||||
|
||||
changeStream.on('error', (error: Error) => {
|
||||
console.error('[ChangeStream] Global MongoDB watcher error:', error);
|
||||
});
|
||||
|
||||
this.globalMongoWatcher = changeStream;
|
||||
console.log('[ChangeStream] Global MongoDB watcher started');
|
||||
} catch (error) {
|
||||
console.warn('[ChangeStream] MongoDB change streams unavailable (requires replica set). MongoDB activity events will come from API operations only.');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start S3 bucket watchers — one BucketWatcher per bucket.
|
||||
*/
|
||||
private async startGlobalS3Watchers(): Promise<void> {
|
||||
try {
|
||||
const smartbucket = await this.tsview.getSmartBucket();
|
||||
if (!smartbucket) {
|
||||
console.log('[ChangeStream] S3 not configured, skipping global S3 watchers');
|
||||
return;
|
||||
}
|
||||
|
||||
// List all buckets
|
||||
const command = new plugins.s3.ListBucketsCommand({});
|
||||
const response = await smartbucket.s3Client.send(command) as plugins.s3.ListBucketsCommandOutput;
|
||||
const bucketNames = response.Buckets?.map(b => b.Name).filter((name): name is string => !!name) || [];
|
||||
|
||||
for (const bucketName of bucketNames) {
|
||||
try {
|
||||
const bucketInstance = await smartbucket.getBucketByName(bucketName);
|
||||
const watcher = bucketInstance.createWatcher({
|
||||
prefix: '',
|
||||
pollIntervalMs: 5000,
|
||||
bufferTimeMs: 500,
|
||||
});
|
||||
|
||||
watcher.changeSubject.subscribe((eventOrEvents: IS3ChangeEvent | IS3ChangeEvent[]) => {
|
||||
const events = Array.isArray(eventOrEvents) ? eventOrEvents : [eventOrEvents];
|
||||
for (const event of events) {
|
||||
this.addToActivityBuffer('s3', event);
|
||||
}
|
||||
});
|
||||
|
||||
await watcher.start();
|
||||
await watcher.readyDeferred.promise;
|
||||
|
||||
this.globalS3Watchers.set(bucketName, watcher);
|
||||
console.log(`[ChangeStream] Global S3 watcher started for bucket: ${bucketName}`);
|
||||
} catch (bucketError) {
|
||||
console.error(`[ChangeStream] Failed to start global S3 watcher for bucket ${bucketName}:`, bucketError);
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`[ChangeStream] Global S3 watchers started (${this.globalS3Watchers.size}/${bucketNames.length} buckets)`);
|
||||
} catch (error) {
|
||||
console.error('[ChangeStream] Failed to start global S3 watchers:', error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop all global watchers when no activity subscribers remain.
|
||||
*/
|
||||
private async stopGlobalWatchers(): Promise<void> {
|
||||
if (!this.globalWatchersActive) return;
|
||||
|
||||
console.log('[ChangeStream] Stopping global watchers...');
|
||||
|
||||
// Close global MongoDB watcher
|
||||
if (this.globalMongoWatcher) {
|
||||
try {
|
||||
await this.globalMongoWatcher.close();
|
||||
console.log('[ChangeStream] Global MongoDB watcher stopped');
|
||||
} catch (error) {
|
||||
console.error('[ChangeStream] Error closing global MongoDB watcher:', error);
|
||||
}
|
||||
this.globalMongoWatcher = null;
|
||||
}
|
||||
|
||||
// Close all global S3 watchers
|
||||
for (const [bucketName, watcher] of this.globalS3Watchers) {
|
||||
try {
|
||||
await watcher.stop();
|
||||
console.log(`[ChangeStream] Global S3 watcher stopped for bucket: ${bucketName}`);
|
||||
} catch (error) {
|
||||
console.error(`[ChangeStream] Error closing global S3 watcher for ${bucketName}:`, error);
|
||||
}
|
||||
}
|
||||
this.globalS3Watchers.clear();
|
||||
|
||||
this.globalWatchersActive = false;
|
||||
console.log('[ChangeStream] Global watchers stopped');
|
||||
}
|
||||
|
||||
// ===========================================
|
||||
// Connection Management
|
||||
// ===========================================
|
||||
@@ -564,6 +741,11 @@ export class ChangeStreamManager {
|
||||
|
||||
// Clean up activity subscription
|
||||
this.activitySubscribers.delete(connectionId);
|
||||
|
||||
// Stop global watchers if no activity subscribers remain
|
||||
if (this.activitySubscribers.size === 0) {
|
||||
await this.stopGlobalWatchers();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -572,6 +754,9 @@ export class ChangeStreamManager {
|
||||
public async stop(): Promise<void> {
|
||||
console.log('[ChangeStream] Stopping all watchers...');
|
||||
|
||||
// Stop global watchers first
|
||||
await this.stopGlobalWatchers();
|
||||
|
||||
// Close all MongoDB watchers
|
||||
for (const key of this.mongoWatchers.keys()) {
|
||||
await this.closeMongoWatcher(key);
|
||||
|
||||
Reference in New Issue
Block a user