import * as plugins from '../../plugins.ts'; import { logger } from '../../logging.ts'; import type { OpsServer } from '../classes.opsserver.ts'; import * as interfaces from '../../../ts_interfaces/index.ts'; import { requireValidIdentity } from '../helpers/guards.ts'; export class PlatformHandler { public typedrouter = new plugins.typedrequest.TypedRouter(); private activeLogStreams = new Map(); constructor(private opsServerRef: OpsServer) { this.opsServerRef.typedrouter.addTypedRouter(this.typedrouter); this.registerHandlers(); this.startLogStreaming(); } /** * Start streaming logs from all running platform service containers * and push new entries to connected dashboard clients via TypedSocket */ private async startLogStreaming(): Promise { // Poll for running platform services every 10s and start streams for new ones const checkAndStream = async () => { const services = this.opsServerRef.oneboxRef.database.getAllPlatformServices(); for (const service of services) { if (service.status !== 'running' || !service.containerId) continue; if (this.activeLogStreams.has(service.type)) continue; this.activeLogStreams.set(service.type, true); logger.info(`Starting log stream for platform service: ${service.type}`); try { await this.opsServerRef.oneboxRef.docker.streamContainerLogs( service.containerId, (line: string, isError: boolean) => { this.pushLogToClients(service.type as interfaces.data.TPlatformServiceType, line, isError); } ); } catch (err) { logger.warn(`Log stream failed for ${service.type}: ${(err as Error).message}`); this.activeLogStreams.delete(service.type); } } }; // Initial check after a short delay (let services start first) setTimeout(() => checkAndStream(), 5000); // Re-check periodically for newly started services setInterval(() => checkAndStream(), 15000); } private pushLogToClients( serviceType: interfaces.data.TPlatformServiceType, line: string, isError: boolean, ): void { const typedsocket = (this.opsServerRef.server as any)?.typedserver?.typedsocket; if (!typedsocket) return; // Parse timestamp from Docker log line const tsMatch = line.match(/^(\d{4}-\d{2}-\d{2}T[\d:.]+Z?)\s+(.*)/); const timestamp = tsMatch ? tsMatch[1] : new Date().toISOString(); const message = tsMatch ? tsMatch[2] : line; const msgLower = message.toLowerCase(); const level = isError || msgLower.includes('error') || msgLower.includes('fatal') ? 'error' : msgLower.includes('warn') ? 'warn' : 'info'; // Find all dashboard clients and push typedsocket.findAllTargetConnectionsByTag('role', 'ops_dashboard') .then((connections: any[]) => { for (const conn of connections) { typedsocket.createTypedRequest( 'pushPlatformServiceLog', conn, ).fire({ serviceType, entry: { timestamp, level, message }, }).catch(() => {}); // fire-and-forget } }) .catch(() => {}); // no connections, ignore } private registerHandlers(): void { // Get all platform services this.typedrouter.addTypedHandler( new plugins.typedrequest.TypedHandler( 'getPlatformServices', async (dataArg) => { await requireValidIdentity(this.opsServerRef.adminHandler, dataArg); const platformServices = this.opsServerRef.oneboxRef.platformServices.getAllPlatformServices(); const providers = this.opsServerRef.oneboxRef.platformServices.getAllProviders(); const result = providers.map((provider: any) => { const service = platformServices.find((s: any) => s.type === provider.type); const isCore = 'isCore' in provider && (provider as any).isCore === true; let status: string = service?.status || 'not-deployed'; if (provider.type === 'caddy') { const proxyStatus = this.opsServerRef.oneboxRef.reverseProxy.getStatus() as any; status = (proxyStatus.running ?? proxyStatus.http?.running) ? 'running' : 'stopped'; } return { type: provider.type, displayName: provider.displayName, resourceTypes: provider.resourceTypes, status: status as interfaces.data.TPlatformServiceStatus, containerId: service?.containerId, isCore, createdAt: service?.createdAt, updatedAt: service?.updatedAt, }; }); return { platformServices: result as interfaces.data.IPlatformService[] }; }, ), ); // Get specific platform service this.typedrouter.addTypedHandler( new plugins.typedrequest.TypedHandler( 'getPlatformService', async (dataArg) => { await requireValidIdentity(this.opsServerRef.adminHandler, dataArg); const provider = this.opsServerRef.oneboxRef.platformServices.getProvider(dataArg.serviceType); if (!provider) { throw new plugins.typedrequest.TypedResponseError(`Unknown platform service type: ${dataArg.serviceType}`); } const service = this.opsServerRef.oneboxRef.database.getPlatformServiceByType(dataArg.serviceType); const isCore = 'isCore' in provider && (provider as any).isCore === true; let rawStatus: string = service?.status || 'not-deployed'; if (dataArg.serviceType === 'caddy') { const proxyStatus = this.opsServerRef.oneboxRef.reverseProxy.getStatus() as any; rawStatus = (proxyStatus.running ?? proxyStatus.http?.running) ? 'running' : 'stopped'; } return { platformService: { type: provider.type, displayName: provider.displayName, resourceTypes: provider.resourceTypes, status: rawStatus as interfaces.data.TPlatformServiceStatus, containerId: service?.containerId, isCore, createdAt: service?.createdAt, updatedAt: service?.updatedAt, } as interfaces.data.IPlatformService, }; }, ), ); // Start platform service this.typedrouter.addTypedHandler( new plugins.typedrequest.TypedHandler( 'startPlatformService', async (dataArg) => { await requireValidIdentity(this.opsServerRef.adminHandler, dataArg); const provider = this.opsServerRef.oneboxRef.platformServices.getProvider(dataArg.serviceType); if (!provider) { throw new plugins.typedrequest.TypedResponseError(`Unknown platform service type: ${dataArg.serviceType}`); } logger.info(`Starting platform service: ${dataArg.serviceType}`); const service = await this.opsServerRef.oneboxRef.platformServices.ensureRunning(dataArg.serviceType); return { platformService: { type: service.type, displayName: provider.displayName, resourceTypes: provider.resourceTypes, status: service.status, containerId: service.containerId, }, }; }, ), ); // Stop platform service this.typedrouter.addTypedHandler( new plugins.typedrequest.TypedHandler( 'stopPlatformService', async (dataArg) => { await requireValidIdentity(this.opsServerRef.adminHandler, dataArg); const provider = this.opsServerRef.oneboxRef.platformServices.getProvider(dataArg.serviceType); if (!provider) { throw new plugins.typedrequest.TypedResponseError(`Unknown platform service type: ${dataArg.serviceType}`); } const isCore = 'isCore' in provider && (provider as any).isCore === true; if (isCore) { throw new plugins.typedrequest.TypedResponseError( `${provider.displayName} is a core service and cannot be stopped`, ); } logger.info(`Stopping platform service: ${dataArg.serviceType}`); await this.opsServerRef.oneboxRef.platformServices.stopPlatformService(dataArg.serviceType); return { platformService: { type: dataArg.serviceType, displayName: provider.displayName, resourceTypes: provider.resourceTypes, status: 'stopped' as const, }, }; }, ), ); // Get platform service stats this.typedrouter.addTypedHandler( new plugins.typedrequest.TypedHandler( 'getPlatformServiceStats', async (dataArg) => { await requireValidIdentity(this.opsServerRef.adminHandler, dataArg); const service = this.opsServerRef.oneboxRef.database.getPlatformServiceByType(dataArg.serviceType); if (!service || !service.containerId) { throw new plugins.typedrequest.TypedResponseError('Platform service has no container'); } const stats = await this.opsServerRef.oneboxRef.docker.getContainerStats(service.containerId); if (!stats) { throw new plugins.typedrequest.TypedResponseError('Could not retrieve container stats'); } return { stats }; }, ), ); // Get platform service logs this.typedrouter.addTypedHandler( new plugins.typedrequest.TypedHandler( 'getPlatformServiceLogs', async (dataArg) => { await requireValidIdentity(this.opsServerRef.adminHandler, dataArg); const service = this.opsServerRef.oneboxRef.database.getPlatformServiceByType(dataArg.serviceType); if (!service || !service.containerId) { throw new plugins.typedrequest.TypedResponseError('Platform service has no container'); } const tail = dataArg.tail || 100; const rawLogs = await this.opsServerRef.oneboxRef.docker.getContainerLogs(service.containerId, tail); // Parse raw log output into structured entries const logLines = (rawLogs.stdout + rawLogs.stderr) .split('\n') .filter((line: string) => line.trim()); const logs = logLines.map((line: string, index: number) => { // Try to parse Docker timestamp from beginning of line const tsMatch = line.match(/^(\d{4}-\d{2}-\d{2}T[\d:.]+Z?)\s+(.*)/); const timestamp = tsMatch ? new Date(tsMatch[1]).getTime() : Date.now(); const message = tsMatch ? tsMatch[2] : line; const msgLower = message.toLowerCase(); const isError = msgLower.includes('error') || msgLower.includes('fatal'); const isWarn = msgLower.includes('warn'); return { id: index, serviceId: 0, timestamp, message, level: (isError ? 'error' : isWarn ? 'warn' : 'info') as 'info' | 'warn' | 'error' | 'debug', source: 'stdout' as const, }; }); return { logs }; }, ), ); } }