import * as plugins from '../../plugins.js'; import type { OpsServer } from '../classes.opsserver.js'; import * as interfaces from '../../../ts_interfaces/index.js'; import { logBuffer, baseLogger } from '../../logger.js'; // Module-level singleton: the log push destination is added once and reuses // the current OpsServer reference so it survives OpsServer restarts without // accumulating duplicate destinations. let logPushDestinationInstalled = false; let currentOpsServerRef: OpsServer | null = null; export class LogsHandler { private activeStreamStops: Set<() => void> = new Set(); constructor(private opsServerRef: OpsServer) { this.registerHandlers(); this.setupLogPushDestination(); } /** * Clean up all active log streams and deactivate the push destination. * Called when OpsServer stops. */ public cleanup(): void { // Stop all active follow-mode log streams for (const stop of this.activeStreamStops) { stop(); } this.activeStreamStops.clear(); // Deactivate the push destination (it stays registered but becomes a no-op) currentOpsServerRef = null; } private registerHandlers(): void { // All log endpoints register directly on viewRouter (valid identity required via middleware) const router = this.opsServerRef.viewRouter; // Get Recent Logs Handler router.addTypedHandler( new plugins.typedrequest.TypedHandler( 'getRecentLogs', async (dataArg, toolsArg) => { const logs = await this.getRecentLogs( dataArg.level, dataArg.category, dataArg.limit || 100, dataArg.offset || 0, dataArg.search, dataArg.timeRange ); return { logs, total: logs.length, hasMore: false, }; } ) ); // Get Log Stream Handler router.addTypedHandler( new plugins.typedrequest.TypedHandler( 'getLogStream', async (dataArg, toolsArg) => { // Create a virtual stream for log streaming const virtualStream = new plugins.typedrequest.VirtualStream(); // Set up log streaming const streamLogs = this.setupLogStream( virtualStream, dataArg.filters?.level, dataArg.filters?.category, dataArg.follow ); // Start streaming streamLogs.start(); // Track the stop function so we can clean up on shutdown this.activeStreamStops.add(streamLogs.stop); return { logStream: virtualStream as any, }; } ) ); } private static mapLogLevel(smartlogLevel: string): 'debug' | 'info' | 'warn' | 'error' { switch (smartlogLevel) { case 'silly': case 'debug': return 'debug'; case 'warn': return 'warn'; case 'error': return 'error'; default: return 'info'; } } private static deriveCategory( zone?: string, message?: string ): 'smtp' | 'dns' | 'security' | 'system' | 'email' { const msg = (message || '').toLowerCase(); if (msg.includes('[security:') || msg.includes('security')) return 'security'; if (zone === 'email' || msg.includes('email') || msg.includes('smtp') || msg.includes('mta')) return 'email'; if (zone === 'dns' || msg.includes('dns')) return 'dns'; if (msg.includes('smtp')) return 'smtp'; return 'system'; } private async getRecentLogs( level?: 'error' | 'warn' | 'info' | 'debug', category?: 'smtp' | 'dns' | 'security' | 'system' | 'email', limit: number = 100, offset: number = 0, search?: string, timeRange?: '1h' | '6h' | '24h' | '7d' | '30d' ): Promise> { // Compute a timestamp cutoff from timeRange let since: number | undefined; if (timeRange) { const rangeMs: Record = { '1h': 3600000, '6h': 21600000, '24h': 86400000, '7d': 604800000, '30d': 2592000000, }; since = Date.now() - (rangeMs[timeRange] || 86400000); } // Map the UI level to smartlog levels for filtering const smartlogLevels: string[] | undefined = level ? level === 'debug' ? ['debug', 'silly'] : level === 'info' ? ['info', 'ok', 'success', 'note', 'lifecycle'] : [level] : undefined; // Fetch a larger batch from buffer, then apply category filter client-side const rawEntries = logBuffer.getEntries({ level: smartlogLevels as any, search, since, limit: limit * 3, // over-fetch to compensate for category filtering offset: 0, }); // Map ILogPackage → UI log format and apply category filter const mapped: Array<{ timestamp: number; level: 'debug' | 'info' | 'warn' | 'error'; category: 'smtp' | 'dns' | 'security' | 'system' | 'email'; message: string; metadata?: any; }> = []; for (const pkg of rawEntries) { const uiLevel = LogsHandler.mapLogLevel(pkg.level); const uiCategory = LogsHandler.deriveCategory(pkg.context?.zone, pkg.message); if (category && uiCategory !== category) continue; mapped.push({ timestamp: pkg.timestamp, level: uiLevel, category: uiCategory, message: pkg.message, metadata: pkg.data, }); if (mapped.length >= limit) break; } return mapped; } /** * Add a log destination to the base logger that pushes entries * to all connected ops_dashboard TypedSocket clients. * * Uses a module-level singleton so the destination is added only once, * even across OpsServer restart cycles. The destination reads * `currentOpsServerRef` dynamically so it always uses the active server. */ private setupLogPushDestination(): void { // Update the module-level reference so the existing destination uses the new server currentOpsServerRef = this.opsServerRef; if (logPushDestinationInstalled) { return; // destination already registered — just updated the ref } logPushDestinationInstalled = true; baseLogger.addLogDestination({ async handleLog(logPackage: any) { const opsServer = currentOpsServerRef; if (!opsServer) return; const typedsocket = opsServer.server?.typedserver?.typedsocket; if (!typedsocket) return; let connections: any[]; try { connections = await typedsocket.findAllTargetConnectionsByTag('role', 'ops_dashboard'); } catch { return; } if (connections.length === 0) return; const entry: interfaces.data.ILogEntry = { timestamp: logPackage.timestamp || Date.now(), level: LogsHandler.mapLogLevel(logPackage.level), category: LogsHandler.deriveCategory(logPackage.context?.zone, logPackage.message), message: logPackage.message, metadata: logPackage.data, }; for (const conn of connections) { try { const push = typedsocket.createTypedRequest( 'pushLogEntry', conn, ); push.fire({ entry }).catch(() => {}); // fire-and-forget } catch { // connection may have closed } } }, }); } private setupLogStream( virtualStream: plugins.typedrequest.VirtualStream, levelFilter?: string[], categoryFilter?: string[], follow: boolean = true ): { start: () => void; stop: () => void; } { let intervalId: NodeJS.Timeout | null = null; let stopped = false; let logIndex = 0; const stop = () => { stopped = true; if (intervalId) { clearInterval(intervalId); intervalId = null; } this.activeStreamStops.delete(stop); }; const start = () => { if (!follow) { // Send existing logs and close this.getRecentLogs( levelFilter?.[0] as any, categoryFilter?.[0] as any, 100, 0 ).then(logs => { logs.forEach(log => { const logData = JSON.stringify(log); const encoder = new TextEncoder(); virtualStream.sendData(encoder.encode(logData)); }); }); return; } // For follow mode, simulate real-time log streaming intervalId = setInterval(async () => { if (stopped) { // Guard: clear interval if stop() was called between ticks clearInterval(intervalId!); intervalId = null; return; } const categories: Array<'smtp' | 'dns' | 'security' | 'system' | 'email'> = ['smtp', 'dns', 'security', 'system', 'email']; const levels: Array<'debug' | 'info' | 'warn' | 'error'> = ['info', 'warn', 'error', 'debug']; const mockCategory = categories[Math.floor(Math.random() * categories.length)]; const mockLevel = levels[Math.floor(Math.random() * levels.length)]; // Filter by requested criteria if (levelFilter && !levelFilter.includes(mockLevel)) return; if (categoryFilter && !categoryFilter.includes(mockCategory)) return; const logEntry = { timestamp: Date.now(), level: mockLevel, category: mockCategory, message: `Real-time log ${logIndex++} from ${mockCategory}`, metadata: { requestId: plugins.uuid.v4(), }, }; const logData = JSON.stringify(logEntry); const encoder = new TextEncoder(); try { // Use a timeout to detect hung streams (sendData can hang if the // VirtualStream's keepAlive loop has ended) let timeoutHandle: ReturnType; await Promise.race([ virtualStream.sendData(encoder.encode(logData)).then((result) => { clearTimeout(timeoutHandle); return result; }), new Promise((_, reject) => { timeoutHandle = setTimeout(() => reject(new Error('stream send timeout')), 10_000); }), ]); } catch { // Stream closed, errored, or timed out — clean up stop(); } }, 2000); }; return { start, stop }; } }