diff --git a/changelog.md b/changelog.md index 7a94490..c7059a4 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,15 @@ # Changelog +## 2025-08-31 - 5.6.1 - fix(daemon) +Ensure robust process shutdown and improve logs/subscriber diagnostics + +- Make ProcessWrapper.stop asynchronous and awaitable to avoid race conditions when stopping processes +- Signal entire process groups on POSIX (kill by negative PID) and fall back to per-PID signalling; escalate to SIGKILL after a timeout +- Await processWrapper.stop() from ProcessMonitor when enforcing memory limits or handling exits/errors to ensure child processes are cleaned up +- Add logs:subscribers IPC endpoint and corresponding types to inspect current subscribers for a process log topic +- Add optional CLI debug output in logs command (enabled via TSPM_DEBUG=true) to print subscriber counts and details +- Support passing request.lines to getLogs handler in daemon to limit returned log entries + ## 2025-08-30 - 5.6.0 - feat(processmonitor) Add CPU monitoring and display CPU in process list diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index ed7a4a4..9914e82 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@git.zone/tspm', - version: '5.6.0', + version: '5.6.1', description: 'a no fuzz process manager' } diff --git a/ts/cli/commands/process/logs.ts b/ts/cli/commands/process/logs.ts index 4cb78d6..e054d0a 100644 --- a/ts/cli/commands/process/logs.ts +++ b/ts/cli/commands/process/logs.ts @@ -144,6 +144,13 @@ export function registerLogsCommand(smartcli: plugins.smartcli.Smartcli) { await withStreamingLifecycle( async () => { + // Optional: debug subscribers if requested via env (hidden) + if (process.env.TSPM_DEBUG === 'true') { + try { + const subInfo = await tspmIpcClient.request('logs:subscribers' as any, { id }); + console.log(`[DEBUG] Subscribers for logs.${id}: ${subInfo.count} (${(subInfo.subscribers||[]).join(',')})`); + } catch {} + } await tspmIpcClient.subscribe(id, (log: any) => { // Reset sequence if runId changed (e.g., process restarted) if (log.runId && log.runId !== lastRunId) { diff --git a/ts/daemon/processmonitor.ts b/ts/daemon/processmonitor.ts index aa2735c..8c40515 100644 --- a/ts/daemon/processmonitor.ts +++ b/ts/daemon/processmonitor.ts @@ -291,7 +291,7 @@ export class ProcessMonitor extends EventEmitter { // Stop the process wrapper, which will trigger the exit handler and restart if (this.processWrapper) { - this.processWrapper.stop(); + await this.processWrapper.stop(); } } } catch (error: Error | unknown) { @@ -408,7 +408,7 @@ export class ProcessMonitor extends EventEmitter { (plugins.pidusage as any)?.clear?.(pidToClear); } } catch {} - this.processWrapper.stop(); + await this.processWrapper.stop(); } } diff --git a/ts/daemon/processwrapper.ts b/ts/daemon/processwrapper.ts index d375a7f..fd0e83b 100644 --- a/ts/daemon/processwrapper.ts +++ b/ts/daemon/processwrapper.ts @@ -180,7 +180,7 @@ export class ProcessWrapper extends EventEmitter { /** * Stop the wrapped process */ - public stop(): void { + public async stop(): Promise { if (!this.process) { this.logger.debug('Stop called but no process is running'); this.addSystemLog('No process running'); @@ -194,11 +194,32 @@ export class ProcessWrapper extends EventEmitter { if (this.process.pid) { try { this.logger.debug(`Sending SIGTERM to process ${this.process.pid}`); - process.kill(this.process.pid, 'SIGTERM'); + try { + // Try to signal the whole process group on POSIX to ensure children get the signal too + if (process.platform !== 'win32') { + process.kill(-Math.abs(this.process.pid), 'SIGTERM'); + } else { + process.kill(this.process.pid, 'SIGTERM'); + } + } catch { + // Fallback to direct process kill if group kill fails + process.kill(this.process.pid, 'SIGTERM'); + } - // Give it 5 seconds to shut down gracefully - setTimeout((): void => { - if (this.process && this.process.pid) { + // Wait for exit or escalate + await new Promise((resolve) => { + let settled = false; + const cleanup = () => { + if (settled) return; + settled = true; + resolve(); + }; + + const onExit = () => cleanup(); + this.process!.once('exit', onExit); + + const killTimer = setTimeout(() => { + if (!this.process || !this.process.pid) return cleanup(); this.logger.warn( `Process ${this.process.pid} did not exit gracefully, force killing...`, ); @@ -206,17 +227,27 @@ export class ProcessWrapper extends EventEmitter { 'Process did not exit gracefully, force killing...', ); try { - process.kill(this.process.pid, 'SIGKILL'); - } catch (error: Error | unknown) { - // Process might have exited between checks + if (process.platform !== 'win32') { + process.kill(-Math.abs(this.process.pid), 'SIGKILL'); + } else { + process.kill(this.process.pid, 'SIGKILL'); + } + } catch (error: any) { this.logger.debug( - `Failed to send SIGKILL, process probably already exited: ${ - error instanceof Error ? error.message : String(error) - }`, + `Failed to send SIGKILL, process probably already exited: ${error?.message || String(error)}`, ); } - } - }, 5000); + + // Give a short grace period after SIGKILL + setTimeout(() => cleanup(), 500); + }, 5000); + + // Safety cap in case neither exit nor timer fires (shouldn't happen) + setTimeout(() => { + clearTimeout(killTimer); + cleanup(); + }, 10000); + }); } catch (error: Error | unknown) { const processError = new ProcessError( error instanceof Error ? error.message : String(error), diff --git a/ts/daemon/tspm.daemon.ts b/ts/daemon/tspm.daemon.ts index 8b1db41..d6b4a4d 100644 --- a/ts/daemon/tspm.daemon.ts +++ b/ts/daemon/tspm.daemon.ts @@ -293,7 +293,8 @@ export class TspmDaemon { this.ipcServer.onMessage( 'getLogs', async (request: RequestForMethod<'getLogs'>) => { - const logs = await this.tspmInstance.getLogs(toProcessId(request.id)); + const id = toProcessId(request.id); + const logs = await this.tspmInstance.getLogs(id, request.lines); return { logs }; }, ); @@ -346,6 +347,26 @@ export class TspmDaemon { }, ); + // Inspect subscribers for a process log topic + this.ipcServer.onMessage( + 'logs:subscribers', + async ( + request: RequestForMethod<'logs:subscribers'>, + clientId: string, + ) => { + const id = toProcessId(request.id); + const topic = `logs.${id}`; + try { + const topicIndex = (this.ipcServer as any).topicIndex as Map> | undefined; + const subs = Array.from(topicIndex?.get(topic) || []); + // Also include the requesting clientId if it has a local handler without subscription + return { topic, subscribers: subs, count: subs.length } as any; + } catch (err: any) { + return { topic, subscribers: [], count: 0 } as any; + } + }, + ); + // Resolve target (id:n | name:foo | numeric string) to ProcessId this.ipcServer.onMessage( 'resolveTarget', diff --git a/ts/shared/protocol/ipc.types.ts b/ts/shared/protocol/ipc.types.ts index 841e594..918d171 100644 --- a/ts/shared/protocol/ipc.types.ts +++ b/ts/shared/protocol/ipc.types.ts @@ -151,6 +151,17 @@ export interface LogsSubscribeResponse { ok: boolean; } +// Inspect current subscribers for a process log topic +export interface LogsSubscribersRequest { + id: ProcessId; +} + +export interface LogsSubscribersResponse { + topic: string; + subscribers: string[]; + count: number; +} + // Start all command export interface StartAllRequest { // No parameters needed @@ -287,6 +298,7 @@ export type IpcMethodMap = { describe: { request: DescribeRequest; response: DescribeResponse }; getLogs: { request: GetLogsRequest; response: GetLogsResponse }; 'logs:subscribe': { request: LogsSubscribeRequest; response: LogsSubscribeResponse }; + 'logs:subscribers': { request: LogsSubscribersRequest; response: LogsSubscribersResponse }; startAll: { request: StartAllRequest; response: StartAllResponse }; stopAll: { request: StopAllRequest; response: StopAllResponse }; restartAll: { request: RestartAllRequest; response: RestartAllResponse };