import * as plugins from '../plugins.js'; import { EventEmitter } from 'events'; import { ProcessWrapper } from './processwrapper.js'; import { LogPersistence } from './logpersistence.js'; import { Logger, ProcessError, handleError } from '../shared/common/utils.errorhandler.js'; import type { IMonitorConfig, IProcessLog } from '../shared/protocol/ipc.types.js'; import type { ProcessId } from '../shared/protocol/id.js'; export class ProcessMonitor extends EventEmitter { private processWrapper: ProcessWrapper | null = null; private config: IMonitorConfig; private intervalId: NodeJS.Timeout | null = null; private stopped: boolean = true; // Initially stopped until start() is called private restartCount: number = 0; private logger: Logger; private logs: IProcessLog[] = []; private logPersistence: LogPersistence; private processId?: ProcessId; private currentLogMemorySize: number = 0; private readonly MAX_LOG_MEMORY_SIZE = 10 * 1024 * 1024; // 10MB private restartTimer: NodeJS.Timeout | null = null; private lastRetryAt: number | null = null; private readonly MAX_RETRIES = 10; private readonly RESET_WINDOW_MS = 60 * 60 * 1000; // 1 hour constructor(config: IMonitorConfig & { id?: ProcessId }) { super(); this.config = config; this.logger = new Logger(`ProcessMonitor:${config.name || 'unnamed'}`); this.logs = []; this.logPersistence = new LogPersistence(); this.processId = config.id; this.currentLogMemorySize = 0; } public async start(): Promise { // Load previously persisted logs if available if (this.processId) { const persistedLogs = await this.logPersistence.loadLogs(this.processId); if (persistedLogs.length > 0) { this.logs = persistedLogs; this.currentLogMemorySize = LogPersistence.calculateLogMemorySize(this.logs); this.logger.info(`Loaded ${persistedLogs.length} persisted logs from disk`); // Delete the persisted file after loading await this.logPersistence.deleteLogs(this.processId); } } // Reset the stopped flag so that new processes can spawn. this.stopped = false; this.log(`Starting process monitor.`); this.spawnProcess(); // Set the monitoring interval. const interval = this.config.monitorIntervalMs || 5000; this.intervalId = setInterval((): void => { if (this.processWrapper && this.processWrapper.getPid()) { this.monitorProcessGroup( this.processWrapper.getPid()!, this.config.memoryLimitBytes, ); } }, interval); } private spawnProcess(): void { // Don't spawn if the monitor has been stopped. if (this.stopped) { this.logger.debug('Not spawning process because monitor is stopped'); return; } this.logger.info(`Spawning process: ${this.config.command}`); // Create a new process wrapper this.processWrapper = new ProcessWrapper({ name: this.config.name || 'unnamed-process', command: this.config.command, args: this.config.args, cwd: this.config.projectDir, env: this.config.env, logBuffer: this.config.logBufferSize, }); // Set up event handlers this.processWrapper.on('log', (log: IProcessLog): void => { // Store the log in our buffer this.logs.push(log); console.error(`[ProcessMonitor:${this.config.name}] Received log (type=${log.type}): ${log.message}`); console.error(`[ProcessMonitor:${this.config.name}] Logs array now has ${this.logs.length} items`); this.logger.debug(`ProcessMonitor received log: ${log.message}`); // Update memory size tracking this.currentLogMemorySize = LogPersistence.calculateLogMemorySize(this.logs); // Trim logs if they exceed memory limit (10MB) while (this.currentLogMemorySize > this.MAX_LOG_MEMORY_SIZE && this.logs.length > 1) { // Remove oldest logs until we're under the memory limit this.logs.shift(); this.currentLogMemorySize = LogPersistence.calculateLogMemorySize(this.logs); } // Re-emit the log event for upstream handlers this.emit('log', log); // Log system messages to the console if (log.type === 'system') { this.log(log.message); } }); // Re-emit start event with PID for upstream handlers this.processWrapper.on('start', (pid: number): void => { this.emit('start', pid); }); this.processWrapper.on( 'exit', async (code: number | null, signal: string | null): Promise => { const exitMsg = `Process exited with code ${code}, signal ${signal}.`; this.logger.info(exitMsg); this.log(exitMsg); // Flush logs to disk on exit if (this.processId && this.logs.length > 0) { try { await this.logPersistence.saveLogs(this.processId, this.logs); this.logger.debug(`Flushed ${this.logs.length} logs to disk on exit`); } catch (error) { this.logger.error(`Failed to flush logs to disk on exit: ${error}`); } } // Re-emit exit event for upstream handlers this.emit('exit', code, signal); if (!this.stopped) { this.scheduleRestart('exit'); } else { this.logger.debug( 'Not restarting process because monitor is stopped', ); } }, ); this.processWrapper.on('error', async (error: Error | ProcessError): Promise => { const errorMsg = error instanceof ProcessError ? `Process error: ${error.toString()}` : `Process error: ${error.message}`; this.logger.error(error); this.log(errorMsg); // Flush logs to disk on error if (this.processId && this.logs.length > 0) { try { await this.logPersistence.saveLogs(this.processId, this.logs); this.logger.debug(`Flushed ${this.logs.length} logs to disk on error`); } catch (flushError) { this.logger.error(`Failed to flush logs to disk on error: ${flushError}`); } } if (!this.stopped) { this.scheduleRestart('error'); } else { this.logger.debug('Not restarting process because monitor is stopped'); } }); // Start the process try { this.processWrapper.start(); } catch (error: Error | unknown) { // The process wrapper will handle logging the error // Just prevent it from bubbling up further this.logger.error( `Failed to start process: ${error instanceof Error ? error.message : String(error)}`, ); } } /** * Schedule a restart with incremental debounce and failure cutoff. */ private scheduleRestart(reason: 'exit' | 'error'): void { const now = Date.now(); // Reset window: if last retry was more than 1 hour ago, reset counter if (this.lastRetryAt && now - this.lastRetryAt >= this.RESET_WINDOW_MS) { this.logger.info('Resetting retry counter after 1 hour window'); this.restartCount = 0; } // Already at or above max retries? if (this.restartCount >= this.MAX_RETRIES) { const msg = 'Maximum restart attempts reached. Marking process as failed.'; this.logger.warn(msg); this.log(msg); this.stopped = true; // Emit a specific event so manager can set status to errored this.emit('failed'); return; } // Increment and compute delay (1..10 seconds) this.restartCount++; const delaySec = Math.min(this.restartCount, 10); const msg = `Restarting process in ${delaySec}s (attempt ${this.restartCount}/${this.MAX_RETRIES}) due to ${reason}...`; this.logger.info(msg); this.log(msg); // Clear existing timer if any, then schedule if (this.restartTimer) { clearTimeout(this.restartTimer); } this.lastRetryAt = now; this.restartTimer = setTimeout(() => { // If stopped in the meantime, do not spawn if (this.stopped) { return; } this.spawnProcess(); }, delaySec * 1000); } /** * Monitor the process group's memory usage. If the total memory exceeds the limit, * kill the process group so that the 'exit' handler can restart it. */ private async monitorProcessGroup( pid: number, memoryLimit: number, ): Promise { try { const memoryUsage = await this.getProcessGroupMemory(pid); this.logger.debug( `Memory usage for PID ${pid}: ${this.humanReadableBytes(memoryUsage)} (${memoryUsage} bytes)`, ); // Only log to the process log at longer intervals to avoid spamming this.log( `Current memory usage for process group (PID ${pid}): ${this.humanReadableBytes( memoryUsage, )} (${memoryUsage} bytes)`, ); if (memoryUsage > memoryLimit) { const memoryLimitMsg = `Memory usage ${this.humanReadableBytes( memoryUsage, )} exceeds limit of ${this.humanReadableBytes(memoryLimit)}. Restarting process.`; this.logger.warn(memoryLimitMsg); this.log(memoryLimitMsg); // Stop the process wrapper, which will trigger the exit handler and restart if (this.processWrapper) { this.processWrapper.stop(); } } } catch (error: Error | unknown) { const processError = new ProcessError( error instanceof Error ? error.message : String(error), 'ERR_MEMORY_MONITORING_FAILED', { pid }, ); this.logger.error(processError); this.log(`Error monitoring process group: ${processError.toString()}`); } } /** * Get the total memory usage (in bytes) for the process group (the main process and its children). */ private getProcessGroupMemory(pid: number): Promise { return new Promise((resolve, reject) => { this.logger.debug( `Getting memory usage for process group with PID ${pid}`, ); plugins.psTree( pid, (err: Error | null, children: Array<{ PID: string }>) => { if (err) { const processError = new ProcessError( `Failed to get process tree: ${err.message}`, 'ERR_PSTREE_FAILED', { pid }, ); this.logger.debug(`psTree error: ${err.message}`); return reject(processError); } // Include the main process and its children. const pids: number[] = [ pid, ...children.map((child) => Number(child.PID)), ]; this.logger.debug( `Found ${pids.length} processes in group with parent PID ${pid}`, ); plugins.pidusage( pids, (err: Error | null, stats: Record) => { if (err) { const processError = new ProcessError( `Failed to get process usage stats: ${err.message}`, 'ERR_PIDUSAGE_FAILED', { pids }, ); this.logger.debug(`pidusage error: ${err.message}`); return reject(processError); } let totalMemory = 0; for (const key in stats) { totalMemory += stats[key].memory; } this.logger.debug( `Total memory for process group: ${this.humanReadableBytes(totalMemory)}`, ); resolve(totalMemory); }, ); }, ); }); } /** * Convert a number of bytes into a human-readable string (e.g. "1.23 MB"). */ private humanReadableBytes(bytes: number, decimals: number = 2): string { if (bytes === 0) return '0 Bytes'; const k = 1024; const dm = decimals < 0 ? 0 : decimals; const sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']; const i = Math.floor(Math.log(bytes) / Math.log(k)); return parseFloat((bytes / Math.pow(k, i)).toFixed(dm)) + ' ' + sizes[i]; } /** * Stop the monitor and prevent any further respawns. */ public async stop(): Promise { this.log('Stopping process monitor.'); this.stopped = true; // Flush logs to disk before stopping if (this.processId && this.logs.length > 0) { try { await this.logPersistence.saveLogs(this.processId, this.logs); this.logger.info(`Flushed ${this.logs.length} logs to disk on stop`); } catch (error) { this.logger.error(`Failed to flush logs to disk on stop: ${error}`); } } if (this.intervalId) { clearInterval(this.intervalId); } if (this.processWrapper) { this.processWrapper.stop(); } } /** * Get the current logs from the process */ public getLogs(limit?: number): IProcessLog[] { console.error(`[ProcessMonitor:${this.config.name}] getLogs called, logs.length=${this.logs.length}, limit=${limit}`); this.logger.debug(`Getting logs, total stored: ${this.logs.length}`); if (limit && limit > 0) { return this.logs.slice(-limit); } return this.logs; } /** * Get the number of times the process has been restarted */ public getRestartCount(): number { return this.restartCount; } /** * Get the process ID if running */ public getPid(): number | null { return this.processWrapper?.getPid() || null; } /** * Get process uptime in milliseconds */ public getUptime(): number { return this.processWrapper?.getUptime() || 0; } /** * Check if the process is currently running */ public isRunning(): boolean { return this.processWrapper?.isRunning() || false; } /** * Helper method for logging messages with the instance name. */ private log(message: string): void { const prefix = this.config.name ? `[${this.config.name}] ` : ''; console.log(prefix + message); } }