fix(lifecycle): clean up service subscriptions, proxy retries, and stale runtime state on shutdown

This commit is contained in:
2026-03-21 22:30:30 +00:00
parent c7fe7aeb50
commit 39ff159bf7
10 changed files with 115 additions and 27 deletions

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@serve.zone/dcrouter',
version: '11.9.0',
version: '11.9.1',
description: 'A multifaceted routing service handling mail and SMS delivery functions.'
}

View File

@@ -61,14 +61,21 @@ export class CertProvisionScheduler {
}
/**
* Check if a domain is currently in backoff
* Check if a domain is currently in backoff.
* Expired entries are pruned from the cache to prevent unbounded growth.
*/
async isInBackoff(domain: string): Promise<boolean> {
const entry = await this.loadBackoff(domain);
if (!entry) return false;
const retryAfter = new Date(entry.retryAfter);
return retryAfter.getTime() > Date.now();
if (retryAfter.getTime() > Date.now()) {
return true;
}
// Backoff has expired — prune the stale entry
this.backoffCache.delete(domain);
return false;
}
/**
@@ -124,9 +131,12 @@ export class CertProvisionScheduler {
const entry = await this.loadBackoff(domain);
if (!entry) return null;
// Only return if still in backoff
// Only return if still in backoff — prune expired entries
const retryAfter = new Date(entry.retryAfter);
if (retryAfter.getTime() <= Date.now()) return null;
if (retryAfter.getTime() <= Date.now()) {
this.backoffCache.delete(domain);
return null;
}
return {
failures: entry.failures,

View File

@@ -254,6 +254,7 @@ export class DcRouter {
// Service lifecycle management
public serviceManager: plugins.taskbuffer.ServiceManager;
private serviceSubjectSubscription?: plugins.smartrx.rxjs.Subscription;
public smartAcmeReady = false;
// TypedRouter for API endpoints
@@ -516,7 +517,7 @@ export class DcRouter {
}
// Wire up aggregated events for logging
this.serviceManager.serviceSubject.subscribe((event) => {
this.serviceSubjectSubscription = this.serviceManager.serviceSubject.subscribe((event) => {
const level = event.type === 'failed' ? 'error' : event.type === 'retrying' ? 'warn' : 'info';
logger.log(level as any, `Service '${event.serviceName}': ${event.type}`, {
state: event.state,
@@ -639,6 +640,13 @@ export class DcRouter {
*/
private async setupSmartProxy(): Promise<void> {
logger.log('info', 'Setting up SmartProxy...');
// Clean up any existing SmartProxy instance (e.g. from a retry)
if (this.smartProxy) {
this.smartProxy.removeAllListeners();
this.smartProxy = undefined;
}
let routes: plugins.smartproxy.IRouteConfig[] = [];
let acmeConfig: plugins.smartproxy.IAcmeOptions | undefined;
@@ -1126,6 +1134,12 @@ export class DcRouter {
public async stop() {
logger.log('info', 'Stopping DcRouter services...');
// Unsubscribe from service events before stopping services
if (this.serviceSubjectSubscription) {
this.serviceSubjectSubscription.unsubscribe();
this.serviceSubjectSubscription = undefined;
}
// ServiceManager handles reverse-dependency-ordered shutdown
await this.serviceManager.stop();

View File

@@ -35,6 +35,6 @@ export const runCli = async () => {
await dcRouter.stop();
process.exit(0);
};
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
process.once('SIGINT', shutdown);
process.once('SIGTERM', shutdown);
};

View File

@@ -92,6 +92,8 @@ export interface IAccountingManagerConfig {
detailedLogging?: boolean;
/** Maximum active sessions to track in memory */
maxActiveSessions?: number;
/** Stale session timeout in hours — sessions with no update for this long are evicted (default: 24) */
staleSessionTimeoutHours?: number;
}
/**
@@ -105,6 +107,7 @@ export class AccountingManager {
private activeSessions: Map<string, IAccountingSession> = new Map();
private config: Required<IAccountingManagerConfig>;
private storageManager?: StorageManager;
private staleSessionSweepTimer?: ReturnType<typeof setInterval>;
// Counters for statistics
private stats = {
@@ -121,6 +124,7 @@ export class AccountingManager {
retentionDays: config?.retentionDays ?? 30,
detailedLogging: config?.detailedLogging ?? false,
maxActiveSessions: config?.maxActiveSessions ?? 10000,
staleSessionTimeoutHours: config?.staleSessionTimeoutHours ?? 24,
};
this.storageManager = storageManager;
}
@@ -132,9 +136,60 @@ export class AccountingManager {
if (this.storageManager) {
await this.loadActiveSessions();
}
// Start periodic sweep to evict stale sessions (every 15 minutes)
this.staleSessionSweepTimer = setInterval(() => {
this.sweepStaleSessions();
}, 15 * 60 * 1000);
// Allow the process to exit even if the timer is pending
if (this.staleSessionSweepTimer.unref) {
this.staleSessionSweepTimer.unref();
}
logger.log('info', `AccountingManager initialized with ${this.activeSessions.size} active sessions`);
}
/**
* Stop the accounting manager and clean up timers
*/
stop(): void {
if (this.staleSessionSweepTimer) {
clearInterval(this.staleSessionSweepTimer);
this.staleSessionSweepTimer = undefined;
}
}
/**
* Sweep stale active sessions that have not received any update
* within the configured timeout. These are orphaned sessions where
* the Stop packet was never received.
*/
private sweepStaleSessions(): void {
const timeoutMs = this.config.staleSessionTimeoutHours * 60 * 60 * 1000;
const cutoff = Date.now() - timeoutMs;
let swept = 0;
for (const [sessionId, session] of this.activeSessions) {
if (session.lastUpdateTime < cutoff) {
session.status = 'terminated';
session.terminateCause = 'StaleSessionTimeout';
session.endTime = Date.now();
session.sessionTime = Math.floor((session.endTime - session.startTime) / 1000);
if (this.storageManager) {
this.archiveSession(session).catch(() => {});
}
this.activeSessions.delete(sessionId);
swept++;
}
}
if (swept > 0) {
logger.log('info', `Swept ${swept} stale RADIUS sessions (no update for ${this.config.staleSessionTimeoutHours}h)`);
}
}
/**
* Handle accounting start request
*/

View File

@@ -183,6 +183,8 @@ export class RadiusServer {
this.radiusServer = undefined;
}
this.accountingManager.stop();
this.running = false;
logger.log('info', 'RADIUS server stopped');
}