feat(daemon): Add real-time log streaming and pub/sub: daemon publishes per-process logs, IPC client subscribe/unsubscribe, CLI --follow streaming, and sequencing for logs

This commit is contained in:
2025-08-26 15:00:15 +00:00
parent 4e0944034b
commit 50c5fdb0ea
12 changed files with 233 additions and 236 deletions

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@git.zone/tspm',
version: '1.7.0',
version: '1.8.0',
description: 'a no fuzz process manager'
}

View File

@@ -43,14 +43,19 @@ export class TspmDaemon {
this.ipcServer = plugins.smartipc.SmartIpc.createServer({
id: 'tspm-daemon',
socketPath: this.socketPath,
heartbeat: false, // Disable heartbeat for now
autoCleanupSocketFile: true, // Clean up stale sockets
socketMode: 0o600, // Set proper permissions
heartbeat: true,
heartbeatInterval: 5000,
heartbeatTimeout: 20000,
heartbeatInitialGracePeriodMs: 10000 // Grace period for startup
});
// Register message handlers
this.registerHandlers();
// Start the IPC server
await this.ipcServer.start();
// Start the IPC server and wait until ready to accept connections
await this.ipcServer.start({ readyWhen: 'accepting' });
// Write PID file
await this.writePidFile();
@@ -60,6 +65,16 @@ export class TspmDaemon {
// Load existing process configurations
await this.tspmInstance.loadProcessConfigs();
// Set up log publishing
this.tspmInstance.on('process:log', ({ processId, log }) => {
// Publish to topic for this process
const topic = `logs.${processId}`;
// Broadcast to all connected clients subscribed to this topic
if (this.ipcServer) {
this.ipcServer.broadcast(`topic:${topic}`, log);
}
});
// Set up graceful shutdown handlers
this.setupShutdownHandlers();

View File

@@ -44,13 +44,33 @@ export class TspmIpcClient {
this.ipcClient = plugins.smartipc.SmartIpc.createClient({
id: 'tspm-cli',
socketPath: this.socketPath,
heartbeat: false, // Disable heartbeat for now
clientId: `cli-${process.pid}`,
connectRetry: {
enabled: true,
initialDelay: 100,
maxDelay: 2000,
maxAttempts: 30,
totalTimeout: 15000,
},
registerTimeoutMs: 8000,
heartbeat: true,
heartbeatInterval: 5000,
heartbeatTimeout: 20000,
heartbeatInitialGracePeriodMs: 10000,
heartbeatThrowOnTimeout: false // Don't throw, emit events instead
});
// Connect to the daemon
try {
await this.ipcClient.connect();
await this.ipcClient.connect({ waitForReady: true });
this.isConnected = true;
// Handle heartbeat timeouts gracefully
this.ipcClient.on('heartbeatTimeout', () => {
console.warn('Heartbeat timeout detected, connection may be degraded');
this.isConnected = false;
});
console.log('Connected to TSPM daemon');
} catch (error) {
console.error('Failed to connect to daemon:', error);
@@ -109,6 +129,30 @@ export class TspmIpcClient {
throw error;
}
}
/**
* Subscribe to log updates for a specific process
*/
public async subscribe(processId: string, handler: (log: any) => void): Promise<void> {
if (!this.ipcClient || !this.isConnected) {
throw new Error('Not connected to daemon');
}
const topic = `logs.${processId}`;
await this.ipcClient.subscribe(`topic:${topic}`, handler);
}
/**
* Unsubscribe from log updates for a specific process
*/
public async unsubscribe(processId: string): Promise<void> {
if (!this.ipcClient || !this.isConnected) {
throw new Error('Not connected to daemon');
}
const topic = `logs.${processId}`;
await this.ipcClient.unsubscribe(`topic:${topic}`);
}
/**
* Check if the daemon is running
@@ -176,18 +220,15 @@ export class TspmIpcClient {
console.log(`Started daemon process with PID: ${daemonProcess.pid}`);
// Wait for daemon to be ready (check for socket file)
const maxWaitTime = 10000; // 10 seconds
const startTime = Date.now();
while (Date.now() - startTime < maxWaitTime) {
if (await this.isDaemonRunning()) {
return;
}
await new Promise((resolve) => setTimeout(resolve, 500));
// Wait for daemon to be ready using SmartIPC's helper
try {
await plugins.smartipc.SmartIpc.waitForServer({
socketPath: this.socketPath,
timeoutMs: 15000,
});
} catch (error) {
throw new Error(`Daemon failed to start: ${error.message}`);
}
throw new Error('Daemon failed to start within timeout period');
}
/**

View File

@@ -1,4 +1,5 @@
import * as plugins from './plugins.js';
import { EventEmitter } from 'events';
import { ProcessWrapper, type IProcessLog } from './classes.processwrapper.js';
import { Logger, ProcessError, handleError } from './utils.errorhandler.js';
@@ -13,7 +14,7 @@ export interface IMonitorConfig {
logBufferSize?: number; // Optional: number of log lines to keep (default: 100)
}
export class ProcessMonitor {
export class ProcessMonitor extends EventEmitter {
private processWrapper: ProcessWrapper | null = null;
private config: IMonitorConfig;
private intervalId: NodeJS.Timeout | null = null;
@@ -22,6 +23,7 @@ export class ProcessMonitor {
private logger: Logger;
constructor(config: IMonitorConfig) {
super();
this.config = config;
this.logger = new Logger(`ProcessMonitor:${config.name || 'unnamed'}`);
}
@@ -65,8 +67,10 @@ export class ProcessMonitor {
// Set up event handlers
this.processWrapper.on('log', (log: IProcessLog): void => {
// Here we could add handlers to send logs somewhere
// For now, we just log system messages to the console
// Re-emit the log event for upstream handlers
this.emit('log', log);
// Log system messages to the console
if (log.type === 'system') {
this.log(log.message);
}

View File

@@ -15,6 +15,8 @@ export interface IProcessLog {
timestamp: Date;
type: 'stdout' | 'stderr' | 'system';
message: string;
seq: number;
runId: string;
}
export class ProcessWrapper extends EventEmitter {
@@ -24,12 +26,15 @@ export class ProcessWrapper extends EventEmitter {
private logBufferSize: number;
private startTime: Date | null = null;
private logger: Logger;
private nextSeq: number = 0;
private runId: string = '';
constructor(options: IProcessWrapperOptions) {
super();
this.options = options;
this.logBufferSize = options.logBuffer || 100;
this.logger = new Logger(`ProcessWrapper:${options.name}`);
this.runId = `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
/**
@@ -217,6 +222,8 @@ export class ProcessWrapper extends EventEmitter {
timestamp: new Date(),
type,
message,
seq: this.nextSeq++,
runId: this.runId,
};
this.logs.push(log);
@@ -238,6 +245,8 @@ export class ProcessWrapper extends EventEmitter {
timestamp: new Date(),
type: 'system',
message,
seq: this.nextSeq++,
runId: this.runId,
};
this.logs.push(log);

View File

@@ -1,9 +1,11 @@
import * as plugins from './plugins.js';
import { EventEmitter } from 'events';
import * as paths from './paths.js';
import {
ProcessMonitor,
type IMonitorConfig,
} from './classes.processmonitor.js';
import { type IProcessLog } from './classes.processwrapper.js';
import { TspmConfig } from './classes.config.js';
import {
Logger,
@@ -30,13 +32,9 @@ export interface IProcessInfo {
restarts: number;
}
export interface IProcessLog {
timestamp: Date;
type: 'stdout' | 'stderr' | 'system';
message: string;
}
export class Tspm {
export class Tspm extends EventEmitter {
public processes: Map<string, ProcessMonitor> = new Map();
public processConfigs: Map<string, IProcessConfig> = new Map();
public processInfo: Map<string, IProcessInfo> = new Map();
@@ -45,6 +43,7 @@ export class Tspm {
private logger: Logger;
constructor() {
super();
this.logger = new Logger('Tspm');
this.config = new TspmConfig();
this.loadProcessConfigs();
@@ -98,6 +97,12 @@ export class Tspm {
});
this.processes.set(config.id, monitor);
// Set up log event handler to re-emit for pub/sub
monitor.on('log', (log: IProcessLog) => {
this.emit('process:log', { processId: config.id, log });
});
monitor.start();
// Update process info

View File

@@ -426,20 +426,84 @@ export const run = async (): Promise<void> => {
const id = argvArg._[1];
if (!id) {
console.error('Error: Please provide a process ID');
console.log('Usage: tspm logs <id>');
console.log('Usage: tspm logs <id> [options]');
console.log('\nOptions:');
console.log(' --lines <n> Number of lines to show (default: 50)');
console.log(' --follow Stream logs in real-time (like tail -f)');
return;
}
const lines = argvArg.lines || 50;
const follow = argvArg.follow || argvArg.f || false;
// Get initial logs
const response = await tspmIpcClient.request('getLogs', { id, lines });
console.log(`Logs for process: ${id} (last ${lines} lines)`);
console.log('─'.repeat(60));
if (!follow) {
// Static log output
console.log(`Logs for process: ${id} (last ${lines} lines)`);
console.log('─'.repeat(60));
for (const log of response.logs) {
const timestamp = new Date(log.timestamp).toLocaleTimeString();
const prefix = log.type === 'stdout' ? '[OUT]' : '[ERR]';
console.log(`${timestamp} ${prefix} ${log.message}`);
for (const log of response.logs) {
const timestamp = new Date(log.timestamp).toLocaleTimeString();
const prefix = log.type === 'stdout' ? '[OUT]' : log.type === 'stderr' ? '[ERR]' : '[SYS]';
console.log(`${timestamp} ${prefix} ${log.message}`);
}
} else {
// Streaming log output
console.log(`Logs for process: ${id} (streaming...)`);
console.log('─'.repeat(60));
// Display initial logs
let lastSeq = 0;
for (const log of response.logs) {
const timestamp = new Date(log.timestamp).toLocaleTimeString();
const prefix = log.type === 'stdout' ? '[OUT]' : log.type === 'stderr' ? '[ERR]' : '[SYS]';
console.log(`${timestamp} ${prefix} ${log.message}`);
if (log.seq !== undefined) {
lastSeq = Math.max(lastSeq, log.seq);
}
}
// Subscribe to real-time updates
await tspmIpcClient.subscribe(id, (log: any) => {
// Check for sequence gap or duplicate
if (log.seq !== undefined && log.seq <= lastSeq) {
return; // Skip duplicate
}
if (log.seq !== undefined && log.seq > lastSeq + 1) {
console.log(`[WARNING] Log gap detected: expected seq ${lastSeq + 1}, got ${log.seq}`);
}
const timestamp = new Date(log.timestamp).toLocaleTimeString();
const prefix = log.type === 'stdout' ? '[OUT]' : log.type === 'stderr' ? '[ERR]' : '[SYS]';
console.log(`${timestamp} ${prefix} ${log.message}`);
if (log.seq !== undefined) {
lastSeq = log.seq;
}
});
// Handle Ctrl+C gracefully
let isCleaningUp = false;
const cleanup = async () => {
if (isCleaningUp) return;
isCleaningUp = true;
console.log('\n\nStopping log stream...');
try {
await tspmIpcClient.unsubscribe(id);
await tspmIpcClient.disconnect();
} catch (err) {
// Ignore cleanup errors
}
process.exit(0);
};
process.on('SIGINT', cleanup);
process.on('SIGTERM', cleanup);
// Keep the process alive
await new Promise(() => {}); // Block forever until interrupted
}
} catch (error) {
console.error('Error getting logs:', error.message);

View File

@@ -1,8 +1,8 @@
import type {
IProcessConfig,
IProcessInfo,
IProcessLog,
} from './classes.tspm.js';
import type { IProcessLog } from './classes.processwrapper.js';
// Base message types
export interface IpcRequest<T = any> {