diff --git a/changelog.md b/changelog.md index 9f97164..be42dbb 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,15 @@ # Changelog +## 2025-08-30 - 5.5.0 - feat(logs) +Improve logs streaming and backlog delivery; add CLI filters and ndjson output + +- CLI: add new logs options: --since, --stderr-only, --stdout-only and --ndjson; enhance streaming output and gap detection +- CLI: fetch backlog conditionally (honoring --since) and print filtered results before live streaming +- Client: add TspmIpcClient.requestLogsBacklogStream, onStream and onBacklogTopic helpers to receive backlog chunks and streams +- Daemon: add logs:subscribe IPC handler to stream backlog entries to requesting client in small batches +- Protocol: extend IPC types with LogsSubscribeRequest/Response and register 'logs:subscribe' method +- Dependency: bump @push.rocks/smartipc to ^2.3.0 to support the streaming/IPC changes + ## 2025-08-30 - 5.4.2 - fix(cli/process/logs) Reset log sequence on process restart to avoid false log gap warnings diff --git a/package.json b/package.json index 6739ab2..95d7978 100644 --- a/package.json +++ b/package.json @@ -38,7 +38,7 @@ "@push.rocks/smartdaemon": "^2.0.9", "@push.rocks/smartfile": "^11.2.7", "@push.rocks/smartinteract": "^2.0.16", - "@push.rocks/smartipc": "^2.2.2", + "@push.rocks/smartipc": "^2.3.0", "@push.rocks/smartpath": "^6.0.0", "@types/pidusage": "^2.0.5", "@types/ps-tree": "^1.1.6", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 42e59bf..9b5c98b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -27,8 +27,8 @@ importers: specifier: ^2.0.16 version: 2.0.16 '@push.rocks/smartipc': - specifier: ^2.2.2 - version: 2.2.2 + specifier: ^2.3.0 + version: 2.3.0 '@push.rocks/smartpath': specifier: ^6.0.0 version: 6.0.0 @@ -877,8 +877,8 @@ packages: '@push.rocks/smartinteract@2.0.16': resolution: {integrity: sha512-eltvVRRUKBKd77DSFA4DPY2g4V4teZLNe8A93CDy/WglglYcUjxMoLY/b0DFTWCWKYT+yjk6Fe6p0FRrvX9Yvg==} - '@push.rocks/smartipc@2.2.2': - resolution: {integrity: sha512-pkWqp2nQH7p5zD9Efh5KNX2O0+gFWL6bxbdd6SdDh4gP8Gb0b3Sn87Tpedghpc/d+LCVql+1pUf6OlvMQpD5Yw==} + '@push.rocks/smartipc@2.3.0': + resolution: {integrity: sha512-/btC/DHf+2PWF6Qiq0oHHP7XHzacgYfHAShIts2ZXS+nhpvSyjucNzB2ErNUPHLMITNXGUSu5Wpt7sfvIQzxJQ==} '@push.rocks/smartjson@5.0.20': resolution: {integrity: sha512-ogGBLyOTluphZVwBYNyjhm5sziPGuiAwWihW07OSRxD4HQUyqj9Ek6r1pqH07JUG5EbtRYivM1Yt1cCwnu3JVQ==} @@ -6360,7 +6360,7 @@ snapshots: '@push.rocks/smartpromise': 4.2.3 inquirer: 11.1.0 - '@push.rocks/smartipc@2.2.2': + '@push.rocks/smartipc@2.3.0': dependencies: '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartrx': 3.0.10 diff --git a/readme.md b/readme.md index 8059ff4..2977247 100644 --- a/readme.md +++ b/readme.md @@ -177,11 +177,15 @@ Watch: disabled #### `tspm logs [options]` -View process logs (stdout and stderr combined). +View and stream process logs (stdout, stderr, and system messages). **Options:** -- `--lines ` - Number of lines to display (default: 50) -- `--follow` - Stream logs in real-time (like `tail -f`) +- `--lines ` Number of lines to show (default: 50) +- `--since ` Only show logs since duration (e.g., `10m`, `2h`, `1d`; units: `ms|s|m|h|d`) +- `--stderr-only` Only show stderr logs +- `--stdout-only` Only show stdout logs +- `--ndjson` Output each log as JSON line (timestamp in ms) +- `--follow` Stream logs in real-time (like `tail -f`) ```bash # View last 50 lines @@ -190,10 +194,20 @@ tspm logs name:my-server # View last 100 lines tspm logs name:my-server --lines 100 -# Follow logs in real-time +# Only stderr for the last 10 minutes (as NDJSON) +tspm logs name:my-server --since 10m --stderr-only --ndjson + +# Follow logs in real time (prints recent lines, then streams backlog incrementally and live logs) tspm logs name:my-server --follow + +# Follow only stdout since 2h ago +tspm logs name:my-server --follow --since 2h --stdout-only ``` +Notes: +- Follow mode prints a small recent backlog, then streams older entries incrementally (to avoid large payloads) and continues with live logs. +- Log sequences are restart-aware; TSPM detects run changes and keeps output consistent across restarts. + ### Batch Operations #### `tspm start-all` @@ -285,6 +299,18 @@ Processes: 5 Socket: /home/user/.tspm/tspm.sock ``` +#### Version check and service refresh + +Check CLI vs daemon versions and refresh the systemd service if they differ: + +```bash +tspm -v +# tspm CLI: 5.x.y +# Daemon: running v5.x.z (pid 1234) +# Version mismatch detected → optionally refresh the systemd service (equivalent to `tspm disable && tspm enable`). +``` +This is helpful after upgrades where the system service still references an older CLI path. + ### System Service Management Run TSPM as a system service (systemd) for production deployments. diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 18a8544..3ac853c 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@git.zone/tspm', - version: '5.4.2', + version: '5.5.0', description: 'a no fuzz process manager' } diff --git a/ts/cli/commands/process/logs.ts b/ts/cli/commands/process/logs.ts index a13c440..4cb78d6 100644 --- a/ts/cli/commands/process/logs.ts +++ b/ts/cli/commands/process/logs.ts @@ -2,7 +2,7 @@ import * as plugins from '../../plugins.js'; import { tspmIpcClient } from '../../../client/tspm.ipcclient.js'; import type { CliArguments } from '../../types.js'; import { registerIpcCommand } from '../../registration/index.js'; -import { getBool, getNumber } from '../../helpers/argv.js'; +import { getBool, getNumber, getString } from '../../helpers/argv.js'; import { formatLog } from '../../helpers/formatting.js'; import { withStreamingLifecycle } from '../../helpers/lifecycle.js'; @@ -16,23 +16,92 @@ export function registerLogsCommand(smartcli: plugins.smartcli.Smartcli) { console.error('Error: Please provide a process target'); console.log('Usage: tspm logs [options]'); console.log('\nOptions:'); - console.log(' --lines Number of lines to show (default: 50)'); - console.log(' --follow Stream logs in real-time (like tail -f)'); + console.log(' --lines Number of lines to show (default: 50)'); + console.log(' --since Only show logs since duration (e.g., 10m, 2h, 1d)'); + console.log(' --stderr-only Only show stderr logs'); + console.log(' --stdout-only Only show stdout logs'); + console.log(' --ndjson Output each log as JSON line'); + console.log(' --follow Stream logs in real-time (like tail -f)'); return; } const lines = getNumber(argvArg, 'lines', 50); const follow = getBool(argvArg, 'follow', 'f'); + const sinceSpec = getString(argvArg, 'since'); + const stderrOnly = getBool(argvArg, 'stderr-only'); + const stdoutOnly = getBool(argvArg, 'stdout-only'); + const ndjson = getBool(argvArg, 'ndjson'); + + const parseDuration = (spec?: string): number | undefined => { + if (!spec) return undefined; + const m = spec.trim().match(/^(\d+)(ms|s|m|h|d)?$/i); + if (!m) return undefined; + const val = Number(m[1]); + const unit = (m[2] || 'm').toLowerCase(); + const mult = unit === 'ms' ? 1 : unit === 's' ? 1000 : unit === 'm' ? 60000 : unit === 'h' ? 3600000 : 86400000; + return Date.now() - val * mult; + }; + const sinceTime = parseDuration(sinceSpec); + const typesFilter: Array<'stdout' | 'stderr' | 'system'> | undefined = + stderrOnly && !stdoutOnly + ? ['stderr'] + : stdoutOnly && !stderrOnly + ? ['stdout'] + : undefined; // all const resolved = await tspmIpcClient.request('resolveTarget', { target: String(target) }); const id = resolved.id; - const response = await tspmIpcClient.request('getLogs', { id, lines }); + const response = await tspmIpcClient.request('getLogs', { id, lines: sinceTime ? 0 : lines }); if (!follow) { // One-shot mode - auto-disconnect handled by registerIpcCommand - console.log(`Logs for process: ${id} (last ${lines} lines)`); + const filtered = response.logs.filter((l) => { + if (typesFilter && !typesFilter.includes(l.type)) return false; + if (sinceTime && new Date(l.timestamp).getTime() < sinceTime) return false; + return true; + }); + console.log(`Logs for process: ${id} (${sinceTime ? 'since ' + new Date(sinceTime).toLocaleString() : 'last ' + lines + ' lines'})`); console.log('─'.repeat(60)); - for (const log of response.logs) { + for (const log of filtered) { + if (ndjson) { + console.log( + JSON.stringify({ + ...log, + timestamp: new Date(log.timestamp).getTime(), + }), + ); + } else { + const timestamp = new Date(log.timestamp).toLocaleTimeString(); + const prefix = + log.type === 'stdout' + ? '[OUT]' + : log.type === 'stderr' + ? '[ERR]' + : '[SYS]'; + console.log(`${timestamp} ${prefix} ${log.message}`); + } + } + return; + } + + // Streaming mode + console.log(`Logs for process: ${resolved.name || id} (streaming...)`); + console.log('─'.repeat(60)); + + // Prepare backlog printing state and stream handler + let lastSeq = 0; + let lastRunId: string | undefined = undefined; + const printLog = (log: any) => { + if (typesFilter && !typesFilter.includes(log.type)) return; + if (sinceTime && new Date(log.timestamp).getTime() < sinceTime) return; + if (ndjson) { + console.log( + JSON.stringify({ + ...log, + timestamp: new Date(log.timestamp).getTime(), + }), + ); + } else { const timestamp = new Date(log.timestamp).toLocaleTimeString(); const prefix = log.type === 'stdout' @@ -42,28 +111,37 @@ export function registerLogsCommand(smartcli: plugins.smartcli.Smartcli) { : '[SYS]'; console.log(`${timestamp} ${prefix} ${log.message}`); } - return; - } + }; - // Streaming mode - console.log(`Logs for process: ${resolved.name || id} (streaming...)`); - console.log('─'.repeat(60)); - - let lastSeq = 0; - let lastRunId: string | undefined = undefined; + // Print initial backlog (already fetched via getLogs) for (const log of response.logs) { - const timestamp = new Date(log.timestamp).toLocaleTimeString(); - const prefix = - log.type === 'stdout' - ? '[OUT]' - : log.type === 'stderr' - ? '[ERR]' - : '[SYS]'; - console.log(`${timestamp} ${prefix} ${log.message}`); + printLog(log); if (log.seq !== undefined) lastSeq = Math.max(lastSeq, log.seq); if ((log as any).runId) lastRunId = (log as any).runId; } + // Request additional backlog delivered as incremental messages to avoid large payloads + try { + const disposeBacklog = tspmIpcClient.onBacklogTopic(id, (log: any) => { + if (log.runId && log.runId !== lastRunId) { + console.log(`[INFO] Detected process restart (runId changed).`); + lastSeq = -1; + lastRunId = log.runId; + } + if (log.seq !== undefined && log.seq <= lastSeq) return; + if (log.seq !== undefined && log.seq > lastSeq + 1) { + console.log( + `[WARNING] Log gap detected: expected seq ${lastSeq + 1}, got ${log.seq}`, + ); + } + printLog({ ...log, timestamp: new Date(log.timestamp) }); + if (log.seq !== undefined) lastSeq = log.seq; + }); + await tspmIpcClient.requestLogsBacklogStream(id, { lines: sinceTime ? undefined : lines, sinceTime, types: typesFilter }); + // Dispose backlog handler after a short grace (backlog is finite) + setTimeout(() => disposeBacklog(), 10000); + } catch {} + await withStreamingLifecycle( async () => { await tspmIpcClient.subscribe(id, (log: any) => { @@ -79,14 +157,7 @@ export function registerLogsCommand(smartcli: plugins.smartcli.Smartcli) { `[WARNING] Log gap detected: expected seq ${lastSeq + 1}, got ${log.seq}`, ); } - const timestamp = new Date(log.timestamp).toLocaleTimeString(); - const prefix = - log.type === 'stdout' - ? '[OUT]' - : log.type === 'stderr' - ? '[ERR]' - : '[SYS]'; - console.log(`${timestamp} ${prefix} ${log.message}`); + printLog(log); if (log.seq !== undefined) lastSeq = log.seq; }); }, diff --git a/ts/client/tspm.ipcclient.ts b/ts/client/tspm.ipcclient.ts index 0d440bf..abedb9d 100644 --- a/ts/client/tspm.ipcclient.ts +++ b/ts/client/tspm.ipcclient.ts @@ -160,6 +160,55 @@ export class TspmIpcClient { await this.ipcClient.subscribe(topic, handler); } + /** + * Request backlog logs as a stream from the daemon. + * The actual stream will be delivered via the 'stream' event. + */ + public async requestLogsBacklogStream( + processId: ProcessId | number | string, + opts: { lines?: number; sinceTime?: number; types?: Array<'stdout' | 'stderr' | 'system'> } = {}, + ): Promise { + if (!this.ipcClient || !this.isConnected) { + throw new Error('Not connected to daemon'); + } + const id = toProcessId(processId); + await this.request('logs:subscribe' as any, { + id, + lines: opts.lines, + sinceTime: opts.sinceTime, + types: opts.types, + } as any); + } + + /** + * Register a handler for incoming streams (e.g., backlog logs) + */ + public onStream( + handler: (info: any, readable: NodeJS.ReadableStream) => void, + ): void { + if (!this.ipcClient) throw new Error('Not connected to daemon'); + // smartipc emits 'stream' with (info, readable) + (this.ipcClient as any).on('stream', handler); + } + + /** + * Register a temporary handler for backlog topic messages for a specific process + */ + public onBacklogTopic( + processId: ProcessId | number | string, + handler: (log: any) => void, + ): () => void { + if (!this.ipcClient) throw new Error('Not connected to daemon'); + const id = toProcessId(processId); + const topicType = `topic:logs.backlog.${id}`; + (this.ipcClient as any).onMessage(topicType, handler); + return () => { + try { + (this.ipcClient as any).messageHandlers?.delete?.(topicType); + } catch {} + }; + } + /** * Unsubscribe from log updates for a specific process */ diff --git a/ts/daemon/tspm.daemon.ts b/ts/daemon/tspm.daemon.ts index c8d72c3..8b1db41 100644 --- a/ts/daemon/tspm.daemon.ts +++ b/ts/daemon/tspm.daemon.ts @@ -298,6 +298,54 @@ export class TspmDaemon { }, ); + // Stream backlog logs and let client subscribe to live topic separately + this.ipcServer.onMessage( + 'logs:subscribe', + async ( + request: RequestForMethod<'logs:subscribe'>, + clientId: string, + ) => { + const id = toProcessId(request.id); + // Determine backlog set + const allLogs = await this.tspmInstance.getLogs(id); + let filtered = allLogs; + if (request.types && request.types.length) { + filtered = filtered.filter((l) => request.types!.includes(l.type)); + } + if (request.sinceTime && request.sinceTime > 0) { + filtered = filtered.filter( + (l) => new Date(l.timestamp).getTime() >= request.sinceTime!, + ); + } + const lines = request.lines && request.lines > 0 ? request.lines : 0; + if (lines > 0 && filtered.length > lines) { + filtered = filtered.slice(-lines); + } + + // Send backlog entries directly to the requesting client as topic messages + // in small batches to avoid overwhelming the transport or client. + const chunkSize = 200; + for (let i = 0; i < filtered.length; i += chunkSize) { + const chunk = filtered.slice(i, i + chunkSize); + await Promise.allSettled( + chunk.map((entry) => + this.ipcServer.sendToClient( + clientId, + `topic:logs.backlog.${id}`, + { + ...entry, + timestamp: new Date(entry.timestamp).getTime(), + }, + ), + ), + ); + // Yield a bit between chunks + await new Promise((r) => setTimeout(r, 5)); + } + return { ok: true } as any; + }, + ); + // Resolve target (id:n | name:foo | numeric string) to ProcessId this.ipcServer.onMessage( 'resolveTarget', diff --git a/ts/shared/protocol/ipc.types.ts b/ts/shared/protocol/ipc.types.ts index 962f457..841e594 100644 --- a/ts/shared/protocol/ipc.types.ts +++ b/ts/shared/protocol/ipc.types.ts @@ -139,6 +139,18 @@ export interface GetLogsResponse { logs: IProcessLog[]; } +// Subscribe and stream backlog logs +export interface LogsSubscribeRequest { + id: ProcessId; + lines?: number; // number of backlog lines + sinceTime?: number; // ms epoch + types?: Array; +} + +export interface LogsSubscribeResponse { + ok: boolean; +} + // Start all command export interface StartAllRequest { // No parameters needed @@ -274,6 +286,7 @@ export type IpcMethodMap = { list: { request: ListRequest; response: ListResponse }; describe: { request: DescribeRequest; response: DescribeResponse }; getLogs: { request: GetLogsRequest; response: GetLogsResponse }; + 'logs:subscribe': { request: LogsSubscribeRequest; response: LogsSubscribeResponse }; startAll: { request: StartAllRequest; response: StartAllResponse }; stopAll: { request: StopAllRequest; response: StopAllResponse }; restartAll: { request: RestartAllRequest; response: RestartAllResponse };