feat(daemon): Reorganize and refactor core into client/daemon/shared modules; add IPC protocol and tests
This commit is contained in:
462
ts/daemon/tspm.daemon.ts
Normal file
462
ts/daemon/tspm.daemon.ts
Normal file
@@ -0,0 +1,462 @@
|
||||
import * as plugins from '../plugins.js';
|
||||
import * as paths from '../paths.js';
|
||||
import { ProcessManager } from './processmanager.js';
|
||||
import type {
|
||||
IpcMethodMap,
|
||||
RequestForMethod,
|
||||
ResponseForMethod,
|
||||
DaemonStatusResponse,
|
||||
HeartbeatResponse,
|
||||
} from '../shared/protocol/ipc.types.js';
|
||||
|
||||
/**
|
||||
* Central daemon server that manages all TSPM processes
|
||||
*/
|
||||
export class TspmDaemon {
|
||||
private tspmInstance: ProcessManager;
|
||||
private ipcServer: plugins.smartipc.IpcServer;
|
||||
private startTime: number;
|
||||
private isShuttingDown: boolean = false;
|
||||
private socketPath: string;
|
||||
private heartbeatInterval: NodeJS.Timeout | null = null;
|
||||
private daemonPidFile: string;
|
||||
|
||||
constructor() {
|
||||
this.tspmInstance = new ProcessManager();
|
||||
this.socketPath = plugins.path.join(paths.tspmDir, 'tspm.sock');
|
||||
this.daemonPidFile = plugins.path.join(paths.tspmDir, 'daemon.pid');
|
||||
this.startTime = Date.now();
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the daemon server
|
||||
*/
|
||||
public async start(): Promise<void> {
|
||||
console.log('Starting TSPM daemon...');
|
||||
|
||||
// Ensure the TSPM directory exists
|
||||
const fs = await import('fs/promises');
|
||||
await fs.mkdir(paths.tspmDir, { recursive: true });
|
||||
|
||||
// Check if another daemon is already running
|
||||
if (await this.isDaemonRunning()) {
|
||||
throw new Error('Another TSPM daemon instance is already running');
|
||||
}
|
||||
|
||||
// Initialize IPC server
|
||||
this.ipcServer = plugins.smartipc.SmartIpc.createServer({
|
||||
id: 'tspm-daemon',
|
||||
socketPath: this.socketPath,
|
||||
autoCleanupSocketFile: true, // Clean up stale sockets
|
||||
socketMode: 0o600, // Set proper permissions
|
||||
heartbeat: true,
|
||||
heartbeatInterval: 5000,
|
||||
heartbeatTimeout: 20000,
|
||||
heartbeatInitialGracePeriodMs: 10000, // Grace period for startup
|
||||
});
|
||||
|
||||
// Register message handlers
|
||||
this.registerHandlers();
|
||||
|
||||
// Start the IPC server and wait until ready to accept connections
|
||||
await this.ipcServer.start({ readyWhen: 'accepting' });
|
||||
|
||||
// Write PID file
|
||||
await this.writePidFile();
|
||||
|
||||
// Start heartbeat monitoring
|
||||
this.startHeartbeatMonitoring();
|
||||
|
||||
// Load existing process configurations
|
||||
await this.tspmInstance.loadProcessConfigs();
|
||||
|
||||
// Set up log publishing
|
||||
this.tspmInstance.on('process:log', ({ processId, log }) => {
|
||||
// Publish to topic for this process
|
||||
const topic = `logs.${processId}`;
|
||||
// Broadcast to all connected clients subscribed to this topic
|
||||
if (this.ipcServer) {
|
||||
this.ipcServer.broadcast(`topic:${topic}`, log);
|
||||
}
|
||||
});
|
||||
|
||||
// Set up graceful shutdown handlers
|
||||
this.setupShutdownHandlers();
|
||||
|
||||
console.log(`TSPM daemon started successfully on ${this.socketPath}`);
|
||||
console.log(`PID: ${process.pid}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Register all IPC message handlers
|
||||
*/
|
||||
private registerHandlers(): void {
|
||||
// Process management handlers
|
||||
this.ipcServer.onMessage(
|
||||
'start',
|
||||
async (request: RequestForMethod<'start'>) => {
|
||||
try {
|
||||
await this.tspmInstance.start(request.config);
|
||||
const processInfo = this.tspmInstance.processInfo.get(
|
||||
request.config.id,
|
||||
);
|
||||
return {
|
||||
processId: request.config.id,
|
||||
pid: processInfo?.pid,
|
||||
status: processInfo?.status || 'stopped',
|
||||
};
|
||||
} catch (error) {
|
||||
throw new Error(`Failed to start process: ${error.message}`);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
this.ipcServer.onMessage(
|
||||
'stop',
|
||||
async (request: RequestForMethod<'stop'>) => {
|
||||
try {
|
||||
await this.tspmInstance.stop(request.id);
|
||||
return {
|
||||
success: true,
|
||||
message: `Process ${request.id} stopped successfully`,
|
||||
};
|
||||
} catch (error) {
|
||||
throw new Error(`Failed to stop process: ${error.message}`);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
this.ipcServer.onMessage(
|
||||
'restart',
|
||||
async (request: RequestForMethod<'restart'>) => {
|
||||
try {
|
||||
await this.tspmInstance.restart(request.id);
|
||||
const processInfo = this.tspmInstance.processInfo.get(request.id);
|
||||
return {
|
||||
processId: request.id,
|
||||
pid: processInfo?.pid,
|
||||
status: processInfo?.status || 'stopped',
|
||||
};
|
||||
} catch (error) {
|
||||
throw new Error(`Failed to restart process: ${error.message}`);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
this.ipcServer.onMessage(
|
||||
'delete',
|
||||
async (request: RequestForMethod<'delete'>) => {
|
||||
try {
|
||||
await this.tspmInstance.delete(request.id);
|
||||
return {
|
||||
success: true,
|
||||
message: `Process ${request.id} deleted successfully`,
|
||||
};
|
||||
} catch (error) {
|
||||
throw new Error(`Failed to delete process: ${error.message}`);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Query handlers
|
||||
this.ipcServer.onMessage(
|
||||
'list',
|
||||
async (request: RequestForMethod<'list'>) => {
|
||||
const processes = await this.tspmInstance.list();
|
||||
return { processes };
|
||||
},
|
||||
);
|
||||
|
||||
this.ipcServer.onMessage(
|
||||
'describe',
|
||||
async (request: RequestForMethod<'describe'>) => {
|
||||
const processInfo = await this.tspmInstance.describe(request.id);
|
||||
const config = this.tspmInstance.processConfigs.get(request.id);
|
||||
|
||||
if (!processInfo || !config) {
|
||||
throw new Error(`Process ${request.id} not found`);
|
||||
}
|
||||
|
||||
return {
|
||||
processInfo,
|
||||
config,
|
||||
};
|
||||
},
|
||||
);
|
||||
|
||||
this.ipcServer.onMessage(
|
||||
'getLogs',
|
||||
async (request: RequestForMethod<'getLogs'>) => {
|
||||
const logs = await this.tspmInstance.getLogs(request.id);
|
||||
return { logs };
|
||||
},
|
||||
);
|
||||
|
||||
// Batch operations handlers
|
||||
this.ipcServer.onMessage(
|
||||
'startAll',
|
||||
async (request: RequestForMethod<'startAll'>) => {
|
||||
const started: string[] = [];
|
||||
const failed: Array<{ id: string; error: string }> = [];
|
||||
|
||||
await this.tspmInstance.startAll();
|
||||
|
||||
// Get status of all processes
|
||||
for (const [id, processInfo] of this.tspmInstance.processInfo) {
|
||||
if (processInfo.status === 'online') {
|
||||
started.push(id);
|
||||
} else {
|
||||
failed.push({ id, error: 'Failed to start' });
|
||||
}
|
||||
}
|
||||
|
||||
return { started, failed };
|
||||
},
|
||||
);
|
||||
|
||||
this.ipcServer.onMessage(
|
||||
'stopAll',
|
||||
async (request: RequestForMethod<'stopAll'>) => {
|
||||
const stopped: string[] = [];
|
||||
const failed: Array<{ id: string; error: string }> = [];
|
||||
|
||||
await this.tspmInstance.stopAll();
|
||||
|
||||
// Get status of all processes
|
||||
for (const [id, processInfo] of this.tspmInstance.processInfo) {
|
||||
if (processInfo.status === 'stopped') {
|
||||
stopped.push(id);
|
||||
} else {
|
||||
failed.push({ id, error: 'Failed to stop' });
|
||||
}
|
||||
}
|
||||
|
||||
return { stopped, failed };
|
||||
},
|
||||
);
|
||||
|
||||
this.ipcServer.onMessage(
|
||||
'restartAll',
|
||||
async (request: RequestForMethod<'restartAll'>) => {
|
||||
const restarted: string[] = [];
|
||||
const failed: Array<{ id: string; error: string }> = [];
|
||||
|
||||
await this.tspmInstance.restartAll();
|
||||
|
||||
// Get status of all processes
|
||||
for (const [id, processInfo] of this.tspmInstance.processInfo) {
|
||||
if (processInfo.status === 'online') {
|
||||
restarted.push(id);
|
||||
} else {
|
||||
failed.push({ id, error: 'Failed to restart' });
|
||||
}
|
||||
}
|
||||
|
||||
return { restarted, failed };
|
||||
},
|
||||
);
|
||||
|
||||
// Daemon management handlers
|
||||
this.ipcServer.onMessage(
|
||||
'daemon:status',
|
||||
async (request: RequestForMethod<'daemon:status'>) => {
|
||||
const memUsage = process.memoryUsage();
|
||||
return {
|
||||
status: 'running',
|
||||
pid: process.pid,
|
||||
uptime: Date.now() - this.startTime,
|
||||
processCount: this.tspmInstance.processes.size,
|
||||
memoryUsage: memUsage.heapUsed,
|
||||
cpuUsage: process.cpuUsage().user / 1000000, // Convert to seconds
|
||||
};
|
||||
},
|
||||
);
|
||||
|
||||
this.ipcServer.onMessage(
|
||||
'daemon:shutdown',
|
||||
async (request: RequestForMethod<'daemon:shutdown'>) => {
|
||||
if (this.isShuttingDown) {
|
||||
return {
|
||||
success: false,
|
||||
message: 'Daemon is already shutting down',
|
||||
};
|
||||
}
|
||||
|
||||
// Schedule shutdown
|
||||
const graceful = request.graceful !== false;
|
||||
const timeout = request.timeout || 10000;
|
||||
|
||||
if (graceful) {
|
||||
setTimeout(() => this.shutdown(true), 100);
|
||||
} else {
|
||||
setTimeout(() => this.shutdown(false), 100);
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
message: `Daemon will shutdown ${graceful ? 'gracefully' : 'immediately'} in ${timeout}ms`,
|
||||
};
|
||||
},
|
||||
);
|
||||
|
||||
// Heartbeat handler
|
||||
this.ipcServer.onMessage(
|
||||
'heartbeat',
|
||||
async (request: RequestForMethod<'heartbeat'>) => {
|
||||
return {
|
||||
timestamp: Date.now(),
|
||||
status: this.isShuttingDown ? 'degraded' : 'healthy',
|
||||
};
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start heartbeat monitoring
|
||||
*/
|
||||
private startHeartbeatMonitoring(): void {
|
||||
// Send heartbeat every 30 seconds
|
||||
this.heartbeatInterval = setInterval(() => {
|
||||
// This is where we could implement health checks
|
||||
// For now, just log that the daemon is alive
|
||||
const uptime = Math.floor((Date.now() - this.startTime) / 1000);
|
||||
console.log(
|
||||
`[Heartbeat] Daemon alive - Uptime: ${uptime}s, Processes: ${this.tspmInstance.processes.size}`,
|
||||
);
|
||||
}, 30000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up graceful shutdown handlers
|
||||
*/
|
||||
private setupShutdownHandlers(): void {
|
||||
const shutdownHandler = async (signal: string) => {
|
||||
console.log(`\nReceived ${signal}, initiating graceful shutdown...`);
|
||||
await this.shutdown(true);
|
||||
};
|
||||
|
||||
process.on('SIGTERM', () => shutdownHandler('SIGTERM'));
|
||||
process.on('SIGINT', () => shutdownHandler('SIGINT'));
|
||||
process.on('SIGHUP', () => shutdownHandler('SIGHUP'));
|
||||
|
||||
// Handle uncaught errors
|
||||
process.on('uncaughtException', (error) => {
|
||||
console.error('Uncaught exception:', error);
|
||||
this.shutdown(false);
|
||||
});
|
||||
|
||||
process.on('unhandledRejection', (reason, promise) => {
|
||||
console.error('Unhandled rejection at:', promise, 'reason:', reason);
|
||||
// Don't exit on unhandled rejection, just log it
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown the daemon
|
||||
*/
|
||||
public async shutdown(graceful: boolean = true): Promise<void> {
|
||||
if (this.isShuttingDown) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.isShuttingDown = true;
|
||||
console.log('Shutting down TSPM daemon...');
|
||||
|
||||
// Clear heartbeat interval
|
||||
if (this.heartbeatInterval) {
|
||||
clearInterval(this.heartbeatInterval);
|
||||
this.heartbeatInterval = null;
|
||||
}
|
||||
|
||||
if (graceful) {
|
||||
// Stop all processes gracefully
|
||||
try {
|
||||
console.log('Stopping all managed processes...');
|
||||
await this.tspmInstance.stopAll();
|
||||
} catch (error) {
|
||||
console.error('Error stopping processes:', error);
|
||||
}
|
||||
}
|
||||
|
||||
// Stop IPC server
|
||||
if (this.ipcServer) {
|
||||
try {
|
||||
await this.ipcServer.stop();
|
||||
} catch (error) {
|
||||
console.error('Error stopping IPC server:', error);
|
||||
}
|
||||
}
|
||||
|
||||
// Remove PID file
|
||||
await this.removePidFile();
|
||||
|
||||
// Remove socket file if it exists
|
||||
try {
|
||||
const fs = await import('fs');
|
||||
await fs.promises.unlink(this.socketPath).catch(() => {});
|
||||
} catch (error) {
|
||||
// Ignore errors
|
||||
}
|
||||
|
||||
console.log('TSPM daemon shutdown complete');
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if another daemon instance is running
|
||||
*/
|
||||
private async isDaemonRunning(): Promise<boolean> {
|
||||
try {
|
||||
const fs = await import('fs');
|
||||
const pidContent = await fs.promises.readFile(
|
||||
this.daemonPidFile,
|
||||
'utf-8',
|
||||
);
|
||||
const pid = parseInt(pidContent.trim(), 10);
|
||||
|
||||
// Check if process is running
|
||||
try {
|
||||
process.kill(pid, 0);
|
||||
return true; // Process exists
|
||||
} catch {
|
||||
// Process doesn't exist, clean up stale PID file
|
||||
await this.removePidFile();
|
||||
return false;
|
||||
}
|
||||
} catch {
|
||||
// PID file doesn't exist
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write the daemon PID to a file
|
||||
*/
|
||||
private async writePidFile(): Promise<void> {
|
||||
const fs = await import('fs');
|
||||
await fs.promises.writeFile(this.daemonPidFile, process.pid.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the daemon PID file
|
||||
*/
|
||||
private async removePidFile(): Promise<void> {
|
||||
try {
|
||||
const fs = await import('fs');
|
||||
await fs.promises.unlink(this.daemonPidFile);
|
||||
} catch {
|
||||
// Ignore if file doesn't exist
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Main entry point for the daemon
|
||||
*/
|
||||
export const startDaemon = async (): Promise<void> => {
|
||||
const daemon = new TspmDaemon();
|
||||
await daemon.start();
|
||||
|
||||
// Keep the process alive
|
||||
await new Promise(() => {});
|
||||
};
|
Reference in New Issue
Block a user