From e91e782113e8b67d1a4ff0b4780d4f0f78aefff8 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Fri, 20 Mar 2026 15:24:12 +0000 Subject: [PATCH] feat(service): add Service and ServiceManager for component lifecycle management Adds two new classes: - Service: long-running component with start/stop lifecycle, health checks, builder pattern and subclass support - ServiceManager: orchestrates multiple services with dependency-ordered startup, failure isolation, retry with backoff, and reverse-order shutdown --- package.json | 2 +- test/test.14.servicemanager.ts | 517 ++++++++++++++++++++++++ ts/index.ts | 17 + ts/taskbuffer.classes.service.ts | 346 ++++++++++++++++ ts/taskbuffer.classes.servicemanager.ts | 405 +++++++++++++++++++ ts/taskbuffer.interfaces.ts | 97 +++++ 6 files changed, 1383 insertions(+), 1 deletion(-) create mode 100644 test/test.14.servicemanager.ts create mode 100644 ts/taskbuffer.classes.service.ts create mode 100644 ts/taskbuffer.classes.servicemanager.ts diff --git a/package.json b/package.json index 1361c6f..0b8f2f0 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@push.rocks/taskbuffer", - "version": "6.1.2", + "version": "7.0.0", "private": false, "description": "A flexible task management library supporting TypeScript, allowing for task buffering, scheduling, and execution with dependency management.", "main": "dist_ts/index.js", diff --git a/test/test.14.servicemanager.ts b/test/test.14.servicemanager.ts new file mode 100644 index 0000000..6726c37 --- /dev/null +++ b/test/test.14.servicemanager.ts @@ -0,0 +1,517 @@ +import { expect, tap } from '@git.zone/tstest/tapbundle'; +import * as taskbuffer from '../ts/index.js'; +import * as smartdelay from '@push.rocks/smartdelay'; + +// ── Test 1: Basic service start/stop lifecycle ───────────── + +tap.test('should start and stop a simple service', async () => { + let started = false; + let stopped = false; + + const service = new taskbuffer.Service('TestService') + .withStart(async () => { started = true; }) + .withStop(async () => { stopped = true; }); + + expect(service.state).toEqual('stopped'); + await service.start(); + expect(service.state).toEqual('running'); + expect(started).toBeTrue(); + + await service.stop(); + expect(service.state).toEqual('stopped'); + expect(stopped).toBeTrue(); +}); + +// ── Test 2: Builder pattern chaining ─────────────────────── + +tap.test('should support builder pattern chaining', async () => { + const service = new taskbuffer.Service('ChainedService') + .critical() + .dependsOn('Dep1', 'Dep2') + .withStart(async () => 'result') + .withStop(async () => {}) + .withRetry({ maxRetries: 5, baseDelayMs: 100 }); + + expect(service.criticality).toEqual('critical'); + expect(service.dependencies).toEqual(['Dep1', 'Dep2']); + expect(service.retryConfig).toBeTruthy(); + expect(service.retryConfig!.maxRetries).toEqual(5); +}); + +// ── Test 3: Subclass pattern ─────────────────────────────── + +tap.test('should support subclass pattern', async () => { + class MyService extends taskbuffer.Service { + public startCalled = false; + public stopCalled = false; + + constructor() { + super('MySubclassService'); + this.optional(); + } + + protected async serviceStart(): Promise { + this.startCalled = true; + return 'hello'; + } + + protected async serviceStop(): Promise { + this.stopCalled = true; + } + } + + const service = new MyService(); + const result = await service.start(); + expect(service.startCalled).toBeTrue(); + expect(service.state).toEqual('running'); + + await service.stop(); + expect(service.stopCalled).toBeTrue(); + expect(service.state).toEqual('stopped'); +}); + +// ── Test 4: Constructor with options object ──────────────── + +tap.test('should accept options object in constructor', async () => { + let started = false; + const service = new taskbuffer.Service({ + name: 'OptionsService', + criticality: 'critical', + dependencies: ['A', 'B'], + start: async () => { started = true; }, + stop: async () => {}, + retry: { maxRetries: 10 }, + }); + + expect(service.name).toEqual('OptionsService'); + expect(service.criticality).toEqual('critical'); + expect(service.dependencies).toEqual(['A', 'B']); + await service.start(); + expect(started).toBeTrue(); + await service.stop(); +}); + +// ── Test 5: ServiceManager dependency ordering ───────────── + +tap.test('should start services in dependency order', async () => { + const order: string[] = []; + + const manager = new taskbuffer.ServiceManager({ name: 'TestManager' }); + + manager.addService( + new taskbuffer.Service('C') + .dependsOn('B') + .withStart(async () => { order.push('C'); }) + .withStop(async () => {}), + ); + + manager.addService( + new taskbuffer.Service('A') + .withStart(async () => { order.push('A'); }) + .withStop(async () => {}), + ); + + manager.addService( + new taskbuffer.Service('B') + .dependsOn('A') + .withStart(async () => { order.push('B'); }) + .withStop(async () => {}), + ); + + await manager.start(); + + // A must come before B, B must come before C + expect(order.indexOf('A')).toBeLessThan(order.indexOf('B')); + expect(order.indexOf('B')).toBeLessThan(order.indexOf('C')); + + await manager.stop(); +}); + +// ── Test 6: Critical service failure aborts startup ──────── + +tap.test('should abort startup when a critical service fails', async () => { + const manager = new taskbuffer.ServiceManager({ name: 'CriticalTest' }); + let serviceCStopped = false; + + manager.addService( + new taskbuffer.Service('Working') + .critical() + .withStart(async () => {}) + .withStop(async () => {}), + ); + + manager.addService( + new taskbuffer.Service('Broken') + .critical() + .withStart(async () => { throw new Error('boom'); }) + .withStop(async () => {}) + .withRetry({ maxRetries: 0 }), + ); + + manager.addService( + new taskbuffer.Service('AfterBroken') + .dependsOn('Broken') + .withStart(async () => { serviceCStopped = true; }) + .withStop(async () => {}), + ); + + let caught = false; + try { + await manager.start(); + } catch (err) { + caught = true; + expect((err as Error).message).toInclude('Broken'); + } + + expect(caught).toBeTrue(); + // AfterBroken should never have started because Broken (its dep) failed + expect(serviceCStopped).toBeFalse(); +}); + +// ── Test 7: Optional service failure continues startup ───── + +tap.test('should continue startup when an optional service fails', async () => { + const manager = new taskbuffer.ServiceManager({ name: 'OptionalTest' }); + let criticalStarted = false; + + manager.addService( + new taskbuffer.Service('Critical') + .critical() + .withStart(async () => { criticalStarted = true; }) + .withStop(async () => {}) + .withRetry({ maxRetries: 0 }), + ); + + manager.addService( + new taskbuffer.Service('Optional') + .optional() + .withStart(async () => { throw new Error('oops'); }) + .withStop(async () => {}) + .withRetry({ maxRetries: 0 }), + ); + + // Should NOT throw + await manager.start(); + expect(criticalStarted).toBeTrue(); + + const health = manager.getHealth(); + expect(health.overall).toEqual('degraded'); + + const optionalStatus = manager.getServiceStatus('Optional'); + expect(optionalStatus!.state).toEqual('failed'); + + await manager.stop(); +}); + +// ── Test 8: Retry with backoff for optional services ─────── + +tap.test('should retry failed optional services', async () => { + const manager = new taskbuffer.ServiceManager({ name: 'RetryTest' }); + let attempts = 0; + + manager.addService( + new taskbuffer.Service('Flaky') + .optional() + .withStart(async () => { + attempts++; + if (attempts < 3) { + throw new Error(`attempt ${attempts} failed`); + } + }) + .withStop(async () => {}) + .withRetry({ maxRetries: 5, baseDelayMs: 50, maxDelayMs: 100, backoffFactor: 1 }), + ); + + await manager.start(); + expect(attempts).toEqual(3); + + const status = manager.getServiceStatus('Flaky'); + expect(status!.state).toEqual('running'); + + await manager.stop(); +}); + +// ── Test 9: Reverse-order shutdown ───────────────────────── + +tap.test('should stop services in reverse dependency order', async () => { + const order: string[] = []; + + const manager = new taskbuffer.ServiceManager({ name: 'ShutdownTest' }); + + manager.addService( + new taskbuffer.Service('Base') + .withStart(async () => {}) + .withStop(async () => { order.push('Base'); }), + ); + + manager.addService( + new taskbuffer.Service('Middle') + .dependsOn('Base') + .withStart(async () => {}) + .withStop(async () => { order.push('Middle'); }), + ); + + manager.addService( + new taskbuffer.Service('Top') + .dependsOn('Middle') + .withStart(async () => {}) + .withStop(async () => { order.push('Top'); }), + ); + + await manager.start(); + await manager.stop(); + + // Top should stop before Middle, Middle before Base + expect(order.indexOf('Top')).toBeLessThan(order.indexOf('Middle')); + expect(order.indexOf('Middle')).toBeLessThan(order.indexOf('Base')); +}); + +// ── Test 10: Circular dependency detection ───────────────── + +tap.test('should throw on circular dependency', async () => { + const manager = new taskbuffer.ServiceManager({ name: 'CycleTest' }); + + manager.addService( + new taskbuffer.Service('A') + .dependsOn('B') + .withStart(async () => {}) + .withStop(async () => {}), + ); + + manager.addService( + new taskbuffer.Service('B') + .dependsOn('A') + .withStart(async () => {}) + .withStop(async () => {}), + ); + + let caught = false; + try { + await manager.start(); + } catch (err) { + caught = true; + expect((err as Error).message).toInclude('Circular dependency'); + } + + expect(caught).toBeTrue(); +}); + +// ── Test 11: restartService cascades to dependents ───────── + +tap.test('should restart service and its dependents', async () => { + const startOrder: string[] = []; + const stopOrder: string[] = []; + + const manager = new taskbuffer.ServiceManager({ name: 'RestartTest' }); + + manager.addService( + new taskbuffer.Service('Base') + .withStart(async () => { startOrder.push('Base'); }) + .withStop(async () => { stopOrder.push('Base'); }) + .withRetry({ maxRetries: 0 }), + ); + + manager.addService( + new taskbuffer.Service('Dep') + .dependsOn('Base') + .withStart(async () => { startOrder.push('Dep'); }) + .withStop(async () => { stopOrder.push('Dep'); }) + .withRetry({ maxRetries: 0 }), + ); + + await manager.start(); + expect(startOrder).toEqual(['Base', 'Dep']); + + // Clear tracking + startOrder.length = 0; + stopOrder.length = 0; + + await manager.restartService('Base'); + + // Dep should be stopped first, then Base, then Base restarted, then Dep + expect(stopOrder).toEqual(['Dep', 'Base']); + expect(startOrder).toEqual(['Base', 'Dep']); + + await manager.stop(); +}); + +// ── Test 12: getHealth returns correct aggregated status ─── + +tap.test('should return correct health aggregation', async () => { + const manager = new taskbuffer.ServiceManager({ name: 'HealthTest' }); + + manager.addService( + new taskbuffer.Service('OK') + .critical() + .withStart(async () => {}) + .withStop(async () => {}) + .withRetry({ maxRetries: 0 }), + ); + + manager.addService( + new taskbuffer.Service('AlsoOK') + .optional() + .withStart(async () => {}) + .withStop(async () => {}) + .withRetry({ maxRetries: 0 }), + ); + + await manager.start(); + + const health = manager.getHealth(); + expect(health.overall).toEqual('healthy'); + expect(health.services.length).toEqual(2); + expect(health.startedAt).toBeTruthy(); + expect(health.uptime).toBeGreaterThanOrEqual(0); + + await manager.stop(); +}); + +// ── Test 13: Events emitted on state transitions ─────────── + +tap.test('should emit events on state transitions', async () => { + const events: taskbuffer.IServiceEvent[] = []; + + const manager = new taskbuffer.ServiceManager({ name: 'EventTest' }); + + manager.addService( + new taskbuffer.Service('Svc') + .withStart(async () => {}) + .withStop(async () => {}) + .withRetry({ maxRetries: 0 }), + ); + + manager.serviceSubject.subscribe((event) => { + events.push(event); + }); + + await manager.start(); + await manager.stop(); + + const types = events.map((e) => e.type); + expect(types).toContain('started'); + expect(types).toContain('stopped'); +}); + +// ── Test 14: Parallel startup of independent services ────── + +tap.test('should start independent services in parallel', async () => { + const manager = new taskbuffer.ServiceManager({ name: 'ParallelTest' }); + const startTimes: Record = {}; + + manager.addService( + new taskbuffer.Service('A') + .withStart(async () => { + startTimes['A'] = Date.now(); + await smartdelay.delayFor(100); + }) + .withStop(async () => {}), + ); + + manager.addService( + new taskbuffer.Service('B') + .withStart(async () => { + startTimes['B'] = Date.now(); + await smartdelay.delayFor(100); + }) + .withStop(async () => {}), + ); + + await manager.start(); + + // Both should start at roughly the same time (within 50ms) + const diff = Math.abs(startTimes['A'] - startTimes['B']); + expect(diff).toBeLessThan(50); + + await manager.stop(); +}); + +// ── Test 15: getStatus snapshot ──────────────────────────── + +tap.test('should return accurate status snapshot', async () => { + const service = new taskbuffer.Service('StatusTest') + .critical() + .dependsOn('X') + .withStart(async () => {}) + .withStop(async () => {}); + + const statusBefore = service.getStatus(); + expect(statusBefore.state).toEqual('stopped'); + expect(statusBefore.name).toEqual('StatusTest'); + expect(statusBefore.criticality).toEqual('critical'); + expect(statusBefore.dependencies).toEqual(['X']); + expect(statusBefore.errorCount).toEqual(0); + + await service.start(); + + const statusAfter = service.getStatus(); + expect(statusAfter.state).toEqual('running'); + expect(statusAfter.startedAt).toBeTruthy(); + expect(statusAfter.uptime).toBeGreaterThanOrEqual(0); + + await service.stop(); +}); + +// ── Test 16: Missing dependency detection ────────────────── + +tap.test('should throw when a dependency is not registered', async () => { + const manager = new taskbuffer.ServiceManager({ name: 'MissingDepTest' }); + + manager.addService( + new taskbuffer.Service('Lonely') + .dependsOn('Ghost') + .withStart(async () => {}) + .withStop(async () => {}), + ); + + let caught = false; + try { + await manager.start(); + } catch (err) { + caught = true; + expect((err as Error).message).toInclude('Ghost'); + expect((err as Error).message).toInclude('not registered'); + } + + expect(caught).toBeTrue(); +}); + +// ── Test 17: No-op on double start/stop ──────────────────── + +tap.test('should be a no-op when starting an already-running service', async () => { + let startCount = 0; + + const service = new taskbuffer.Service('DoubleStart') + .withStart(async () => { startCount++; }) + .withStop(async () => {}); + + await service.start(); + await service.start(); // should be no-op + + expect(startCount).toEqual(1); + + await service.stop(); + await service.stop(); // should be no-op +}); + +// ── Test 18: addServiceFromOptions convenience ───────────── + +tap.test('should support addServiceFromOptions', async () => { + const manager = new taskbuffer.ServiceManager({ name: 'ConvenienceTest' }); + let started = false; + + const service = manager.addServiceFromOptions({ + name: 'Easy', + start: async () => { started = true; }, + stop: async () => {}, + }); + + expect(service).toBeInstanceOf(taskbuffer.Service); + expect(service.name).toEqual('Easy'); + + await manager.start(); + expect(started).toBeTrue(); + await manager.stop(); +}); + +export default tap.start(); diff --git a/ts/index.ts b/ts/index.ts index 007266b..b95972d 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -14,5 +14,22 @@ export type { ITaskStep } from './taskbuffer.classes.taskstep.js'; // Metadata interfaces export type { ITaskMetadata, ITaskExecutionReport, IScheduledTaskInfo, ITaskEvent, TTaskEventType, ITaskConstraintGroupOptions, ITaskExecution, IRateLimitConfig, TResultSharingMode } from './taskbuffer.interfaces.js'; +// Service lifecycle system +export { Service } from './taskbuffer.classes.service.js'; +export { ServiceManager } from './taskbuffer.classes.servicemanager.js'; +export type { + IServiceOptions, + IServiceStatus, + IServiceEvent, + IServiceManagerOptions, + IServiceManagerHealth, + IRetryConfig, + IHealthCheckConfig, + TServiceState, + TServiceCriticality, + TServiceEventType, + TOverallHealth, +} from './taskbuffer.interfaces.js'; + import * as distributedCoordination from './taskbuffer.classes.distributedcoordinator.js'; export { distributedCoordination }; diff --git a/ts/taskbuffer.classes.service.ts b/ts/taskbuffer.classes.service.ts new file mode 100644 index 0000000..d21b325 --- /dev/null +++ b/ts/taskbuffer.classes.service.ts @@ -0,0 +1,346 @@ +import * as plugins from './taskbuffer.plugins.js'; +import { logger } from './taskbuffer.logging.js'; +import type { + TServiceState, + TServiceCriticality, + IServiceEvent, + IServiceStatus, + IRetryConfig, + IHealthCheckConfig, + IServiceOptions, +} from './taskbuffer.interfaces.js'; + +/** + * Service represents a long-running component with start/stop lifecycle, + * health checking, and retry capabilities. + * + * Use via builder pattern: + * new Service('MyService') + * .critical() + * .dependsOn('Database') + * .withStart(async () => { ... }) + * .withStop(async () => { ... }) + * + * Or extend for complex services: + * class MyService extends Service { + * protected async serviceStart() { ... } + * protected async serviceStop() { ... } + * } + */ +export class Service { + public readonly name: string; + public readonly eventSubject = new plugins.smartrx.rxjs.Subject(); + + // ── Internal state ───────────────────────────────── + private _state: TServiceState = 'stopped'; + private _criticality: TServiceCriticality = 'optional'; + private _dependencies: string[] = []; + private _retryConfig: IRetryConfig | undefined; + private _healthCheckConfig: IHealthCheckConfig | undefined; + + // Builder-provided functions + private _startFn: (() => Promise) | undefined; + private _stopFn: (() => Promise) | undefined; + private _healthCheckFn: (() => Promise) | undefined; + + // Runtime tracking + private _startedAt: number | undefined; + private _stoppedAt: number | undefined; + private _errorCount = 0; + private _lastError: string | undefined; + private _retryCount = 0; + + // Health check tracking + private _healthCheckTimer: ReturnType | undefined; + private _lastHealthCheck: number | undefined; + private _healthCheckOk: boolean | undefined; + private _consecutiveHealthFailures = 0; + + constructor(nameOrOptions: string | IServiceOptions) { + if (typeof nameOrOptions === 'string') { + this.name = nameOrOptions; + } else { + this.name = nameOrOptions.name; + this._startFn = nameOrOptions.start; + this._stopFn = nameOrOptions.stop; + this._healthCheckFn = nameOrOptions.healthCheck; + this._criticality = nameOrOptions.criticality || 'optional'; + this._dependencies = nameOrOptions.dependencies || []; + this._retryConfig = nameOrOptions.retry; + this._healthCheckConfig = nameOrOptions.healthCheckConfig; + } + } + + // ── Builder methods ────────────────────────────────── + + public critical(): this { + this._criticality = 'critical'; + return this; + } + + public optional(): this { + this._criticality = 'optional'; + return this; + } + + public dependsOn(...serviceNames: string[]): this { + this._dependencies.push(...serviceNames); + return this; + } + + public withStart(fn: () => Promise): this { + this._startFn = fn; + return this; + } + + public withStop(fn: () => Promise): this { + this._stopFn = fn; + return this; + } + + public withHealthCheck(fn: () => Promise, config?: IHealthCheckConfig): this { + this._healthCheckFn = fn; + if (config) { + this._healthCheckConfig = config; + } + return this; + } + + public withRetry(config: IRetryConfig): this { + this._retryConfig = config; + return this; + } + + // ── Overridable hooks (for subclassing) ────────────── + + protected async serviceStart(): Promise { + if (this._startFn) { + return this._startFn(); + } + throw new Error(`Service '${this.name}': no start function provided. Use withStart() or override serviceStart().`); + } + + protected async serviceStop(): Promise { + if (this._stopFn) { + return this._stopFn(); + } + // Default: no-op stop is fine (some services don't need explicit cleanup) + } + + protected async serviceHealthCheck(): Promise { + if (this._healthCheckFn) { + return this._healthCheckFn(); + } + // No health check configured — assume healthy if running + return this._state === 'running'; + } + + // ── Lifecycle (called by ServiceManager) ───────────── + + public async start(): Promise { + if (this._state === 'running') { + return undefined as T; + } + + this.setState('starting'); + + try { + const result = await this.serviceStart(); + this._startedAt = Date.now(); + this._stoppedAt = undefined; + this._consecutiveHealthFailures = 0; + this._healthCheckOk = true; + this.setState('running'); + this.emitEvent('started'); + this.startHealthCheckTimer(); + return result; + } catch (err) { + this._errorCount++; + this._lastError = err instanceof Error ? err.message : String(err); + this.setState('failed'); + this.emitEvent('failed', { error: this._lastError }); + throw err; + } + } + + public async stop(): Promise { + if (this._state === 'stopped' || this._state === 'stopping') { + return; + } + + this.stopHealthCheckTimer(); + this.setState('stopping'); + + try { + await this.serviceStop(); + } catch (err) { + logger.log('warn', `Service '${this.name}' error during stop: ${err instanceof Error ? err.message : String(err)}`); + } + + this._stoppedAt = Date.now(); + this.setState('stopped'); + this.emitEvent('stopped'); + } + + public async checkHealth(): Promise { + if (!this._healthCheckFn && !this.hasOverriddenHealthCheck()) { + return undefined; + } + + try { + const config = this._healthCheckConfig; + const timeoutMs = config?.timeoutMs ?? 5000; + + const result = await Promise.race([ + this.serviceHealthCheck(), + new Promise((_, reject) => + setTimeout(() => reject(new Error('Health check timed out')), timeoutMs) + ), + ]); + + this._lastHealthCheck = Date.now(); + this._healthCheckOk = result; + + if (result) { + this._consecutiveHealthFailures = 0; + if (this._state === 'degraded') { + this.setState('running'); + this.emitEvent('recovered'); + } + } else { + this._consecutiveHealthFailures++; + this.handleHealthFailure(); + } + + this.emitEvent('healthCheck'); + return result; + } catch (err) { + this._lastHealthCheck = Date.now(); + this._healthCheckOk = false; + this._consecutiveHealthFailures++; + this.handleHealthFailure(); + this.emitEvent('healthCheck'); + return false; + } + } + + // ── State ──────────────────────────────────────────── + + public get state(): TServiceState { + return this._state; + } + + public get criticality(): TServiceCriticality { + return this._criticality; + } + + public get dependencies(): string[] { + return [...this._dependencies]; + } + + public get retryConfig(): IRetryConfig | undefined { + return this._retryConfig; + } + + public get errorCount(): number { + return this._errorCount; + } + + public get retryCount(): number { + return this._retryCount; + } + + public set retryCount(value: number) { + this._retryCount = value; + } + + public getStatus(): IServiceStatus { + return { + name: this.name, + state: this._state, + criticality: this._criticality, + startedAt: this._startedAt, + stoppedAt: this._stoppedAt, + lastHealthCheck: this._lastHealthCheck, + healthCheckOk: this._healthCheckOk, + uptime: this._startedAt && this._state === 'running' + ? Date.now() - this._startedAt + : undefined, + errorCount: this._errorCount, + lastError: this._lastError, + retryCount: this._retryCount, + dependencies: [...this._dependencies], + }; + } + + // ── Internal helpers ───────────────────────────────── + + private setState(state: TServiceState): void { + this._state = state; + } + + private emitEvent(type: IServiceEvent['type'], extra?: Partial): void { + this.eventSubject.next({ + type, + serviceName: this.name, + state: this._state, + timestamp: Date.now(), + ...extra, + }); + } + + private handleHealthFailure(): void { + const config = this._healthCheckConfig; + const failuresBeforeDegraded = config?.failuresBeforeDegraded ?? 3; + const failuresBeforeFailed = config?.failuresBeforeFailed ?? 5; + + if (this._state === 'running' && this._consecutiveHealthFailures >= failuresBeforeDegraded) { + this.setState('degraded'); + this.emitEvent('degraded'); + } + + if (this._consecutiveHealthFailures >= failuresBeforeFailed) { + this.setState('failed'); + this._lastError = `Health check failed ${this._consecutiveHealthFailures} consecutive times`; + this.emitEvent('failed', { error: this._lastError }); + this.stopHealthCheckTimer(); + } + } + + private startHealthCheckTimer(): void { + if (!this._healthCheckFn && !this.hasOverriddenHealthCheck()) { + return; + } + const config = this._healthCheckConfig; + const intervalMs = config?.intervalMs ?? 30000; + + this.stopHealthCheckTimer(); + + const tick = () => { + if (this._state !== 'running' && this._state !== 'degraded') { + return; + } + this.checkHealth().catch(() => {}); + this._healthCheckTimer = setTimeout(tick, intervalMs); + if (this._healthCheckTimer && typeof this._healthCheckTimer === 'object' && 'unref' in this._healthCheckTimer) { + (this._healthCheckTimer as any).unref(); + } + }; + + this._healthCheckTimer = setTimeout(tick, intervalMs); + if (this._healthCheckTimer && typeof this._healthCheckTimer === 'object' && 'unref' in this._healthCheckTimer) { + (this._healthCheckTimer as any).unref(); + } + } + + private stopHealthCheckTimer(): void { + if (this._healthCheckTimer) { + clearTimeout(this._healthCheckTimer); + this._healthCheckTimer = undefined; + } + } + + private hasOverriddenHealthCheck(): boolean { + return this.serviceHealthCheck !== Service.prototype.serviceHealthCheck; + } +} diff --git a/ts/taskbuffer.classes.servicemanager.ts b/ts/taskbuffer.classes.servicemanager.ts new file mode 100644 index 0000000..f5af876 --- /dev/null +++ b/ts/taskbuffer.classes.servicemanager.ts @@ -0,0 +1,405 @@ +import * as plugins from './taskbuffer.plugins.js'; +import { logger } from './taskbuffer.logging.js'; +import { Service } from './taskbuffer.classes.service.js'; +import type { + IServiceEvent, + IServiceStatus, + IServiceManagerOptions, + IServiceManagerHealth, + IServiceOptions, + IRetryConfig, + TOverallHealth, +} from './taskbuffer.interfaces.js'; + +/** + * ServiceManager orchestrates multiple Service instances with: + * - Dependency-ordered startup (topological sort, level-by-level parallel) + * - Failure isolation (optional services don't crash the system) + * - Retry with exponential backoff for failed optional services + * - Reverse-dependency-ordered shutdown + * - Aggregated health status + */ +export class ServiceManager { + public readonly name: string; + public readonly serviceSubject = new plugins.smartrx.rxjs.Subject(); + + private services = new Map(); + private startupOrder: string[][] = []; // levels of service names + private options: Required; + private _startedAt: number | undefined; + private subscriptions: plugins.smartrx.rxjs.Subscription[] = []; + + constructor(options?: IServiceManagerOptions) { + this.name = options?.name || 'ServiceManager'; + this.options = { + name: this.name, + defaultRetry: options?.defaultRetry || { maxRetries: 3, baseDelayMs: 1000, maxDelayMs: 30000, backoffFactor: 2 }, + defaultHealthCheck: options?.defaultHealthCheck || {}, + startupTimeoutMs: options?.startupTimeoutMs ?? 120000, + shutdownTimeoutMs: options?.shutdownTimeoutMs ?? 30000, + }; + } + + // ── Service Registration ───────────────────────────── + + public addService(service: Service): void { + if (this.services.has(service.name)) { + throw new Error(`Service '${service.name}' is already registered`); + } + this.services.set(service.name, service); + + // Forward service events to the aggregated subject + const sub = service.eventSubject.subscribe((event) => { + this.serviceSubject.next(event); + }); + this.subscriptions.push(sub); + } + + public addServiceFromOptions(options: IServiceOptions): Service { + const service = new Service(options); + this.addService(service); + return service; + } + + public removeService(name: string): void { + // Check no other service depends on this one + for (const [svcName, svc] of this.services) { + if (svc.dependencies.includes(name)) { + throw new Error(`Cannot remove service '${name}': service '${svcName}' depends on it`); + } + } + this.services.delete(name); + } + + // ── Lifecycle ──────────────────────────────────────── + + public async start(): Promise { + // Build startup order via topological sort + this.startupOrder = this.topologicalSort(); + this._startedAt = Date.now(); + const startedServices: string[] = []; + + logger.log('info', `${this.name}: starting ${this.services.size} services in ${this.startupOrder.length} levels`); + + for (let levelIdx = 0; levelIdx < this.startupOrder.length; levelIdx++) { + const level = this.startupOrder[levelIdx]; + logger.log('info', `${this.name}: starting level ${levelIdx}: [${level.join(', ')}]`); + + const results = await Promise.allSettled( + level.map(async (name) => { + const service = this.services.get(name)!; + try { + await this.startServiceWithRetry(service); + startedServices.push(name); + } catch (err) { + if (service.criticality === 'critical') { + throw err; + } + // Optional service — log and continue + logger.log('warn', `${this.name}: optional service '${name}' failed to start: ${err instanceof Error ? err.message : String(err)}`); + } + }), + ); + + // Check if any critical service failed + for (let i = 0; i < results.length; i++) { + const result = results[i]; + if (result.status === 'rejected') { + const name = level[i]; + const service = this.services.get(name); + if (service && service.criticality === 'critical') { + logger.log('error', `${this.name}: critical service '${name}' failed, aborting startup`); + // Rollback: stop all started services in reverse order + await this.stopServices(startedServices.reverse()); + throw new Error(`Critical service '${name}' failed to start: ${result.reason instanceof Error ? result.reason.message : String(result.reason)}`); + } + } + } + } + + const statuses = this.getAllStatuses(); + const running = statuses.filter((s) => s.state === 'running').length; + const failed = statuses.filter((s) => s.state === 'failed').length; + logger.log('info', `${this.name}: startup complete — ${running} running, ${failed} failed`); + } + + public async stop(): Promise { + logger.log('info', `${this.name}: stopping all services`); + + // Stop in reverse startup order + const reversedLevels = [...this.startupOrder].reverse(); + + for (const level of reversedLevels) { + const runningInLevel = level.filter((name) => { + const svc = this.services.get(name); + return svc && svc.state !== 'stopped'; + }); + + if (runningInLevel.length === 0) continue; + + await Promise.allSettled( + runningInLevel.map(async (name) => { + const service = this.services.get(name)!; + try { + await Promise.race([ + service.stop(), + plugins.smartdelay.delayFor(this.options.shutdownTimeoutMs).then(() => { + throw new Error(`Timeout stopping service '${name}'`); + }), + ]); + } catch (err) { + logger.log('warn', `${this.name}: error stopping '${name}': ${err instanceof Error ? err.message : String(err)}`); + } + }), + ); + } + + // Clean up subscriptions + for (const sub of this.subscriptions) { + sub.unsubscribe(); + } + this.subscriptions = []; + + this._startedAt = undefined; + logger.log('info', `${this.name}: all services stopped`); + } + + // ── Querying ───────────────────────────────────────── + + public getService(name: string): Service | undefined { + return this.services.get(name); + } + + public getServiceStatus(name: string): IServiceStatus | undefined { + return this.services.get(name)?.getStatus(); + } + + public getAllStatuses(): IServiceStatus[] { + return Array.from(this.services.values()).map((s) => s.getStatus()); + } + + public getHealth(): IServiceManagerHealth { + const statuses = this.getAllStatuses(); + let overall: TOverallHealth = 'healthy'; + + const hasCriticalDown = statuses.some( + (s) => s.criticality === 'critical' && s.state !== 'running' && s.state !== 'stopped', + ); + const hasAnyDown = statuses.some( + (s) => s.state !== 'running' && s.state !== 'stopped', + ); + + if (hasCriticalDown) { + overall = 'unhealthy'; + } else if (hasAnyDown) { + overall = 'degraded'; + } + + return { + overall, + services: statuses, + startedAt: this._startedAt, + uptime: this._startedAt ? Date.now() - this._startedAt : undefined, + }; + } + + // ── Runtime Operations ─────────────────────────────── + + public async restartService(name: string): Promise { + const service = this.services.get(name); + if (!service) { + throw new Error(`Service '${name}' not found`); + } + + // Find all transitive dependents + const dependents = this.getTransitiveDependents(name); + + // Stop dependents in reverse dependency order + const dependentLevels = this.getLevelsForServices(dependents).reverse(); + for (const level of dependentLevels) { + await Promise.allSettled( + level.map((depName) => { + const svc = this.services.get(depName); + return svc && svc.state !== 'stopped' ? svc.stop() : Promise.resolve(); + }), + ); + } + + // Stop the target service + await service.stop(); + + // Start the target service + await this.startServiceWithRetry(service); + + // Restart dependents in dependency order + const dependentLevelsForward = this.getLevelsForServices(dependents); + for (const level of dependentLevelsForward) { + await Promise.allSettled( + level.map(async (depName) => { + const svc = this.services.get(depName)!; + try { + await this.startServiceWithRetry(svc); + } catch (err) { + logger.log('warn', `${this.name}: failed to restart dependent '${depName}': ${err instanceof Error ? err.message : String(err)}`); + } + }), + ); + } + } + + // ── Internal helpers ───────────────────────────────── + + private async startServiceWithRetry(service: Service): Promise { + const retryConfig = service.retryConfig || this.options.defaultRetry; + const maxRetries = retryConfig.maxRetries ?? 3; + const baseDelay = retryConfig.baseDelayMs ?? 1000; + const maxDelay = retryConfig.maxDelayMs ?? 30000; + const factor = retryConfig.backoffFactor ?? 2; + + let lastError: Error | undefined; + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + await service.start(); + return; + } catch (err) { + lastError = err instanceof Error ? err : new Error(String(err)); + service.retryCount = attempt + 1; + + if (attempt < maxRetries) { + const delay = Math.min(baseDelay * Math.pow(factor, attempt), maxDelay); + const jitter = 0.8 + Math.random() * 0.4; + const actualDelay = Math.floor(delay * jitter); + + service.eventSubject.next({ + type: 'retrying', + serviceName: service.name, + state: service.state, + timestamp: Date.now(), + error: lastError.message, + attempt: attempt + 1, + }); + + logger.log('info', `${this.name}: retrying '${service.name}' in ${actualDelay}ms (attempt ${attempt + 1}/${maxRetries})`); + await plugins.smartdelay.delayFor(actualDelay); + } + } + } + + throw lastError!; + } + + private async stopServices(names: string[]): Promise { + await Promise.allSettled( + names.map(async (name) => { + const service = this.services.get(name); + if (service && service.state !== 'stopped') { + try { + await service.stop(); + } catch (err) { + logger.log('warn', `${this.name}: error stopping '${name}' during rollback: ${err instanceof Error ? err.message : String(err)}`); + } + } + }), + ); + } + + /** + * Topological sort using Kahn's algorithm. + * Returns levels of service names — services within a level can start in parallel. + */ + private topologicalSort(): string[][] { + const names = new Set(this.services.keys()); + const inDegree = new Map(); + const dependents = new Map(); // dep -> services that depend on it + + // Initialize + for (const name of names) { + inDegree.set(name, 0); + dependents.set(name, []); + } + + // Build graph + for (const [name, service] of this.services) { + for (const dep of service.dependencies) { + if (!names.has(dep)) { + throw new Error(`Service '${name}' depends on '${dep}', which is not registered`); + } + inDegree.set(name, (inDegree.get(name) || 0) + 1); + dependents.get(dep)!.push(name); + } + } + + // Process level by level + const levels: string[][] = []; + const remaining = new Set(names); + + while (remaining.size > 0) { + const level: string[] = []; + + for (const name of remaining) { + if ((inDegree.get(name) || 0) === 0) { + level.push(name); + } + } + + if (level.length === 0) { + // Cycle detected + const cycleNodes = Array.from(remaining).join(', '); + throw new Error(`Circular dependency detected involving services: ${cycleNodes}`); + } + + // Remove this level's nodes and update in-degrees + for (const name of level) { + remaining.delete(name); + for (const dependent of dependents.get(name) || []) { + inDegree.set(dependent, (inDegree.get(dependent) || 0) - 1); + } + } + + levels.push(level); + } + + return levels; + } + + /** + * Get all services that transitively depend on the given service. + */ + private getTransitiveDependents(name: string): string[] { + const result = new Set(); + const queue = [name]; + + while (queue.length > 0) { + const current = queue.shift()!; + for (const [svcName, svc] of this.services) { + if (svc.dependencies.includes(current) && !result.has(svcName)) { + result.add(svcName); + queue.push(svcName); + } + } + } + + return Array.from(result); + } + + /** + * Organize a set of service names into dependency-ordered levels. + */ + private getLevelsForServices(names: string[]): string[][] { + if (names.length === 0) return []; + + const nameSet = new Set(names); + const levels: string[][] = []; + + // Simple approach: filter the global startup order to include only the given names + for (const level of this.startupOrder) { + const filtered = level.filter((n) => nameSet.has(n)); + if (filtered.length > 0) { + levels.push(filtered); + } + } + + return levels; + } +} diff --git a/ts/taskbuffer.interfaces.ts b/ts/taskbuffer.interfaces.ts index a863d8b..9677829 100644 --- a/ts/taskbuffer.interfaces.ts +++ b/ts/taskbuffer.interfaces.ts @@ -79,4 +79,101 @@ export interface ITaskEvent { timestamp: number; stepName?: string; // present when type === 'step' error?: string; // present when type === 'failed' +} + +// ── Service Lifecycle Types ────────────────────────────────────── + +export type TServiceState = + | 'stopped' + | 'starting' + | 'running' + | 'degraded' + | 'failed' + | 'stopping'; + +export type TServiceCriticality = 'critical' | 'optional'; + +export type TServiceEventType = + | 'started' + | 'stopped' + | 'failed' + | 'degraded' + | 'recovered' + | 'retrying' + | 'healthCheck'; + +export interface IServiceEvent { + type: TServiceEventType; + serviceName: string; + state: TServiceState; + timestamp: number; + error?: string; + attempt?: number; +} + +export interface IServiceStatus { + name: string; + state: TServiceState; + criticality: TServiceCriticality; + startedAt?: number; + stoppedAt?: number; + lastHealthCheck?: number; + healthCheckOk?: boolean; + uptime?: number; + errorCount: number; + lastError?: string; + retryCount: number; + dependencies: string[]; +} + +export interface IRetryConfig { + /** Maximum retry attempts. 0 = no retries. Default: 3 */ + maxRetries?: number; + /** Base delay in ms. Default: 1000 */ + baseDelayMs?: number; + /** Maximum delay cap in ms. Default: 30000 */ + maxDelayMs?: number; + /** Multiplier per attempt. Default: 2 */ + backoffFactor?: number; +} + +export interface IHealthCheckConfig { + /** Interval in ms between health checks. Default: 30000 */ + intervalMs?: number; + /** Timeout for a single health check call. Default: 5000 */ + timeoutMs?: number; + /** Consecutive failures before marking degraded. Default: 3 */ + failuresBeforeDegraded?: number; + /** Consecutive failures before marking failed. Default: 5 */ + failuresBeforeFailed?: number; +} + +export interface IServiceOptions { + name: string; + start: () => Promise; + stop: () => Promise; + healthCheck?: () => Promise; + criticality?: TServiceCriticality; + dependencies?: string[]; + retry?: IRetryConfig; + healthCheckConfig?: IHealthCheckConfig; +} + +export interface IServiceManagerOptions { + name?: string; + defaultRetry?: IRetryConfig; + defaultHealthCheck?: IHealthCheckConfig; + /** Timeout in ms for the entire startup sequence. Default: 120000 */ + startupTimeoutMs?: number; + /** Timeout in ms for the entire shutdown sequence. Default: 30000 */ + shutdownTimeoutMs?: number; +} + +export type TOverallHealth = 'healthy' | 'degraded' | 'unhealthy'; + +export interface IServiceManagerHealth { + overall: TOverallHealth; + services: IServiceStatus[]; + startedAt?: number; + uptime?: number; } \ No newline at end of file