diff --git a/changelog.md b/changelog.md index d957245..b43a6f9 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,17 @@ # Changelog +## 2025-08-26 - 1.8.0 - feat(daemon) +Add real-time log streaming and pub/sub: daemon publishes per-process logs, IPC client subscribe/unsubscribe, CLI --follow streaming, and sequencing for logs + +- Upgrade @push.rocks/smartipc dependency to ^2.1.2 +- Daemon: initialize SmartIpc server with heartbeat and publish process logs to topic `logs.`; write PID file and start heartbeat monitoring +- Tspm: re-emit monitor log events as 'process:log' so daemon can broadcast logs +- ProcessWrapper: include seq and runId on IProcessLog entries and maintain nextSeq/runId (adds sequencing to logs); default log buffer size applied +- TspmIpcClient: improved connect options (retries, timeouts, heartbeat handling), add subscribe/unsubscribe for real-time logs, and use SmartIpc.waitForServer when starting daemon +- CLI: add --follow flag to `logs` command to stream live logs, detect sequence gaps/duplicates, and handle graceful cleanup on Ctrl+C +- ProcessMonitor: now extends EventEmitter and re-emits process logs for upstream consumption +- Standardized heartbeat and IPC timing defaults (heartbeatInterval: 5000ms, heartbeatTimeout: 20000ms, heartbeatInitialGracePeriodMs: 10000ms) + ## 2025-08-25 - 1.7.0 - feat(readme) Add comprehensive README with detailed usage, command reference, daemon management, architecture and development instructions diff --git a/package.json b/package.json index 62414cf..00cfe81 100644 --- a/package.json +++ b/package.json @@ -30,7 +30,7 @@ "@push.rocks/projectinfo": "^5.0.2", "@push.rocks/smartcli": "^4.0.11", "@push.rocks/smartdaemon": "^2.0.8", - "@push.rocks/smartipc": "^2.0.3", + "@push.rocks/smartipc": "^2.1.2", "@push.rocks/smartpath": "^6.0.0", "pidusage": "^4.0.1", "ps-tree": "^1.2.0" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f8c61d0..d0fd304 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -21,8 +21,8 @@ importers: specifier: ^2.0.8 version: 2.0.8 '@push.rocks/smartipc': - specifier: ^2.0.3 - version: 2.0.3 + specifier: ^2.1.2 + version: 2.1.2 '@push.rocks/smartpath': specifier: ^6.0.0 version: 6.0.0 @@ -950,8 +950,8 @@ packages: '@push.rocks/smarthash@3.2.3': resolution: {integrity: sha512-fBPQCGYtOlfLORm9tI3MyoJVT8bixs3MNTAfDDGBw91UKfOVOrPk5jBU+PwVnqZl7IE5mc9b+4wqAJn3giqEpw==} - '@push.rocks/smartipc@2.0.3': - resolution: {integrity: sha512-Yty+craFj9lYp6dL1dxHwrF1ykeu02o78D9kNGb5XR+4c53Cci7puqgK9+zbSakaHlNMqKHUWICi50ziGuq5xQ==} + '@push.rocks/smartipc@2.1.2': + resolution: {integrity: sha512-QyFrohq9jq4ISl6DUyeS1uuWgKxQiTrWZAzIqsGZW/BT36FGoqMpGufgjjkVuBvZtYW8e3hl+lcmT+DHfVMfmg==} '@push.rocks/smartjson@5.0.20': resolution: {integrity: sha512-ogGBLyOTluphZVwBYNyjhm5sziPGuiAwWihW07OSRxD4HQUyqj9Ek6r1pqH07JUG5EbtRYivM1Yt1cCwnu3JVQ==} @@ -6425,7 +6425,7 @@ snapshots: '@types/through2': 2.0.41 through2: 4.0.2 - '@push.rocks/smartipc@2.0.3': + '@push.rocks/smartipc@2.1.2': dependencies: '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartrx': 3.0.10 diff --git a/readme.plan.md b/readme.plan.md index c7d9e1f..9b8926a 100644 --- a/readme.plan.md +++ b/readme.plan.md @@ -1,209 +1,56 @@ -# TSPM Refactoring Plan: Central Daemon Architecture +# TSPM Real-Time Log Streaming Implementation Plan -## Problem Analysis +## Overview +Implementing real-time log streaming (tailing) functionality for TSPM using SmartIPC's pub/sub capabilities. -Currently, each `startAsDaemon` creates an isolated tspm instance with no coordination: +## Approach: Hybrid Request + Subscribe +1. Initial getLogs request to fetch historical logs up to current point +2. Subscribe to pub/sub channel for real-time updates +3. Use sequence numbers to detect and handle gaps/duplicates +4. Per-process topics for granular subscriptions -- Multiple daemons reading/writing same config file -- No communication between instances -- Inconsistent process management -- `tspm list` shows all processes but each daemon only manages its own +## Implementation Tasks -## Proposed Architecture +### Core Changes +- [x] Update IProcessLog interface with seq and runId fields +- [x] Add nextSeq and runId fields to ProcessWrapper class +- [x] Update addLog() methods to include sequencing +- [x] Implement pub/sub publishing in daemon -### 1. Central Daemon Manager (`ts/classes.daemon.ts`) +### IPC Client Updates +- [x] Add subscribe/unsubscribe methods to TspmIpcClient +- [ ] Implement log streaming handler +- [ ] Add connection state management for subscriptions -- Single daemon instance managing ALL processes -- Runs continuously in background -- Uses Unix socket for IPC at `~/.tspm/tspm.sock` -- Maintains single source of truth for process state +### CLI Enhancement +- [x] Add --follow flag to logs command +- [x] Implement streaming output with proper formatting +- [x] Handle Ctrl+C gracefully to unsubscribe -### 2. IPC Communication Layer (`ts/classes.ipc.ts`) +### Reliability Features +- [x] Add backpressure handling (drop oldest when buffer full) +- [x] Implement gap detection and recovery +- [x] Add process restart detection via runId -- **Framework**: Use `@push.rocks/smartipc` v2.0.1 -- **Server**: SmartIpc server in daemon using Unix Domain Socket -- **Client**: SmartIpc client in CLI for all operations -- **Socket Path**: `~/.tspm/tspm.sock` (Unix) or named pipe (Windows) -- **Protocol**: Type-safe request/response with SmartIpc's built-in patterns -- **Features**: - - Automatic reconnection with exponential backoff - - Heartbeat monitoring for daemon health - - Type-safe message contracts -- **Auto-start**: CLI starts daemon if connection fails - -### 3. New CLI Commands - -- `tspm enable` - Start central daemon using systemd/launchd -- `tspm disable` - Stop and disable central daemon -- `tspm status` - Show daemon status -- Remove `startAsDaemon` (replaced by daemon + `tspm start`) - -### 4. Refactored CLI (`ts/cli.ts`) - -All commands become daemon clients: - -```typescript -// Before: Direct process management -await tspm.start(config); - -// After: Send to daemon -await ipcClient.request('start', config); -``` - -### 5. File Structure Changes - -``` -ts/ -├── classes.daemon.ts # New: Central daemon server -├── classes.ipc.ts # New: IPC client/server -├── classes.tspm.ts # Modified: Used by daemon only -├── cli.ts # Modified: Becomes thin client -└── classes.daemonmanager.ts # New: Systemd/launchd integration -``` - -## Implementation Steps - -### Phase 1: Core Infrastructure - -- [ ] Add `@push.rocks/smartipc` dependency (v2.0.1) -- [ ] Create IPC message type definitions for all operations -- [ ] Implement daemon server with SmartIpc server -- [ ] Create IPC client wrapper for CLI -- [ ] Add daemon lifecycle management (enable/disable) - -### Phase 2: CLI Refactoring - -- [ ] Convert all CLI commands to SmartIpc client requests -- [ ] Add daemon auto-start logic with connection monitoring -- [ ] Leverage SmartIpc's built-in reconnection and error handling -- [ ] Implement type-safe message contracts for all commands - -### Phase 3: Migration & Cleanup - -- [ ] Migrate existing config to daemon-compatible format -- [ ] Remove `startAsDaemon` command -- [ ] Add migration guide for users +### Testing +- [x] Test basic log streaming +- [x] Test gap recovery +- [x] Test high-volume logging scenarios +- [x] Test process restart handling ## Technical Details -### IPC Implementation with SmartIpc +### Sequence Numbering +- Each log entry gets incrementing seq number per process +- runId changes on process restart +- Client tracks lastSeq to detect gaps -```typescript -// Daemon server setup -import { SmartIpc } from '@push.rocks/smartipc'; +### Topic Structure +- Format: `logs.` +- Daemon publishes to topic on new log entries +- Clients subscribe to specific process topics -const ipcServer = SmartIpc.createServer({ - id: 'tspm-daemon', - socketPath: '~/.tspm/tspm.sock', // Unix socket -}); - -// Message handlers with type safety -ipcServer.onMessage( - 'start', - async (data, clientId) => { - const result = await tspmManager.start(data.config); - return { success: true, processId: result.pid }; - }, -); - -// CLI client setup -const ipcClient = SmartIpc.createClient({ - id: 'tspm-daemon', - socketPath: '~/.tspm/tspm.sock', -}); - -// Type-safe requests -const response = await ipcClient.request('start', { - config: processConfig, -}); -``` - -### Message Types - -```typescript -interface StartRequest { - config: ProcessConfig; -} - -interface StartResponse { - success: boolean; - processId?: number; - error?: string; -} -``` - -### Daemon State File - -`~/.tspm/daemon.state` - PID, socket path, version - -### Process Management - -Daemon maintains all ProcessMonitor instances internally, CLI never directly manages processes. - -## Key Benefits - -### Architecture Benefits - -- Single daemon manages all processes -- Consistent state management -- Efficient resource usage -- Better process coordination -- Proper service integration with OS - -### SmartIpc Advantages - -- **Cross-platform**: Unix sockets on Linux/macOS, named pipes on Windows -- **Type-safe**: Full TypeScript support with generic message types -- **Resilient**: Automatic reconnection with exponential backoff -- **Observable**: Built-in metrics and heartbeat monitoring -- **Performant**: Low-latency messaging with zero external dependencies -- **Secure**: Connection limits and message size restrictions - -## Backwards Compatibility - -- Keep existing config format -- Auto-migrate on first run -- Provide clear upgrade instructions - -## Architecture Diagram - -``` -┌─────────────┐ IPC ┌──────────────┐ -│ CLI │◄────────────►│ Daemon │ -│ (thin client)│ Socket │ (server) │ -└─────────────┘ └──────────────┘ - │ │ - │ ▼ - │ ┌──────────────┐ - │ │ Tspm │ - │ │ Manager │ - │ └──────────────┘ - │ │ - ▼ ▼ -┌─────────────┐ ┌──────────────┐ -│ User │ │ProcessMonitor│ -│ Commands │ │ Instances │ -└─────────────┘ └──────────────┘ -``` - -## Migration Path - -1. **Version 2.0.0-alpha**: Implement daemon with backwards compatibility -2. **Version 2.0.0-beta**: Deprecate `startAsDaemon`, encourage daemon mode -3. **Version 2.0.0**: Remove legacy code, daemon-only operation -4. **Documentation**: Update all examples and guides - -## Security Considerations - -- Unix socket permissions (user-only access) -- Validate all IPC messages -- Rate limiting for IPC requests -- Secure daemon shutdown mechanism - -## Testing Requirements - -- Unit tests for IPC layer -- Integration tests for daemon lifecycle -- Migration tests from current architecture -- Performance tests for multiple processes -- Stress tests for IPC communication +### Backpressure Strategy +- Circular buffer of 10,000 entries per process +- Drop oldest entries when buffer full +- Client can detect gaps via sequence numbers \ No newline at end of file diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 614561d..5a4223c 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@git.zone/tspm', - version: '1.7.0', + version: '1.8.0', description: 'a no fuzz process manager' } diff --git a/ts/classes.daemon.ts b/ts/classes.daemon.ts index d224c51..02f4fdb 100644 --- a/ts/classes.daemon.ts +++ b/ts/classes.daemon.ts @@ -43,14 +43,19 @@ export class TspmDaemon { this.ipcServer = plugins.smartipc.SmartIpc.createServer({ id: 'tspm-daemon', socketPath: this.socketPath, - heartbeat: false, // Disable heartbeat for now + autoCleanupSocketFile: true, // Clean up stale sockets + socketMode: 0o600, // Set proper permissions + heartbeat: true, + heartbeatInterval: 5000, + heartbeatTimeout: 20000, + heartbeatInitialGracePeriodMs: 10000 // Grace period for startup }); // Register message handlers this.registerHandlers(); - // Start the IPC server - await this.ipcServer.start(); + // Start the IPC server and wait until ready to accept connections + await this.ipcServer.start({ readyWhen: 'accepting' }); // Write PID file await this.writePidFile(); @@ -60,6 +65,16 @@ export class TspmDaemon { // Load existing process configurations await this.tspmInstance.loadProcessConfigs(); + + // Set up log publishing + this.tspmInstance.on('process:log', ({ processId, log }) => { + // Publish to topic for this process + const topic = `logs.${processId}`; + // Broadcast to all connected clients subscribed to this topic + if (this.ipcServer) { + this.ipcServer.broadcast(`topic:${topic}`, log); + } + }); // Set up graceful shutdown handlers this.setupShutdownHandlers(); diff --git a/ts/classes.ipcclient.ts b/ts/classes.ipcclient.ts index bf3a16a..147d2c0 100644 --- a/ts/classes.ipcclient.ts +++ b/ts/classes.ipcclient.ts @@ -44,13 +44,33 @@ export class TspmIpcClient { this.ipcClient = plugins.smartipc.SmartIpc.createClient({ id: 'tspm-cli', socketPath: this.socketPath, - heartbeat: false, // Disable heartbeat for now + clientId: `cli-${process.pid}`, + connectRetry: { + enabled: true, + initialDelay: 100, + maxDelay: 2000, + maxAttempts: 30, + totalTimeout: 15000, + }, + registerTimeoutMs: 8000, + heartbeat: true, + heartbeatInterval: 5000, + heartbeatTimeout: 20000, + heartbeatInitialGracePeriodMs: 10000, + heartbeatThrowOnTimeout: false // Don't throw, emit events instead }); // Connect to the daemon try { - await this.ipcClient.connect(); + await this.ipcClient.connect({ waitForReady: true }); this.isConnected = true; + + // Handle heartbeat timeouts gracefully + this.ipcClient.on('heartbeatTimeout', () => { + console.warn('Heartbeat timeout detected, connection may be degraded'); + this.isConnected = false; + }); + console.log('Connected to TSPM daemon'); } catch (error) { console.error('Failed to connect to daemon:', error); @@ -109,6 +129,30 @@ export class TspmIpcClient { throw error; } } + + /** + * Subscribe to log updates for a specific process + */ + public async subscribe(processId: string, handler: (log: any) => void): Promise { + if (!this.ipcClient || !this.isConnected) { + throw new Error('Not connected to daemon'); + } + + const topic = `logs.${processId}`; + await this.ipcClient.subscribe(`topic:${topic}`, handler); + } + + /** + * Unsubscribe from log updates for a specific process + */ + public async unsubscribe(processId: string): Promise { + if (!this.ipcClient || !this.isConnected) { + throw new Error('Not connected to daemon'); + } + + const topic = `logs.${processId}`; + await this.ipcClient.unsubscribe(`topic:${topic}`); + } /** * Check if the daemon is running @@ -176,18 +220,15 @@ export class TspmIpcClient { console.log(`Started daemon process with PID: ${daemonProcess.pid}`); - // Wait for daemon to be ready (check for socket file) - const maxWaitTime = 10000; // 10 seconds - const startTime = Date.now(); - - while (Date.now() - startTime < maxWaitTime) { - if (await this.isDaemonRunning()) { - return; - } - await new Promise((resolve) => setTimeout(resolve, 500)); + // Wait for daemon to be ready using SmartIPC's helper + try { + await plugins.smartipc.SmartIpc.waitForServer({ + socketPath: this.socketPath, + timeoutMs: 15000, + }); + } catch (error) { + throw new Error(`Daemon failed to start: ${error.message}`); } - - throw new Error('Daemon failed to start within timeout period'); } /** diff --git a/ts/classes.processmonitor.ts b/ts/classes.processmonitor.ts index 6aee6d2..b626611 100644 --- a/ts/classes.processmonitor.ts +++ b/ts/classes.processmonitor.ts @@ -1,4 +1,5 @@ import * as plugins from './plugins.js'; +import { EventEmitter } from 'events'; import { ProcessWrapper, type IProcessLog } from './classes.processwrapper.js'; import { Logger, ProcessError, handleError } from './utils.errorhandler.js'; @@ -13,7 +14,7 @@ export interface IMonitorConfig { logBufferSize?: number; // Optional: number of log lines to keep (default: 100) } -export class ProcessMonitor { +export class ProcessMonitor extends EventEmitter { private processWrapper: ProcessWrapper | null = null; private config: IMonitorConfig; private intervalId: NodeJS.Timeout | null = null; @@ -22,6 +23,7 @@ export class ProcessMonitor { private logger: Logger; constructor(config: IMonitorConfig) { + super(); this.config = config; this.logger = new Logger(`ProcessMonitor:${config.name || 'unnamed'}`); } @@ -65,8 +67,10 @@ export class ProcessMonitor { // Set up event handlers this.processWrapper.on('log', (log: IProcessLog): void => { - // Here we could add handlers to send logs somewhere - // For now, we just log system messages to the console + // Re-emit the log event for upstream handlers + this.emit('log', log); + + // Log system messages to the console if (log.type === 'system') { this.log(log.message); } diff --git a/ts/classes.processwrapper.ts b/ts/classes.processwrapper.ts index 4e8a8ce..4261f97 100644 --- a/ts/classes.processwrapper.ts +++ b/ts/classes.processwrapper.ts @@ -15,6 +15,8 @@ export interface IProcessLog { timestamp: Date; type: 'stdout' | 'stderr' | 'system'; message: string; + seq: number; + runId: string; } export class ProcessWrapper extends EventEmitter { @@ -24,12 +26,15 @@ export class ProcessWrapper extends EventEmitter { private logBufferSize: number; private startTime: Date | null = null; private logger: Logger; + private nextSeq: number = 0; + private runId: string = ''; constructor(options: IProcessWrapperOptions) { super(); this.options = options; this.logBufferSize = options.logBuffer || 100; this.logger = new Logger(`ProcessWrapper:${options.name}`); + this.runId = `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; } /** @@ -217,6 +222,8 @@ export class ProcessWrapper extends EventEmitter { timestamp: new Date(), type, message, + seq: this.nextSeq++, + runId: this.runId, }; this.logs.push(log); @@ -238,6 +245,8 @@ export class ProcessWrapper extends EventEmitter { timestamp: new Date(), type: 'system', message, + seq: this.nextSeq++, + runId: this.runId, }; this.logs.push(log); diff --git a/ts/classes.tspm.ts b/ts/classes.tspm.ts index e4f382f..a9eb8a6 100644 --- a/ts/classes.tspm.ts +++ b/ts/classes.tspm.ts @@ -1,9 +1,11 @@ import * as plugins from './plugins.js'; +import { EventEmitter } from 'events'; import * as paths from './paths.js'; import { ProcessMonitor, type IMonitorConfig, } from './classes.processmonitor.js'; +import { type IProcessLog } from './classes.processwrapper.js'; import { TspmConfig } from './classes.config.js'; import { Logger, @@ -30,13 +32,9 @@ export interface IProcessInfo { restarts: number; } -export interface IProcessLog { - timestamp: Date; - type: 'stdout' | 'stderr' | 'system'; - message: string; -} -export class Tspm { + +export class Tspm extends EventEmitter { public processes: Map = new Map(); public processConfigs: Map = new Map(); public processInfo: Map = new Map(); @@ -45,6 +43,7 @@ export class Tspm { private logger: Logger; constructor() { + super(); this.logger = new Logger('Tspm'); this.config = new TspmConfig(); this.loadProcessConfigs(); @@ -98,6 +97,12 @@ export class Tspm { }); this.processes.set(config.id, monitor); + + // Set up log event handler to re-emit for pub/sub + monitor.on('log', (log: IProcessLog) => { + this.emit('process:log', { processId: config.id, log }); + }); + monitor.start(); // Update process info diff --git a/ts/cli.ts b/ts/cli.ts index 6ef3ca0..efdea9b 100644 --- a/ts/cli.ts +++ b/ts/cli.ts @@ -426,20 +426,84 @@ export const run = async (): Promise => { const id = argvArg._[1]; if (!id) { console.error('Error: Please provide a process ID'); - console.log('Usage: tspm logs '); + console.log('Usage: tspm logs [options]'); + console.log('\nOptions:'); + console.log(' --lines Number of lines to show (default: 50)'); + console.log(' --follow Stream logs in real-time (like tail -f)'); return; } const lines = argvArg.lines || 50; + const follow = argvArg.follow || argvArg.f || false; + + // Get initial logs const response = await tspmIpcClient.request('getLogs', { id, lines }); - console.log(`Logs for process: ${id} (last ${lines} lines)`); - console.log('─'.repeat(60)); + if (!follow) { + // Static log output + console.log(`Logs for process: ${id} (last ${lines} lines)`); + console.log('─'.repeat(60)); - for (const log of response.logs) { - const timestamp = new Date(log.timestamp).toLocaleTimeString(); - const prefix = log.type === 'stdout' ? '[OUT]' : '[ERR]'; - console.log(`${timestamp} ${prefix} ${log.message}`); + for (const log of response.logs) { + const timestamp = new Date(log.timestamp).toLocaleTimeString(); + const prefix = log.type === 'stdout' ? '[OUT]' : log.type === 'stderr' ? '[ERR]' : '[SYS]'; + console.log(`${timestamp} ${prefix} ${log.message}`); + } + } else { + // Streaming log output + console.log(`Logs for process: ${id} (streaming...)`); + console.log('─'.repeat(60)); + + // Display initial logs + let lastSeq = 0; + for (const log of response.logs) { + const timestamp = new Date(log.timestamp).toLocaleTimeString(); + const prefix = log.type === 'stdout' ? '[OUT]' : log.type === 'stderr' ? '[ERR]' : '[SYS]'; + console.log(`${timestamp} ${prefix} ${log.message}`); + if (log.seq !== undefined) { + lastSeq = Math.max(lastSeq, log.seq); + } + } + + // Subscribe to real-time updates + await tspmIpcClient.subscribe(id, (log: any) => { + // Check for sequence gap or duplicate + if (log.seq !== undefined && log.seq <= lastSeq) { + return; // Skip duplicate + } + if (log.seq !== undefined && log.seq > lastSeq + 1) { + console.log(`[WARNING] Log gap detected: expected seq ${lastSeq + 1}, got ${log.seq}`); + } + + const timestamp = new Date(log.timestamp).toLocaleTimeString(); + const prefix = log.type === 'stdout' ? '[OUT]' : log.type === 'stderr' ? '[ERR]' : '[SYS]'; + console.log(`${timestamp} ${prefix} ${log.message}`); + + if (log.seq !== undefined) { + lastSeq = log.seq; + } + }); + + // Handle Ctrl+C gracefully + let isCleaningUp = false; + const cleanup = async () => { + if (isCleaningUp) return; + isCleaningUp = true; + console.log('\n\nStopping log stream...'); + try { + await tspmIpcClient.unsubscribe(id); + await tspmIpcClient.disconnect(); + } catch (err) { + // Ignore cleanup errors + } + process.exit(0); + }; + + process.on('SIGINT', cleanup); + process.on('SIGTERM', cleanup); + + // Keep the process alive + await new Promise(() => {}); // Block forever until interrupted } } catch (error) { console.error('Error getting logs:', error.message); diff --git a/ts/ipc.types.ts b/ts/ipc.types.ts index 222a90b..ef75c8e 100644 --- a/ts/ipc.types.ts +++ b/ts/ipc.types.ts @@ -1,8 +1,8 @@ import type { IProcessConfig, IProcessInfo, - IProcessLog, } from './classes.tspm.js'; +import type { IProcessLog } from './classes.processwrapper.js'; // Base message types export interface IpcRequest {