feat(logs): Improve logs streaming and backlog delivery; add CLI filters and ndjson output

This commit is contained in:
2025-08-30 23:26:59 +00:00
parent d8709d8b94
commit b210efde2a
9 changed files with 258 additions and 41 deletions

View File

@@ -1,5 +1,15 @@
# Changelog
## 2025-08-30 - 5.5.0 - feat(logs)
Improve logs streaming and backlog delivery; add CLI filters and ndjson output
- CLI: add new logs options: --since, --stderr-only, --stdout-only and --ndjson; enhance streaming output and gap detection
- CLI: fetch backlog conditionally (honoring --since) and print filtered results before live streaming
- Client: add TspmIpcClient.requestLogsBacklogStream, onStream and onBacklogTopic helpers to receive backlog chunks and streams
- Daemon: add logs:subscribe IPC handler to stream backlog entries to requesting client in small batches
- Protocol: extend IPC types with LogsSubscribeRequest/Response and register 'logs:subscribe' method
- Dependency: bump @push.rocks/smartipc to ^2.3.0 to support the streaming/IPC changes
## 2025-08-30 - 5.4.2 - fix(cli/process/logs)
Reset log sequence on process restart to avoid false log gap warnings

View File

@@ -38,7 +38,7 @@
"@push.rocks/smartdaemon": "^2.0.9",
"@push.rocks/smartfile": "^11.2.7",
"@push.rocks/smartinteract": "^2.0.16",
"@push.rocks/smartipc": "^2.2.2",
"@push.rocks/smartipc": "^2.3.0",
"@push.rocks/smartpath": "^6.0.0",
"@types/pidusage": "^2.0.5",
"@types/ps-tree": "^1.1.6",

10
pnpm-lock.yaml generated
View File

@@ -27,8 +27,8 @@ importers:
specifier: ^2.0.16
version: 2.0.16
'@push.rocks/smartipc':
specifier: ^2.2.2
version: 2.2.2
specifier: ^2.3.0
version: 2.3.0
'@push.rocks/smartpath':
specifier: ^6.0.0
version: 6.0.0
@@ -877,8 +877,8 @@ packages:
'@push.rocks/smartinteract@2.0.16':
resolution: {integrity: sha512-eltvVRRUKBKd77DSFA4DPY2g4V4teZLNe8A93CDy/WglglYcUjxMoLY/b0DFTWCWKYT+yjk6Fe6p0FRrvX9Yvg==}
'@push.rocks/smartipc@2.2.2':
resolution: {integrity: sha512-pkWqp2nQH7p5zD9Efh5KNX2O0+gFWL6bxbdd6SdDh4gP8Gb0b3Sn87Tpedghpc/d+LCVql+1pUf6OlvMQpD5Yw==}
'@push.rocks/smartipc@2.3.0':
resolution: {integrity: sha512-/btC/DHf+2PWF6Qiq0oHHP7XHzacgYfHAShIts2ZXS+nhpvSyjucNzB2ErNUPHLMITNXGUSu5Wpt7sfvIQzxJQ==}
'@push.rocks/smartjson@5.0.20':
resolution: {integrity: sha512-ogGBLyOTluphZVwBYNyjhm5sziPGuiAwWihW07OSRxD4HQUyqj9Ek6r1pqH07JUG5EbtRYivM1Yt1cCwnu3JVQ==}
@@ -6360,7 +6360,7 @@ snapshots:
'@push.rocks/smartpromise': 4.2.3
inquirer: 11.1.0
'@push.rocks/smartipc@2.2.2':
'@push.rocks/smartipc@2.3.0':
dependencies:
'@push.rocks/smartdelay': 3.0.5
'@push.rocks/smartrx': 3.0.10

View File

@@ -177,11 +177,15 @@ Watch: disabled
#### `tspm logs <id|id:N|name:LABEL> [options]`
View process logs (stdout and stderr combined).
View and stream process logs (stdout, stderr, and system messages).
**Options:**
- `--lines <n>` - Number of lines to display (default: 50)
- `--follow` - Stream logs in real-time (like `tail -f`)
- `--lines <n>` Number of lines to show (default: 50)
- `--since <dur>` Only show logs since duration (e.g., `10m`, `2h`, `1d`; units: `ms|s|m|h|d`)
- `--stderr-only` Only show stderr logs
- `--stdout-only` Only show stdout logs
- `--ndjson` Output each log as JSON line (timestamp in ms)
- `--follow` Stream logs in real-time (like `tail -f`)
```bash
# View last 50 lines
@@ -190,10 +194,20 @@ tspm logs name:my-server
# View last 100 lines
tspm logs name:my-server --lines 100
# Follow logs in real-time
# Only stderr for the last 10 minutes (as NDJSON)
tspm logs name:my-server --since 10m --stderr-only --ndjson
# Follow logs in real time (prints recent lines, then streams backlog incrementally and live logs)
tspm logs name:my-server --follow
# Follow only stdout since 2h ago
tspm logs name:my-server --follow --since 2h --stdout-only
```
Notes:
- Follow mode prints a small recent backlog, then streams older entries incrementally (to avoid large payloads) and continues with live logs.
- Log sequences are restart-aware; TSPM detects run changes and keeps output consistent across restarts.
### Batch Operations
#### `tspm start-all`
@@ -285,6 +299,18 @@ Processes: 5
Socket: /home/user/.tspm/tspm.sock
```
#### Version check and service refresh
Check CLI vs daemon versions and refresh the systemd service if they differ:
```bash
tspm -v
# tspm CLI: 5.x.y
# Daemon: running v5.x.z (pid 1234)
# Version mismatch detected → optionally refresh the systemd service (equivalent to `tspm disable && tspm enable`).
```
This is helpful after upgrades where the system service still references an older CLI path.
### System Service Management
Run TSPM as a system service (systemd) for production deployments.

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@git.zone/tspm',
version: '5.4.2',
version: '5.5.0',
description: 'a no fuzz process manager'
}

View File

@@ -2,7 +2,7 @@ import * as plugins from '../../plugins.js';
import { tspmIpcClient } from '../../../client/tspm.ipcclient.js';
import type { CliArguments } from '../../types.js';
import { registerIpcCommand } from '../../registration/index.js';
import { getBool, getNumber } from '../../helpers/argv.js';
import { getBool, getNumber, getString } from '../../helpers/argv.js';
import { formatLog } from '../../helpers/formatting.js';
import { withStreamingLifecycle } from '../../helpers/lifecycle.js';
@@ -16,23 +16,92 @@ export function registerLogsCommand(smartcli: plugins.smartcli.Smartcli) {
console.error('Error: Please provide a process target');
console.log('Usage: tspm logs <id | id:N | name:LABEL> [options]');
console.log('\nOptions:');
console.log(' --lines <n> Number of lines to show (default: 50)');
console.log(' --follow Stream logs in real-time (like tail -f)');
console.log(' --lines <n> Number of lines to show (default: 50)');
console.log(' --since <dur> Only show logs since duration (e.g., 10m, 2h, 1d)');
console.log(' --stderr-only Only show stderr logs');
console.log(' --stdout-only Only show stdout logs');
console.log(' --ndjson Output each log as JSON line');
console.log(' --follow Stream logs in real-time (like tail -f)');
return;
}
const lines = getNumber(argvArg, 'lines', 50);
const follow = getBool(argvArg, 'follow', 'f');
const sinceSpec = getString(argvArg, 'since');
const stderrOnly = getBool(argvArg, 'stderr-only');
const stdoutOnly = getBool(argvArg, 'stdout-only');
const ndjson = getBool(argvArg, 'ndjson');
const parseDuration = (spec?: string): number | undefined => {
if (!spec) return undefined;
const m = spec.trim().match(/^(\d+)(ms|s|m|h|d)?$/i);
if (!m) return undefined;
const val = Number(m[1]);
const unit = (m[2] || 'm').toLowerCase();
const mult = unit === 'ms' ? 1 : unit === 's' ? 1000 : unit === 'm' ? 60000 : unit === 'h' ? 3600000 : 86400000;
return Date.now() - val * mult;
};
const sinceTime = parseDuration(sinceSpec);
const typesFilter: Array<'stdout' | 'stderr' | 'system'> | undefined =
stderrOnly && !stdoutOnly
? ['stderr']
: stdoutOnly && !stderrOnly
? ['stdout']
: undefined; // all
const resolved = await tspmIpcClient.request('resolveTarget', { target: String(target) });
const id = resolved.id;
const response = await tspmIpcClient.request('getLogs', { id, lines });
const response = await tspmIpcClient.request('getLogs', { id, lines: sinceTime ? 0 : lines });
if (!follow) {
// One-shot mode - auto-disconnect handled by registerIpcCommand
console.log(`Logs for process: ${id} (last ${lines} lines)`);
const filtered = response.logs.filter((l) => {
if (typesFilter && !typesFilter.includes(l.type)) return false;
if (sinceTime && new Date(l.timestamp).getTime() < sinceTime) return false;
return true;
});
console.log(`Logs for process: ${id} (${sinceTime ? 'since ' + new Date(sinceTime).toLocaleString() : 'last ' + lines + ' lines'})`);
console.log('─'.repeat(60));
for (const log of response.logs) {
for (const log of filtered) {
if (ndjson) {
console.log(
JSON.stringify({
...log,
timestamp: new Date(log.timestamp).getTime(),
}),
);
} else {
const timestamp = new Date(log.timestamp).toLocaleTimeString();
const prefix =
log.type === 'stdout'
? '[OUT]'
: log.type === 'stderr'
? '[ERR]'
: '[SYS]';
console.log(`${timestamp} ${prefix} ${log.message}`);
}
}
return;
}
// Streaming mode
console.log(`Logs for process: ${resolved.name || id} (streaming...)`);
console.log('─'.repeat(60));
// Prepare backlog printing state and stream handler
let lastSeq = 0;
let lastRunId: string | undefined = undefined;
const printLog = (log: any) => {
if (typesFilter && !typesFilter.includes(log.type)) return;
if (sinceTime && new Date(log.timestamp).getTime() < sinceTime) return;
if (ndjson) {
console.log(
JSON.stringify({
...log,
timestamp: new Date(log.timestamp).getTime(),
}),
);
} else {
const timestamp = new Date(log.timestamp).toLocaleTimeString();
const prefix =
log.type === 'stdout'
@@ -42,28 +111,37 @@ export function registerLogsCommand(smartcli: plugins.smartcli.Smartcli) {
: '[SYS]';
console.log(`${timestamp} ${prefix} ${log.message}`);
}
return;
}
};
// Streaming mode
console.log(`Logs for process: ${resolved.name || id} (streaming...)`);
console.log('─'.repeat(60));
let lastSeq = 0;
let lastRunId: string | undefined = undefined;
// Print initial backlog (already fetched via getLogs)
for (const log of response.logs) {
const timestamp = new Date(log.timestamp).toLocaleTimeString();
const prefix =
log.type === 'stdout'
? '[OUT]'
: log.type === 'stderr'
? '[ERR]'
: '[SYS]';
console.log(`${timestamp} ${prefix} ${log.message}`);
printLog(log);
if (log.seq !== undefined) lastSeq = Math.max(lastSeq, log.seq);
if ((log as any).runId) lastRunId = (log as any).runId;
}
// Request additional backlog delivered as incremental messages to avoid large payloads
try {
const disposeBacklog = tspmIpcClient.onBacklogTopic(id, (log: any) => {
if (log.runId && log.runId !== lastRunId) {
console.log(`[INFO] Detected process restart (runId changed).`);
lastSeq = -1;
lastRunId = log.runId;
}
if (log.seq !== undefined && log.seq <= lastSeq) return;
if (log.seq !== undefined && log.seq > lastSeq + 1) {
console.log(
`[WARNING] Log gap detected: expected seq ${lastSeq + 1}, got ${log.seq}`,
);
}
printLog({ ...log, timestamp: new Date(log.timestamp) });
if (log.seq !== undefined) lastSeq = log.seq;
});
await tspmIpcClient.requestLogsBacklogStream(id, { lines: sinceTime ? undefined : lines, sinceTime, types: typesFilter });
// Dispose backlog handler after a short grace (backlog is finite)
setTimeout(() => disposeBacklog(), 10000);
} catch {}
await withStreamingLifecycle(
async () => {
await tspmIpcClient.subscribe(id, (log: any) => {
@@ -79,14 +157,7 @@ export function registerLogsCommand(smartcli: plugins.smartcli.Smartcli) {
`[WARNING] Log gap detected: expected seq ${lastSeq + 1}, got ${log.seq}`,
);
}
const timestamp = new Date(log.timestamp).toLocaleTimeString();
const prefix =
log.type === 'stdout'
? '[OUT]'
: log.type === 'stderr'
? '[ERR]'
: '[SYS]';
console.log(`${timestamp} ${prefix} ${log.message}`);
printLog(log);
if (log.seq !== undefined) lastSeq = log.seq;
});
},

View File

@@ -160,6 +160,55 @@ export class TspmIpcClient {
await this.ipcClient.subscribe(topic, handler);
}
/**
* Request backlog logs as a stream from the daemon.
* The actual stream will be delivered via the 'stream' event.
*/
public async requestLogsBacklogStream(
processId: ProcessId | number | string,
opts: { lines?: number; sinceTime?: number; types?: Array<'stdout' | 'stderr' | 'system'> } = {},
): Promise<void> {
if (!this.ipcClient || !this.isConnected) {
throw new Error('Not connected to daemon');
}
const id = toProcessId(processId);
await this.request('logs:subscribe' as any, {
id,
lines: opts.lines,
sinceTime: opts.sinceTime,
types: opts.types,
} as any);
}
/**
* Register a handler for incoming streams (e.g., backlog logs)
*/
public onStream(
handler: (info: any, readable: NodeJS.ReadableStream) => void,
): void {
if (!this.ipcClient) throw new Error('Not connected to daemon');
// smartipc emits 'stream' with (info, readable)
(this.ipcClient as any).on('stream', handler);
}
/**
* Register a temporary handler for backlog topic messages for a specific process
*/
public onBacklogTopic(
processId: ProcessId | number | string,
handler: (log: any) => void,
): () => void {
if (!this.ipcClient) throw new Error('Not connected to daemon');
const id = toProcessId(processId);
const topicType = `topic:logs.backlog.${id}`;
(this.ipcClient as any).onMessage(topicType, handler);
return () => {
try {
(this.ipcClient as any).messageHandlers?.delete?.(topicType);
} catch {}
};
}
/**
* Unsubscribe from log updates for a specific process
*/

View File

@@ -298,6 +298,54 @@ export class TspmDaemon {
},
);
// Stream backlog logs and let client subscribe to live topic separately
this.ipcServer.onMessage(
'logs:subscribe',
async (
request: RequestForMethod<'logs:subscribe'>,
clientId: string,
) => {
const id = toProcessId(request.id);
// Determine backlog set
const allLogs = await this.tspmInstance.getLogs(id);
let filtered = allLogs;
if (request.types && request.types.length) {
filtered = filtered.filter((l) => request.types!.includes(l.type));
}
if (request.sinceTime && request.sinceTime > 0) {
filtered = filtered.filter(
(l) => new Date(l.timestamp).getTime() >= request.sinceTime!,
);
}
const lines = request.lines && request.lines > 0 ? request.lines : 0;
if (lines > 0 && filtered.length > lines) {
filtered = filtered.slice(-lines);
}
// Send backlog entries directly to the requesting client as topic messages
// in small batches to avoid overwhelming the transport or client.
const chunkSize = 200;
for (let i = 0; i < filtered.length; i += chunkSize) {
const chunk = filtered.slice(i, i + chunkSize);
await Promise.allSettled(
chunk.map((entry) =>
this.ipcServer.sendToClient(
clientId,
`topic:logs.backlog.${id}`,
{
...entry,
timestamp: new Date(entry.timestamp).getTime(),
},
),
),
);
// Yield a bit between chunks
await new Promise((r) => setTimeout(r, 5));
}
return { ok: true } as any;
},
);
// Resolve target (id:n | name:foo | numeric string) to ProcessId
this.ipcServer.onMessage(
'resolveTarget',

View File

@@ -139,6 +139,18 @@ export interface GetLogsResponse {
logs: IProcessLog[];
}
// Subscribe and stream backlog logs
export interface LogsSubscribeRequest {
id: ProcessId;
lines?: number; // number of backlog lines
sinceTime?: number; // ms epoch
types?: Array<IProcessLog['type']>;
}
export interface LogsSubscribeResponse {
ok: boolean;
}
// Start all command
export interface StartAllRequest {
// No parameters needed
@@ -274,6 +286,7 @@ export type IpcMethodMap = {
list: { request: ListRequest; response: ListResponse };
describe: { request: DescribeRequest; response: DescribeResponse };
getLogs: { request: GetLogsRequest; response: GetLogsResponse };
'logs:subscribe': { request: LogsSubscribeRequest; response: LogsSubscribeResponse };
startAll: { request: StartAllRequest; response: StartAllResponse };
stopAll: { request: StopAllRequest; response: StopAllResponse };
restartAll: { request: RestartAllRequest; response: RestartAllResponse };