BREAKING CHANGE(daemon): Introduce persistent log storage, numeric ProcessId type, and improved process monitoring / IPC handling

This commit is contained in:
2025-08-30 13:47:14 +00:00
parent e507b75c40
commit 538f282b62
16 changed files with 589 additions and 167 deletions

View File

@@ -2,6 +2,7 @@ import * as plugins from '../plugins.js';
import { EventEmitter } from 'events';
import * as paths from '../paths.js';
import { ProcessMonitor } from './processmonitor.js';
import { LogPersistence } from './logpersistence.js';
import { TspmConfig } from './tspm.config.js';
import {
Logger,
@@ -16,17 +17,20 @@ import type {
IProcessLog,
IMonitorConfig
} from '../shared/protocol/ipc.types.js';
import { toProcessId, getNextProcessId } from '../shared/protocol/id.js';
import type { ProcessId } from '../shared/protocol/id.js';
export class ProcessManager extends EventEmitter {
public processes: Map<string, ProcessMonitor> = new Map();
public processConfigs: Map<string, IProcessConfig> = new Map();
public processInfo: Map<string, IProcessInfo> = new Map();
public processes: Map<ProcessId, ProcessMonitor> = new Map();
public processConfigs: Map<ProcessId, IProcessConfig> = new Map();
public processInfo: Map<ProcessId, IProcessInfo> = new Map();
private processLogs: Map<ProcessId, IProcessLog[]> = new Map();
private config: TspmConfig;
private configStorageKey = 'processes';
private desiredStateStorageKey = 'desiredStates';
private desiredStates: Map<string, IProcessInfo['status']> = new Map();
private desiredStates: Map<ProcessId, IProcessInfo['status']> = new Map();
private logger: Logger;
constructor() {
@@ -39,14 +43,14 @@ export class ProcessManager extends EventEmitter {
/**
* Add a process configuration without starting it.
* Returns the assigned numeric sequential id as string.
* Returns the assigned numeric sequential id.
*/
public async add(configInput: Omit<IProcessConfig, 'id'> & { id?: string }): Promise<string> {
public async add(configInput: Omit<IProcessConfig, 'id'> & { id?: ProcessId }): Promise<ProcessId> {
// Determine next numeric id
const nextId = this.getNextSequentialId();
const config: IProcessConfig = {
id: String(nextId),
id: nextId,
name: configInput.name || `process-${nextId}`,
command: configInput.command,
args: configInput.args,
@@ -111,7 +115,8 @@ export class ProcessManager extends EventEmitter {
// Create and start process monitor
const monitor = new ProcessMonitor({
name: config.name || config.id,
id: config.id, // Pass the ProcessId for log persistence
name: config.name || String(config.id),
projectDir: config.projectDir,
command: config.command,
args: config.args,
@@ -125,13 +130,43 @@ export class ProcessManager extends EventEmitter {
// Set up log event handler to re-emit for pub/sub
monitor.on('log', (log: IProcessLog) => {
// Store log in our persistent storage
if (!this.processLogs.has(config.id)) {
this.processLogs.set(config.id, []);
}
const logs = this.processLogs.get(config.id)!;
logs.push(log);
// Trim logs if they exceed buffer size (default 1000)
const bufferSize = config.logBufferSize || 1000;
if (logs.length > bufferSize) {
this.processLogs.set(config.id, logs.slice(-bufferSize));
}
this.emit('process:log', { processId: config.id, log });
});
// Set up event handler to track PID when process starts
monitor.on('start', (pid: number) => {
this.updateProcessInfo(config.id, { pid });
});
// Set up event handler to clear PID when process exits
monitor.on('exit', () => {
this.updateProcessInfo(config.id, { pid: undefined });
});
monitor.start();
await monitor.start();
// Update process info
this.updateProcessInfo(config.id, { status: 'online' });
// Wait a moment for the process to spawn and get its PID
await new Promise(resolve => setTimeout(resolve, 100));
// Update process info with PID
const pid = monitor.getPid();
this.updateProcessInfo(config.id, {
status: 'online',
pid: pid || undefined
});
// Save updated configs
await this.saveProcessConfigs();
@@ -165,7 +200,7 @@ export class ProcessManager extends EventEmitter {
/**
* Stop a process by id
*/
public async stop(id: string): Promise<void> {
public async stop(id: ProcessId): Promise<void> {
this.logger.info(`Stopping process with id '${id}'`);
const monitor = this.processes.get(id);
@@ -179,7 +214,7 @@ export class ProcessManager extends EventEmitter {
}
try {
monitor.stop();
await monitor.stop();
this.updateProcessInfo(id, { status: 'stopped' });
this.logger.info(`Successfully stopped process with id '${id}'`);
} catch (error: Error | unknown) {
@@ -199,7 +234,7 @@ export class ProcessManager extends EventEmitter {
/**
* Restart a process by id
*/
public async restart(id: string): Promise<void> {
public async restart(id: ProcessId): Promise<void> {
this.logger.info(`Restarting process with id '${id}'`);
const monitor = this.processes.get(id);
@@ -216,11 +251,12 @@ export class ProcessManager extends EventEmitter {
try {
// Stop and then start the process
monitor.stop();
await monitor.stop();
// Create a new monitor instance
const newMonitor = new ProcessMonitor({
name: config.name || config.id,
id: config.id, // Pass the ProcessId for log persistence
name: config.name || String(config.id),
projectDir: config.projectDir,
command: config.command,
args: config.args,
@@ -230,14 +266,37 @@ export class ProcessManager extends EventEmitter {
logBufferSize: config.logBufferSize,
});
// Set up log event handler for the new monitor
newMonitor.on('log', (log: IProcessLog) => {
// Store log in our persistent storage
if (!this.processLogs.has(id)) {
this.processLogs.set(id, []);
}
const logs = this.processLogs.get(id)!;
logs.push(log);
// Trim logs if they exceed buffer size (default 1000)
const bufferSize = config.logBufferSize || 1000;
if (logs.length > bufferSize) {
this.processLogs.set(id, logs.slice(-bufferSize));
}
this.emit('process:log', { processId: id, log });
});
this.processes.set(id, newMonitor);
newMonitor.start();
await newMonitor.start();
// Update restart count
// Wait a moment for the process to spawn and get its PID
await new Promise(resolve => setTimeout(resolve, 100));
// Update restart count and PID
const info = this.processInfo.get(id);
if (info) {
const pid = newMonitor.getPid();
this.updateProcessInfo(id, {
status: 'online',
pid: pid || undefined,
restarts: info.restarts + 1,
});
}
@@ -257,7 +316,7 @@ export class ProcessManager extends EventEmitter {
/**
* Delete a process by id
*/
public async delete(id: string): Promise<void> {
public async delete(id: ProcessId): Promise<void> {
this.logger.info(`Deleting process with id '${id}'`);
// Check if process exists
@@ -280,6 +339,11 @@ export class ProcessManager extends EventEmitter {
this.processes.delete(id);
this.processConfigs.delete(id);
this.processInfo.delete(id);
this.processLogs.delete(id);
// Delete persisted logs from disk
const logPersistence = new LogPersistence();
await logPersistence.deleteLogs(id);
// Save updated configs
await this.saveProcessConfigs();
@@ -292,6 +356,12 @@ export class ProcessManager extends EventEmitter {
this.processes.delete(id);
this.processConfigs.delete(id);
this.processInfo.delete(id);
this.processLogs.delete(id);
// Delete persisted logs from disk even if stop failed
const logPersistence = new LogPersistence();
await logPersistence.deleteLogs(id);
await this.saveProcessConfigs();
await this.removeDesiredState(id);
@@ -314,14 +384,42 @@ export class ProcessManager extends EventEmitter {
* Get a list of all process infos
*/
public list(): IProcessInfo[] {
return Array.from(this.processInfo.values());
const infos = Array.from(this.processInfo.values());
// Enrich with live data from monitors
for (const info of infos) {
const monitor = this.processes.get(info.id);
if (monitor) {
// Update with current PID if the monitor is running
const pid = monitor.getPid();
if (pid) {
info.pid = pid;
}
// Update uptime if available
const uptime = monitor.getUptime();
if (uptime !== null) {
info.uptime = uptime;
}
// Update restart count
info.restarts = monitor.getRestartCount();
// Update status based on actual running state
if (monitor.isRunning()) {
info.status = 'online';
}
}
}
return infos;
}
/**
* Get detailed info for a specific process
*/
public describe(
id: string,
id: ProcessId,
): { config: IProcessConfig; info: IProcessInfo } | null {
const config = this.processConfigs.get(id);
const info = this.processInfo.get(id);
@@ -336,13 +434,21 @@ export class ProcessManager extends EventEmitter {
/**
* Get process logs
*/
public getLogs(id: string, limit?: number): IProcessLog[] {
public getLogs(id: ProcessId, limit?: number): IProcessLog[] {
// Get logs from the ProcessMonitor instance
const monitor = this.processes.get(id);
if (!monitor) {
return [];
if (monitor) {
const logs = monitor.getLogs(limit);
return logs;
}
return monitor.getLogs(limit);
// Fallback to stored logs if monitor doesn't exist
const logs = this.processLogs.get(id) || [];
if (limit && limit > 0) {
return logs.slice(-limit);
}
return logs;
}
/**
@@ -377,7 +483,7 @@ export class ProcessManager extends EventEmitter {
/**
* Update the info for a process
*/
private updateProcessInfo(id: string, update: Partial<IProcessInfo>): void {
private updateProcessInfo(id: ProcessId, update: Partial<IProcessInfo>): void {
const info = this.processInfo.get(id);
if (info) {
this.processInfo.set(id, { ...info, ...update });
@@ -387,15 +493,40 @@ export class ProcessManager extends EventEmitter {
/**
* Compute next sequential numeric id based on existing configs
*/
private getNextSequentialId(): number {
let maxId = 0;
for (const id of this.processConfigs.keys()) {
const n = parseInt(id, 10);
if (!isNaN(n)) {
maxId = Math.max(maxId, n);
/**
* Sync process stats from monitors to processInfo
*/
public syncProcessStats(): void {
for (const [id, monitor] of this.processes.entries()) {
const info = this.processInfo.get(id);
if (info) {
const pid = monitor.getPid();
const updates: Partial<IProcessInfo> = {};
// Update PID if available
if (pid) {
updates.pid = pid;
}
// Update uptime if available
const uptime = monitor.getUptime();
if (uptime !== null) {
updates.uptime = uptime;
}
// Update restart count
updates.restarts = monitor.getRestartCount();
// Update status based on actual running state
updates.status = monitor.isRunning() ? 'online' : 'stopped';
this.updateProcessInfo(id, updates);
}
}
return maxId + 1;
}
private getNextSequentialId(): ProcessId {
return getNextProcessId(this.processConfigs.keys());
}
/**
@@ -426,7 +557,7 @@ export class ProcessManager extends EventEmitter {
try {
const obj: Record<string, IProcessInfo['status']> = {};
for (const [id, state] of this.desiredStates.entries()) {
obj[id] = state;
obj[String(id)] = state;
}
await this.config.writeKey(
this.desiredStateStorageKey,
@@ -444,7 +575,9 @@ export class ProcessManager extends EventEmitter {
const raw = await this.config.readKey(this.desiredStateStorageKey);
if (raw) {
const obj = JSON.parse(raw) as Record<string, IProcessInfo['status']>;
this.desiredStates = new Map(Object.entries(obj));
this.desiredStates = new Map(
Object.entries(obj).map(([k, v]) => [toProcessId(k), v] as const)
);
this.logger.debug(
`Loaded desired states for ${this.desiredStates.size} processes`,
);
@@ -457,14 +590,14 @@ export class ProcessManager extends EventEmitter {
}
public async setDesiredState(
id: string,
id: ProcessId,
state: IProcessInfo['status'],
): Promise<void> {
this.desiredStates.set(id, state);
await this.saveDesiredStates();
}
public async removeDesiredState(id: string): Promise<void> {
public async removeDesiredState(id: ProcessId): Promise<void> {
this.desiredStates.delete(id);
await this.saveDesiredStates();
}
@@ -505,23 +638,35 @@ export class ProcessManager extends EventEmitter {
const configsJson = await this.config.readKey(this.configStorageKey);
if (configsJson) {
try {
const configs = JSON.parse(configsJson) as IProcessConfig[];
this.logger.debug(`Loaded ${configs.length} process configurations`);
const parsed = JSON.parse(configsJson) as Array<any>;
this.logger.debug(`Loaded ${parsed.length} process configurations`);
for (const config of configs) {
// Validate config
if (!config.id || !config.command || !config.projectDir) {
for (const raw of parsed) {
// Convert legacy string IDs to ProcessId
let id: ProcessId;
try {
id = toProcessId(raw.id);
} catch {
this.logger.warn(
`Skipping invalid process config for id '${config.id || 'unknown'}'`,
`Skipping invalid process config with non-numeric id '${raw.id || 'unknown'}'`,
);
continue;
}
this.processConfigs.set(config.id, config);
// Validate config
if (!id || !raw.command || !raw.projectDir) {
this.logger.warn(
`Skipping invalid process config for id '${id || 'unknown'}'`,
);
continue;
}
const config: IProcessConfig = { ...raw, id };
this.processConfigs.set(id, config);
// Initialize process info
this.processInfo.set(config.id, {
id: config.id,
this.processInfo.set(id, {
id: id,
status: 'stopped',
memory: 0,
restarts: 0,
@@ -555,15 +700,15 @@ export class ProcessManager extends EventEmitter {
* Reset: stop all running processes and clear all saved configurations
*/
public async reset(): Promise<{
stopped: string[];
removed: string[];
failed: Array<{ id: string; error: string }>;
stopped: ProcessId[];
removed: ProcessId[];
failed: Array<{ id: ProcessId; error: string }>;
}> {
this.logger.info('Resetting TSPM: stopping all processes and clearing configs');
const removed = Array.from(this.processConfigs.keys());
const stopped: string[] = [];
const failed: Array<{ id: string; error: string }> = [];
const stopped: ProcessId[] = [];
const failed: Array<{ id: ProcessId; error: string }> = [];
// Attempt to stop all currently running processes with per-id error collection
for (const id of Array.from(this.processes.keys())) {