fix(daemon): Ensure robust process shutdown and improve logs/subscriber diagnostics

This commit is contained in:
2025-08-31 00:01:50 +00:00
parent 4160b3f031
commit b3087831e2
7 changed files with 98 additions and 17 deletions

View File

@@ -1,5 +1,15 @@
# Changelog
## 2025-08-31 - 5.6.1 - fix(daemon)
Ensure robust process shutdown and improve logs/subscriber diagnostics
- Make ProcessWrapper.stop asynchronous and awaitable to avoid race conditions when stopping processes
- Signal entire process groups on POSIX (kill by negative PID) and fall back to per-PID signalling; escalate to SIGKILL after a timeout
- Await processWrapper.stop() from ProcessMonitor when enforcing memory limits or handling exits/errors to ensure child processes are cleaned up
- Add logs:subscribers IPC endpoint and corresponding types to inspect current subscribers for a process log topic
- Add optional CLI debug output in logs command (enabled via TSPM_DEBUG=true) to print subscriber counts and details
- Support passing request.lines to getLogs handler in daemon to limit returned log entries
## 2025-08-30 - 5.6.0 - feat(processmonitor)
Add CPU monitoring and display CPU in process list

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@git.zone/tspm',
version: '5.6.0',
version: '5.6.1',
description: 'a no fuzz process manager'
}

View File

@@ -144,6 +144,13 @@ export function registerLogsCommand(smartcli: plugins.smartcli.Smartcli) {
await withStreamingLifecycle(
async () => {
// Optional: debug subscribers if requested via env (hidden)
if (process.env.TSPM_DEBUG === 'true') {
try {
const subInfo = await tspmIpcClient.request('logs:subscribers' as any, { id });
console.log(`[DEBUG] Subscribers for logs.${id}: ${subInfo.count} (${(subInfo.subscribers||[]).join(',')})`);
} catch {}
}
await tspmIpcClient.subscribe(id, (log: any) => {
// Reset sequence if runId changed (e.g., process restarted)
if (log.runId && log.runId !== lastRunId) {

View File

@@ -291,7 +291,7 @@ export class ProcessMonitor extends EventEmitter {
// Stop the process wrapper, which will trigger the exit handler and restart
if (this.processWrapper) {
this.processWrapper.stop();
await this.processWrapper.stop();
}
}
} catch (error: Error | unknown) {
@@ -408,7 +408,7 @@ export class ProcessMonitor extends EventEmitter {
(plugins.pidusage as any)?.clear?.(pidToClear);
}
} catch {}
this.processWrapper.stop();
await this.processWrapper.stop();
}
}

View File

@@ -180,7 +180,7 @@ export class ProcessWrapper extends EventEmitter {
/**
* Stop the wrapped process
*/
public stop(): void {
public async stop(): Promise<void> {
if (!this.process) {
this.logger.debug('Stop called but no process is running');
this.addSystemLog('No process running');
@@ -194,11 +194,32 @@ export class ProcessWrapper extends EventEmitter {
if (this.process.pid) {
try {
this.logger.debug(`Sending SIGTERM to process ${this.process.pid}`);
process.kill(this.process.pid, 'SIGTERM');
try {
// Try to signal the whole process group on POSIX to ensure children get the signal too
if (process.platform !== 'win32') {
process.kill(-Math.abs(this.process.pid), 'SIGTERM');
} else {
process.kill(this.process.pid, 'SIGTERM');
}
} catch {
// Fallback to direct process kill if group kill fails
process.kill(this.process.pid, 'SIGTERM');
}
// Give it 5 seconds to shut down gracefully
setTimeout((): void => {
if (this.process && this.process.pid) {
// Wait for exit or escalate
await new Promise<void>((resolve) => {
let settled = false;
const cleanup = () => {
if (settled) return;
settled = true;
resolve();
};
const onExit = () => cleanup();
this.process!.once('exit', onExit);
const killTimer = setTimeout(() => {
if (!this.process || !this.process.pid) return cleanup();
this.logger.warn(
`Process ${this.process.pid} did not exit gracefully, force killing...`,
);
@@ -206,17 +227,27 @@ export class ProcessWrapper extends EventEmitter {
'Process did not exit gracefully, force killing...',
);
try {
process.kill(this.process.pid, 'SIGKILL');
} catch (error: Error | unknown) {
// Process might have exited between checks
if (process.platform !== 'win32') {
process.kill(-Math.abs(this.process.pid), 'SIGKILL');
} else {
process.kill(this.process.pid, 'SIGKILL');
}
} catch (error: any) {
this.logger.debug(
`Failed to send SIGKILL, process probably already exited: ${
error instanceof Error ? error.message : String(error)
}`,
`Failed to send SIGKILL, process probably already exited: ${error?.message || String(error)}`,
);
}
}
}, 5000);
// Give a short grace period after SIGKILL
setTimeout(() => cleanup(), 500);
}, 5000);
// Safety cap in case neither exit nor timer fires (shouldn't happen)
setTimeout(() => {
clearTimeout(killTimer);
cleanup();
}, 10000);
});
} catch (error: Error | unknown) {
const processError = new ProcessError(
error instanceof Error ? error.message : String(error),

View File

@@ -293,7 +293,8 @@ export class TspmDaemon {
this.ipcServer.onMessage(
'getLogs',
async (request: RequestForMethod<'getLogs'>) => {
const logs = await this.tspmInstance.getLogs(toProcessId(request.id));
const id = toProcessId(request.id);
const logs = await this.tspmInstance.getLogs(id, request.lines);
return { logs };
},
);
@@ -346,6 +347,26 @@ export class TspmDaemon {
},
);
// Inspect subscribers for a process log topic
this.ipcServer.onMessage(
'logs:subscribers',
async (
request: RequestForMethod<'logs:subscribers'>,
clientId: string,
) => {
const id = toProcessId(request.id);
const topic = `logs.${id}`;
try {
const topicIndex = (this.ipcServer as any).topicIndex as Map<string, Set<string>> | undefined;
const subs = Array.from(topicIndex?.get(topic) || []);
// Also include the requesting clientId if it has a local handler without subscription
return { topic, subscribers: subs, count: subs.length } as any;
} catch (err: any) {
return { topic, subscribers: [], count: 0 } as any;
}
},
);
// Resolve target (id:n | name:foo | numeric string) to ProcessId
this.ipcServer.onMessage(
'resolveTarget',

View File

@@ -151,6 +151,17 @@ export interface LogsSubscribeResponse {
ok: boolean;
}
// Inspect current subscribers for a process log topic
export interface LogsSubscribersRequest {
id: ProcessId;
}
export interface LogsSubscribersResponse {
topic: string;
subscribers: string[];
count: number;
}
// Start all command
export interface StartAllRequest {
// No parameters needed
@@ -287,6 +298,7 @@ export type IpcMethodMap = {
describe: { request: DescribeRequest; response: DescribeResponse };
getLogs: { request: GetLogsRequest; response: GetLogsResponse };
'logs:subscribe': { request: LogsSubscribeRequest; response: LogsSubscribeResponse };
'logs:subscribers': { request: LogsSubscribersRequest; response: LogsSubscribersResponse };
startAll: { request: StartAllRequest; response: StartAllResponse };
stopAll: { request: StopAllRequest; response: StopAllResponse };
restartAll: { request: RestartAllRequest; response: RestartAllResponse };