226 lines
6.7 KiB
TypeScript
226 lines
6.7 KiB
TypeScript
import * as plugins from '../plugins.js';
|
|
import type { TsView } from '../tsview.classes.tsview.js';
|
|
import type * as interfaces from '../interfaces/index.js';
|
|
import { registerS3Handlers } from '../api/handlers.s3.js';
|
|
import { registerMongoHandlers } from '../api/handlers.mongodb.js';
|
|
import { ChangeStreamManager } from '../streaming/index.js';
|
|
import { files as bundledUiFiles } from '../bundled_ui.js';
|
|
|
|
/**
|
|
* Web server for TsView that serves the bundled UI and API endpoints.
|
|
*/
|
|
export class ViewServer {
|
|
private tsview: TsView;
|
|
private port: number;
|
|
private typedServer: plugins.typedserver.TypedServer | null = null;
|
|
private changeStreamManager: ChangeStreamManager | null = null;
|
|
public typedrouter: plugins.typedrequest.TypedRouter;
|
|
|
|
constructor(tsview: TsView, port: number) {
|
|
this.tsview = tsview;
|
|
this.port = port;
|
|
this.typedrouter = new plugins.typedrequest.TypedRouter();
|
|
}
|
|
|
|
/**
|
|
* Start the server
|
|
*/
|
|
public async start(): Promise<void> {
|
|
// Create typed server with bundled content
|
|
this.typedServer = new plugins.typedserver.TypedServer({
|
|
cors: true,
|
|
port: this.port,
|
|
bundledContent: bundledUiFiles,
|
|
spaFallback: true,
|
|
noCache: true,
|
|
});
|
|
|
|
// Register API handlers directly to server's router
|
|
if (this.tsview.config.hasS3()) {
|
|
await registerS3Handlers(this.typedServer.typedrouter, this.tsview);
|
|
}
|
|
|
|
if (this.tsview.config.hasMongo()) {
|
|
await registerMongoHandlers(this.typedServer.typedrouter, this.tsview);
|
|
}
|
|
|
|
// Initialize ChangeStreamManager for real-time updates
|
|
this.changeStreamManager = new ChangeStreamManager(this.tsview);
|
|
|
|
// Register streaming handlers
|
|
await this.registerStreamingHandlers();
|
|
|
|
// Start server
|
|
await this.typedServer.start();
|
|
|
|
// Set TypedSocket reference after server starts
|
|
if (this.typedServer.typedsocket) {
|
|
this.changeStreamManager.setTypedSocket(this.typedServer.typedsocket);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Register WebSocket streaming handlers
|
|
*/
|
|
private async registerStreamingHandlers(): Promise<void> {
|
|
if (!this.typedServer || !this.changeStreamManager) return;
|
|
|
|
const typedrouter = this.typedServer.typedrouter;
|
|
|
|
// Subscribe to MongoDB collection changes
|
|
typedrouter.addTypedHandler(
|
|
new plugins.typedrequest.TypedHandler<interfaces.IReq_SubscribeMongo>(
|
|
'subscribeMongo',
|
|
async (reqData, context) => {
|
|
const connectionId = this.getConnectionId(context);
|
|
if (!connectionId) {
|
|
return { success: false, subscriptionId: '' };
|
|
}
|
|
|
|
const result = await this.changeStreamManager!.subscribeToMongo(
|
|
connectionId,
|
|
reqData.database,
|
|
reqData.collection
|
|
);
|
|
return result;
|
|
}
|
|
)
|
|
);
|
|
|
|
// Unsubscribe from MongoDB collection changes
|
|
typedrouter.addTypedHandler(
|
|
new plugins.typedrequest.TypedHandler<interfaces.IReq_UnsubscribeMongo>(
|
|
'unsubscribeMongo',
|
|
async (reqData, context) => {
|
|
const connectionId = this.getConnectionId(context);
|
|
if (!connectionId) {
|
|
return { success: false };
|
|
}
|
|
|
|
const success = await this.changeStreamManager!.unsubscribeFromMongo(
|
|
connectionId,
|
|
reqData.database,
|
|
reqData.collection
|
|
);
|
|
return { success };
|
|
}
|
|
)
|
|
);
|
|
|
|
// Subscribe to S3 bucket changes
|
|
typedrouter.addTypedHandler(
|
|
new plugins.typedrequest.TypedHandler<interfaces.IReq_SubscribeS3>(
|
|
'subscribeS3',
|
|
async (reqData, context) => {
|
|
const connectionId = this.getConnectionId(context);
|
|
if (!connectionId) {
|
|
return { success: false, subscriptionId: '' };
|
|
}
|
|
|
|
const result = await this.changeStreamManager!.subscribeToS3(
|
|
connectionId,
|
|
reqData.bucket,
|
|
reqData.prefix
|
|
);
|
|
return result;
|
|
}
|
|
)
|
|
);
|
|
|
|
// Unsubscribe from S3 bucket changes
|
|
typedrouter.addTypedHandler(
|
|
new plugins.typedrequest.TypedHandler<interfaces.IReq_UnsubscribeS3>(
|
|
'unsubscribeS3',
|
|
async (reqData, context) => {
|
|
const connectionId = this.getConnectionId(context);
|
|
if (!connectionId) {
|
|
return { success: false };
|
|
}
|
|
|
|
const success = await this.changeStreamManager!.unsubscribeFromS3(
|
|
connectionId,
|
|
reqData.bucket,
|
|
reqData.prefix
|
|
);
|
|
return { success };
|
|
}
|
|
)
|
|
);
|
|
|
|
// Subscribe to activity stream
|
|
typedrouter.addTypedHandler(
|
|
new plugins.typedrequest.TypedHandler<interfaces.IReq_SubscribeActivity>(
|
|
'subscribeActivity',
|
|
async (reqData, context) => {
|
|
const connectionId = this.getConnectionId(context);
|
|
if (!connectionId) {
|
|
return { success: false, subscriptionId: '' };
|
|
}
|
|
|
|
const result = await this.changeStreamManager!.subscribeToActivity(connectionId);
|
|
return result;
|
|
}
|
|
)
|
|
);
|
|
|
|
// Unsubscribe from activity stream
|
|
typedrouter.addTypedHandler(
|
|
new plugins.typedrequest.TypedHandler<interfaces.IReq_UnsubscribeActivity>(
|
|
'unsubscribeActivity',
|
|
async (reqData, context) => {
|
|
const connectionId = this.getConnectionId(context);
|
|
if (!connectionId) {
|
|
return { success: false };
|
|
}
|
|
|
|
const success = await this.changeStreamManager!.unsubscribeFromActivity(connectionId);
|
|
return { success };
|
|
}
|
|
)
|
|
);
|
|
|
|
// Get recent activity events
|
|
typedrouter.addTypedHandler(
|
|
new plugins.typedrequest.TypedHandler<interfaces.IReq_GetRecentActivity>(
|
|
'getRecentActivity',
|
|
async (reqData) => {
|
|
const events = this.changeStreamManager!.getRecentActivity(reqData.limit || 100);
|
|
return { events };
|
|
}
|
|
)
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Extract connection ID from request context
|
|
*/
|
|
private getConnectionId(context: any): string | null {
|
|
// Try to get connection ID from WebSocket context
|
|
if (context?.socketConnection?.socketId) {
|
|
return context.socketConnection.socketId;
|
|
}
|
|
if (context?.socketConnection?.alias) {
|
|
return context.socketConnection.alias;
|
|
}
|
|
// Fallback: generate a unique ID for HTTP requests
|
|
// Note: Real-time streaming requires WebSocket connection
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Stop the server
|
|
*/
|
|
public async stop(): Promise<void> {
|
|
// Stop change stream manager first
|
|
if (this.changeStreamManager) {
|
|
await this.changeStreamManager.stop();
|
|
this.changeStreamManager = null;
|
|
}
|
|
|
|
if (this.typedServer) {
|
|
await this.typedServer.stop();
|
|
this.typedServer = null;
|
|
}
|
|
}
|
|
}
|