import * as plugins from '../plugins.js'; import { EventEmitter } from 'events'; import { Logger, ProcessError, handleError } from '../shared/common/utils.errorhandler.js'; import type { IProcessLog } from '../shared/protocol/ipc.types.js'; export interface IProcessWrapperOptions { command: string; args?: string[]; cwd: string; env?: NodeJS.ProcessEnv; name: string; logBuffer?: number; // Number of log lines to keep in memory (default: 100) } export class ProcessWrapper extends EventEmitter { private process: plugins.childProcess.ChildProcess | null = null; private options: IProcessWrapperOptions; private logs: IProcessLog[] = []; private logBufferSize: number; private startTime: Date | null = null; private logger: Logger; private nextSeq: number = 0; private runId: string = ''; private stdoutRemainder: string = ''; private stderrRemainder: string = ''; // Helper: send a signal to the process and all its children (best-effort) private async killProcessTree(signal: NodeJS.Signals): Promise { if (!this.process || !this.process.pid) return; const rootPid = this.process.pid; await new Promise((resolve) => { plugins.psTree(rootPid, (err: any, children: ReadonlyArray<{ PID: string }>) => { const pids: number[] = [rootPid, ...children.map((c) => Number(c.PID)).filter((n) => Number.isFinite(n))]; for (const pid of pids) { try { // Always signal individual PIDs to avoid accidentally targeting unrelated groups process.kill(pid, signal); } catch { // ignore ESRCH/EPERM } } resolve(); }); }); } constructor(options: IProcessWrapperOptions) { super(); this.options = options; this.logBufferSize = options.logBuffer || 100; this.logger = new Logger(`ProcessWrapper:${options.name}`); this.runId = `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; } /** * Start the wrapped process */ public start(): void { this.addSystemLog('Starting process...'); try { this.logger.debug(`Starting process: ${this.options.command}`); if (this.options.args && this.options.args.length > 0) { this.process = plugins.childProcess.spawn( this.options.command, this.options.args, { cwd: this.options.cwd, env: { ...process.env, ...(this.options.env || {}) }, stdio: ['ignore', 'pipe', 'pipe'], // We need to pipe stdout and stderr }, ); } else { // Use shell mode to allow a full command string this.process = plugins.childProcess.spawn(this.options.command, { cwd: this.options.cwd, env: { ...process.env, ...(this.options.env || {}) }, stdio: ['ignore', 'pipe', 'pipe'], // We need to pipe stdout and stderr shell: true, }); } this.startTime = new Date(); // Handle process exit this.process.on('exit', (code, signal) => { const exitMessage = `Process exited with code ${code}, signal ${signal}`; this.logger.info(exitMessage); this.addSystemLog(exitMessage); // Clear remainder buffers on exit this.stdoutRemainder = ''; this.stderrRemainder = ''; this.emit('exit', code, signal); }); // Handle errors this.process.on('error', (error) => { const processError = new ProcessError( error.message, 'ERR_PROCESS_EXECUTION', { command: this.options.command, pid: this.process?.pid }, ); this.logger.error(processError); this.addSystemLog(`Process error: ${processError.toString()}`); this.emit('error', processError); }); // Capture stdout if (this.process.stdout) { if (process.env.TSPM_DEBUG) { console.error( `[ProcessWrapper] Setting up stdout listener for process ${this.process.pid}`, ); } this.process.stdout.on('data', (data) => { if (process.env.TSPM_DEBUG) { console.error( `[ProcessWrapper] Received stdout data from PID ${this.process?.pid}: ${data .toString() .substring(0, 100)}`, ); } // Add data to remainder buffer and split by newlines const text = this.stdoutRemainder + data.toString(); const lines = text.split('\n'); // The last element might be a partial line this.stdoutRemainder = lines.pop() || ''; // Process complete lines for (const line of lines) { if (process.env.TSPM_DEBUG) { console.error(`[ProcessWrapper] Processing stdout line: ${line}`); } this.logger.debug(`Captured stdout: ${line}`); this.addLog('stdout', line); } }); // Flush remainder on stream end this.process.stdout.on('end', () => { if (this.stdoutRemainder) { this.logger.debug(`Flushing stdout remainder: ${this.stdoutRemainder}`); this.addLog('stdout', this.stdoutRemainder); this.stdoutRemainder = ''; } }); } else { this.logger.warn('Process stdout is null'); } // Capture stderr if (this.process.stderr) { this.process.stderr.on('data', (data) => { // Add data to remainder buffer and split by newlines const text = this.stderrRemainder + data.toString(); const lines = text.split('\n'); // The last element might be a partial line this.stderrRemainder = lines.pop() || ''; // Process complete lines for (const line of lines) { this.addLog('stderr', line); } }); // Flush remainder on stream end this.process.stderr.on('end', () => { if (this.stderrRemainder) { this.addLog('stderr', this.stderrRemainder); this.stderrRemainder = ''; } }); } this.addSystemLog(`Process started with PID ${this.process.pid}`); this.logger.info(`Process started with PID ${this.process.pid}`); this.emit('start', this.process.pid); } catch (error: Error | unknown) { const processError = error instanceof ProcessError ? error : new ProcessError( error instanceof Error ? error.message : String(error), 'ERR_PROCESS_START_FAILED', { command: this.options.command }, ); this.logger.error(processError); this.addSystemLog(`Failed to start process: ${processError.toString()}`); this.emit('error', processError); throw processError; } } /** * Stop the wrapped process */ public async stop(): Promise { if (!this.process) { this.logger.debug('Stop called but no process is running'); this.addSystemLog('No process running'); return; } this.logger.info('Stopping process...'); this.addSystemLog('Stopping process...'); // First try SIGTERM for graceful shutdown if (this.process.pid) { try { this.logger.debug(`Sending SIGTERM to process tree rooted at ${this.process.pid}`); await this.killProcessTree('SIGTERM'); // If the process already exited, return immediately if (typeof this.process.exitCode === 'number') { this.logger.debug('Process already exited, no need to wait'); return; } // Wait for exit or escalate await new Promise((resolve) => { let settled = false; const cleanup = () => { if (settled) return; settled = true; resolve(); }; const onExit = () => cleanup(); this.process!.once('exit', onExit); const killTimer = setTimeout(async () => { if (!this.process || !this.process.pid) return cleanup(); this.logger.warn( `Process ${this.process.pid} did not exit gracefully, force killing tree...`, ); this.addSystemLog('Process did not exit gracefully, force killing...'); try { await this.killProcessTree('SIGKILL'); } catch {} // Give a short grace period after SIGKILL setTimeout(() => cleanup(), 500); }, 5000); // Safety cap in case neither exit nor timer fires (shouldn't happen) setTimeout(() => { clearTimeout(killTimer); cleanup(); }, 10000); }); } catch (error: Error | unknown) { const processError = new ProcessError( error instanceof Error ? error.message : String(error), 'ERR_PROCESS_STOP_FAILED', { pid: this.process.pid }, ); this.logger.error(processError); this.addSystemLog(`Error stopping process: ${processError.toString()}`); } } } /** * Get the process ID if running */ public getPid(): number | null { return this.process?.pid || null; } /** * Get the current logs */ public getLogs(limit: number = this.logBufferSize): IProcessLog[] { // Return the most recent logs up to the limit return this.logs.slice(-limit); } /** * Get uptime in milliseconds */ public getUptime(): number { if (!this.startTime) return 0; return Date.now() - this.startTime.getTime(); } /** * Check if the process is currently running */ public isRunning(): boolean { return this.process !== null && typeof this.process.exitCode !== 'number'; } /** * Add a log entry from stdout or stderr */ private addLog(type: 'stdout' | 'stderr', message: string): void { const log: IProcessLog = { timestamp: new Date(), type, message, seq: this.nextSeq++, runId: this.runId, }; this.logs.push(log); // Trim logs if they exceed buffer size if (this.logs.length > this.logBufferSize) { this.logs = this.logs.slice(-this.logBufferSize); } // Emit log event for potential handlers this.emit('log', log); } /** * Add a system log entry (not from the process itself) */ private addSystemLog(message: string): void { const log: IProcessLog = { timestamp: new Date(), type: 'system', message, seq: this.nextSeq++, runId: this.runId, }; this.logs.push(log); // Trim logs if they exceed buffer size if (this.logs.length > this.logBufferSize) { this.logs = this.logs.slice(-this.logBufferSize); } // Emit log event for potential handlers this.emit('log', log); } }