341 lines
10 KiB
TypeScript
341 lines
10 KiB
TypeScript
import * as plugins from '../../plugins.js';
|
|
import type { OpsServer } from '../classes.opsserver.js';
|
|
import * as interfaces from '../../../ts_interfaces/index.js';
|
|
import { logBuffer, baseLogger } from '../../logger.js';
|
|
|
|
// Module-level singleton: the log push destination is added once and reuses
|
|
// the current OpsServer reference so it survives OpsServer restarts without
|
|
// accumulating duplicate destinations.
|
|
let logPushDestinationInstalled = false;
|
|
let currentOpsServerRef: OpsServer | null = null;
|
|
|
|
export class LogsHandler {
|
|
public typedrouter = new plugins.typedrequest.TypedRouter();
|
|
private activeStreamStops: Set<() => void> = new Set();
|
|
|
|
constructor(private opsServerRef: OpsServer) {
|
|
// Add this handler's router to the parent
|
|
this.opsServerRef.typedrouter.addTypedRouter(this.typedrouter);
|
|
this.registerHandlers();
|
|
this.setupLogPushDestination();
|
|
}
|
|
|
|
/**
|
|
* Clean up all active log streams and deactivate the push destination.
|
|
* Called when OpsServer stops.
|
|
*/
|
|
public cleanup(): void {
|
|
// Stop all active follow-mode log streams
|
|
for (const stop of this.activeStreamStops) {
|
|
stop();
|
|
}
|
|
this.activeStreamStops.clear();
|
|
// Deactivate the push destination (it stays registered but becomes a no-op)
|
|
currentOpsServerRef = null;
|
|
}
|
|
|
|
private registerHandlers(): void {
|
|
// Get Recent Logs Handler
|
|
this.typedrouter.addTypedHandler(
|
|
new plugins.typedrequest.TypedHandler<interfaces.requests.IReq_GetRecentLogs>(
|
|
'getRecentLogs',
|
|
async (dataArg, toolsArg) => {
|
|
const logs = await this.getRecentLogs(
|
|
dataArg.level,
|
|
dataArg.category,
|
|
dataArg.limit || 100,
|
|
dataArg.offset || 0,
|
|
dataArg.search,
|
|
dataArg.timeRange
|
|
);
|
|
|
|
return {
|
|
logs,
|
|
total: logs.length,
|
|
hasMore: false,
|
|
};
|
|
}
|
|
)
|
|
);
|
|
|
|
// Get Log Stream Handler
|
|
this.typedrouter.addTypedHandler(
|
|
new plugins.typedrequest.TypedHandler<interfaces.requests.IReq_GetLogStream>(
|
|
'getLogStream',
|
|
async (dataArg, toolsArg) => {
|
|
// Create a virtual stream for log streaming
|
|
const virtualStream = new plugins.typedrequest.VirtualStream<Uint8Array>();
|
|
|
|
// Set up log streaming
|
|
const streamLogs = this.setupLogStream(
|
|
virtualStream,
|
|
dataArg.filters?.level,
|
|
dataArg.filters?.category,
|
|
dataArg.follow
|
|
);
|
|
|
|
// Start streaming
|
|
streamLogs.start();
|
|
|
|
// Track the stop function so we can clean up on shutdown
|
|
this.activeStreamStops.add(streamLogs.stop);
|
|
|
|
return {
|
|
logStream: virtualStream as any,
|
|
};
|
|
}
|
|
)
|
|
);
|
|
}
|
|
|
|
private static mapLogLevel(smartlogLevel: string): 'debug' | 'info' | 'warn' | 'error' {
|
|
switch (smartlogLevel) {
|
|
case 'silly':
|
|
case 'debug':
|
|
return 'debug';
|
|
case 'warn':
|
|
return 'warn';
|
|
case 'error':
|
|
return 'error';
|
|
default:
|
|
return 'info';
|
|
}
|
|
}
|
|
|
|
private static deriveCategory(
|
|
zone?: string,
|
|
message?: string
|
|
): 'smtp' | 'dns' | 'security' | 'system' | 'email' {
|
|
const msg = (message || '').toLowerCase();
|
|
if (msg.includes('[security:') || msg.includes('security')) return 'security';
|
|
if (zone === 'email' || msg.includes('email') || msg.includes('smtp') || msg.includes('mta')) return 'email';
|
|
if (zone === 'dns' || msg.includes('dns')) return 'dns';
|
|
if (msg.includes('smtp')) return 'smtp';
|
|
return 'system';
|
|
}
|
|
|
|
private async getRecentLogs(
|
|
level?: 'error' | 'warn' | 'info' | 'debug',
|
|
category?: 'smtp' | 'dns' | 'security' | 'system' | 'email',
|
|
limit: number = 100,
|
|
offset: number = 0,
|
|
search?: string,
|
|
timeRange?: '1h' | '6h' | '24h' | '7d' | '30d'
|
|
): Promise<Array<{
|
|
timestamp: number;
|
|
level: 'debug' | 'info' | 'warn' | 'error';
|
|
category: 'smtp' | 'dns' | 'security' | 'system' | 'email';
|
|
message: string;
|
|
metadata?: any;
|
|
}>> {
|
|
// Compute a timestamp cutoff from timeRange
|
|
let since: number | undefined;
|
|
if (timeRange) {
|
|
const rangeMs: Record<string, number> = {
|
|
'1h': 3600000,
|
|
'6h': 21600000,
|
|
'24h': 86400000,
|
|
'7d': 604800000,
|
|
'30d': 2592000000,
|
|
};
|
|
since = Date.now() - (rangeMs[timeRange] || 86400000);
|
|
}
|
|
|
|
// Map the UI level to smartlog levels for filtering
|
|
const smartlogLevels: string[] | undefined = level
|
|
? level === 'debug'
|
|
? ['debug', 'silly']
|
|
: level === 'info'
|
|
? ['info', 'ok', 'success', 'note', 'lifecycle']
|
|
: [level]
|
|
: undefined;
|
|
|
|
// Fetch a larger batch from buffer, then apply category filter client-side
|
|
const rawEntries = logBuffer.getEntries({
|
|
level: smartlogLevels as any,
|
|
search,
|
|
since,
|
|
limit: limit * 3, // over-fetch to compensate for category filtering
|
|
offset: 0,
|
|
});
|
|
|
|
// Map ILogPackage → UI log format and apply category filter
|
|
const mapped: Array<{
|
|
timestamp: number;
|
|
level: 'debug' | 'info' | 'warn' | 'error';
|
|
category: 'smtp' | 'dns' | 'security' | 'system' | 'email';
|
|
message: string;
|
|
metadata?: any;
|
|
}> = [];
|
|
|
|
for (const pkg of rawEntries) {
|
|
const uiLevel = LogsHandler.mapLogLevel(pkg.level);
|
|
const uiCategory = LogsHandler.deriveCategory(pkg.context?.zone, pkg.message);
|
|
|
|
if (category && uiCategory !== category) continue;
|
|
|
|
mapped.push({
|
|
timestamp: pkg.timestamp,
|
|
level: uiLevel,
|
|
category: uiCategory,
|
|
message: pkg.message,
|
|
metadata: pkg.data,
|
|
});
|
|
|
|
if (mapped.length >= limit) break;
|
|
}
|
|
|
|
return mapped;
|
|
}
|
|
|
|
/**
|
|
* Add a log destination to the base logger that pushes entries
|
|
* to all connected ops_dashboard TypedSocket clients.
|
|
*
|
|
* Uses a module-level singleton so the destination is added only once,
|
|
* even across OpsServer restart cycles. The destination reads
|
|
* `currentOpsServerRef` dynamically so it always uses the active server.
|
|
*/
|
|
private setupLogPushDestination(): void {
|
|
// Update the module-level reference so the existing destination uses the new server
|
|
currentOpsServerRef = this.opsServerRef;
|
|
|
|
if (logPushDestinationInstalled) {
|
|
return; // destination already registered — just updated the ref
|
|
}
|
|
logPushDestinationInstalled = true;
|
|
|
|
baseLogger.addLogDestination({
|
|
async handleLog(logPackage: any) {
|
|
const opsServer = currentOpsServerRef;
|
|
if (!opsServer) return;
|
|
|
|
const typedsocket = opsServer.server?.typedserver?.typedsocket;
|
|
if (!typedsocket) return;
|
|
|
|
let connections: any[];
|
|
try {
|
|
connections = await typedsocket.findAllTargetConnectionsByTag('role', 'ops_dashboard');
|
|
} catch {
|
|
return;
|
|
}
|
|
if (connections.length === 0) return;
|
|
|
|
const entry: interfaces.data.ILogEntry = {
|
|
timestamp: logPackage.timestamp || Date.now(),
|
|
level: LogsHandler.mapLogLevel(logPackage.level),
|
|
category: LogsHandler.deriveCategory(logPackage.context?.zone, logPackage.message),
|
|
message: logPackage.message,
|
|
metadata: logPackage.data,
|
|
};
|
|
|
|
for (const conn of connections) {
|
|
try {
|
|
const push = typedsocket.createTypedRequest<interfaces.requests.IReq_PushLogEntry>(
|
|
'pushLogEntry',
|
|
conn,
|
|
);
|
|
push.fire({ entry }).catch(() => {}); // fire-and-forget
|
|
} catch {
|
|
// connection may have closed
|
|
}
|
|
}
|
|
},
|
|
});
|
|
}
|
|
|
|
private setupLogStream(
|
|
virtualStream: plugins.typedrequest.VirtualStream<Uint8Array>,
|
|
levelFilter?: string[],
|
|
categoryFilter?: string[],
|
|
follow: boolean = true
|
|
): {
|
|
start: () => void;
|
|
stop: () => void;
|
|
} {
|
|
let intervalId: NodeJS.Timeout | null = null;
|
|
let stopped = false;
|
|
let logIndex = 0;
|
|
|
|
const stop = () => {
|
|
stopped = true;
|
|
if (intervalId) {
|
|
clearInterval(intervalId);
|
|
intervalId = null;
|
|
}
|
|
this.activeStreamStops.delete(stop);
|
|
};
|
|
|
|
const start = () => {
|
|
if (!follow) {
|
|
// Send existing logs and close
|
|
this.getRecentLogs(
|
|
levelFilter?.[0] as any,
|
|
categoryFilter?.[0] as any,
|
|
100,
|
|
0
|
|
).then(logs => {
|
|
logs.forEach(log => {
|
|
const logData = JSON.stringify(log);
|
|
const encoder = new TextEncoder();
|
|
virtualStream.sendData(encoder.encode(logData));
|
|
});
|
|
});
|
|
return;
|
|
}
|
|
|
|
// For follow mode, simulate real-time log streaming
|
|
intervalId = setInterval(async () => {
|
|
if (stopped) {
|
|
// Guard: clear interval if stop() was called between ticks
|
|
clearInterval(intervalId!);
|
|
intervalId = null;
|
|
return;
|
|
}
|
|
|
|
const categories: Array<'smtp' | 'dns' | 'security' | 'system' | 'email'> = ['smtp', 'dns', 'security', 'system', 'email'];
|
|
const levels: Array<'debug' | 'info' | 'warn' | 'error'> = ['info', 'warn', 'error', 'debug'];
|
|
|
|
const mockCategory = categories[Math.floor(Math.random() * categories.length)];
|
|
const mockLevel = levels[Math.floor(Math.random() * levels.length)];
|
|
|
|
// Filter by requested criteria
|
|
if (levelFilter && !levelFilter.includes(mockLevel)) return;
|
|
if (categoryFilter && !categoryFilter.includes(mockCategory)) return;
|
|
|
|
const logEntry = {
|
|
timestamp: Date.now(),
|
|
level: mockLevel,
|
|
category: mockCategory,
|
|
message: `Real-time log ${logIndex++} from ${mockCategory}`,
|
|
metadata: {
|
|
requestId: plugins.uuid.v4(),
|
|
},
|
|
};
|
|
|
|
const logData = JSON.stringify(logEntry);
|
|
const encoder = new TextEncoder();
|
|
try {
|
|
// Use a timeout to detect hung streams (sendData can hang if the
|
|
// VirtualStream's keepAlive loop has ended)
|
|
let timeoutHandle: ReturnType<typeof setTimeout>;
|
|
await Promise.race([
|
|
virtualStream.sendData(encoder.encode(logData)).then((result) => {
|
|
clearTimeout(timeoutHandle);
|
|
return result;
|
|
}),
|
|
new Promise<never>((_, reject) => {
|
|
timeoutHandle = setTimeout(() => reject(new Error('stream send timeout')), 10_000);
|
|
}),
|
|
]);
|
|
} catch {
|
|
// Stream closed, errored, or timed out — clean up
|
|
stop();
|
|
}
|
|
}, 2000);
|
|
};
|
|
|
|
return { start, stop };
|
|
}
|
|
}
|