fix(daemon): Improve daemon log delivery and process monitor memory accounting; gate debug output and update tests to numeric ProcessId
This commit is contained in:
		
							
								
								
									
										11
									
								
								changelog.md
									
									
									
									
									
								
							
							
						
						
									
										11
									
								
								changelog.md
									
									
									
									
									
								
							@@ -1,5 +1,16 @@
 | 
			
		||||
# Changelog
 | 
			
		||||
 | 
			
		||||
## 2025-08-30 - 5.3.2 - fix(daemon)
 | 
			
		||||
Improve daemon log delivery and process monitor memory accounting; gate debug output and update tests to numeric ProcessId
 | 
			
		||||
 | 
			
		||||
- Deliver process logs only to subscribed clients instead of broadcasting to all connections (reduce unnecessary IPC traffic and noise)
 | 
			
		||||
- Implement incremental log memory accounting in ProcessMonitor using an estimateLogSize helper and WeakMap to avoid repeated JSON.stringify and reduce CPU/memory overhead
 | 
			
		||||
- Seed the incremental size map when loading persisted logs so memory accounting is accurate after restart
 | 
			
		||||
- Trim logs incrementally by subtracting estimated sizes of removed entries (avoids O(n) recalculation)
 | 
			
		||||
- Gate verbose console/debug output behind TSPM_DEBUG to prevent spamming in normal runs (applies to ProcessWrapper and ProcessMonitor)
 | 
			
		||||
- Improve process wrapper stdout/stderr debug logging to be conditional on debug mode
 | 
			
		||||
- Update tests to use numeric ProcessId via toProcessId(...) for consistency with typed IDs
 | 
			
		||||
 | 
			
		||||
## 2025-08-30 - 5.3.1 - fix(client(tspmIpcClient))
 | 
			
		||||
Use bare topic names for IPC client subscribe/unsubscribe to fix log subscription issues
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -5,6 +5,7 @@ import * as fs from 'fs/promises';
 | 
			
		||||
import * as os from 'os';
 | 
			
		||||
import { spawn } from 'child_process';
 | 
			
		||||
import { tspmIpcClient, TspmIpcClient } from '../ts/client/tspm.ipcclient.js';
 | 
			
		||||
import { toProcessId } from '../ts/shared/protocol/id.js';
 | 
			
		||||
 | 
			
		||||
// Helper to ensure daemon is stopped before tests
 | 
			
		||||
async function ensureDaemonStopped() {
 | 
			
		||||
@@ -160,7 +161,7 @@ tap.test('Process management through daemon', async (tools) => {
 | 
			
		||||
 | 
			
		||||
  // Test 2: Start a test process
 | 
			
		||||
  const testConfig: tspm.IProcessConfig = {
 | 
			
		||||
    id: 'test-echo',
 | 
			
		||||
    id: toProcessId(1001),
 | 
			
		||||
    name: 'Test Echo Process',
 | 
			
		||||
    command: 'echo "Test process"',
 | 
			
		||||
    projectDir: process.cwd(),
 | 
			
		||||
@@ -172,7 +173,7 @@ tap.test('Process management through daemon', async (tools) => {
 | 
			
		||||
    config: testConfig,
 | 
			
		||||
  });
 | 
			
		||||
  console.log('Start response:', startResponse);
 | 
			
		||||
  expect(startResponse.processId).toEqual('test-echo');
 | 
			
		||||
  expect(startResponse.processId).toEqual(1001);
 | 
			
		||||
  expect(startResponse.status).toBeDefined();
 | 
			
		||||
 | 
			
		||||
  // Test 3: List processes (should have one process)
 | 
			
		||||
@@ -180,27 +181,27 @@ tap.test('Process management through daemon', async (tools) => {
 | 
			
		||||
  console.log('List after start:', listResponse);
 | 
			
		||||
  expect(listResponse.processes.length).toBeGreaterThanOrEqual(1);
 | 
			
		||||
 | 
			
		||||
  const procInfo = listResponse.processes.find((p) => p.id === 'test-echo');
 | 
			
		||||
  const procInfo = listResponse.processes.find((p) => p.id === toProcessId(1001));
 | 
			
		||||
  expect(procInfo).toBeDefined();
 | 
			
		||||
  expect(procInfo?.id).toEqual('test-echo');
 | 
			
		||||
  expect(procInfo?.id).toEqual(1001);
 | 
			
		||||
 | 
			
		||||
  // Test 4: Describe the process
 | 
			
		||||
  const describeResponse = await tspmIpcClient.request('describe', {
 | 
			
		||||
    id: 'test-echo',
 | 
			
		||||
    id: toProcessId(1001),
 | 
			
		||||
  });
 | 
			
		||||
  console.log('Describe:', describeResponse);
 | 
			
		||||
  expect(describeResponse.processInfo).toBeDefined();
 | 
			
		||||
  expect(describeResponse.config).toBeDefined();
 | 
			
		||||
  expect(describeResponse.config.id).toEqual('test-echo');
 | 
			
		||||
  expect(describeResponse.config.id).toEqual(1001);
 | 
			
		||||
 | 
			
		||||
  // Test 5: Stop the process
 | 
			
		||||
  const stopResponse = await tspmIpcClient.request('stop', { id: 'test-echo' });
 | 
			
		||||
  const stopResponse = await tspmIpcClient.request('stop', { id: toProcessId(1001) });
 | 
			
		||||
  console.log('Stop response:', stopResponse);
 | 
			
		||||
  expect(stopResponse.success).toEqual(true);
 | 
			
		||||
 | 
			
		||||
  // Test 6: Delete the process
 | 
			
		||||
  const deleteResponse = await tspmIpcClient.request('delete', {
 | 
			
		||||
    id: 'test-echo',
 | 
			
		||||
    id: toProcessId(1001),
 | 
			
		||||
  });
 | 
			
		||||
  console.log('Delete response:', deleteResponse);
 | 
			
		||||
  expect(deleteResponse.success).toEqual(true);
 | 
			
		||||
@@ -208,9 +209,7 @@ tap.test('Process management through daemon', async (tools) => {
 | 
			
		||||
  // Test 7: Verify process is gone
 | 
			
		||||
  listResponse = await tspmIpcClient.request('list', {});
 | 
			
		||||
  console.log('List after delete:', listResponse);
 | 
			
		||||
  const deletedProcess = listResponse.processes.find(
 | 
			
		||||
    (p) => p.id === 'test-echo',
 | 
			
		||||
  );
 | 
			
		||||
  const deletedProcess = listResponse.processes.find((p) => p.id === toProcessId(1001));
 | 
			
		||||
  expect(deletedProcess).toBeUndefined();
 | 
			
		||||
 | 
			
		||||
  // Cleanup: stop daemon
 | 
			
		||||
@@ -241,7 +240,7 @@ tap.test('Batch operations through daemon', async (tools) => {
 | 
			
		||||
  // Add multiple test processes
 | 
			
		||||
  const testConfigs: tspm.IProcessConfig[] = [
 | 
			
		||||
    {
 | 
			
		||||
      id: 'batch-test-1',
 | 
			
		||||
      id: toProcessId(1101),
 | 
			
		||||
      name: 'Batch Test 1',
 | 
			
		||||
      command: 'echo "Process 1"',
 | 
			
		||||
      projectDir: process.cwd(),
 | 
			
		||||
@@ -249,7 +248,7 @@ tap.test('Batch operations through daemon', async (tools) => {
 | 
			
		||||
      autorestart: false,
 | 
			
		||||
    },
 | 
			
		||||
    {
 | 
			
		||||
      id: 'batch-test-2',
 | 
			
		||||
      id: toProcessId(1102),
 | 
			
		||||
      name: 'Batch Test 2',
 | 
			
		||||
      command: 'echo "Process 2"',
 | 
			
		||||
      projectDir: process.cwd(),
 | 
			
		||||
@@ -308,7 +307,7 @@ tap.test('Daemon error handling', async (tools) => {
 | 
			
		||||
 | 
			
		||||
  // Test 1: Try to stop non-existent process
 | 
			
		||||
  try {
 | 
			
		||||
    await tspmIpcClient.request('stop', { id: 'non-existent-process' });
 | 
			
		||||
    await tspmIpcClient.request('stop', { id: toProcessId(99999) });
 | 
			
		||||
    expect(false).toEqual(true); // Should not reach here
 | 
			
		||||
  } catch (error) {
 | 
			
		||||
    expect(error.message).toInclude('Failed to stop process');
 | 
			
		||||
@@ -316,7 +315,7 @@ tap.test('Daemon error handling', async (tools) => {
 | 
			
		||||
 | 
			
		||||
  // Test 2: Try to describe non-existent process
 | 
			
		||||
  try {
 | 
			
		||||
    await tspmIpcClient.request('describe', { id: 'non-existent-process' });
 | 
			
		||||
    await tspmIpcClient.request('describe', { id: toProcessId(99999) });
 | 
			
		||||
    expect(false).toEqual(true); // Should not reach here
 | 
			
		||||
  } catch (error) {
 | 
			
		||||
    expect(error.message).toInclude('not found');
 | 
			
		||||
@@ -324,7 +323,7 @@ tap.test('Daemon error handling', async (tools) => {
 | 
			
		||||
 | 
			
		||||
  // Test 3: Try to restart non-existent process
 | 
			
		||||
  try {
 | 
			
		||||
    await tspmIpcClient.request('restart', { id: 'non-existent-process' });
 | 
			
		||||
    await tspmIpcClient.request('restart', { id: toProcessId(99999) });
 | 
			
		||||
    expect(false).toEqual(true); // Should not reach here
 | 
			
		||||
  } catch (error) {
 | 
			
		||||
    expect(error.message).toInclude('Failed to restart process');
 | 
			
		||||
 
 | 
			
		||||
@@ -1,5 +1,6 @@
 | 
			
		||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
 | 
			
		||||
import * as tspm from '../ts/index.js';
 | 
			
		||||
import { toProcessId } from '../ts/shared/protocol/id.js';
 | 
			
		||||
import { join } from 'path';
 | 
			
		||||
 | 
			
		||||
// Basic module import test
 | 
			
		||||
@@ -51,7 +52,7 @@ async function exampleUsingIpcClient() {
 | 
			
		||||
  // Start a process using the request method
 | 
			
		||||
  await client.request('start', {
 | 
			
		||||
    config: {
 | 
			
		||||
      id: 'web-server',
 | 
			
		||||
      id: toProcessId(2001),
 | 
			
		||||
      name: 'Web Server',
 | 
			
		||||
      projectDir: '/path/to/web/project',
 | 
			
		||||
      command: 'npm run serve',
 | 
			
		||||
@@ -65,7 +66,7 @@ async function exampleUsingIpcClient() {
 | 
			
		||||
  // Start another process
 | 
			
		||||
  await client.request('start', {
 | 
			
		||||
    config: {
 | 
			
		||||
      id: 'api-server',
 | 
			
		||||
      id: toProcessId(2002),
 | 
			
		||||
      name: 'API Server',
 | 
			
		||||
      projectDir: '/path/to/api/project',
 | 
			
		||||
      command: 'npm run api',
 | 
			
		||||
@@ -80,13 +81,13 @@ async function exampleUsingIpcClient() {
 | 
			
		||||
  
 | 
			
		||||
  // Get logs from a process
 | 
			
		||||
  const logs = await client.request('getLogs', {
 | 
			
		||||
    id: 'web-server',
 | 
			
		||||
    id: toProcessId(2001),
 | 
			
		||||
    lines: 20,
 | 
			
		||||
  });
 | 
			
		||||
  console.log('Web server logs:', logs.logs);
 | 
			
		||||
  
 | 
			
		||||
  // Stop a process
 | 
			
		||||
  await client.request('stop', { id: 'api-server' });
 | 
			
		||||
  await client.request('stop', { id: toProcessId(2002) });
 | 
			
		||||
  
 | 
			
		||||
  // Handle graceful shutdown
 | 
			
		||||
  process.on('SIGINT', async () => {
 | 
			
		||||
 
 | 
			
		||||
@@ -3,6 +3,6 @@
 | 
			
		||||
 */
 | 
			
		||||
export const commitinfo = {
 | 
			
		||||
  name: '@git.zone/tspm',
 | 
			
		||||
  version: '5.3.1',
 | 
			
		||||
  version: '5.3.2',
 | 
			
		||||
  description: 'a no fuzz process manager'
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -18,6 +18,8 @@ export class ProcessMonitor extends EventEmitter {
 | 
			
		||||
  private processId?: ProcessId;
 | 
			
		||||
  private currentLogMemorySize: number = 0;
 | 
			
		||||
  private readonly MAX_LOG_MEMORY_SIZE = 10 * 1024 * 1024; // 10MB
 | 
			
		||||
  // Track approximate size per log to avoid O(n) JSON stringify on every update
 | 
			
		||||
  private logSizeMap: WeakMap<IProcessLog, number> = new WeakMap();
 | 
			
		||||
  private restartTimer: NodeJS.Timeout | null = null;
 | 
			
		||||
  private lastRetryAt: number | null = null;
 | 
			
		||||
  private readonly MAX_RETRIES = 10;
 | 
			
		||||
@@ -39,7 +41,13 @@ export class ProcessMonitor extends EventEmitter {
 | 
			
		||||
      const persistedLogs = await this.logPersistence.loadLogs(this.processId);
 | 
			
		||||
      if (persistedLogs.length > 0) {
 | 
			
		||||
        this.logs = persistedLogs;
 | 
			
		||||
        this.currentLogMemorySize = LogPersistence.calculateLogMemorySize(this.logs);
 | 
			
		||||
        // Recalculate size once from scratch and seed the size map
 | 
			
		||||
        this.currentLogMemorySize = 0;
 | 
			
		||||
        for (const log of this.logs) {
 | 
			
		||||
          const size = this.estimateLogSize(log);
 | 
			
		||||
          this.logSizeMap.set(log, size);
 | 
			
		||||
          this.currentLogMemorySize += size;
 | 
			
		||||
        }
 | 
			
		||||
        this.logger.info(`Loaded ${persistedLogs.length} persisted logs from disk`);
 | 
			
		||||
        
 | 
			
		||||
        // Delete the persisted file after loading
 | 
			
		||||
@@ -87,18 +95,27 @@ export class ProcessMonitor extends EventEmitter {
 | 
			
		||||
    this.processWrapper.on('log', (log: IProcessLog): void => {
 | 
			
		||||
      // Store the log in our buffer
 | 
			
		||||
      this.logs.push(log);
 | 
			
		||||
      console.error(`[ProcessMonitor:${this.config.name}] Received log (type=${log.type}): ${log.message}`);
 | 
			
		||||
      console.error(`[ProcessMonitor:${this.config.name}] Logs array now has ${this.logs.length} items`);
 | 
			
		||||
      if (process.env.TSPM_DEBUG) {
 | 
			
		||||
        console.error(
 | 
			
		||||
          `[ProcessMonitor:${this.config.name}] Received log (type=${log.type}): ${log.message}`,
 | 
			
		||||
        );
 | 
			
		||||
        console.error(
 | 
			
		||||
          `[ProcessMonitor:${this.config.name}] Logs array now has ${this.logs.length} items`,
 | 
			
		||||
        );
 | 
			
		||||
      }
 | 
			
		||||
      this.logger.debug(`ProcessMonitor received log: ${log.message}`);
 | 
			
		||||
      
 | 
			
		||||
      // Update memory size tracking
 | 
			
		||||
      this.currentLogMemorySize = LogPersistence.calculateLogMemorySize(this.logs);
 | 
			
		||||
      // Update memory size tracking incrementally
 | 
			
		||||
      const approxSize = this.estimateLogSize(log);
 | 
			
		||||
      this.logSizeMap.set(log, approxSize);
 | 
			
		||||
      this.currentLogMemorySize += approxSize;
 | 
			
		||||
      
 | 
			
		||||
      // Trim logs if they exceed memory limit (10MB)
 | 
			
		||||
      while (this.currentLogMemorySize > this.MAX_LOG_MEMORY_SIZE && this.logs.length > 1) {
 | 
			
		||||
        // Remove oldest logs until we're under the memory limit
 | 
			
		||||
        this.logs.shift();
 | 
			
		||||
        this.currentLogMemorySize = LogPersistence.calculateLogMemorySize(this.logs);
 | 
			
		||||
        const removed = this.logs.shift()!;
 | 
			
		||||
        const removedSize = this.logSizeMap.get(removed) ?? this.estimateLogSize(removed);
 | 
			
		||||
        this.currentLogMemorySize -= removedSize;
 | 
			
		||||
      }
 | 
			
		||||
      
 | 
			
		||||
      // Re-emit the log event for upstream handlers
 | 
			
		||||
@@ -241,12 +258,14 @@ export class ProcessMonitor extends EventEmitter {
 | 
			
		||||
        `Memory usage for PID ${pid}: ${this.humanReadableBytes(memoryUsage)} (${memoryUsage} bytes)`,
 | 
			
		||||
      );
 | 
			
		||||
 | 
			
		||||
      // Only log to the process log at longer intervals to avoid spamming
 | 
			
		||||
      // Only log memory usage in debug mode to avoid spamming
 | 
			
		||||
      if (process.env.TSPM_DEBUG) {
 | 
			
		||||
        this.log(
 | 
			
		||||
          `Current memory usage for process group (PID ${pid}): ${this.humanReadableBytes(
 | 
			
		||||
            memoryUsage,
 | 
			
		||||
          )} (${memoryUsage} bytes)`,
 | 
			
		||||
        );
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (memoryUsage > memoryLimit) {
 | 
			
		||||
        const memoryLimitMsg = `Memory usage ${this.humanReadableBytes(
 | 
			
		||||
@@ -374,7 +393,11 @@ export class ProcessMonitor extends EventEmitter {
 | 
			
		||||
   * Get the current logs from the process
 | 
			
		||||
   */
 | 
			
		||||
  public getLogs(limit?: number): IProcessLog[] {
 | 
			
		||||
    console.error(`[ProcessMonitor:${this.config.name}] getLogs called, logs.length=${this.logs.length}, limit=${limit}`);
 | 
			
		||||
    if (process.env.TSPM_DEBUG) {
 | 
			
		||||
      console.error(
 | 
			
		||||
        `[ProcessMonitor:${this.config.name}] getLogs called, logs.length=${this.logs.length}, limit=${limit}`,
 | 
			
		||||
      );
 | 
			
		||||
    }
 | 
			
		||||
    this.logger.debug(`Getting logs, total stored: ${this.logs.length}`);
 | 
			
		||||
    if (limit && limit > 0) {
 | 
			
		||||
      return this.logs.slice(-limit);
 | 
			
		||||
@@ -417,4 +440,17 @@ export class ProcessMonitor extends EventEmitter {
 | 
			
		||||
    const prefix = this.config.name ? `[${this.config.name}] ` : '';
 | 
			
		||||
    console.log(prefix + message);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Estimate approximate memory size in bytes for a log entry.
 | 
			
		||||
   * Keeps CPU low by avoiding JSON.stringify on the full array.
 | 
			
		||||
   */
 | 
			
		||||
  private estimateLogSize(log: IProcessLog): number {
 | 
			
		||||
    const messageBytes = Buffer.byteLength(log.message || '', 'utf8');
 | 
			
		||||
    const typeBytes = Buffer.byteLength(log.type || '', 'utf8');
 | 
			
		||||
    const runIdBytes = Buffer.byteLength((log as any).runId || '', 'utf8');
 | 
			
		||||
    // Rough overhead for object structure, keys, timestamp/seq values
 | 
			
		||||
    const overhead = 64;
 | 
			
		||||
    return messageBytes + typeBytes + runIdBytes + overhead;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -90,9 +90,19 @@ export class ProcessWrapper extends EventEmitter {
 | 
			
		||||
 | 
			
		||||
      // Capture stdout
 | 
			
		||||
      if (this.process.stdout) {
 | 
			
		||||
        console.error(`[ProcessWrapper] Setting up stdout listener for process ${this.process.pid}`);
 | 
			
		||||
        if (process.env.TSPM_DEBUG) {
 | 
			
		||||
          console.error(
 | 
			
		||||
            `[ProcessWrapper] Setting up stdout listener for process ${this.process.pid}`,
 | 
			
		||||
          );
 | 
			
		||||
        }
 | 
			
		||||
        this.process.stdout.on('data', (data) => {
 | 
			
		||||
          console.error(`[ProcessWrapper] Received stdout data from PID ${this.process?.pid}: ${data.toString().substring(0, 100)}`);
 | 
			
		||||
          if (process.env.TSPM_DEBUG) {
 | 
			
		||||
            console.error(
 | 
			
		||||
              `[ProcessWrapper] Received stdout data from PID ${this.process?.pid}: ${data
 | 
			
		||||
                .toString()
 | 
			
		||||
                .substring(0, 100)}`,
 | 
			
		||||
            );
 | 
			
		||||
          }
 | 
			
		||||
          // Add data to remainder buffer and split by newlines
 | 
			
		||||
          const text = this.stdoutRemainder + data.toString();
 | 
			
		||||
          const lines = text.split('\n');
 | 
			
		||||
@@ -102,7 +112,9 @@ export class ProcessWrapper extends EventEmitter {
 | 
			
		||||
          
 | 
			
		||||
          // Process complete lines
 | 
			
		||||
          for (const line of lines) {
 | 
			
		||||
            if (process.env.TSPM_DEBUG) {
 | 
			
		||||
              console.error(`[ProcessWrapper] Processing stdout line: ${line}`);
 | 
			
		||||
            }
 | 
			
		||||
            this.logger.debug(`Captured stdout: ${line}`);
 | 
			
		||||
            this.addLog('stdout', line);
 | 
			
		||||
          }
 | 
			
		||||
 
 | 
			
		||||
@@ -97,9 +97,25 @@ export class TspmDaemon {
 | 
			
		||||
    this.tspmInstance.on('process:log', ({ processId, log }) => {
 | 
			
		||||
      // Publish to topic for this process
 | 
			
		||||
      const topic = `logs.${processId}`;
 | 
			
		||||
      // Broadcast to all connected clients subscribed to this topic
 | 
			
		||||
      // Deliver only to subscribed clients
 | 
			
		||||
      if (this.ipcServer) {
 | 
			
		||||
        this.ipcServer.broadcast(`topic:${topic}`, log);
 | 
			
		||||
        try {
 | 
			
		||||
          const topicIndex = (this.ipcServer as any).topicIndex as Map<string, Set<string>> | undefined;
 | 
			
		||||
          const subscribers = topicIndex?.get(topic);
 | 
			
		||||
          if (subscribers && subscribers.size > 0) {
 | 
			
		||||
            // Send directly to subscribers for this topic
 | 
			
		||||
            for (const clientId of subscribers) {
 | 
			
		||||
              this.ipcServer
 | 
			
		||||
                .sendToClient(clientId, `topic:${topic}`, log)
 | 
			
		||||
                .catch((err: any) => {
 | 
			
		||||
                  // Surface but don't fail the loop
 | 
			
		||||
                  console.error('[IPC] sendToClient error:', err?.message || err);
 | 
			
		||||
                });
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
        } catch (err: any) {
 | 
			
		||||
          console.error('[IPC] Topic delivery error:', err?.message || err);
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user