diff --git a/changelog.md b/changelog.md index 695ab00..8cd0a1e 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,16 @@ # Changelog +## 2025-08-30 - 5.3.2 - fix(daemon) +Improve daemon log delivery and process monitor memory accounting; gate debug output and update tests to numeric ProcessId + +- Deliver process logs only to subscribed clients instead of broadcasting to all connections (reduce unnecessary IPC traffic and noise) +- Implement incremental log memory accounting in ProcessMonitor using an estimateLogSize helper and WeakMap to avoid repeated JSON.stringify and reduce CPU/memory overhead +- Seed the incremental size map when loading persisted logs so memory accounting is accurate after restart +- Trim logs incrementally by subtracting estimated sizes of removed entries (avoids O(n) recalculation) +- Gate verbose console/debug output behind TSPM_DEBUG to prevent spamming in normal runs (applies to ProcessWrapper and ProcessMonitor) +- Improve process wrapper stdout/stderr debug logging to be conditional on debug mode +- Update tests to use numeric ProcessId via toProcessId(...) for consistency with typed IDs + ## 2025-08-30 - 5.3.1 - fix(client(tspmIpcClient)) Use bare topic names for IPC client subscribe/unsubscribe to fix log subscription issues diff --git a/test/test.integration.ts b/test/test.integration.ts index 1cc3896..90e52b5 100644 --- a/test/test.integration.ts +++ b/test/test.integration.ts @@ -5,6 +5,7 @@ import * as fs from 'fs/promises'; import * as os from 'os'; import { spawn } from 'child_process'; import { tspmIpcClient, TspmIpcClient } from '../ts/client/tspm.ipcclient.js'; +import { toProcessId } from '../ts/shared/protocol/id.js'; // Helper to ensure daemon is stopped before tests async function ensureDaemonStopped() { @@ -160,7 +161,7 @@ tap.test('Process management through daemon', async (tools) => { // Test 2: Start a test process const testConfig: tspm.IProcessConfig = { - id: 'test-echo', + id: toProcessId(1001), name: 'Test Echo Process', command: 'echo "Test process"', projectDir: process.cwd(), @@ -172,7 +173,7 @@ tap.test('Process management through daemon', async (tools) => { config: testConfig, }); console.log('Start response:', startResponse); - expect(startResponse.processId).toEqual('test-echo'); + expect(startResponse.processId).toEqual(1001); expect(startResponse.status).toBeDefined(); // Test 3: List processes (should have one process) @@ -180,27 +181,27 @@ tap.test('Process management through daemon', async (tools) => { console.log('List after start:', listResponse); expect(listResponse.processes.length).toBeGreaterThanOrEqual(1); - const procInfo = listResponse.processes.find((p) => p.id === 'test-echo'); + const procInfo = listResponse.processes.find((p) => p.id === toProcessId(1001)); expect(procInfo).toBeDefined(); - expect(procInfo?.id).toEqual('test-echo'); + expect(procInfo?.id).toEqual(1001); // Test 4: Describe the process const describeResponse = await tspmIpcClient.request('describe', { - id: 'test-echo', + id: toProcessId(1001), }); console.log('Describe:', describeResponse); expect(describeResponse.processInfo).toBeDefined(); expect(describeResponse.config).toBeDefined(); - expect(describeResponse.config.id).toEqual('test-echo'); + expect(describeResponse.config.id).toEqual(1001); // Test 5: Stop the process - const stopResponse = await tspmIpcClient.request('stop', { id: 'test-echo' }); + const stopResponse = await tspmIpcClient.request('stop', { id: toProcessId(1001) }); console.log('Stop response:', stopResponse); expect(stopResponse.success).toEqual(true); // Test 6: Delete the process const deleteResponse = await tspmIpcClient.request('delete', { - id: 'test-echo', + id: toProcessId(1001), }); console.log('Delete response:', deleteResponse); expect(deleteResponse.success).toEqual(true); @@ -208,9 +209,7 @@ tap.test('Process management through daemon', async (tools) => { // Test 7: Verify process is gone listResponse = await tspmIpcClient.request('list', {}); console.log('List after delete:', listResponse); - const deletedProcess = listResponse.processes.find( - (p) => p.id === 'test-echo', - ); + const deletedProcess = listResponse.processes.find((p) => p.id === toProcessId(1001)); expect(deletedProcess).toBeUndefined(); // Cleanup: stop daemon @@ -241,7 +240,7 @@ tap.test('Batch operations through daemon', async (tools) => { // Add multiple test processes const testConfigs: tspm.IProcessConfig[] = [ { - id: 'batch-test-1', + id: toProcessId(1101), name: 'Batch Test 1', command: 'echo "Process 1"', projectDir: process.cwd(), @@ -249,7 +248,7 @@ tap.test('Batch operations through daemon', async (tools) => { autorestart: false, }, { - id: 'batch-test-2', + id: toProcessId(1102), name: 'Batch Test 2', command: 'echo "Process 2"', projectDir: process.cwd(), @@ -308,7 +307,7 @@ tap.test('Daemon error handling', async (tools) => { // Test 1: Try to stop non-existent process try { - await tspmIpcClient.request('stop', { id: 'non-existent-process' }); + await tspmIpcClient.request('stop', { id: toProcessId(99999) }); expect(false).toEqual(true); // Should not reach here } catch (error) { expect(error.message).toInclude('Failed to stop process'); @@ -316,7 +315,7 @@ tap.test('Daemon error handling', async (tools) => { // Test 2: Try to describe non-existent process try { - await tspmIpcClient.request('describe', { id: 'non-existent-process' }); + await tspmIpcClient.request('describe', { id: toProcessId(99999) }); expect(false).toEqual(true); // Should not reach here } catch (error) { expect(error.message).toInclude('not found'); @@ -324,7 +323,7 @@ tap.test('Daemon error handling', async (tools) => { // Test 3: Try to restart non-existent process try { - await tspmIpcClient.request('restart', { id: 'non-existent-process' }); + await tspmIpcClient.request('restart', { id: toProcessId(99999) }); expect(false).toEqual(true); // Should not reach here } catch (error) { expect(error.message).toInclude('Failed to restart process'); diff --git a/test/test.ts b/test/test.ts index 6013d7b..ac08c2d 100644 --- a/test/test.ts +++ b/test/test.ts @@ -1,5 +1,6 @@ import { expect, tap } from '@git.zone/tstest/tapbundle'; import * as tspm from '../ts/index.js'; +import { toProcessId } from '../ts/shared/protocol/id.js'; import { join } from 'path'; // Basic module import test @@ -51,7 +52,7 @@ async function exampleUsingIpcClient() { // Start a process using the request method await client.request('start', { config: { - id: 'web-server', + id: toProcessId(2001), name: 'Web Server', projectDir: '/path/to/web/project', command: 'npm run serve', @@ -65,7 +66,7 @@ async function exampleUsingIpcClient() { // Start another process await client.request('start', { config: { - id: 'api-server', + id: toProcessId(2002), name: 'API Server', projectDir: '/path/to/api/project', command: 'npm run api', @@ -80,13 +81,13 @@ async function exampleUsingIpcClient() { // Get logs from a process const logs = await client.request('getLogs', { - id: 'web-server', + id: toProcessId(2001), lines: 20, }); console.log('Web server logs:', logs.logs); // Stop a process - await client.request('stop', { id: 'api-server' }); + await client.request('stop', { id: toProcessId(2002) }); // Handle graceful shutdown process.on('SIGINT', async () => { diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 05c1162..6bbb64e 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@git.zone/tspm', - version: '5.3.1', + version: '5.3.2', description: 'a no fuzz process manager' } diff --git a/ts/daemon/processmonitor.ts b/ts/daemon/processmonitor.ts index c52cf08..9c6f8c3 100644 --- a/ts/daemon/processmonitor.ts +++ b/ts/daemon/processmonitor.ts @@ -18,6 +18,8 @@ export class ProcessMonitor extends EventEmitter { private processId?: ProcessId; private currentLogMemorySize: number = 0; private readonly MAX_LOG_MEMORY_SIZE = 10 * 1024 * 1024; // 10MB + // Track approximate size per log to avoid O(n) JSON stringify on every update + private logSizeMap: WeakMap = new WeakMap(); private restartTimer: NodeJS.Timeout | null = null; private lastRetryAt: number | null = null; private readonly MAX_RETRIES = 10; @@ -39,7 +41,13 @@ export class ProcessMonitor extends EventEmitter { const persistedLogs = await this.logPersistence.loadLogs(this.processId); if (persistedLogs.length > 0) { this.logs = persistedLogs; - this.currentLogMemorySize = LogPersistence.calculateLogMemorySize(this.logs); + // Recalculate size once from scratch and seed the size map + this.currentLogMemorySize = 0; + for (const log of this.logs) { + const size = this.estimateLogSize(log); + this.logSizeMap.set(log, size); + this.currentLogMemorySize += size; + } this.logger.info(`Loaded ${persistedLogs.length} persisted logs from disk`); // Delete the persisted file after loading @@ -87,18 +95,27 @@ export class ProcessMonitor extends EventEmitter { this.processWrapper.on('log', (log: IProcessLog): void => { // Store the log in our buffer this.logs.push(log); - console.error(`[ProcessMonitor:${this.config.name}] Received log (type=${log.type}): ${log.message}`); - console.error(`[ProcessMonitor:${this.config.name}] Logs array now has ${this.logs.length} items`); + if (process.env.TSPM_DEBUG) { + console.error( + `[ProcessMonitor:${this.config.name}] Received log (type=${log.type}): ${log.message}`, + ); + console.error( + `[ProcessMonitor:${this.config.name}] Logs array now has ${this.logs.length} items`, + ); + } this.logger.debug(`ProcessMonitor received log: ${log.message}`); - // Update memory size tracking - this.currentLogMemorySize = LogPersistence.calculateLogMemorySize(this.logs); + // Update memory size tracking incrementally + const approxSize = this.estimateLogSize(log); + this.logSizeMap.set(log, approxSize); + this.currentLogMemorySize += approxSize; // Trim logs if they exceed memory limit (10MB) while (this.currentLogMemorySize > this.MAX_LOG_MEMORY_SIZE && this.logs.length > 1) { // Remove oldest logs until we're under the memory limit - this.logs.shift(); - this.currentLogMemorySize = LogPersistence.calculateLogMemorySize(this.logs); + const removed = this.logs.shift()!; + const removedSize = this.logSizeMap.get(removed) ?? this.estimateLogSize(removed); + this.currentLogMemorySize -= removedSize; } // Re-emit the log event for upstream handlers @@ -241,12 +258,14 @@ export class ProcessMonitor extends EventEmitter { `Memory usage for PID ${pid}: ${this.humanReadableBytes(memoryUsage)} (${memoryUsage} bytes)`, ); - // Only log to the process log at longer intervals to avoid spamming - this.log( - `Current memory usage for process group (PID ${pid}): ${this.humanReadableBytes( - memoryUsage, - )} (${memoryUsage} bytes)`, - ); + // Only log memory usage in debug mode to avoid spamming + if (process.env.TSPM_DEBUG) { + this.log( + `Current memory usage for process group (PID ${pid}): ${this.humanReadableBytes( + memoryUsage, + )} (${memoryUsage} bytes)`, + ); + } if (memoryUsage > memoryLimit) { const memoryLimitMsg = `Memory usage ${this.humanReadableBytes( @@ -374,7 +393,11 @@ export class ProcessMonitor extends EventEmitter { * Get the current logs from the process */ public getLogs(limit?: number): IProcessLog[] { - console.error(`[ProcessMonitor:${this.config.name}] getLogs called, logs.length=${this.logs.length}, limit=${limit}`); + if (process.env.TSPM_DEBUG) { + console.error( + `[ProcessMonitor:${this.config.name}] getLogs called, logs.length=${this.logs.length}, limit=${limit}`, + ); + } this.logger.debug(`Getting logs, total stored: ${this.logs.length}`); if (limit && limit > 0) { return this.logs.slice(-limit); @@ -417,4 +440,17 @@ export class ProcessMonitor extends EventEmitter { const prefix = this.config.name ? `[${this.config.name}] ` : ''; console.log(prefix + message); } + + /** + * Estimate approximate memory size in bytes for a log entry. + * Keeps CPU low by avoiding JSON.stringify on the full array. + */ + private estimateLogSize(log: IProcessLog): number { + const messageBytes = Buffer.byteLength(log.message || '', 'utf8'); + const typeBytes = Buffer.byteLength(log.type || '', 'utf8'); + const runIdBytes = Buffer.byteLength((log as any).runId || '', 'utf8'); + // Rough overhead for object structure, keys, timestamp/seq values + const overhead = 64; + return messageBytes + typeBytes + runIdBytes + overhead; + } } diff --git a/ts/daemon/processwrapper.ts b/ts/daemon/processwrapper.ts index 525aea9..d375a7f 100644 --- a/ts/daemon/processwrapper.ts +++ b/ts/daemon/processwrapper.ts @@ -90,9 +90,19 @@ export class ProcessWrapper extends EventEmitter { // Capture stdout if (this.process.stdout) { - console.error(`[ProcessWrapper] Setting up stdout listener for process ${this.process.pid}`); + if (process.env.TSPM_DEBUG) { + console.error( + `[ProcessWrapper] Setting up stdout listener for process ${this.process.pid}`, + ); + } this.process.stdout.on('data', (data) => { - console.error(`[ProcessWrapper] Received stdout data from PID ${this.process?.pid}: ${data.toString().substring(0, 100)}`); + if (process.env.TSPM_DEBUG) { + console.error( + `[ProcessWrapper] Received stdout data from PID ${this.process?.pid}: ${data + .toString() + .substring(0, 100)}`, + ); + } // Add data to remainder buffer and split by newlines const text = this.stdoutRemainder + data.toString(); const lines = text.split('\n'); @@ -102,7 +112,9 @@ export class ProcessWrapper extends EventEmitter { // Process complete lines for (const line of lines) { - console.error(`[ProcessWrapper] Processing stdout line: ${line}`); + if (process.env.TSPM_DEBUG) { + console.error(`[ProcessWrapper] Processing stdout line: ${line}`); + } this.logger.debug(`Captured stdout: ${line}`); this.addLog('stdout', line); } diff --git a/ts/daemon/tspm.daemon.ts b/ts/daemon/tspm.daemon.ts index f954c68..c8d72c3 100644 --- a/ts/daemon/tspm.daemon.ts +++ b/ts/daemon/tspm.daemon.ts @@ -97,9 +97,25 @@ export class TspmDaemon { this.tspmInstance.on('process:log', ({ processId, log }) => { // Publish to topic for this process const topic = `logs.${processId}`; - // Broadcast to all connected clients subscribed to this topic + // Deliver only to subscribed clients if (this.ipcServer) { - this.ipcServer.broadcast(`topic:${topic}`, log); + try { + const topicIndex = (this.ipcServer as any).topicIndex as Map> | undefined; + const subscribers = topicIndex?.get(topic); + if (subscribers && subscribers.size > 0) { + // Send directly to subscribers for this topic + for (const clientId of subscribers) { + this.ipcServer + .sendToClient(clientId, `topic:${topic}`, log) + .catch((err: any) => { + // Surface but don't fail the loop + console.error('[IPC] sendToClient error:', err?.message || err); + }); + } + } + } catch (err: any) { + console.error('[IPC] Topic delivery error:', err?.message || err); + } } });