Files
taskbuffer/ts/taskbuffer.classes.servicemanager.ts
Juergen Kunz e91e782113 feat(service): add Service and ServiceManager for component lifecycle management
Adds two new classes:
- Service: long-running component with start/stop lifecycle, health checks, builder pattern and subclass support
- ServiceManager: orchestrates multiple services with dependency-ordered startup, failure isolation, retry with backoff, and reverse-order shutdown
2026-03-20 15:24:12 +00:00

406 lines
13 KiB
TypeScript

import * as plugins from './taskbuffer.plugins.js';
import { logger } from './taskbuffer.logging.js';
import { Service } from './taskbuffer.classes.service.js';
import type {
IServiceEvent,
IServiceStatus,
IServiceManagerOptions,
IServiceManagerHealth,
IServiceOptions,
IRetryConfig,
TOverallHealth,
} from './taskbuffer.interfaces.js';
/**
* ServiceManager orchestrates multiple Service instances with:
* - Dependency-ordered startup (topological sort, level-by-level parallel)
* - Failure isolation (optional services don't crash the system)
* - Retry with exponential backoff for failed optional services
* - Reverse-dependency-ordered shutdown
* - Aggregated health status
*/
export class ServiceManager {
public readonly name: string;
public readonly serviceSubject = new plugins.smartrx.rxjs.Subject<IServiceEvent>();
private services = new Map<string, Service>();
private startupOrder: string[][] = []; // levels of service names
private options: Required<IServiceManagerOptions>;
private _startedAt: number | undefined;
private subscriptions: plugins.smartrx.rxjs.Subscription[] = [];
constructor(options?: IServiceManagerOptions) {
this.name = options?.name || 'ServiceManager';
this.options = {
name: this.name,
defaultRetry: options?.defaultRetry || { maxRetries: 3, baseDelayMs: 1000, maxDelayMs: 30000, backoffFactor: 2 },
defaultHealthCheck: options?.defaultHealthCheck || {},
startupTimeoutMs: options?.startupTimeoutMs ?? 120000,
shutdownTimeoutMs: options?.shutdownTimeoutMs ?? 30000,
};
}
// ── Service Registration ─────────────────────────────
public addService(service: Service): void {
if (this.services.has(service.name)) {
throw new Error(`Service '${service.name}' is already registered`);
}
this.services.set(service.name, service);
// Forward service events to the aggregated subject
const sub = service.eventSubject.subscribe((event) => {
this.serviceSubject.next(event);
});
this.subscriptions.push(sub);
}
public addServiceFromOptions<T>(options: IServiceOptions<T>): Service<T> {
const service = new Service<T>(options);
this.addService(service);
return service;
}
public removeService(name: string): void {
// Check no other service depends on this one
for (const [svcName, svc] of this.services) {
if (svc.dependencies.includes(name)) {
throw new Error(`Cannot remove service '${name}': service '${svcName}' depends on it`);
}
}
this.services.delete(name);
}
// ── Lifecycle ────────────────────────────────────────
public async start(): Promise<void> {
// Build startup order via topological sort
this.startupOrder = this.topologicalSort();
this._startedAt = Date.now();
const startedServices: string[] = [];
logger.log('info', `${this.name}: starting ${this.services.size} services in ${this.startupOrder.length} levels`);
for (let levelIdx = 0; levelIdx < this.startupOrder.length; levelIdx++) {
const level = this.startupOrder[levelIdx];
logger.log('info', `${this.name}: starting level ${levelIdx}: [${level.join(', ')}]`);
const results = await Promise.allSettled(
level.map(async (name) => {
const service = this.services.get(name)!;
try {
await this.startServiceWithRetry(service);
startedServices.push(name);
} catch (err) {
if (service.criticality === 'critical') {
throw err;
}
// Optional service — log and continue
logger.log('warn', `${this.name}: optional service '${name}' failed to start: ${err instanceof Error ? err.message : String(err)}`);
}
}),
);
// Check if any critical service failed
for (let i = 0; i < results.length; i++) {
const result = results[i];
if (result.status === 'rejected') {
const name = level[i];
const service = this.services.get(name);
if (service && service.criticality === 'critical') {
logger.log('error', `${this.name}: critical service '${name}' failed, aborting startup`);
// Rollback: stop all started services in reverse order
await this.stopServices(startedServices.reverse());
throw new Error(`Critical service '${name}' failed to start: ${result.reason instanceof Error ? result.reason.message : String(result.reason)}`);
}
}
}
}
const statuses = this.getAllStatuses();
const running = statuses.filter((s) => s.state === 'running').length;
const failed = statuses.filter((s) => s.state === 'failed').length;
logger.log('info', `${this.name}: startup complete — ${running} running, ${failed} failed`);
}
public async stop(): Promise<void> {
logger.log('info', `${this.name}: stopping all services`);
// Stop in reverse startup order
const reversedLevels = [...this.startupOrder].reverse();
for (const level of reversedLevels) {
const runningInLevel = level.filter((name) => {
const svc = this.services.get(name);
return svc && svc.state !== 'stopped';
});
if (runningInLevel.length === 0) continue;
await Promise.allSettled(
runningInLevel.map(async (name) => {
const service = this.services.get(name)!;
try {
await Promise.race([
service.stop(),
plugins.smartdelay.delayFor(this.options.shutdownTimeoutMs).then(() => {
throw new Error(`Timeout stopping service '${name}'`);
}),
]);
} catch (err) {
logger.log('warn', `${this.name}: error stopping '${name}': ${err instanceof Error ? err.message : String(err)}`);
}
}),
);
}
// Clean up subscriptions
for (const sub of this.subscriptions) {
sub.unsubscribe();
}
this.subscriptions = [];
this._startedAt = undefined;
logger.log('info', `${this.name}: all services stopped`);
}
// ── Querying ─────────────────────────────────────────
public getService(name: string): Service | undefined {
return this.services.get(name);
}
public getServiceStatus(name: string): IServiceStatus | undefined {
return this.services.get(name)?.getStatus();
}
public getAllStatuses(): IServiceStatus[] {
return Array.from(this.services.values()).map((s) => s.getStatus());
}
public getHealth(): IServiceManagerHealth {
const statuses = this.getAllStatuses();
let overall: TOverallHealth = 'healthy';
const hasCriticalDown = statuses.some(
(s) => s.criticality === 'critical' && s.state !== 'running' && s.state !== 'stopped',
);
const hasAnyDown = statuses.some(
(s) => s.state !== 'running' && s.state !== 'stopped',
);
if (hasCriticalDown) {
overall = 'unhealthy';
} else if (hasAnyDown) {
overall = 'degraded';
}
return {
overall,
services: statuses,
startedAt: this._startedAt,
uptime: this._startedAt ? Date.now() - this._startedAt : undefined,
};
}
// ── Runtime Operations ───────────────────────────────
public async restartService(name: string): Promise<void> {
const service = this.services.get(name);
if (!service) {
throw new Error(`Service '${name}' not found`);
}
// Find all transitive dependents
const dependents = this.getTransitiveDependents(name);
// Stop dependents in reverse dependency order
const dependentLevels = this.getLevelsForServices(dependents).reverse();
for (const level of dependentLevels) {
await Promise.allSettled(
level.map((depName) => {
const svc = this.services.get(depName);
return svc && svc.state !== 'stopped' ? svc.stop() : Promise.resolve();
}),
);
}
// Stop the target service
await service.stop();
// Start the target service
await this.startServiceWithRetry(service);
// Restart dependents in dependency order
const dependentLevelsForward = this.getLevelsForServices(dependents);
for (const level of dependentLevelsForward) {
await Promise.allSettled(
level.map(async (depName) => {
const svc = this.services.get(depName)!;
try {
await this.startServiceWithRetry(svc);
} catch (err) {
logger.log('warn', `${this.name}: failed to restart dependent '${depName}': ${err instanceof Error ? err.message : String(err)}`);
}
}),
);
}
}
// ── Internal helpers ─────────────────────────────────
private async startServiceWithRetry(service: Service): Promise<void> {
const retryConfig = service.retryConfig || this.options.defaultRetry;
const maxRetries = retryConfig.maxRetries ?? 3;
const baseDelay = retryConfig.baseDelayMs ?? 1000;
const maxDelay = retryConfig.maxDelayMs ?? 30000;
const factor = retryConfig.backoffFactor ?? 2;
let lastError: Error | undefined;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
await service.start();
return;
} catch (err) {
lastError = err instanceof Error ? err : new Error(String(err));
service.retryCount = attempt + 1;
if (attempt < maxRetries) {
const delay = Math.min(baseDelay * Math.pow(factor, attempt), maxDelay);
const jitter = 0.8 + Math.random() * 0.4;
const actualDelay = Math.floor(delay * jitter);
service.eventSubject.next({
type: 'retrying',
serviceName: service.name,
state: service.state,
timestamp: Date.now(),
error: lastError.message,
attempt: attempt + 1,
});
logger.log('info', `${this.name}: retrying '${service.name}' in ${actualDelay}ms (attempt ${attempt + 1}/${maxRetries})`);
await plugins.smartdelay.delayFor(actualDelay);
}
}
}
throw lastError!;
}
private async stopServices(names: string[]): Promise<void> {
await Promise.allSettled(
names.map(async (name) => {
const service = this.services.get(name);
if (service && service.state !== 'stopped') {
try {
await service.stop();
} catch (err) {
logger.log('warn', `${this.name}: error stopping '${name}' during rollback: ${err instanceof Error ? err.message : String(err)}`);
}
}
}),
);
}
/**
* Topological sort using Kahn's algorithm.
* Returns levels of service names — services within a level can start in parallel.
*/
private topologicalSort(): string[][] {
const names = new Set(this.services.keys());
const inDegree = new Map<string, number>();
const dependents = new Map<string, string[]>(); // dep -> services that depend on it
// Initialize
for (const name of names) {
inDegree.set(name, 0);
dependents.set(name, []);
}
// Build graph
for (const [name, service] of this.services) {
for (const dep of service.dependencies) {
if (!names.has(dep)) {
throw new Error(`Service '${name}' depends on '${dep}', which is not registered`);
}
inDegree.set(name, (inDegree.get(name) || 0) + 1);
dependents.get(dep)!.push(name);
}
}
// Process level by level
const levels: string[][] = [];
const remaining = new Set(names);
while (remaining.size > 0) {
const level: string[] = [];
for (const name of remaining) {
if ((inDegree.get(name) || 0) === 0) {
level.push(name);
}
}
if (level.length === 0) {
// Cycle detected
const cycleNodes = Array.from(remaining).join(', ');
throw new Error(`Circular dependency detected involving services: ${cycleNodes}`);
}
// Remove this level's nodes and update in-degrees
for (const name of level) {
remaining.delete(name);
for (const dependent of dependents.get(name) || []) {
inDegree.set(dependent, (inDegree.get(dependent) || 0) - 1);
}
}
levels.push(level);
}
return levels;
}
/**
* Get all services that transitively depend on the given service.
*/
private getTransitiveDependents(name: string): string[] {
const result = new Set<string>();
const queue = [name];
while (queue.length > 0) {
const current = queue.shift()!;
for (const [svcName, svc] of this.services) {
if (svc.dependencies.includes(current) && !result.has(svcName)) {
result.add(svcName);
queue.push(svcName);
}
}
}
return Array.from(result);
}
/**
* Organize a set of service names into dependency-ordered levels.
*/
private getLevelsForServices(names: string[]): string[][] {
if (names.length === 0) return [];
const nameSet = new Set(names);
const levels: string[][] = [];
// Simple approach: filter the global startup order to include only the given names
for (const level of this.startupOrder) {
const filtered = level.filter((n) => nameSet.has(n));
if (filtered.length > 0) {
levels.push(filtered);
}
}
return levels;
}
}