590 lines
20 KiB
TypeScript
590 lines
20 KiB
TypeScript
import * as plugins from '../plugins.js';
|
|
import { EventEmitter } from 'events';
|
|
import { ProcessWrapper } from './processwrapper.js';
|
|
import { LogPersistence } from './logpersistence.js';
|
|
import { CrashLogManager } from './crashlogmanager.js';
|
|
import { Logger, ProcessError, handleError } from '../shared/common/utils.errorhandler.js';
|
|
import type { IMonitorConfig, IProcessLog } from '../shared/protocol/ipc.types.js';
|
|
import type { ProcessId } from '../shared/protocol/id.js';
|
|
|
|
export class ProcessMonitor extends EventEmitter {
|
|
private processWrapper: ProcessWrapper | null = null;
|
|
private config: IMonitorConfig;
|
|
private intervalId: NodeJS.Timeout | null = null;
|
|
private stopped: boolean = true; // Initially stopped until start() is called
|
|
private restartCount: number = 0;
|
|
private logger: Logger;
|
|
private logs: IProcessLog[] = [];
|
|
private logPersistence: LogPersistence;
|
|
private crashLogManager: CrashLogManager;
|
|
private processId?: ProcessId;
|
|
private currentLogMemorySize: number = 0;
|
|
private readonly MAX_LOG_MEMORY_SIZE = 10 * 1024 * 1024; // 10MB
|
|
// Track approximate size per log to avoid O(n) JSON stringify on every update
|
|
private logSizeMap: WeakMap<IProcessLog, number> = new WeakMap();
|
|
private restartTimer: NodeJS.Timeout | null = null;
|
|
private lastRetryAt: number | null = null;
|
|
private readonly MAX_RETRIES = 10;
|
|
private readonly RESET_WINDOW_MS = 60 * 60 * 1000; // 1 hour
|
|
private lastMemoryUsage: number = 0;
|
|
private lastCpuUsage: number = 0;
|
|
// Store event listeners for cleanup
|
|
private logHandler?: (log: IProcessLog) => void;
|
|
private startHandler?: (pid: number) => void;
|
|
private exitHandler?: (code: number | null, signal: string | null) => Promise<void>;
|
|
private errorHandler?: (error: Error | ProcessError) => Promise<void>;
|
|
|
|
constructor(config: IMonitorConfig & { id?: ProcessId }) {
|
|
super();
|
|
this.config = config;
|
|
this.logger = new Logger(`ProcessMonitor:${config.name || 'unnamed'}`);
|
|
this.logs = [];
|
|
this.logPersistence = new LogPersistence();
|
|
this.crashLogManager = new CrashLogManager();
|
|
this.processId = config.id;
|
|
this.currentLogMemorySize = 0;
|
|
}
|
|
|
|
public async start(): Promise<void> {
|
|
// Load previously persisted logs if available
|
|
if (this.processId) {
|
|
const persistedLogs = await this.logPersistence.loadLogs(this.processId);
|
|
if (persistedLogs.length > 0) {
|
|
this.logs = persistedLogs;
|
|
// Recalculate size once from scratch and seed the size map
|
|
this.currentLogMemorySize = 0;
|
|
for (const log of this.logs) {
|
|
const size = this.estimateLogSize(log);
|
|
this.logSizeMap.set(log, size);
|
|
this.currentLogMemorySize += size;
|
|
}
|
|
this.logger.info(`Loaded ${persistedLogs.length} persisted logs from disk`);
|
|
|
|
// Delete the persisted file after loading
|
|
await this.logPersistence.deleteLogs(this.processId);
|
|
}
|
|
}
|
|
|
|
// Reset the stopped flag so that new processes can spawn.
|
|
this.stopped = false;
|
|
this.log(`Starting process monitor.`);
|
|
this.spawnProcess();
|
|
|
|
// Set the monitoring interval.
|
|
const interval = this.config.monitorIntervalMs || 5000;
|
|
this.intervalId = setInterval((): void => {
|
|
if (this.processWrapper && this.processWrapper.getPid()) {
|
|
this.monitorProcessGroup(
|
|
this.processWrapper.getPid()!,
|
|
this.config.memoryLimitBytes,
|
|
);
|
|
}
|
|
}, interval);
|
|
}
|
|
|
|
private spawnProcess(): void {
|
|
// Don't spawn if the monitor has been stopped.
|
|
if (this.stopped) {
|
|
this.logger.debug('Not spawning process because monitor is stopped');
|
|
return;
|
|
}
|
|
|
|
this.logger.info(`Spawning process: ${this.config.command}`);
|
|
|
|
// Clear any orphaned pidusage cache entries before spawning
|
|
try {
|
|
(plugins.pidusage as any)?.clearAll?.();
|
|
} catch {}
|
|
|
|
// Clean up previous listeners if any
|
|
this.cleanupListeners();
|
|
|
|
// Create a new process wrapper
|
|
this.processWrapper = new ProcessWrapper({
|
|
name: this.config.name || 'unnamed-process',
|
|
command: this.config.command,
|
|
args: this.config.args,
|
|
cwd: this.config.projectDir,
|
|
env: this.config.env,
|
|
logBuffer: this.config.logBufferSize,
|
|
});
|
|
|
|
// Set up event handlers
|
|
this.logHandler = (log: IProcessLog): void => {
|
|
// Store the log in our buffer
|
|
this.logs.push(log);
|
|
if (process.env.TSPM_DEBUG) {
|
|
console.error(
|
|
`[ProcessMonitor:${this.config.name}] Received log (type=${log.type}): ${log.message}`,
|
|
);
|
|
console.error(
|
|
`[ProcessMonitor:${this.config.name}] Logs array now has ${this.logs.length} items`,
|
|
);
|
|
}
|
|
this.logger.debug(`ProcessMonitor received log: ${log.message}`);
|
|
|
|
// Update memory size tracking incrementally
|
|
const approxSize = this.estimateLogSize(log);
|
|
this.logSizeMap.set(log, approxSize);
|
|
this.currentLogMemorySize += approxSize;
|
|
|
|
// Trim logs if they exceed memory limit (10MB)
|
|
while (this.currentLogMemorySize > this.MAX_LOG_MEMORY_SIZE && this.logs.length > 1) {
|
|
// Remove oldest logs until we're under the memory limit
|
|
const removed = this.logs.shift()!;
|
|
const removedSize = this.logSizeMap.get(removed) ?? this.estimateLogSize(removed);
|
|
this.logSizeMap.delete(removed); // Clean up map entry to prevent memory leak
|
|
this.currentLogMemorySize -= removedSize;
|
|
}
|
|
|
|
// Re-emit the log event for upstream handlers
|
|
this.emit('log', log);
|
|
|
|
// Log system messages to the console
|
|
if (log.type === 'system') {
|
|
this.log(log.message);
|
|
}
|
|
};
|
|
this.processWrapper.on('log', this.logHandler);
|
|
|
|
// Re-emit start event with PID for upstream handlers
|
|
this.startHandler = (pid: number): void => {
|
|
this.emit('start', pid);
|
|
};
|
|
this.processWrapper.on('start', this.startHandler);
|
|
|
|
this.exitHandler = async (code: number | null, signal: string | null): Promise<void> => {
|
|
const exitMsg = `Process exited with code ${code}, signal ${signal}.`;
|
|
this.logger.info(exitMsg);
|
|
this.log(exitMsg);
|
|
|
|
// Clear pidusage internal state for this PID to prevent memory leaks
|
|
try {
|
|
const pidToClear = this.processWrapper?.getPid();
|
|
if (pidToClear) {
|
|
(plugins.pidusage as any)?.clear?.(pidToClear);
|
|
}
|
|
} catch {}
|
|
|
|
// Detect if this was a crash (non-zero exit code or killed by signal)
|
|
const isCrash = (code !== null && code !== 0) || signal !== null;
|
|
|
|
// Save crash log if this was a crash
|
|
if (isCrash && this.processId && this.config.name) {
|
|
try {
|
|
await this.crashLogManager.saveCrashLog(
|
|
this.processId,
|
|
this.config.name,
|
|
this.logs,
|
|
code,
|
|
signal,
|
|
this.restartCount,
|
|
this.lastMemoryUsage
|
|
);
|
|
this.logger.info(`Saved crash log for process ${this.config.name}`);
|
|
} catch (error) {
|
|
this.logger.error(`Failed to save crash log: ${error}`);
|
|
}
|
|
}
|
|
|
|
// Flush logs to disk on exit
|
|
if (this.processId && this.logs.length > 0) {
|
|
try {
|
|
await this.logPersistence.saveLogs(this.processId, this.logs);
|
|
this.logger.debug(`Flushed ${this.logs.length} logs to disk on exit`);
|
|
} catch (error) {
|
|
this.logger.error(`Failed to flush logs to disk on exit: ${error}`);
|
|
}
|
|
}
|
|
|
|
// Re-emit exit event for upstream handlers
|
|
this.emit('exit', code, signal);
|
|
|
|
if (!this.stopped) {
|
|
this.scheduleRestart('exit');
|
|
} else {
|
|
this.logger.debug(
|
|
'Not restarting process because monitor is stopped',
|
|
);
|
|
}
|
|
};
|
|
this.processWrapper.on('exit', this.exitHandler);
|
|
|
|
this.errorHandler = async (error: Error | ProcessError): Promise<void> => {
|
|
const errorMsg =
|
|
error instanceof ProcessError
|
|
? `Process error: ${error.toString()}`
|
|
: `Process error: ${error.message}`;
|
|
|
|
this.logger.error(error);
|
|
this.log(errorMsg);
|
|
|
|
// Save crash log for errors
|
|
if (this.processId && this.config.name) {
|
|
try {
|
|
await this.crashLogManager.saveCrashLog(
|
|
this.processId,
|
|
this.config.name,
|
|
this.logs,
|
|
null, // no exit code for errors
|
|
null, // no signal for errors
|
|
this.restartCount,
|
|
this.lastMemoryUsage
|
|
);
|
|
this.logger.info(`Saved crash log for process ${this.config.name} due to error`);
|
|
} catch (crashLogError) {
|
|
this.logger.error(`Failed to save crash log: ${crashLogError}`);
|
|
}
|
|
}
|
|
|
|
// Flush logs to disk on error
|
|
if (this.processId && this.logs.length > 0) {
|
|
try {
|
|
await this.logPersistence.saveLogs(this.processId, this.logs);
|
|
this.logger.debug(`Flushed ${this.logs.length} logs to disk on error`);
|
|
} catch (flushError) {
|
|
this.logger.error(`Failed to flush logs to disk on error: ${flushError}`);
|
|
}
|
|
}
|
|
|
|
if (!this.stopped) {
|
|
this.scheduleRestart('error');
|
|
} else {
|
|
this.logger.debug('Not restarting process because monitor is stopped');
|
|
}
|
|
};
|
|
this.processWrapper.on('error', this.errorHandler);
|
|
|
|
// Start the process
|
|
try {
|
|
this.processWrapper.start();
|
|
} catch (error: Error | unknown) {
|
|
// The process wrapper will handle logging the error
|
|
// Just prevent it from bubbling up further
|
|
this.logger.error(
|
|
`Failed to start process: ${error instanceof Error ? error.message : String(error)}`,
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Clean up event listeners from process wrapper
|
|
*/
|
|
private cleanupListeners(): void {
|
|
if (this.processWrapper) {
|
|
if (this.logHandler) {
|
|
this.processWrapper.removeListener('log', this.logHandler);
|
|
}
|
|
if (this.startHandler) {
|
|
this.processWrapper.removeListener('start', this.startHandler);
|
|
}
|
|
if (this.exitHandler) {
|
|
this.processWrapper.removeListener('exit', this.exitHandler);
|
|
}
|
|
if (this.errorHandler) {
|
|
this.processWrapper.removeListener('error', this.errorHandler);
|
|
}
|
|
}
|
|
// Clear references
|
|
this.logHandler = undefined;
|
|
this.startHandler = undefined;
|
|
this.exitHandler = undefined;
|
|
this.errorHandler = undefined;
|
|
}
|
|
|
|
/**
|
|
* Schedule a restart with incremental debounce and failure cutoff.
|
|
*/
|
|
private scheduleRestart(reason: 'exit' | 'error'): void {
|
|
const now = Date.now();
|
|
// Reset window: if last retry was more than 1 hour ago, reset counter
|
|
if (this.lastRetryAt && now - this.lastRetryAt >= this.RESET_WINDOW_MS) {
|
|
this.logger.info('Resetting retry counter after 1 hour window');
|
|
this.restartCount = 0;
|
|
}
|
|
|
|
// Already at or above max retries?
|
|
if (this.restartCount >= this.MAX_RETRIES) {
|
|
const msg = 'Maximum restart attempts reached. Marking process as failed.';
|
|
this.logger.warn(msg);
|
|
this.log(msg);
|
|
this.stopped = true;
|
|
// Emit a specific event so manager can set status to errored
|
|
this.emit('failed');
|
|
return;
|
|
}
|
|
|
|
// Increment and compute delay (1..10 seconds)
|
|
this.restartCount++;
|
|
const delaySec = Math.min(this.restartCount, 10);
|
|
const msg = `Restarting process in ${delaySec}s (attempt ${this.restartCount}/${this.MAX_RETRIES}) due to ${reason}...`;
|
|
this.logger.info(msg);
|
|
this.log(msg);
|
|
|
|
// Clear existing timer if any, then schedule
|
|
if (this.restartTimer) {
|
|
clearTimeout(this.restartTimer);
|
|
}
|
|
this.lastRetryAt = now;
|
|
this.restartTimer = setTimeout(() => {
|
|
// If stopped in the meantime, do not spawn
|
|
if (this.stopped) {
|
|
return;
|
|
}
|
|
this.spawnProcess();
|
|
}, delaySec * 1000);
|
|
}
|
|
|
|
/**
|
|
* Monitor the process group's memory usage. If the total memory exceeds the limit,
|
|
* kill the process group so that the 'exit' handler can restart it.
|
|
*/
|
|
private async monitorProcessGroup(
|
|
pid: number,
|
|
memoryLimit: number,
|
|
): Promise<void> {
|
|
try {
|
|
const { memory: memoryUsage, cpu: cpuUsage } = await this.getProcessGroupStats(pid);
|
|
|
|
this.logger.debug(
|
|
`Memory usage for PID ${pid}: ${this.humanReadableBytes(memoryUsage)} (${memoryUsage} bytes)`,
|
|
);
|
|
|
|
// Store latest readings
|
|
this.lastMemoryUsage = memoryUsage;
|
|
this.lastCpuUsage = cpuUsage;
|
|
|
|
// Only log memory usage in debug mode to avoid spamming
|
|
if (process.env.TSPM_DEBUG) {
|
|
this.log(
|
|
`Current memory usage for process group (PID ${pid}): ${this.humanReadableBytes(
|
|
memoryUsage,
|
|
)} (${memoryUsage} bytes)`,
|
|
);
|
|
}
|
|
|
|
if (memoryUsage > memoryLimit) {
|
|
const memoryLimitMsg = `Memory usage ${this.humanReadableBytes(
|
|
memoryUsage,
|
|
)} exceeds limit of ${this.humanReadableBytes(memoryLimit)}. Restarting process.`;
|
|
|
|
this.logger.warn(memoryLimitMsg);
|
|
this.log(memoryLimitMsg);
|
|
|
|
// Stop the process wrapper, which will trigger the exit handler and restart
|
|
if (this.processWrapper) {
|
|
await this.processWrapper.stop();
|
|
}
|
|
}
|
|
} catch (error: Error | unknown) {
|
|
const processError = new ProcessError(
|
|
error instanceof Error ? error.message : String(error),
|
|
'ERR_MEMORY_MONITORING_FAILED',
|
|
{ pid },
|
|
);
|
|
|
|
this.logger.error(processError);
|
|
this.log(`Error monitoring process group: ${processError.toString()}`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get the total memory usage (in bytes) for the process group (the main process and its children).
|
|
*/
|
|
private getProcessGroupStats(pid: number): Promise<{ memory: number; cpu: number }> {
|
|
return new Promise((resolve, reject) => {
|
|
this.logger.debug(
|
|
`Getting memory usage for process group with PID ${pid}`,
|
|
);
|
|
|
|
plugins.psTree(
|
|
pid,
|
|
(err: any, children: ReadonlyArray<{ PID: string }>) => {
|
|
if (err) {
|
|
const processError = new ProcessError(
|
|
`Failed to get process tree: ${err.message}`,
|
|
'ERR_PSTREE_FAILED',
|
|
{ pid },
|
|
);
|
|
this.logger.debug(`psTree error: ${err.message}`);
|
|
return reject(processError);
|
|
}
|
|
|
|
// Include the main process and its children.
|
|
const pids: number[] = [
|
|
pid,
|
|
...children.map((child) => Number(child.PID)),
|
|
];
|
|
this.logger.debug(
|
|
`Found ${pids.length} processes in group with parent PID ${pid}`,
|
|
);
|
|
|
|
plugins.pidusage(
|
|
pids,
|
|
(err: Error | null, stats: Record<string, { memory: number; cpu: number }>) => {
|
|
if (err) {
|
|
const processError = new ProcessError(
|
|
`Failed to get process usage stats: ${err.message}`,
|
|
'ERR_PIDUSAGE_FAILED',
|
|
{ pids },
|
|
);
|
|
this.logger.debug(`pidusage error: ${err.message}`);
|
|
return reject(processError);
|
|
}
|
|
|
|
let totalMemory = 0;
|
|
let totalCpu = 0;
|
|
for (const key in stats) {
|
|
// Check if stats[key] exists and is not null (process may have exited)
|
|
if (stats[key]) {
|
|
totalMemory += stats[key].memory || 0;
|
|
totalCpu += Number.isFinite(stats[key].cpu) ? stats[key].cpu : 0;
|
|
} else {
|
|
this.logger.debug(`Process ${key} stats are null (process may have exited)`);
|
|
}
|
|
}
|
|
|
|
this.logger.debug(
|
|
`Total memory for process group: ${this.humanReadableBytes(totalMemory)}`,
|
|
);
|
|
|
|
resolve({ memory: totalMemory, cpu: totalCpu });
|
|
},
|
|
);
|
|
},
|
|
);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Convert a number of bytes into a human-readable string (e.g. "1.23 MB").
|
|
*/
|
|
private humanReadableBytes(bytes: number, decimals: number = 2): string {
|
|
if (bytes === 0) return '0 Bytes';
|
|
const k = 1024;
|
|
const dm = decimals < 0 ? 0 : decimals;
|
|
const sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'];
|
|
const i = Math.floor(Math.log(bytes) / Math.log(k));
|
|
return parseFloat((bytes / Math.pow(k, i)).toFixed(dm)) + ' ' + sizes[i];
|
|
}
|
|
|
|
/**
|
|
* Stop the monitor and prevent any further respawns.
|
|
*/
|
|
public async stop(): Promise<void> {
|
|
this.log('Stopping process monitor.');
|
|
this.stopped = true;
|
|
|
|
// Clean up event listeners
|
|
this.cleanupListeners();
|
|
|
|
// Flush logs to disk before stopping
|
|
if (this.processId && this.logs.length > 0) {
|
|
try {
|
|
await this.logPersistence.saveLogs(this.processId, this.logs);
|
|
this.logger.info(`Flushed ${this.logs.length} logs to disk on stop`);
|
|
} catch (error) {
|
|
this.logger.error(`Failed to flush logs to disk on stop: ${error}`);
|
|
}
|
|
}
|
|
|
|
if (this.intervalId) {
|
|
clearInterval(this.intervalId);
|
|
}
|
|
// Cancel any pending restart timer
|
|
if (this.restartTimer) {
|
|
clearTimeout(this.restartTimer);
|
|
this.restartTimer = null;
|
|
}
|
|
if (this.processWrapper) {
|
|
// Clear pidusage state for current PID before stopping to avoid leaks
|
|
try {
|
|
const pidToClear = this.processWrapper.getPid();
|
|
if (pidToClear) {
|
|
(plugins.pidusage as any)?.clear?.(pidToClear);
|
|
}
|
|
} catch {}
|
|
await this.processWrapper.stop();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get the current logs from the process
|
|
*/
|
|
public getLogs(limit?: number): IProcessLog[] {
|
|
if (process.env.TSPM_DEBUG) {
|
|
console.error(
|
|
`[ProcessMonitor:${this.config.name}] getLogs called, logs.length=${this.logs.length}, limit=${limit}`,
|
|
);
|
|
}
|
|
this.logger.debug(`Getting logs, total stored: ${this.logs.length}`);
|
|
if (limit && limit > 0) {
|
|
return this.logs.slice(-limit);
|
|
}
|
|
return this.logs;
|
|
}
|
|
|
|
/**
|
|
* Get the number of times the process has been restarted
|
|
*/
|
|
public getRestartCount(): number {
|
|
return this.restartCount;
|
|
}
|
|
|
|
/**
|
|
* Get the process ID if running
|
|
*/
|
|
public getPid(): number | null {
|
|
return this.processWrapper?.getPid() || null;
|
|
}
|
|
|
|
/**
|
|
* Get process uptime in milliseconds
|
|
*/
|
|
public getUptime(): number {
|
|
return this.processWrapper?.getUptime() || 0;
|
|
}
|
|
|
|
/**
|
|
* Check if the process is currently running
|
|
*/
|
|
public isRunning(): boolean {
|
|
return this.processWrapper?.isRunning() || false;
|
|
}
|
|
|
|
/**
|
|
* Get last measured memory usage for the process group (bytes)
|
|
*/
|
|
public getLastMemoryUsage(): number {
|
|
return this.lastMemoryUsage;
|
|
}
|
|
|
|
/**
|
|
* Get last measured CPU usage for the process group (sum of group, percent)
|
|
*/
|
|
public getLastCpuUsage(): number {
|
|
return this.lastCpuUsage;
|
|
}
|
|
|
|
/**
|
|
* Helper method for logging messages with the instance name.
|
|
*/
|
|
private log(message: string): void {
|
|
const prefix = this.config.name ? `[${this.config.name}] ` : '';
|
|
console.log(prefix + message);
|
|
}
|
|
|
|
/**
|
|
* Estimate approximate memory size in bytes for a log entry.
|
|
* Keeps CPU low by avoiding JSON.stringify on the full array.
|
|
*/
|
|
private estimateLogSize(log: IProcessLog): number {
|
|
const messageBytes = Buffer.byteLength(log.message || '', 'utf8');
|
|
const typeBytes = Buffer.byteLength(log.type || '', 'utf8');
|
|
const runIdBytes = Buffer.byteLength((log as any).runId || '', 'utf8');
|
|
// Rough overhead for object structure, keys, timestamp/seq values
|
|
const overhead = 64;
|
|
return messageBytes + typeBytes + runIdBytes + overhead;
|
|
}
|
|
}
|