import * as plugins from './taskbuffer.plugins.js'; import { logger } from './taskbuffer.logging.js'; import { Service } from './taskbuffer.classes.service.js'; import type { IServiceEvent, IServiceStatus, IServiceManagerOptions, IServiceManagerHealth, IServiceOptions, IRetryConfig, TOverallHealth, } from './taskbuffer.interfaces.js'; /** * ServiceManager orchestrates multiple Service instances with: * - Dependency-ordered startup (topological sort, level-by-level parallel) * - Failure isolation (optional services don't crash the system) * - Retry with exponential backoff for failed optional services * - Reverse-dependency-ordered shutdown * - Aggregated health status */ export class ServiceManager { public readonly name: string; public readonly serviceSubject = new plugins.smartrx.rxjs.Subject(); private services = new Map(); private startupOrder: string[][] = []; // levels of service names private options: Required; private _startedAt: number | undefined; private subscriptions: plugins.smartrx.rxjs.Subscription[] = []; constructor(options?: IServiceManagerOptions) { this.name = options?.name || 'ServiceManager'; this.options = { name: this.name, defaultRetry: options?.defaultRetry || { maxRetries: 3, baseDelayMs: 1000, maxDelayMs: 30000, backoffFactor: 2 }, defaultHealthCheck: options?.defaultHealthCheck || {}, startupTimeoutMs: options?.startupTimeoutMs ?? 120000, shutdownTimeoutMs: options?.shutdownTimeoutMs ?? 30000, }; } // ── Service Registration ───────────────────────────── public addService(service: Service): void { if (this.services.has(service.name)) { throw new Error(`Service '${service.name}' is already registered`); } this.services.set(service.name, service); // Forward service events to the aggregated subject const sub = service.eventSubject.subscribe((event) => { this.serviceSubject.next(event); }); this.subscriptions.push(sub); } public addServiceFromOptions(options: IServiceOptions): Service { const service = new Service(options); this.addService(service); return service; } public removeService(name: string): void { // Check no other service depends on this one for (const [svcName, svc] of this.services) { if (svc.dependencies.includes(name)) { throw new Error(`Cannot remove service '${name}': service '${svcName}' depends on it`); } } this.services.delete(name); } // ── Lifecycle ──────────────────────────────────────── public async start(): Promise { // Build startup order via topological sort this.startupOrder = this.topologicalSort(); this._startedAt = Date.now(); const startupPromise = this.startAllLevels(); // Enforce global startup timeout if (this.options.startupTimeoutMs) { await Promise.race([ startupPromise, plugins.smartdelay.delayFor(this.options.startupTimeoutMs).then(() => { throw new Error(`${this.name}: global startup timeout exceeded (${this.options.startupTimeoutMs}ms)`); }), ]); } else { await startupPromise; } } private async startAllLevels(): Promise { const startedServices: string[] = []; logger.log('info', `${this.name}: starting ${this.services.size} services in ${this.startupOrder.length} levels`); for (let levelIdx = 0; levelIdx < this.startupOrder.length; levelIdx++) { const level = this.startupOrder[levelIdx]; logger.log('info', `${this.name}: starting level ${levelIdx}: [${level.join(', ')}]`); const results = await Promise.allSettled( level.map(async (name) => { const service = this.services.get(name)!; try { await this.startServiceWithRetry(service); startedServices.push(name); } catch (err) { if (service.criticality === 'critical') { throw err; } // Optional service — log and continue logger.log('warn', `${this.name}: optional service '${name}' failed to start: ${err instanceof Error ? err.message : String(err)}`); } }), ); // Check if any critical service failed for (let i = 0; i < results.length; i++) { const result = results[i]; if (result.status === 'rejected') { const name = level[i]; const service = this.services.get(name); if (service && service.criticality === 'critical') { logger.log('error', `${this.name}: critical service '${name}' failed, aborting startup`); // Rollback: stop all started services in reverse order await this.stopServices(startedServices.reverse()); throw new Error(`Critical service '${name}' failed to start: ${result.reason instanceof Error ? result.reason.message : String(result.reason)}`); } } } } const statuses = this.getAllStatuses(); const running = statuses.filter((s) => s.state === 'running').length; const failed = statuses.filter((s) => s.state === 'failed').length; logger.log('info', `${this.name}: startup complete — ${running} running, ${failed} failed`); } public async stop(): Promise { logger.log('info', `${this.name}: stopping all services`); // Stop in reverse startup order const reversedLevels = [...this.startupOrder].reverse(); for (const level of reversedLevels) { const runningInLevel = level.filter((name) => { const svc = this.services.get(name); return svc && svc.state !== 'stopped'; }); if (runningInLevel.length === 0) continue; await Promise.allSettled( runningInLevel.map(async (name) => { const service = this.services.get(name)!; try { await Promise.race([ service.stop(), plugins.smartdelay.delayFor(this.options.shutdownTimeoutMs).then(() => { throw new Error(`Timeout stopping service '${name}'`); }), ]); } catch (err) { logger.log('warn', `${this.name}: error stopping '${name}': ${err instanceof Error ? err.message : String(err)}`); } }), ); } // Clean up subscriptions for (const sub of this.subscriptions) { sub.unsubscribe(); } this.subscriptions = []; this._startedAt = undefined; logger.log('info', `${this.name}: all services stopped`); } // ── Querying ───────────────────────────────────────── public getService(name: string): Service | undefined { return this.services.get(name); } public getServiceStatus(name: string): IServiceStatus | undefined { return this.services.get(name)?.getStatus(); } public getAllStatuses(): IServiceStatus[] { return Array.from(this.services.values()).map((s) => s.getStatus()); } public getServicesByLabel(key: string, value: string): Service[] { return Array.from(this.services.values()).filter((s) => s.labels[key] === value); } public getServicesStatusByLabel(key: string, value: string): IServiceStatus[] { return this.getServicesByLabel(key, value).map((s) => s.getStatus()); } public getHealth(): IServiceManagerHealth { const statuses = this.getAllStatuses(); let overall: TOverallHealth = 'healthy'; const hasCriticalDown = statuses.some( (s) => s.criticality === 'critical' && s.state !== 'running' && s.state !== 'stopped', ); const hasAnyDown = statuses.some( (s) => s.state !== 'running' && s.state !== 'stopped', ); if (hasCriticalDown) { overall = 'unhealthy'; } else if (hasAnyDown) { overall = 'degraded'; } return { overall, services: statuses, startedAt: this._startedAt, uptime: this._startedAt ? Date.now() - this._startedAt : undefined, }; } // ── Runtime Operations ─────────────────────────────── public async restartService(name: string): Promise { const service = this.services.get(name); if (!service) { throw new Error(`Service '${name}' not found`); } // Find all transitive dependents const dependents = this.getTransitiveDependents(name); // Stop dependents in reverse dependency order const dependentLevels = this.getLevelsForServices(dependents).reverse(); for (const level of dependentLevels) { await Promise.allSettled( level.map((depName) => { const svc = this.services.get(depName); return svc && svc.state !== 'stopped' ? svc.stop() : Promise.resolve(); }), ); } // Stop the target service await service.stop(); // Start the target service await this.startServiceWithRetry(service); // Restart dependents in dependency order const dependentLevelsForward = this.getLevelsForServices(dependents); for (const level of dependentLevelsForward) { await Promise.allSettled( level.map(async (depName) => { const svc = this.services.get(depName)!; try { await this.startServiceWithRetry(svc); } catch (err) { logger.log('warn', `${this.name}: failed to restart dependent '${depName}': ${err instanceof Error ? err.message : String(err)}`); } }), ); } } // ── Internal helpers ───────────────────────────────── private async startServiceWithRetry(service: Service): Promise { const retryConfig = service.retryConfig || this.options.defaultRetry; const maxRetries = retryConfig.maxRetries ?? 3; const baseDelay = retryConfig.baseDelayMs ?? 1000; const maxDelay = retryConfig.maxDelayMs ?? 30000; const factor = retryConfig.backoffFactor ?? 2; let lastError: Error | undefined; for (let attempt = 0; attempt <= maxRetries; attempt++) { try { await service.start(); return; } catch (err) { lastError = err instanceof Error ? err : new Error(String(err)); service.retryCount = attempt + 1; if (attempt < maxRetries) { const delay = Math.min(baseDelay * Math.pow(factor, attempt), maxDelay); const jitter = 0.8 + Math.random() * 0.4; const actualDelay = Math.floor(delay * jitter); service.eventSubject.next({ type: 'retrying', serviceName: service.name, state: service.state, timestamp: Date.now(), error: lastError.message, attempt: attempt + 1, }); logger.log('info', `${this.name}: retrying '${service.name}' in ${actualDelay}ms (attempt ${attempt + 1}/${maxRetries})`); await plugins.smartdelay.delayFor(actualDelay); } } } throw lastError!; } private async stopServices(names: string[]): Promise { await Promise.allSettled( names.map(async (name) => { const service = this.services.get(name); if (service && service.state !== 'stopped') { try { await service.stop(); } catch (err) { logger.log('warn', `${this.name}: error stopping '${name}' during rollback: ${err instanceof Error ? err.message : String(err)}`); } } }), ); } /** * Topological sort using Kahn's algorithm. * Returns levels of service names — services within a level can start in parallel. */ private topologicalSort(): string[][] { const names = new Set(this.services.keys()); const inDegree = new Map(); const dependents = new Map(); // dep -> services that depend on it // Initialize for (const name of names) { inDegree.set(name, 0); dependents.set(name, []); } // Build graph for (const [name, service] of this.services) { for (const dep of service.dependencies) { if (!names.has(dep)) { throw new Error(`Service '${name}' depends on '${dep}', which is not registered`); } inDegree.set(name, (inDegree.get(name) || 0) + 1); dependents.get(dep)!.push(name); } } // Process level by level const levels: string[][] = []; const remaining = new Set(names); while (remaining.size > 0) { const level: string[] = []; for (const name of remaining) { if ((inDegree.get(name) || 0) === 0) { level.push(name); } } if (level.length === 0) { // Cycle detected const cycleNodes = Array.from(remaining).join(', '); throw new Error(`Circular dependency detected involving services: ${cycleNodes}`); } // Remove this level's nodes and update in-degrees for (const name of level) { remaining.delete(name); for (const dependent of dependents.get(name) || []) { inDegree.set(dependent, (inDegree.get(dependent) || 0) - 1); } } levels.push(level); } return levels; } /** * Get all services that transitively depend on the given service. */ private getTransitiveDependents(name: string): string[] { const result = new Set(); const queue = [name]; while (queue.length > 0) { const current = queue.shift()!; for (const [svcName, svc] of this.services) { if (svc.dependencies.includes(current) && !result.has(svcName)) { result.add(svcName); queue.push(svcName); } } } return Array.from(result); } /** * Organize a set of service names into dependency-ordered levels. */ private getLevelsForServices(names: string[]): string[][] { if (names.length === 0) return []; const nameSet = new Set(names); const levels: string[][] = []; // Simple approach: filter the global startup order to include only the given names for (const level of this.startupOrder) { const filtered = level.filter((n) => nameSet.has(n)); if (filtered.length > 0) { levels.push(filtered); } } return levels; } }