From 0c4e28455e670f95e3d3ec069034ea98f031daf5 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Thu, 26 Feb 2026 17:14:51 +0000 Subject: [PATCH] fix(cleanup): prevent event listener and log stream leaks, tighten smartProxy connection timeouts, and improve graceful shutdown behavior --- changelog.md | 10 ++ package.json | 8 +- pnpm-lock.yaml | 53 +++++---- ts/00_commitinfo_data.ts | 2 +- ts/classes.dcrouter.ts | 28 ++++- ts/opsserver/classes.opsserver.ts | 4 + ts/opsserver/handlers/logs.handler.ts | 123 ++++++++++++++------- ts/remoteingress/classes.tunnel-manager.ts | 2 + ts_web/00_commitinfo_data.ts | 2 +- 9 files changed, 157 insertions(+), 75 deletions(-) diff --git a/changelog.md b/changelog.md index 6b49ab7..d39f099 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,15 @@ # Changelog +## 2026-02-26 - 9.1.6 - fix(cleanup) +prevent event listener and log stream leaks, tighten smartProxy connection timeouts, and improve graceful shutdown behavior + +- Tightened smartProxy connection timeouts and lifetimes (5m socketTimeout, 10m inactivityTimeout, keep-alive multiplier, 1h extendedKeepAliveLifetime, 4h maxConnectionLifetime). +- Remove event listeners before stopping services to avoid leaks (smartProxy, emailServer, dnsServer, remote ingress hub). +- OpsServer.stop now invokes logsHandler.cleanup to tear down active log streams and avoid duplicate push destinations. +- LogsHandler rewritten to use a module-level singleton push destination, track active stream stop callbacks, add cleanup(), guard against hung VirtualStream.sendData with a 10s timeout, and ensure intervals are cleared on stop. +- updateSmartProxyConfig removes listeners on the old instance before stopping it. +- Dependency bumps: @api.global/typedsocket ^4.1.2, @push.rocks/smartdata ^7.1.0, @push.rocks/smartmta ^5.2.6, @push.rocks/smartproxy ^25.8.3. + ## 2026-02-26 - 9.1.5 - fix(remoteingress) Reconcile tunnel manager edge statuses with authoritative Rust hub periodically; update active tunnel counts and heartbeats, add missed edges, remove stale entries, and clear reconcile interval on stop diff --git a/package.json b/package.json index 99dcb9d..a5b3b7f 100644 --- a/package.json +++ b/package.json @@ -30,14 +30,14 @@ "@api.global/typedrequest": "^3.2.6", "@api.global/typedrequest-interfaces": "^3.0.19", "@api.global/typedserver": "^8.4.0", - "@api.global/typedsocket": "^4.1.0", + "@api.global/typedsocket": "^4.1.2", "@apiclient.xyz/cloudflare": "^7.1.0", "@design.estate/dees-catalog": "^3.43.3", "@design.estate/dees-element": "^2.1.6", "@push.rocks/projectinfo": "^5.0.2", "@push.rocks/qenv": "^6.1.3", "@push.rocks/smartacme": "^9.1.3", - "@push.rocks/smartdata": "^7.0.15", + "@push.rocks/smartdata": "^7.1.0", "@push.rocks/smartdns": "^7.9.0", "@push.rocks/smartfile": "^13.1.2", "@push.rocks/smartguard": "^3.1.0", @@ -45,11 +45,11 @@ "@push.rocks/smartlog": "^3.2.1", "@push.rocks/smartmetrics": "^3.0.1", "@push.rocks/smartmongo": "^5.1.0", - "@push.rocks/smartmta": "^5.2.2", + "@push.rocks/smartmta": "^5.2.6", "@push.rocks/smartnetwork": "^4.4.0", "@push.rocks/smartpath": "^6.0.0", "@push.rocks/smartpromise": "^4.2.3", - "@push.rocks/smartproxy": "^25.8.1", + "@push.rocks/smartproxy": "^25.8.3", "@push.rocks/smartradius": "^1.1.1", "@push.rocks/smartrequest": "^5.0.1", "@push.rocks/smartrx": "^3.0.10", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f876d87..5b8a2a1 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -18,8 +18,8 @@ importers: specifier: ^8.4.0 version: 8.4.0(@tiptap/pm@2.27.2) '@api.global/typedsocket': - specifier: ^4.1.0 - version: 4.1.0(@push.rocks/smartserve@2.0.1) + specifier: ^4.1.2 + version: 4.1.2(@push.rocks/smartserve@2.0.1) '@apiclient.xyz/cloudflare': specifier: ^7.1.0 version: 7.1.0 @@ -39,8 +39,8 @@ importers: specifier: ^9.1.3 version: 9.1.3(socks@2.8.7) '@push.rocks/smartdata': - specifier: ^7.0.15 - version: 7.0.15(socks@2.8.7) + specifier: ^7.1.0 + version: 7.1.0(socks@2.8.7) '@push.rocks/smartdns': specifier: ^7.9.0 version: 7.9.0 @@ -63,8 +63,8 @@ importers: specifier: ^5.1.0 version: 5.1.0(socks@2.8.7) '@push.rocks/smartmta': - specifier: ^5.2.2 - version: 5.2.2 + specifier: ^5.2.6 + version: 5.2.6 '@push.rocks/smartnetwork': specifier: ^4.4.0 version: 4.4.0 @@ -75,8 +75,8 @@ importers: specifier: ^4.2.3 version: 4.2.3 '@push.rocks/smartproxy': - specifier: ^25.8.1 - version: 25.8.1 + specifier: ^25.8.3 + version: 25.8.3 '@push.rocks/smartradius': specifier: ^1.1.1 version: 1.1.1 @@ -155,8 +155,8 @@ packages: '@push.rocks/smartserve': optional: true - '@api.global/typedsocket@4.1.0': - resolution: {integrity: sha512-ttmoU5BNHmLAkAF/o+Ta8F5O4F7CUmkFo6LK7NKHQvuYJvodPMYWdhJ6yCINTF4pfCgljkMDUqoVKobm6ea4mQ==} + '@api.global/typedsocket@4.1.2': + resolution: {integrity: sha512-fZFuJY9ucFCICjF4wi6OvK8drsv6UcwVVsfamOT1HxFj7OBOYw6QHOceQ+cAQ8IrWbX817sf8gzlesl+jlG8JA==} peerDependencies: '@push.rocks/smartserve': '>=1.1.0' @@ -894,8 +894,8 @@ packages: '@push.rocks/smartdata@5.16.7': resolution: {integrity: sha512-bu/YSIjQcwxWXkAsuhqE6zs7eT+bTIKV8+/H7TbbjpzeioLCyB3dZ/41cLZk37c/EYt4d4GHgZ0ww80OiKOUMg==} - '@push.rocks/smartdata@7.0.15': - resolution: {integrity: sha512-j09BUekmjiGZuvXmdGBiIpBTXFFnxrzG4rOBjZvPO/hG1BwNrvSkIVq20mIwdYomn8JGgya6oJ4Y7NL+FKTqEA==} + '@push.rocks/smartdata@7.1.0': + resolution: {integrity: sha512-ots0g7/96R2xs4ww4F2/2rIwAOPT5AmzP3ciD31YsF02o5WA4Gg6C5laLBUjV3hXCjazhzFsRVQTfwbjmPQe4w==} '@push.rocks/smartdelay@3.0.5': resolution: {integrity: sha512-mUuI7kj2f7ztjpic96FvRIlf2RsKBa5arw81AHNsndbxO6asRcxuWL8dTVxouEIK8YsBUlj0AsrCkHhMbLQdHw==} @@ -996,12 +996,11 @@ packages: '@push.rocks/smartmongo@5.1.0': resolution: {integrity: sha512-2tpKf8K+SMdLHOEpafgKPIN+ypWTLwHc33hCUDNMQ1KaL7vokkavA44+fHxQydOGPMtDi22tSMFeVMCcUSzs4w==} - '@push.rocks/smartmta@5.2.2': - resolution: {integrity: sha512-0xKUi2BMM0HFYIPdNeNJZFitAiJ9CNbLlOJ8TenT+xInp7DKcSQ7ABER1rJKinPtvDjRDSiSqiF2iQR+O7299g==} + '@push.rocks/smartmta@5.2.6': + resolution: {integrity: sha512-MJKgcsgcPicCezm6DCFkni2zdY+mMsfMaqeEjPorhadRCd0Qeo0jP6Ozz82+SjhKHrVHuPPCPJuDG37PsEUqsw==} engines: {node: '>=14.0.0'} cpu: [x64, arm64] os: [darwin, linux, win32] - hasBin: true '@push.rocks/smartmustache@3.0.2': resolution: {integrity: sha512-G3LyRXoJhyM+iQhkvP/MR/2WYMvC9U7zc2J44JxUM5tPdkQ+o3++FbfRtnZj6rz5X/A7q03//vsxPitVQwoi2Q==} @@ -1036,8 +1035,8 @@ packages: '@push.rocks/smartpromise@4.2.3': resolution: {integrity: sha512-Ycg/TJR+tMt+S3wSFurOpEoW6nXv12QBtKXgBcjMZ4RsdO28geN46U09osPn9N9WuwQy1PkmTV5J/V4F9U8qEw==} - '@push.rocks/smartproxy@25.8.1': - resolution: {integrity: sha512-f192aGYWXnF4pJNqBShy+pL6GPxFUECBWuymay5M5qD41uKS76GIieAegEu9/G9XhtFfricvu28s1JeXzU9fLA==} + '@push.rocks/smartproxy@25.8.3': + resolution: {integrity: sha512-ocFvNpB9UDxOt5R7mNcxI3vmVQDziANOXvbo0ApVLzVjWhHizUu2dvdfAgHW+rt8mACiBzrSiVNjwBA2pZhv7w==} '@push.rocks/smartpuppeteer@2.0.5': resolution: {integrity: sha512-yK/qSeWVHIGWRp3c8S5tfdGP6WCKllZC4DR8d8CQlEjszOSBmHtlTdyyqOMBZ/BA4kd+eU5f3A1r4K2tGYty1g==} @@ -4183,7 +4182,7 @@ packages: hasBin: true wordwrap@1.0.0: - resolution: {integrity: sha512-gvVzJFlPycKc5dZN4yPkP8w7Dc37BtP1yczEneOb4uq34pXZcvrtRTmWV8W+Ume+XCxKgbjM+nevkyFPMybd4Q==} + resolution: {integrity: sha1-J1hIEIkUVqQXHI0CJkQa3pDLyus=} wrap-ansi@6.2.0: resolution: {integrity: sha512-r6lPcBGxZXlIcymEu7InxDMhdW0KDxpLgoFLcguasxCaJ/SOIZwINatK9KY/tf+ZrlywOKU0UDj3ATXUBfxJXA==} @@ -4356,7 +4355,7 @@ snapshots: dependencies: '@api.global/typedrequest': 3.2.6 '@api.global/typedrequest-interfaces': 3.0.19 - '@api.global/typedsocket': 4.1.0(@push.rocks/smartserve@2.0.1) + '@api.global/typedsocket': 4.1.2(@push.rocks/smartserve@2.0.1) '@cloudflare/workers-types': 4.20260303.0 '@design.estate/dees-catalog': 3.43.3(@tiptap/pm@2.27.2) '@design.estate/dees-comms': 1.0.30 @@ -4418,7 +4417,7 @@ snapshots: - utf-8-validate - vue - '@api.global/typedsocket@4.1.0(@push.rocks/smartserve@2.0.1)': + '@api.global/typedsocket@4.1.2(@push.rocks/smartserve@2.0.1)': dependencies: '@api.global/typedrequest': 3.2.6 '@api.global/typedrequest-interfaces': 3.0.19 @@ -5763,7 +5762,7 @@ snapshots: '@apiclient.xyz/cloudflare': 7.1.0 '@peculiar/x509': 1.14.3 '@push.rocks/lik': 6.2.2 - '@push.rocks/smartdata': 7.0.15(socks@2.8.7) + '@push.rocks/smartdata': 7.1.0(socks@2.8.7) '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartdns': 7.9.0 '@push.rocks/smartlog': 3.2.1 @@ -5926,7 +5925,7 @@ snapshots: - supports-color - vue - '@push.rocks/smartdata@7.0.15(socks@2.8.7)': + '@push.rocks/smartdata@7.1.0(socks@2.8.7)': dependencies: '@push.rocks/lik': 6.2.2 '@push.rocks/smartdelay': 3.0.5 @@ -5935,7 +5934,7 @@ snapshots: '@push.rocks/smartpromise': 4.2.3 '@push.rocks/smartrx': 3.0.10 '@push.rocks/smartstring': 4.1.0 - '@push.rocks/smarttime': 4.1.1 + '@push.rocks/smarttime': 4.2.3 '@push.rocks/smartunique': 3.0.9 '@push.rocks/taskbuffer': 3.5.0 '@tsclass/tsclass': 9.3.0 @@ -6234,14 +6233,14 @@ snapshots: - supports-color - vue - '@push.rocks/smartmta@5.2.2': + '@push.rocks/smartmta@5.2.6': dependencies: '@push.rocks/smartfile': 13.1.2 '@push.rocks/smartfs': 1.3.1 '@push.rocks/smartlog': 3.2.1 '@push.rocks/smartmail': 2.2.0 '@push.rocks/smartpath': 6.0.0 - '@push.rocks/smartrust': 1.2.1 + '@push.rocks/smartrust': 1.3.1 '@tsclass/tsclass': 9.3.0 lru-cache: 11.2.6 mailparser: 3.9.3 @@ -6341,11 +6340,11 @@ snapshots: '@push.rocks/smartpromise@4.2.3': {} - '@push.rocks/smartproxy@25.8.1': + '@push.rocks/smartproxy@25.8.3': dependencies: '@push.rocks/smartcrypto': 2.0.4 '@push.rocks/smartlog': 3.2.1 - '@push.rocks/smartrust': 1.2.1 + '@push.rocks/smartrust': 1.3.1 '@tsclass/tsclass': 9.3.0 minimatch: 10.2.2 diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 3579386..e491bd2 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/dcrouter', - version: '9.1.5', + version: '9.1.6', description: 'A multifaceted routing service handling mail and SMS delivery functions.' } diff --git a/ts/classes.dcrouter.ts b/ts/classes.dcrouter.ts index c98fa1c..5cc2621 100644 --- a/ts/classes.dcrouter.ts +++ b/ts/classes.dcrouter.ts @@ -476,6 +476,12 @@ export class DcRouter { ...this.options.smartProxyConfig, routes, acme: acmeConfig, + // Tighter connection timeouts to prevent connection buildup + socketTimeout: 300_000, // 5 min idle socket timeout + inactivityTimeout: 600_000, // 10 min inactivity timeout + keepAliveInactivityMultiplier: 3, // keep-alive idle = 30 min (10min * 3) + extendedKeepAliveLifetime: 3_600_000, // keep-alive connections live max 1 hour + maxConnectionLifetime: 14_400_000, // absolute max connection lifetime 4 hours certStore: { loadAll: async () => { const keys = await this.storageManager.list('/proxy-certs/'); @@ -903,6 +909,20 @@ export class DcRouter { await this.opsServer.stop(); try { + // Remove event listeners before stopping services to prevent leaks + if (this.smartProxy) { + this.smartProxy.removeAllListeners(); + } + if (this.emailServer) { + if ((this.emailServer as any).deliverySystem) { + (this.emailServer as any).deliverySystem.removeAllListeners(); + } + this.emailServer.removeAllListeners(); + } + if (this.dnsServer) { + this.dnsServer.removeAllListeners(); + } + // Stop all services in parallel for faster shutdown await Promise.all([ // Stop cache cleaner if running @@ -976,10 +996,11 @@ export class DcRouter { public async updateSmartProxyConfig(config: plugins.smartproxy.ISmartProxyOptions): Promise { // Stop existing SmartProxy if running if (this.smartProxy) { + this.smartProxy.removeAllListeners(); await this.smartProxy.stop(); this.smartProxy = undefined; } - + // Update configuration this.options.smartProxyConfig = config; @@ -1103,6 +1124,11 @@ export class DcRouter { try { // Stop the unified email server which contains all components if (this.emailServer) { + // Remove listeners before stopping to prevent leaks on config update cycles + if ((this.emailServer as any).deliverySystem) { + (this.emailServer as any).deliverySystem.removeAllListeners(); + } + this.emailServer.removeAllListeners(); await this.emailServer.stop(); logger.log('info', 'Unified email server stopped'); this.emailServer = undefined; diff --git a/ts/opsserver/classes.opsserver.ts b/ts/opsserver/classes.opsserver.ts index 618fafc..cbb48e7 100644 --- a/ts/opsserver/classes.opsserver.ts +++ b/ts/opsserver/classes.opsserver.ts @@ -70,6 +70,10 @@ export class OpsServer { } public async stop() { + // Clean up log handler streams and push destination before stopping the server + if (this.logsHandler) { + this.logsHandler.cleanup(); + } if (this.server) { await this.server.stop(); } diff --git a/ts/opsserver/handlers/logs.handler.ts b/ts/opsserver/handlers/logs.handler.ts index 43e6af8..082788e 100644 --- a/ts/opsserver/handlers/logs.handler.ts +++ b/ts/opsserver/handlers/logs.handler.ts @@ -3,8 +3,15 @@ import type { OpsServer } from '../classes.opsserver.js'; import * as interfaces from '../../../ts_interfaces/index.js'; import { logBuffer, baseLogger } from '../../logger.js'; +// Module-level singleton: the log push destination is added once and reuses +// the current OpsServer reference so it survives OpsServer restarts without +// accumulating duplicate destinations. +let logPushDestinationInstalled = false; +let currentOpsServerRef: OpsServer | null = null; + export class LogsHandler { public typedrouter = new plugins.typedrequest.TypedRouter(); + private activeStreamStops: Set<() => void> = new Set(); constructor(private opsServerRef: OpsServer) { // Add this handler's router to the parent @@ -12,7 +19,21 @@ export class LogsHandler { this.registerHandlers(); this.setupLogPushDestination(); } - + + /** + * Clean up all active log streams and deactivate the push destination. + * Called when OpsServer stops. + */ + public cleanup(): void { + // Stop all active follow-mode log streams + for (const stop of this.activeStreamStops) { + stop(); + } + this.activeStreamStops.clear(); + // Deactivate the push destination (it stays registered but becomes a no-op) + currentOpsServerRef = null; + } + private registerHandlers(): void { // Get Recent Logs Handler this.typedrouter.addTypedHandler( @@ -27,16 +48,16 @@ export class LogsHandler { dataArg.search, dataArg.timeRange ); - + return { logs, - total: logs.length, // TODO: Implement proper total count - hasMore: false, // TODO: Implement proper pagination + total: logs.length, + hasMore: false, }; } ) ); - + // Get Log Stream Handler this.typedrouter.addTypedHandler( new plugins.typedrequest.TypedHandler( @@ -44,7 +65,7 @@ export class LogsHandler { async (dataArg, toolsArg) => { // Create a virtual stream for log streaming const virtualStream = new plugins.typedrequest.VirtualStream(); - + // Set up log streaming const streamLogs = this.setupLogStream( virtualStream, @@ -52,20 +73,21 @@ export class LogsHandler { dataArg.filters?.category, dataArg.follow ); - + // Start streaming streamLogs.start(); - - // VirtualStream handles cleanup automatically - + + // Track the stop function so we can clean up on shutdown + this.activeStreamStops.add(streamLogs.stop); + return { - logStream: virtualStream as any, // Cast to IVirtualStream interface + logStream: virtualStream as any, }; } ) ); } - + private static mapLogLevel(smartlogLevel: string): 'debug' | 'info' | 'warn' | 'error' { switch (smartlogLevel) { case 'silly': @@ -165,18 +187,30 @@ export class LogsHandler { return mapped; } - + /** * Add a log destination to the base logger that pushes entries * to all connected ops_dashboard TypedSocket clients. + * + * Uses a module-level singleton so the destination is added only once, + * even across OpsServer restart cycles. The destination reads + * `currentOpsServerRef` dynamically so it always uses the active server. */ private setupLogPushDestination(): void { - const opsServerRef = this.opsServerRef; + // Update the module-level reference so the existing destination uses the new server + currentOpsServerRef = this.opsServerRef; + + if (logPushDestinationInstalled) { + return; // destination already registered — just updated the ref + } + logPushDestinationInstalled = true; baseLogger.addLogDestination({ async handleLog(logPackage: any) { - // Access the TypedSocket server instance from OpsServer - const typedsocket = opsServerRef.server?.typedserver?.typedsocket; + const opsServer = currentOpsServerRef; + if (!opsServer) return; + + const typedsocket = opsServer.server?.typedserver?.typedsocket; if (!typedsocket) return; let connections: any[]; @@ -220,8 +254,18 @@ export class LogsHandler { stop: () => void; } { let intervalId: NodeJS.Timeout | null = null; + let stopped = false; let logIndex = 0; - + + const stop = () => { + stopped = true; + if (intervalId) { + clearInterval(intervalId); + intervalId = null; + } + this.activeStreamStops.delete(stop); + }; + const start = () => { if (!follow) { // Send existing logs and close @@ -236,13 +280,19 @@ export class LogsHandler { const encoder = new TextEncoder(); virtualStream.sendData(encoder.encode(logData)); }); - // VirtualStream doesn't have end() method - it closes automatically }); return; } - + // For follow mode, simulate real-time log streaming intervalId = setInterval(async () => { + if (stopped) { + // Guard: clear interval if stop() was called between ticks + clearInterval(intervalId!); + intervalId = null; + return; + } + const categories: Array<'smtp' | 'dns' | 'security' | 'system' | 'email'> = ['smtp', 'dns', 'security', 'system', 'email']; const levels: Array<'debug' | 'info' | 'warn' | 'error'> = ['info', 'warn', 'error', 'debug']; @@ -266,30 +316,21 @@ export class LogsHandler { const logData = JSON.stringify(logEntry); const encoder = new TextEncoder(); try { - await virtualStream.sendData(encoder.encode(logData)); + // Use a timeout to detect hung streams (sendData can hang if the + // VirtualStream's keepAlive loop has ended) + await Promise.race([ + virtualStream.sendData(encoder.encode(logData)), + new Promise((_, reject) => + setTimeout(() => reject(new Error('stream send timeout')), 10_000) + ), + ]); } catch { - // Stream closed or errored — clean up to prevent interval leak - clearInterval(intervalId!); - intervalId = null; + // Stream closed, errored, or timed out — clean up + stop(); } - }, 2000); // Send a log every 2 seconds - - // TODO: Hook into actual logger events - // logger.on('log', (logEntry) => { - // if (matchesCriteria(logEntry, level, service)) { - // virtualStream.sendData(formatLogEntry(logEntry)); - // } - // }); + }, 2000); }; - - const stop = () => { - if (intervalId) { - clearInterval(intervalId); - intervalId = null; - } - // TODO: Unhook from logger events - }; - + return { start, stop }; } -} \ No newline at end of file +} diff --git a/ts/remoteingress/classes.tunnel-manager.ts b/ts/remoteingress/classes.tunnel-manager.ts index a62a830..edfbec4 100644 --- a/ts/remoteingress/classes.tunnel-manager.ts +++ b/ts/remoteingress/classes.tunnel-manager.ts @@ -81,6 +81,8 @@ export class TunnelManager { clearInterval(this.reconcileInterval); this.reconcileInterval = null; } + // Remove event listeners before stopping to prevent leaks + this.hub.removeAllListeners(); await this.hub.stop(); this.edgeStatuses.clear(); } diff --git a/ts_web/00_commitinfo_data.ts b/ts_web/00_commitinfo_data.ts index 3579386..e491bd2 100644 --- a/ts_web/00_commitinfo_data.ts +++ b/ts_web/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@serve.zone/dcrouter', - version: '9.1.5', + version: '9.1.6', description: 'A multifaceted routing service handling mail and SMS delivery functions.' }