406 lines
13 KiB
TypeScript
406 lines
13 KiB
TypeScript
|
|
import * as plugins from './taskbuffer.plugins.js';
|
||
|
|
import { logger } from './taskbuffer.logging.js';
|
||
|
|
import { Service } from './taskbuffer.classes.service.js';
|
||
|
|
import type {
|
||
|
|
IServiceEvent,
|
||
|
|
IServiceStatus,
|
||
|
|
IServiceManagerOptions,
|
||
|
|
IServiceManagerHealth,
|
||
|
|
IServiceOptions,
|
||
|
|
IRetryConfig,
|
||
|
|
TOverallHealth,
|
||
|
|
} from './taskbuffer.interfaces.js';
|
||
|
|
|
||
|
|
/**
|
||
|
|
* ServiceManager orchestrates multiple Service instances with:
|
||
|
|
* - Dependency-ordered startup (topological sort, level-by-level parallel)
|
||
|
|
* - Failure isolation (optional services don't crash the system)
|
||
|
|
* - Retry with exponential backoff for failed optional services
|
||
|
|
* - Reverse-dependency-ordered shutdown
|
||
|
|
* - Aggregated health status
|
||
|
|
*/
|
||
|
|
export class ServiceManager {
|
||
|
|
public readonly name: string;
|
||
|
|
public readonly serviceSubject = new plugins.smartrx.rxjs.Subject<IServiceEvent>();
|
||
|
|
|
||
|
|
private services = new Map<string, Service>();
|
||
|
|
private startupOrder: string[][] = []; // levels of service names
|
||
|
|
private options: Required<IServiceManagerOptions>;
|
||
|
|
private _startedAt: number | undefined;
|
||
|
|
private subscriptions: plugins.smartrx.rxjs.Subscription[] = [];
|
||
|
|
|
||
|
|
constructor(options?: IServiceManagerOptions) {
|
||
|
|
this.name = options?.name || 'ServiceManager';
|
||
|
|
this.options = {
|
||
|
|
name: this.name,
|
||
|
|
defaultRetry: options?.defaultRetry || { maxRetries: 3, baseDelayMs: 1000, maxDelayMs: 30000, backoffFactor: 2 },
|
||
|
|
defaultHealthCheck: options?.defaultHealthCheck || {},
|
||
|
|
startupTimeoutMs: options?.startupTimeoutMs ?? 120000,
|
||
|
|
shutdownTimeoutMs: options?.shutdownTimeoutMs ?? 30000,
|
||
|
|
};
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── Service Registration ─────────────────────────────
|
||
|
|
|
||
|
|
public addService(service: Service): void {
|
||
|
|
if (this.services.has(service.name)) {
|
||
|
|
throw new Error(`Service '${service.name}' is already registered`);
|
||
|
|
}
|
||
|
|
this.services.set(service.name, service);
|
||
|
|
|
||
|
|
// Forward service events to the aggregated subject
|
||
|
|
const sub = service.eventSubject.subscribe((event) => {
|
||
|
|
this.serviceSubject.next(event);
|
||
|
|
});
|
||
|
|
this.subscriptions.push(sub);
|
||
|
|
}
|
||
|
|
|
||
|
|
public addServiceFromOptions<T>(options: IServiceOptions<T>): Service<T> {
|
||
|
|
const service = new Service<T>(options);
|
||
|
|
this.addService(service);
|
||
|
|
return service;
|
||
|
|
}
|
||
|
|
|
||
|
|
public removeService(name: string): void {
|
||
|
|
// Check no other service depends on this one
|
||
|
|
for (const [svcName, svc] of this.services) {
|
||
|
|
if (svc.dependencies.includes(name)) {
|
||
|
|
throw new Error(`Cannot remove service '${name}': service '${svcName}' depends on it`);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
this.services.delete(name);
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── Lifecycle ────────────────────────────────────────
|
||
|
|
|
||
|
|
public async start(): Promise<void> {
|
||
|
|
// Build startup order via topological sort
|
||
|
|
this.startupOrder = this.topologicalSort();
|
||
|
|
this._startedAt = Date.now();
|
||
|
|
const startedServices: string[] = [];
|
||
|
|
|
||
|
|
logger.log('info', `${this.name}: starting ${this.services.size} services in ${this.startupOrder.length} levels`);
|
||
|
|
|
||
|
|
for (let levelIdx = 0; levelIdx < this.startupOrder.length; levelIdx++) {
|
||
|
|
const level = this.startupOrder[levelIdx];
|
||
|
|
logger.log('info', `${this.name}: starting level ${levelIdx}: [${level.join(', ')}]`);
|
||
|
|
|
||
|
|
const results = await Promise.allSettled(
|
||
|
|
level.map(async (name) => {
|
||
|
|
const service = this.services.get(name)!;
|
||
|
|
try {
|
||
|
|
await this.startServiceWithRetry(service);
|
||
|
|
startedServices.push(name);
|
||
|
|
} catch (err) {
|
||
|
|
if (service.criticality === 'critical') {
|
||
|
|
throw err;
|
||
|
|
}
|
||
|
|
// Optional service — log and continue
|
||
|
|
logger.log('warn', `${this.name}: optional service '${name}' failed to start: ${err instanceof Error ? err.message : String(err)}`);
|
||
|
|
}
|
||
|
|
}),
|
||
|
|
);
|
||
|
|
|
||
|
|
// Check if any critical service failed
|
||
|
|
for (let i = 0; i < results.length; i++) {
|
||
|
|
const result = results[i];
|
||
|
|
if (result.status === 'rejected') {
|
||
|
|
const name = level[i];
|
||
|
|
const service = this.services.get(name);
|
||
|
|
if (service && service.criticality === 'critical') {
|
||
|
|
logger.log('error', `${this.name}: critical service '${name}' failed, aborting startup`);
|
||
|
|
// Rollback: stop all started services in reverse order
|
||
|
|
await this.stopServices(startedServices.reverse());
|
||
|
|
throw new Error(`Critical service '${name}' failed to start: ${result.reason instanceof Error ? result.reason.message : String(result.reason)}`);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
const statuses = this.getAllStatuses();
|
||
|
|
const running = statuses.filter((s) => s.state === 'running').length;
|
||
|
|
const failed = statuses.filter((s) => s.state === 'failed').length;
|
||
|
|
logger.log('info', `${this.name}: startup complete — ${running} running, ${failed} failed`);
|
||
|
|
}
|
||
|
|
|
||
|
|
public async stop(): Promise<void> {
|
||
|
|
logger.log('info', `${this.name}: stopping all services`);
|
||
|
|
|
||
|
|
// Stop in reverse startup order
|
||
|
|
const reversedLevels = [...this.startupOrder].reverse();
|
||
|
|
|
||
|
|
for (const level of reversedLevels) {
|
||
|
|
const runningInLevel = level.filter((name) => {
|
||
|
|
const svc = this.services.get(name);
|
||
|
|
return svc && svc.state !== 'stopped';
|
||
|
|
});
|
||
|
|
|
||
|
|
if (runningInLevel.length === 0) continue;
|
||
|
|
|
||
|
|
await Promise.allSettled(
|
||
|
|
runningInLevel.map(async (name) => {
|
||
|
|
const service = this.services.get(name)!;
|
||
|
|
try {
|
||
|
|
await Promise.race([
|
||
|
|
service.stop(),
|
||
|
|
plugins.smartdelay.delayFor(this.options.shutdownTimeoutMs).then(() => {
|
||
|
|
throw new Error(`Timeout stopping service '${name}'`);
|
||
|
|
}),
|
||
|
|
]);
|
||
|
|
} catch (err) {
|
||
|
|
logger.log('warn', `${this.name}: error stopping '${name}': ${err instanceof Error ? err.message : String(err)}`);
|
||
|
|
}
|
||
|
|
}),
|
||
|
|
);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Clean up subscriptions
|
||
|
|
for (const sub of this.subscriptions) {
|
||
|
|
sub.unsubscribe();
|
||
|
|
}
|
||
|
|
this.subscriptions = [];
|
||
|
|
|
||
|
|
this._startedAt = undefined;
|
||
|
|
logger.log('info', `${this.name}: all services stopped`);
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── Querying ─────────────────────────────────────────
|
||
|
|
|
||
|
|
public getService(name: string): Service | undefined {
|
||
|
|
return this.services.get(name);
|
||
|
|
}
|
||
|
|
|
||
|
|
public getServiceStatus(name: string): IServiceStatus | undefined {
|
||
|
|
return this.services.get(name)?.getStatus();
|
||
|
|
}
|
||
|
|
|
||
|
|
public getAllStatuses(): IServiceStatus[] {
|
||
|
|
return Array.from(this.services.values()).map((s) => s.getStatus());
|
||
|
|
}
|
||
|
|
|
||
|
|
public getHealth(): IServiceManagerHealth {
|
||
|
|
const statuses = this.getAllStatuses();
|
||
|
|
let overall: TOverallHealth = 'healthy';
|
||
|
|
|
||
|
|
const hasCriticalDown = statuses.some(
|
||
|
|
(s) => s.criticality === 'critical' && s.state !== 'running' && s.state !== 'stopped',
|
||
|
|
);
|
||
|
|
const hasAnyDown = statuses.some(
|
||
|
|
(s) => s.state !== 'running' && s.state !== 'stopped',
|
||
|
|
);
|
||
|
|
|
||
|
|
if (hasCriticalDown) {
|
||
|
|
overall = 'unhealthy';
|
||
|
|
} else if (hasAnyDown) {
|
||
|
|
overall = 'degraded';
|
||
|
|
}
|
||
|
|
|
||
|
|
return {
|
||
|
|
overall,
|
||
|
|
services: statuses,
|
||
|
|
startedAt: this._startedAt,
|
||
|
|
uptime: this._startedAt ? Date.now() - this._startedAt : undefined,
|
||
|
|
};
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── Runtime Operations ───────────────────────────────
|
||
|
|
|
||
|
|
public async restartService(name: string): Promise<void> {
|
||
|
|
const service = this.services.get(name);
|
||
|
|
if (!service) {
|
||
|
|
throw new Error(`Service '${name}' not found`);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Find all transitive dependents
|
||
|
|
const dependents = this.getTransitiveDependents(name);
|
||
|
|
|
||
|
|
// Stop dependents in reverse dependency order
|
||
|
|
const dependentLevels = this.getLevelsForServices(dependents).reverse();
|
||
|
|
for (const level of dependentLevels) {
|
||
|
|
await Promise.allSettled(
|
||
|
|
level.map((depName) => {
|
||
|
|
const svc = this.services.get(depName);
|
||
|
|
return svc && svc.state !== 'stopped' ? svc.stop() : Promise.resolve();
|
||
|
|
}),
|
||
|
|
);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Stop the target service
|
||
|
|
await service.stop();
|
||
|
|
|
||
|
|
// Start the target service
|
||
|
|
await this.startServiceWithRetry(service);
|
||
|
|
|
||
|
|
// Restart dependents in dependency order
|
||
|
|
const dependentLevelsForward = this.getLevelsForServices(dependents);
|
||
|
|
for (const level of dependentLevelsForward) {
|
||
|
|
await Promise.allSettled(
|
||
|
|
level.map(async (depName) => {
|
||
|
|
const svc = this.services.get(depName)!;
|
||
|
|
try {
|
||
|
|
await this.startServiceWithRetry(svc);
|
||
|
|
} catch (err) {
|
||
|
|
logger.log('warn', `${this.name}: failed to restart dependent '${depName}': ${err instanceof Error ? err.message : String(err)}`);
|
||
|
|
}
|
||
|
|
}),
|
||
|
|
);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── Internal helpers ─────────────────────────────────
|
||
|
|
|
||
|
|
private async startServiceWithRetry(service: Service): Promise<void> {
|
||
|
|
const retryConfig = service.retryConfig || this.options.defaultRetry;
|
||
|
|
const maxRetries = retryConfig.maxRetries ?? 3;
|
||
|
|
const baseDelay = retryConfig.baseDelayMs ?? 1000;
|
||
|
|
const maxDelay = retryConfig.maxDelayMs ?? 30000;
|
||
|
|
const factor = retryConfig.backoffFactor ?? 2;
|
||
|
|
|
||
|
|
let lastError: Error | undefined;
|
||
|
|
|
||
|
|
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
||
|
|
try {
|
||
|
|
await service.start();
|
||
|
|
return;
|
||
|
|
} catch (err) {
|
||
|
|
lastError = err instanceof Error ? err : new Error(String(err));
|
||
|
|
service.retryCount = attempt + 1;
|
||
|
|
|
||
|
|
if (attempt < maxRetries) {
|
||
|
|
const delay = Math.min(baseDelay * Math.pow(factor, attempt), maxDelay);
|
||
|
|
const jitter = 0.8 + Math.random() * 0.4;
|
||
|
|
const actualDelay = Math.floor(delay * jitter);
|
||
|
|
|
||
|
|
service.eventSubject.next({
|
||
|
|
type: 'retrying',
|
||
|
|
serviceName: service.name,
|
||
|
|
state: service.state,
|
||
|
|
timestamp: Date.now(),
|
||
|
|
error: lastError.message,
|
||
|
|
attempt: attempt + 1,
|
||
|
|
});
|
||
|
|
|
||
|
|
logger.log('info', `${this.name}: retrying '${service.name}' in ${actualDelay}ms (attempt ${attempt + 1}/${maxRetries})`);
|
||
|
|
await plugins.smartdelay.delayFor(actualDelay);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
throw lastError!;
|
||
|
|
}
|
||
|
|
|
||
|
|
private async stopServices(names: string[]): Promise<void> {
|
||
|
|
await Promise.allSettled(
|
||
|
|
names.map(async (name) => {
|
||
|
|
const service = this.services.get(name);
|
||
|
|
if (service && service.state !== 'stopped') {
|
||
|
|
try {
|
||
|
|
await service.stop();
|
||
|
|
} catch (err) {
|
||
|
|
logger.log('warn', `${this.name}: error stopping '${name}' during rollback: ${err instanceof Error ? err.message : String(err)}`);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}),
|
||
|
|
);
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Topological sort using Kahn's algorithm.
|
||
|
|
* Returns levels of service names — services within a level can start in parallel.
|
||
|
|
*/
|
||
|
|
private topologicalSort(): string[][] {
|
||
|
|
const names = new Set(this.services.keys());
|
||
|
|
const inDegree = new Map<string, number>();
|
||
|
|
const dependents = new Map<string, string[]>(); // dep -> services that depend on it
|
||
|
|
|
||
|
|
// Initialize
|
||
|
|
for (const name of names) {
|
||
|
|
inDegree.set(name, 0);
|
||
|
|
dependents.set(name, []);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Build graph
|
||
|
|
for (const [name, service] of this.services) {
|
||
|
|
for (const dep of service.dependencies) {
|
||
|
|
if (!names.has(dep)) {
|
||
|
|
throw new Error(`Service '${name}' depends on '${dep}', which is not registered`);
|
||
|
|
}
|
||
|
|
inDegree.set(name, (inDegree.get(name) || 0) + 1);
|
||
|
|
dependents.get(dep)!.push(name);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Process level by level
|
||
|
|
const levels: string[][] = [];
|
||
|
|
const remaining = new Set(names);
|
||
|
|
|
||
|
|
while (remaining.size > 0) {
|
||
|
|
const level: string[] = [];
|
||
|
|
|
||
|
|
for (const name of remaining) {
|
||
|
|
if ((inDegree.get(name) || 0) === 0) {
|
||
|
|
level.push(name);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if (level.length === 0) {
|
||
|
|
// Cycle detected
|
||
|
|
const cycleNodes = Array.from(remaining).join(', ');
|
||
|
|
throw new Error(`Circular dependency detected involving services: ${cycleNodes}`);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Remove this level's nodes and update in-degrees
|
||
|
|
for (const name of level) {
|
||
|
|
remaining.delete(name);
|
||
|
|
for (const dependent of dependents.get(name) || []) {
|
||
|
|
inDegree.set(dependent, (inDegree.get(dependent) || 0) - 1);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
levels.push(level);
|
||
|
|
}
|
||
|
|
|
||
|
|
return levels;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Get all services that transitively depend on the given service.
|
||
|
|
*/
|
||
|
|
private getTransitiveDependents(name: string): string[] {
|
||
|
|
const result = new Set<string>();
|
||
|
|
const queue = [name];
|
||
|
|
|
||
|
|
while (queue.length > 0) {
|
||
|
|
const current = queue.shift()!;
|
||
|
|
for (const [svcName, svc] of this.services) {
|
||
|
|
if (svc.dependencies.includes(current) && !result.has(svcName)) {
|
||
|
|
result.add(svcName);
|
||
|
|
queue.push(svcName);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
return Array.from(result);
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Organize a set of service names into dependency-ordered levels.
|
||
|
|
*/
|
||
|
|
private getLevelsForServices(names: string[]): string[][] {
|
||
|
|
if (names.length === 0) return [];
|
||
|
|
|
||
|
|
const nameSet = new Set(names);
|
||
|
|
const levels: string[][] = [];
|
||
|
|
|
||
|
|
// Simple approach: filter the global startup order to include only the given names
|
||
|
|
for (const level of this.startupOrder) {
|
||
|
|
const filtered = level.filter((n) => nameSet.has(n));
|
||
|
|
if (filtered.length > 0) {
|
||
|
|
levels.push(filtered);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
return levels;
|
||
|
|
}
|
||
|
|
}
|