Files
tspm/ts/daemon/processmanager.ts

787 lines
23 KiB
TypeScript

import * as plugins from '../plugins.js';
import { EventEmitter } from 'events';
import * as paths from '../paths.js';
import { ProcessMonitor } from './processmonitor.js';
import { LogPersistence } from './logpersistence.js';
import { TspmConfig } from './tspm.config.js';
import {
Logger,
ProcessError,
ConfigError,
ValidationError,
handleError,
} from '../shared/common/utils.errorhandler.js';
import type {
IProcessConfig,
IProcessInfo,
IProcessLog,
IMonitorConfig
} from '../shared/protocol/ipc.types.js';
import { toProcessId, getNextProcessId } from '../shared/protocol/id.js';
import type { ProcessId } from '../shared/protocol/id.js';
export class ProcessManager extends EventEmitter {
public processes: Map<ProcessId, ProcessMonitor> = new Map();
public processConfigs: Map<ProcessId, IProcessConfig> = new Map();
public processInfo: Map<ProcessId, IProcessInfo> = new Map();
private processLogs: Map<ProcessId, IProcessLog[]> = new Map();
private config: TspmConfig;
private configStorageKey = 'processes';
private desiredStateStorageKey = 'desiredStates';
private desiredStates: Map<ProcessId, IProcessInfo['status']> = new Map();
private logger: Logger;
constructor() {
super();
this.logger = new Logger('Tspm');
this.config = new TspmConfig();
this.loadProcessConfigs();
this.loadDesiredStates();
}
/**
* Add a process configuration without starting it.
* Returns the assigned numeric sequential id.
*/
public async add(configInput: Omit<IProcessConfig, 'id'> & { id?: ProcessId }): Promise<ProcessId> {
// Determine next numeric id
const nextId = this.getNextSequentialId();
const config: IProcessConfig = {
id: nextId,
name: configInput.name || `process-${nextId}`,
command: configInput.command,
args: configInput.args,
projectDir: configInput.projectDir,
memoryLimitBytes: configInput.memoryLimitBytes || 512 * 1024 * 1024,
monitorIntervalMs: configInput.monitorIntervalMs,
env: configInput.env,
logBufferSize: configInput.logBufferSize,
autorestart: configInput.autorestart ?? true,
watch: configInput.watch,
watchPaths: configInput.watchPaths,
};
// Store config and initial info
this.processConfigs.set(config.id, config);
this.processInfo.set(config.id, {
id: config.id,
status: 'stopped',
memory: 0,
restarts: 0,
});
await this.saveProcessConfigs();
await this.setDesiredState(config.id, 'stopped');
return config.id;
}
/**
* Start a new process with the given configuration
*/
public async start(config: IProcessConfig): Promise<void> {
this.logger.info(`Starting process with id '${config.id}'`);
// Validate config
if (!config.id || !config.command || !config.projectDir) {
throw new ValidationError(
'Invalid process configuration: missing required fields',
'ERR_INVALID_CONFIG',
{ config },
);
}
// Check if process with this id already exists
if (this.processes.has(config.id)) {
throw new ValidationError(
`Process with id '${config.id}' already exists`,
'ERR_DUPLICATE_PROCESS',
);
}
try {
// Create and store process config
this.processConfigs.set(config.id, config);
// Initialize process info
this.processInfo.set(config.id, {
id: config.id,
status: 'stopped',
memory: 0,
restarts: 0,
});
// Create and start process monitor
const monitor = new ProcessMonitor({
id: config.id, // Pass the ProcessId for log persistence
name: config.name || String(config.id),
projectDir: config.projectDir,
command: config.command,
args: config.args,
memoryLimitBytes: config.memoryLimitBytes,
monitorIntervalMs: config.monitorIntervalMs,
env: config.env,
logBufferSize: config.logBufferSize,
});
this.processes.set(config.id, monitor);
// Set up log event handler to re-emit for pub/sub
monitor.on('log', (log: IProcessLog) => {
// Store log in our persistent storage
if (!this.processLogs.has(config.id)) {
this.processLogs.set(config.id, []);
}
const logs = this.processLogs.get(config.id)!;
logs.push(log);
// Trim logs if they exceed buffer size (default 1000)
const bufferSize = config.logBufferSize || 1000;
if (logs.length > bufferSize) {
this.processLogs.set(config.id, logs.slice(-bufferSize));
}
this.emit('process:log', { processId: config.id, log });
});
// Set up event handler to track PID when process starts
monitor.on('start', (pid: number) => {
this.updateProcessInfo(config.id, { pid });
});
// Set up event handler to clear PID when process exits
monitor.on('exit', () => {
this.updateProcessInfo(config.id, { pid: undefined });
});
// Set up failure handler to mark process as errored
monitor.on('failed', () => {
this.updateProcessInfo(config.id, { status: 'errored', pid: undefined });
});
await monitor.start();
// Wait a moment for the process to spawn and get its PID
await new Promise(resolve => setTimeout(resolve, 100));
// Update process info with PID
const pid = monitor.getPid();
this.updateProcessInfo(config.id, {
status: 'online',
pid: pid || undefined
});
// Save updated configs
await this.saveProcessConfigs();
this.logger.info(`Successfully started process with id '${config.id}'`);
} catch (error: Error | unknown) {
// Clean up in case of error
this.processConfigs.delete(config.id);
this.processInfo.delete(config.id);
this.processes.delete(config.id);
if (error instanceof Error) {
this.logger.error(error);
throw new ProcessError(
`Failed to start process: ${error.message}`,
'ERR_PROCESS_START_FAILED',
{ id: config.id, command: config.command },
);
} else {
const genericError = new ProcessError(
`Failed to start process: ${String(error)}`,
'ERR_PROCESS_START_FAILED',
{ id: config.id },
);
this.logger.error(genericError);
throw genericError;
}
}
}
/**
* Update an existing process configuration
*/
public async update(
id: ProcessId,
updates: Partial<Omit<IProcessConfig, 'id'>>,
): Promise<IProcessConfig> {
const existing = this.processConfigs.get(id);
if (!existing) {
throw new ValidationError(
`Process with id '${id}' does not exist`,
'ERR_PROCESS_NOT_FOUND',
);
}
// Shallow merge; keep id intact
const merged: IProcessConfig = {
...existing,
...updates,
} as IProcessConfig;
this.processConfigs.set(id, merged);
await this.saveProcessConfigs();
return merged;
}
/**
* Stop a process by id
*/
public async stop(id: ProcessId): Promise<void> {
this.logger.info(`Stopping process with id '${id}'`);
const monitor = this.processes.get(id);
if (!monitor) {
const error = new ValidationError(
`Process with id '${id}' not found`,
'ERR_PROCESS_NOT_FOUND',
);
this.logger.error(error);
throw error;
}
try {
await monitor.stop();
this.updateProcessInfo(id, { status: 'stopped' });
this.logger.info(`Successfully stopped process with id '${id}'`);
} catch (error: Error | unknown) {
const processError = new ProcessError(
`Failed to stop process: ${error instanceof Error ? error.message : String(error)}`,
'ERR_PROCESS_STOP_FAILED',
{ id },
);
this.logger.error(processError);
throw processError;
}
// Don't remove from the maps, just mark as stopped
// This allows it to be restarted later
}
/**
* Restart a process by id
*/
public async restart(id: ProcessId): Promise<void> {
this.logger.info(`Restarting process with id '${id}'`);
const monitor = this.processes.get(id);
const config = this.processConfigs.get(id);
if (!monitor || !config) {
const error = new ValidationError(
`Process with id '${id}' not found`,
'ERR_PROCESS_NOT_FOUND',
);
this.logger.error(error);
throw error;
}
try {
// Stop and then start the process
await monitor.stop();
// Create a new monitor instance
const newMonitor = new ProcessMonitor({
id: config.id, // Pass the ProcessId for log persistence
name: config.name || String(config.id),
projectDir: config.projectDir,
command: config.command,
args: config.args,
memoryLimitBytes: config.memoryLimitBytes,
monitorIntervalMs: config.monitorIntervalMs,
env: config.env,
logBufferSize: config.logBufferSize,
});
// Set up log event handler for the new monitor
newMonitor.on('log', (log: IProcessLog) => {
// Store log in our persistent storage
if (!this.processLogs.has(id)) {
this.processLogs.set(id, []);
}
const logs = this.processLogs.get(id)!;
logs.push(log);
// Trim logs if they exceed buffer size (default 1000)
const bufferSize = config.logBufferSize || 1000;
if (logs.length > bufferSize) {
this.processLogs.set(id, logs.slice(-bufferSize));
}
this.emit('process:log', { processId: id, log });
});
this.processes.set(id, newMonitor);
await newMonitor.start();
// Wait a moment for the process to spawn and get its PID
await new Promise(resolve => setTimeout(resolve, 100));
// Update restart count and PID
const info = this.processInfo.get(id);
if (info) {
const pid = newMonitor.getPid();
this.updateProcessInfo(id, {
status: 'online',
pid: pid || undefined,
restarts: info.restarts + 1,
});
}
// Mark errored on failure events
newMonitor.on('failed', () => {
this.updateProcessInfo(id, { status: 'errored', pid: undefined });
});
this.logger.info(`Successfully restarted process with id '${id}'`);
} catch (error: Error | unknown) {
const processError = new ProcessError(
`Failed to restart process: ${error instanceof Error ? error.message : String(error)}`,
'ERR_PROCESS_RESTART_FAILED',
{ id },
);
this.logger.error(processError);
throw processError;
}
}
/**
* Delete a process by id
*/
public async delete(id: ProcessId): Promise<void> {
this.logger.info(`Deleting process with id '${id}'`);
// Check if process exists
if (!this.processConfigs.has(id)) {
const error = new ValidationError(
`Process with id '${id}' not found`,
'ERR_PROCESS_NOT_FOUND',
);
this.logger.error(error);
throw error;
}
// Stop the process if it's running
try {
if (this.processes.has(id)) {
await this.stop(id);
}
// Remove from all maps
this.processes.delete(id);
this.processConfigs.delete(id);
this.processInfo.delete(id);
this.processLogs.delete(id);
// Delete persisted logs from disk
const logPersistence = new LogPersistence();
await logPersistence.deleteLogs(id);
// Save updated configs
await this.saveProcessConfigs();
await this.removeDesiredState(id);
this.logger.info(`Successfully deleted process with id '${id}'`);
} catch (error: Error | unknown) {
// Even if stop fails, we should still try to delete the configuration
try {
this.processes.delete(id);
this.processConfigs.delete(id);
this.processInfo.delete(id);
this.processLogs.delete(id);
// Delete persisted logs from disk even if stop failed
const logPersistence = new LogPersistence();
await logPersistence.deleteLogs(id);
await this.saveProcessConfigs();
await this.removeDesiredState(id);
this.logger.info(
`Successfully deleted process with id '${id}' after stopping failure`,
);
} catch (deleteError: Error | unknown) {
const configError = new ConfigError(
`Failed to delete process configuration: ${deleteError instanceof Error ? deleteError.message : String(deleteError)}`,
'ERR_CONFIG_DELETE_FAILED',
{ id },
);
this.logger.error(configError);
throw configError;
}
}
}
/**
* Get a list of all process infos
*/
public list(): IProcessInfo[] {
const infos = Array.from(this.processInfo.values());
// Enrich with live data from monitors
for (const info of infos) {
const monitor = this.processes.get(info.id);
if (monitor) {
// Update with current PID if the monitor is running
const pid = monitor.getPid();
if (pid) {
info.pid = pid;
}
// Update uptime if available
const uptime = monitor.getUptime();
if (uptime !== null) {
info.uptime = uptime;
}
// Update memory and cpu from latest monitor readings
info.memory = monitor.getLastMemoryUsage();
const cpu = monitor.getLastCpuUsage();
if (Number.isFinite(cpu)) {
info.cpu = cpu;
}
// Update restart count
info.restarts = monitor.getRestartCount();
// Update status based on actual running state
if (monitor.isRunning()) {
info.status = 'online';
}
}
}
return infos;
}
/**
* Get detailed info for a specific process
*/
public describe(
id: ProcessId,
): { config: IProcessConfig; info: IProcessInfo } | null {
const config = this.processConfigs.get(id);
const info = this.processInfo.get(id);
if (!config || !info) {
return null;
}
return { config, info };
}
/**
* Get process logs
*/
public getLogs(id: ProcessId, limit?: number): IProcessLog[] {
// Get logs from the ProcessMonitor instance
const monitor = this.processes.get(id);
if (monitor) {
const logs = monitor.getLogs(limit);
return logs;
}
// Fallback to stored logs if monitor doesn't exist
const logs = this.processLogs.get(id) || [];
if (limit && limit > 0) {
return logs.slice(-limit);
}
return logs;
}
/**
* Start all saved processes
*/
public async startAll(): Promise<void> {
for (const [id, config] of this.processConfigs.entries()) {
if (!this.processes.has(id)) {
await this.start(config);
}
}
}
/**
* Stop all running processes
*/
public async stopAll(): Promise<void> {
for (const id of this.processes.keys()) {
await this.stop(id);
}
}
/**
* Restart all processes
*/
public async restartAll(): Promise<void> {
for (const id of this.processes.keys()) {
await this.restart(id);
}
}
/**
* Update the info for a process
*/
private updateProcessInfo(id: ProcessId, update: Partial<IProcessInfo>): void {
const info = this.processInfo.get(id);
if (info) {
this.processInfo.set(id, { ...info, ...update });
}
}
/**
* Compute next sequential numeric id based on existing configs
*/
/**
* Sync process stats from monitors to processInfo
*/
public syncProcessStats(): void {
for (const [id, monitor] of this.processes.entries()) {
const info = this.processInfo.get(id);
if (info) {
const pid = monitor.getPid();
const updates: Partial<IProcessInfo> = {};
// Update PID if available
if (pid) {
updates.pid = pid;
}
// Update uptime if available
const uptime = monitor.getUptime();
if (uptime !== null) {
updates.uptime = uptime;
}
// Update restart count
updates.restarts = monitor.getRestartCount();
// Update status based on actual running state
updates.status = monitor.isRunning() ? 'online' : 'stopped';
this.updateProcessInfo(id, updates);
}
}
}
private getNextSequentialId(): ProcessId {
return getNextProcessId(this.processConfigs.keys());
}
/**
* Save all process configurations to config storage
*/
private async saveProcessConfigs(): Promise<void> {
this.logger.debug('Saving process configurations to storage');
try {
const configs = Array.from(this.processConfigs.values());
await this.config.writeKey(
this.configStorageKey,
JSON.stringify(configs),
);
this.logger.debug(`Saved ${configs.length} process configurations`);
} catch (error: Error | unknown) {
const configError = new ConfigError(
`Failed to save process configurations: ${error instanceof Error ? error.message : String(error)}`,
'ERR_CONFIG_SAVE_FAILED',
);
this.logger.error(configError);
throw configError;
}
}
// === Desired state persistence ===
private async saveDesiredStates(): Promise<void> {
try {
const obj: Record<string, IProcessInfo['status']> = {};
for (const [id, state] of this.desiredStates.entries()) {
obj[String(id)] = state;
}
await this.config.writeKey(
this.desiredStateStorageKey,
JSON.stringify(obj),
);
} catch (error: any) {
this.logger.warn(
`Failed to save desired states: ${error?.message || String(error)}`,
);
}
}
public async loadDesiredStates(): Promise<void> {
try {
const raw = await this.config.readKey(this.desiredStateStorageKey);
if (raw) {
const obj = JSON.parse(raw) as Record<string, IProcessInfo['status']>;
this.desiredStates = new Map(
Object.entries(obj).map(([k, v]) => [toProcessId(k), v] as const)
);
this.logger.debug(
`Loaded desired states for ${this.desiredStates.size} processes`,
);
}
} catch (error: any) {
this.logger.warn(
`Failed to load desired states: ${error?.message || String(error)}`,
);
}
}
public async setDesiredState(
id: ProcessId,
state: IProcessInfo['status'],
): Promise<void> {
this.desiredStates.set(id, state);
await this.saveDesiredStates();
}
public async removeDesiredState(id: ProcessId): Promise<void> {
this.desiredStates.delete(id);
await this.saveDesiredStates();
}
public async setDesiredStateForAll(
state: IProcessInfo['status'],
): Promise<void> {
for (const id of this.processConfigs.keys()) {
this.desiredStates.set(id, state);
}
await this.saveDesiredStates();
}
public async startDesired(): Promise<void> {
for (const [id, config] of this.processConfigs.entries()) {
const desired = this.desiredStates.get(id);
if (desired === 'online' && !this.processes.has(id)) {
try {
await this.start(config);
} catch (e) {
this.logger.warn(
`Failed to start desired process ${id}: ${
(e as Error)?.message || String(e)
}`,
);
}
}
}
}
/**
* Load process configurations from config storage
*/
public async loadProcessConfigs(): Promise<void> {
this.logger.debug('Loading process configurations from storage');
try {
const configsJson = await this.config.readKey(this.configStorageKey);
if (configsJson) {
try {
const parsed = JSON.parse(configsJson) as Array<any>;
this.logger.debug(`Loaded ${parsed.length} process configurations`);
for (const raw of parsed) {
// Convert legacy string IDs to ProcessId
let id: ProcessId;
try {
id = toProcessId(raw.id);
} catch {
this.logger.warn(
`Skipping invalid process config with non-numeric id '${raw.id || 'unknown'}'`,
);
continue;
}
// Validate config
if (!id || !raw.command || !raw.projectDir) {
this.logger.warn(
`Skipping invalid process config for id '${id || 'unknown'}'`,
);
continue;
}
const config: IProcessConfig = { ...raw, id };
this.processConfigs.set(id, config);
// Initialize process info
this.processInfo.set(id, {
id: id,
status: 'stopped',
memory: 0,
restarts: 0,
});
}
} catch (parseError: Error | unknown) {
const configError = new ConfigError(
`Failed to parse process configurations: ${parseError instanceof Error ? parseError.message : String(parseError)}`,
'ERR_CONFIG_PARSE_FAILED',
);
this.logger.error(configError);
throw configError;
}
} else {
this.logger.info('No saved process configurations found');
}
} catch (error: Error | unknown) {
// Only throw if it's not the "no configs found" case
if (error instanceof ConfigError) {
throw error;
}
// If no configs found or error reading, just continue with empty configs
this.logger.info(
'No saved process configurations found or error reading them',
);
}
}
/**
* Reset: stop all running processes and clear all saved configurations
*/
public async reset(): Promise<{
stopped: ProcessId[];
removed: ProcessId[];
failed: Array<{ id: ProcessId; error: string }>;
}> {
this.logger.info('Resetting TSPM: stopping all processes and clearing configs');
const removed = Array.from(this.processConfigs.keys());
const stopped: ProcessId[] = [];
const failed: Array<{ id: ProcessId; error: string }> = [];
// Attempt to stop all currently running processes with per-id error collection
for (const id of Array.from(this.processes.keys())) {
try {
await this.stop(id);
stopped.push(id);
} catch (error: any) {
failed.push({ id, error: error?.message || String(error) });
}
}
// Clear in-memory maps regardless of stop outcomes
this.processes.clear();
this.processInfo.clear();
this.processConfigs.clear();
this.desiredStates.clear();
// Remove persisted configs
try {
await this.config.deleteKey(this.configStorageKey);
await this.config.deleteKey(this.desiredStateStorageKey).catch(() => {});
this.logger.debug('Cleared persisted process configurations');
} catch (error) {
// Fallback: write empty list if deleteKey fails for any reason
this.logger.warn('deleteKey failed, writing empty process list instead');
await this.saveProcessConfigs().catch(() => {});
}
this.logger.info('TSPM reset complete');
return { stopped, removed, failed };
}
}