fix(cleanup): prevent event listener and log stream leaks, tighten smartProxy connection timeouts, and improve graceful shutdown behavior

This commit is contained in:
2026-02-26 17:14:51 +00:00
parent cfc4cf378f
commit 0c4e28455e
9 changed files with 157 additions and 75 deletions

View File

@@ -70,6 +70,10 @@ export class OpsServer {
}
public async stop() {
// Clean up log handler streams and push destination before stopping the server
if (this.logsHandler) {
this.logsHandler.cleanup();
}
if (this.server) {
await this.server.stop();
}

View File

@@ -3,8 +3,15 @@ import type { OpsServer } from '../classes.opsserver.js';
import * as interfaces from '../../../ts_interfaces/index.js';
import { logBuffer, baseLogger } from '../../logger.js';
// Module-level singleton: the log push destination is added once and reuses
// the current OpsServer reference so it survives OpsServer restarts without
// accumulating duplicate destinations.
let logPushDestinationInstalled = false;
let currentOpsServerRef: OpsServer | null = null;
export class LogsHandler {
public typedrouter = new plugins.typedrequest.TypedRouter();
private activeStreamStops: Set<() => void> = new Set();
constructor(private opsServerRef: OpsServer) {
// Add this handler's router to the parent
@@ -12,7 +19,21 @@ export class LogsHandler {
this.registerHandlers();
this.setupLogPushDestination();
}
/**
* Clean up all active log streams and deactivate the push destination.
* Called when OpsServer stops.
*/
public cleanup(): void {
// Stop all active follow-mode log streams
for (const stop of this.activeStreamStops) {
stop();
}
this.activeStreamStops.clear();
// Deactivate the push destination (it stays registered but becomes a no-op)
currentOpsServerRef = null;
}
private registerHandlers(): void {
// Get Recent Logs Handler
this.typedrouter.addTypedHandler(
@@ -27,16 +48,16 @@ export class LogsHandler {
dataArg.search,
dataArg.timeRange
);
return {
logs,
total: logs.length, // TODO: Implement proper total count
hasMore: false, // TODO: Implement proper pagination
total: logs.length,
hasMore: false,
};
}
)
);
// Get Log Stream Handler
this.typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<interfaces.requests.IReq_GetLogStream>(
@@ -44,7 +65,7 @@ export class LogsHandler {
async (dataArg, toolsArg) => {
// Create a virtual stream for log streaming
const virtualStream = new plugins.typedrequest.VirtualStream<Uint8Array>();
// Set up log streaming
const streamLogs = this.setupLogStream(
virtualStream,
@@ -52,20 +73,21 @@ export class LogsHandler {
dataArg.filters?.category,
dataArg.follow
);
// Start streaming
streamLogs.start();
// VirtualStream handles cleanup automatically
// Track the stop function so we can clean up on shutdown
this.activeStreamStops.add(streamLogs.stop);
return {
logStream: virtualStream as any, // Cast to IVirtualStream interface
logStream: virtualStream as any,
};
}
)
);
}
private static mapLogLevel(smartlogLevel: string): 'debug' | 'info' | 'warn' | 'error' {
switch (smartlogLevel) {
case 'silly':
@@ -165,18 +187,30 @@ export class LogsHandler {
return mapped;
}
/**
* Add a log destination to the base logger that pushes entries
* to all connected ops_dashboard TypedSocket clients.
*
* Uses a module-level singleton so the destination is added only once,
* even across OpsServer restart cycles. The destination reads
* `currentOpsServerRef` dynamically so it always uses the active server.
*/
private setupLogPushDestination(): void {
const opsServerRef = this.opsServerRef;
// Update the module-level reference so the existing destination uses the new server
currentOpsServerRef = this.opsServerRef;
if (logPushDestinationInstalled) {
return; // destination already registered — just updated the ref
}
logPushDestinationInstalled = true;
baseLogger.addLogDestination({
async handleLog(logPackage: any) {
// Access the TypedSocket server instance from OpsServer
const typedsocket = opsServerRef.server?.typedserver?.typedsocket;
const opsServer = currentOpsServerRef;
if (!opsServer) return;
const typedsocket = opsServer.server?.typedserver?.typedsocket;
if (!typedsocket) return;
let connections: any[];
@@ -220,8 +254,18 @@ export class LogsHandler {
stop: () => void;
} {
let intervalId: NodeJS.Timeout | null = null;
let stopped = false;
let logIndex = 0;
const stop = () => {
stopped = true;
if (intervalId) {
clearInterval(intervalId);
intervalId = null;
}
this.activeStreamStops.delete(stop);
};
const start = () => {
if (!follow) {
// Send existing logs and close
@@ -236,13 +280,19 @@ export class LogsHandler {
const encoder = new TextEncoder();
virtualStream.sendData(encoder.encode(logData));
});
// VirtualStream doesn't have end() method - it closes automatically
});
return;
}
// For follow mode, simulate real-time log streaming
intervalId = setInterval(async () => {
if (stopped) {
// Guard: clear interval if stop() was called between ticks
clearInterval(intervalId!);
intervalId = null;
return;
}
const categories: Array<'smtp' | 'dns' | 'security' | 'system' | 'email'> = ['smtp', 'dns', 'security', 'system', 'email'];
const levels: Array<'debug' | 'info' | 'warn' | 'error'> = ['info', 'warn', 'error', 'debug'];
@@ -266,30 +316,21 @@ export class LogsHandler {
const logData = JSON.stringify(logEntry);
const encoder = new TextEncoder();
try {
await virtualStream.sendData(encoder.encode(logData));
// Use a timeout to detect hung streams (sendData can hang if the
// VirtualStream's keepAlive loop has ended)
await Promise.race([
virtualStream.sendData(encoder.encode(logData)),
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error('stream send timeout')), 10_000)
),
]);
} catch {
// Stream closed or errored — clean up to prevent interval leak
clearInterval(intervalId!);
intervalId = null;
// Stream closed, errored, or timed out — clean up
stop();
}
}, 2000); // Send a log every 2 seconds
// TODO: Hook into actual logger events
// logger.on('log', (logEntry) => {
// if (matchesCriteria(logEntry, level, service)) {
// virtualStream.sendData(formatLogEntry(logEntry));
// }
// });
}, 2000);
};
const stop = () => {
if (intervalId) {
clearInterval(intervalId);
intervalId = null;
}
// TODO: Unhook from logger events
};
return { start, stop };
}
}
}