feat(performance): Add async utility functions and filesystem utilities

- Implemented async utilities including delay, retryWithBackoff, withTimeout, parallelLimit, debounceAsync, AsyncMutex, and CircuitBreaker.
- Created tests for async utilities to ensure functionality and reliability.
- Developed AsyncFileSystem class with methods for file and directory operations, including ensureDir, readFile, writeFile, remove, and more.
- Added tests for filesystem utilities to validate file operations and error handling.
This commit is contained in:
2025-05-31 17:45:40 +00:00
parent 02603c3b07
commit 7b81186bb3
12 changed files with 1437 additions and 292 deletions

View File

@ -0,0 +1,275 @@
/**
* Async utility functions for SmartProxy
* Provides non-blocking alternatives to synchronous operations
*/
/**
* Delays execution for the specified number of milliseconds
* Non-blocking alternative to busy wait loops
* @param ms - Number of milliseconds to delay
* @returns Promise that resolves after the delay
*/
export async function delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
/**
* Retry an async operation with exponential backoff
* @param fn - The async function to retry
* @param options - Retry options
* @returns The result of the function or throws the last error
*/
export async function retryWithBackoff<T>(
fn: () => Promise<T>,
options: {
maxAttempts?: number;
initialDelay?: number;
maxDelay?: number;
factor?: number;
onRetry?: (attempt: number, error: Error) => void;
} = {}
): Promise<T> {
const {
maxAttempts = 3,
initialDelay = 100,
maxDelay = 10000,
factor = 2,
onRetry
} = options;
let lastError: Error | null = null;
let currentDelay = initialDelay;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await fn();
} catch (error: any) {
lastError = error;
if (attempt === maxAttempts) {
throw error;
}
if (onRetry) {
onRetry(attempt, error);
}
await delay(currentDelay);
currentDelay = Math.min(currentDelay * factor, maxDelay);
}
}
throw lastError || new Error('Retry failed');
}
/**
* Execute an async operation with a timeout
* @param fn - The async function to execute
* @param timeoutMs - Timeout in milliseconds
* @param timeoutError - Optional custom timeout error
* @returns The result of the function or throws timeout error
*/
export async function withTimeout<T>(
fn: () => Promise<T>,
timeoutMs: number,
timeoutError?: Error
): Promise<T> {
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(() => {
reject(timeoutError || new Error(`Operation timed out after ${timeoutMs}ms`));
}, timeoutMs);
});
return Promise.race([fn(), timeoutPromise]);
}
/**
* Run multiple async operations in parallel with a concurrency limit
* @param items - Array of items to process
* @param fn - Async function to run for each item
* @param concurrency - Maximum number of concurrent operations
* @returns Array of results in the same order as input
*/
export async function parallelLimit<T, R>(
items: T[],
fn: (item: T, index: number) => Promise<R>,
concurrency: number
): Promise<R[]> {
const results: R[] = new Array(items.length);
const executing: Set<Promise<void>> = new Set();
for (let i = 0; i < items.length; i++) {
const promise = fn(items[i], i).then(result => {
results[i] = result;
executing.delete(promise);
});
executing.add(promise);
if (executing.size >= concurrency) {
await Promise.race(executing);
}
}
await Promise.all(executing);
return results;
}
/**
* Debounce an async function
* @param fn - The async function to debounce
* @param delayMs - Delay in milliseconds
* @returns Debounced function with cancel method
*/
export function debounceAsync<T extends (...args: any[]) => Promise<any>>(
fn: T,
delayMs: number
): T & { cancel: () => void } {
let timeoutId: NodeJS.Timeout | null = null;
let lastPromise: Promise<any> | null = null;
const debounced = ((...args: Parameters<T>) => {
if (timeoutId) {
clearTimeout(timeoutId);
}
lastPromise = new Promise((resolve, reject) => {
timeoutId = setTimeout(async () => {
timeoutId = null;
try {
const result = await fn(...args);
resolve(result);
} catch (error) {
reject(error);
}
}, delayMs);
});
return lastPromise;
}) as any;
debounced.cancel = () => {
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}
};
return debounced as T & { cancel: () => void };
}
/**
* Create a mutex for ensuring exclusive access to a resource
*/
export class AsyncMutex {
private queue: Array<() => void> = [];
private locked = false;
async acquire(): Promise<() => void> {
if (!this.locked) {
this.locked = true;
return () => this.release();
}
return new Promise<() => void>(resolve => {
this.queue.push(() => {
resolve(() => this.release());
});
});
}
private release(): void {
const next = this.queue.shift();
if (next) {
next();
} else {
this.locked = false;
}
}
async runExclusive<T>(fn: () => Promise<T>): Promise<T> {
const release = await this.acquire();
try {
return await fn();
} finally {
release();
}
}
}
/**
* Circuit breaker for protecting against cascading failures
*/
export class CircuitBreaker {
private failureCount = 0;
private lastFailureTime = 0;
private state: 'closed' | 'open' | 'half-open' = 'closed';
constructor(
private options: {
failureThreshold: number;
resetTimeout: number;
onStateChange?: (state: 'closed' | 'open' | 'half-open') => void;
}
) {}
async execute<T>(fn: () => Promise<T>): Promise<T> {
if (this.state === 'open') {
if (Date.now() - this.lastFailureTime > this.options.resetTimeout) {
this.setState('half-open');
} else {
throw new Error('Circuit breaker is open');
}
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
private onSuccess(): void {
this.failureCount = 0;
if (this.state !== 'closed') {
this.setState('closed');
}
}
private onFailure(): void {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.failureCount >= this.options.failureThreshold) {
this.setState('open');
}
}
private setState(state: 'closed' | 'open' | 'half-open'): void {
if (this.state !== state) {
this.state = state;
if (this.options.onStateChange) {
this.options.onStateChange(state);
}
}
}
isOpen(): boolean {
return this.state === 'open';
}
getState(): 'closed' | 'open' | 'half-open' {
return this.state;
}
recordSuccess(): void {
this.onSuccess();
}
recordFailure(): void {
this.onFailure();
}
}

270
ts/core/utils/fs-utils.ts Normal file
View File

@ -0,0 +1,270 @@
/**
* Async filesystem utilities for SmartProxy
* Provides non-blocking alternatives to synchronous filesystem operations
*/
import * as plugins from '../../plugins.js';
export class AsyncFileSystem {
/**
* Check if a file or directory exists
* @param path - Path to check
* @returns Promise resolving to true if exists, false otherwise
*/
static async exists(path: string): Promise<boolean> {
try {
await plugins.fs.promises.access(path);
return true;
} catch {
return false;
}
}
/**
* Ensure a directory exists, creating it if necessary
* @param dirPath - Directory path to ensure
* @returns Promise that resolves when directory is ensured
*/
static async ensureDir(dirPath: string): Promise<void> {
await plugins.fs.promises.mkdir(dirPath, { recursive: true });
}
/**
* Read a file as string
* @param filePath - Path to the file
* @param encoding - File encoding (default: utf8)
* @returns Promise resolving to file contents
*/
static async readFile(filePath: string, encoding: BufferEncoding = 'utf8'): Promise<string> {
return plugins.fs.promises.readFile(filePath, encoding);
}
/**
* Read a file as buffer
* @param filePath - Path to the file
* @returns Promise resolving to file buffer
*/
static async readFileBuffer(filePath: string): Promise<Buffer> {
return plugins.fs.promises.readFile(filePath);
}
/**
* Write string data to a file
* @param filePath - Path to the file
* @param data - String data to write
* @param encoding - File encoding (default: utf8)
* @returns Promise that resolves when file is written
*/
static async writeFile(filePath: string, data: string, encoding: BufferEncoding = 'utf8'): Promise<void> {
// Ensure directory exists
const dir = plugins.path.dirname(filePath);
await this.ensureDir(dir);
await plugins.fs.promises.writeFile(filePath, data, encoding);
}
/**
* Write buffer data to a file
* @param filePath - Path to the file
* @param data - Buffer data to write
* @returns Promise that resolves when file is written
*/
static async writeFileBuffer(filePath: string, data: Buffer): Promise<void> {
const dir = plugins.path.dirname(filePath);
await this.ensureDir(dir);
await plugins.fs.promises.writeFile(filePath, data);
}
/**
* Remove a file
* @param filePath - Path to the file
* @returns Promise that resolves when file is removed
*/
static async remove(filePath: string): Promise<void> {
try {
await plugins.fs.promises.unlink(filePath);
} catch (error: any) {
if (error.code !== 'ENOENT') {
throw error;
}
// File doesn't exist, which is fine
}
}
/**
* Remove a directory and all its contents
* @param dirPath - Path to the directory
* @returns Promise that resolves when directory is removed
*/
static async removeDir(dirPath: string): Promise<void> {
try {
await plugins.fs.promises.rm(dirPath, { recursive: true, force: true });
} catch (error: any) {
if (error.code !== 'ENOENT') {
throw error;
}
}
}
/**
* Read JSON from a file
* @param filePath - Path to the JSON file
* @returns Promise resolving to parsed JSON
*/
static async readJSON<T = any>(filePath: string): Promise<T> {
const content = await this.readFile(filePath);
return JSON.parse(content);
}
/**
* Write JSON to a file
* @param filePath - Path to the file
* @param data - Data to write as JSON
* @param pretty - Whether to pretty-print JSON (default: true)
* @returns Promise that resolves when file is written
*/
static async writeJSON(filePath: string, data: any, pretty = true): Promise<void> {
const jsonString = pretty ? JSON.stringify(data, null, 2) : JSON.stringify(data);
await this.writeFile(filePath, jsonString);
}
/**
* Copy a file from source to destination
* @param source - Source file path
* @param destination - Destination file path
* @returns Promise that resolves when file is copied
*/
static async copyFile(source: string, destination: string): Promise<void> {
const destDir = plugins.path.dirname(destination);
await this.ensureDir(destDir);
await plugins.fs.promises.copyFile(source, destination);
}
/**
* Move/rename a file
* @param source - Source file path
* @param destination - Destination file path
* @returns Promise that resolves when file is moved
*/
static async moveFile(source: string, destination: string): Promise<void> {
const destDir = plugins.path.dirname(destination);
await this.ensureDir(destDir);
await plugins.fs.promises.rename(source, destination);
}
/**
* Get file stats
* @param filePath - Path to the file
* @returns Promise resolving to file stats or null if doesn't exist
*/
static async getStats(filePath: string): Promise<plugins.fs.Stats | null> {
try {
return await plugins.fs.promises.stat(filePath);
} catch (error: any) {
if (error.code === 'ENOENT') {
return null;
}
throw error;
}
}
/**
* List files in a directory
* @param dirPath - Directory path
* @returns Promise resolving to array of filenames
*/
static async listFiles(dirPath: string): Promise<string[]> {
try {
return await plugins.fs.promises.readdir(dirPath);
} catch (error: any) {
if (error.code === 'ENOENT') {
return [];
}
throw error;
}
}
/**
* List files in a directory with full paths
* @param dirPath - Directory path
* @returns Promise resolving to array of full file paths
*/
static async listFilesFullPath(dirPath: string): Promise<string[]> {
const files = await this.listFiles(dirPath);
return files.map(file => plugins.path.join(dirPath, file));
}
/**
* Recursively list all files in a directory
* @param dirPath - Directory path
* @param fileList - Accumulator for file list (used internally)
* @returns Promise resolving to array of all file paths
*/
static async listFilesRecursive(dirPath: string, fileList: string[] = []): Promise<string[]> {
const files = await this.listFiles(dirPath);
for (const file of files) {
const filePath = plugins.path.join(dirPath, file);
const stats = await this.getStats(filePath);
if (stats?.isDirectory()) {
await this.listFilesRecursive(filePath, fileList);
} else if (stats?.isFile()) {
fileList.push(filePath);
}
}
return fileList;
}
/**
* Create a read stream for a file
* @param filePath - Path to the file
* @param options - Stream options
* @returns Read stream
*/
static createReadStream(filePath: string, options?: Parameters<typeof plugins.fs.createReadStream>[1]): plugins.fs.ReadStream {
return plugins.fs.createReadStream(filePath, options);
}
/**
* Create a write stream for a file
* @param filePath - Path to the file
* @param options - Stream options
* @returns Write stream
*/
static createWriteStream(filePath: string, options?: Parameters<typeof plugins.fs.createWriteStream>[1]): plugins.fs.WriteStream {
return plugins.fs.createWriteStream(filePath, options);
}
/**
* Ensure a file exists, creating an empty file if necessary
* @param filePath - Path to the file
* @returns Promise that resolves when file is ensured
*/
static async ensureFile(filePath: string): Promise<void> {
const exists = await this.exists(filePath);
if (!exists) {
await this.writeFile(filePath, '');
}
}
/**
* Check if a path is a directory
* @param path - Path to check
* @returns Promise resolving to true if directory, false otherwise
*/
static async isDirectory(path: string): Promise<boolean> {
const stats = await this.getStats(path);
return stats?.isDirectory() ?? false;
}
/**
* Check if a path is a file
* @param path - Path to check
* @returns Promise resolving to true if file, false otherwise
*/
static async isFile(path: string): Promise<boolean> {
const stats = await this.getStats(path);
return stats?.isFile() ?? false;
}
}

View File

@ -13,3 +13,5 @@ export * from './shared-security-manager.js';
export * from './event-system.js';
export * from './websocket-utils.js';
export * from './logger.js';
export * from './async-utils.js';
export * from './fs-utils.js';

View File

@ -4,11 +4,12 @@ import * as fs from 'fs';
import * as http from 'http';
import * as https from 'https';
import * as net from 'net';
import * as path from 'path';
import * as tls from 'tls';
import * as url from 'url';
import * as http2 from 'http2';
export { EventEmitter, fs, http, https, net, tls, url, http2 };
export { EventEmitter, fs, http, https, net, path, tls, url, http2 };
// tsclass scope
import * as tsclass from '@tsclass/tsclass';

View File

@ -2,6 +2,7 @@ import * as plugins from '../../plugins.js';
import * as fs from 'fs';
import * as path from 'path';
import { fileURLToPath } from 'url';
import { AsyncFileSystem } from '../../core/utils/fs-utils.js';
import { type IHttpProxyOptions, type ICertificateEntry, type ILogger, createLogger } from './models/types.js';
import type { IRouteConfig } from '../smart-proxy/models/route-types.js';
@ -17,6 +18,7 @@ export class CertificateManager {
private certificateStoreDir: string;
private logger: ILogger;
private httpsServer: plugins.https.Server | null = null;
private initialized = false;
constructor(private options: IHttpProxyOptions) {
this.certificateStoreDir = path.resolve(options.acme?.certificateStore || './certs');
@ -24,6 +26,15 @@ export class CertificateManager {
this.logger.warn('CertificateManager is deprecated - use SmartCertManager instead');
// Initialize synchronously for backward compatibility but log warning
this.initializeSync();
}
/**
* Synchronous initialization for backward compatibility
* @deprecated This uses sync filesystem operations which block the event loop
*/
private initializeSync(): void {
// Ensure certificate store directory exists
try {
if (!fs.existsSync(this.certificateStoreDir)) {
@ -36,9 +47,28 @@ export class CertificateManager {
this.loadDefaultCertificates();
}
/**
* Async initialization - preferred method
*/
public async initialize(): Promise<void> {
if (this.initialized) return;
// Ensure certificate store directory exists
try {
await AsyncFileSystem.ensureDir(this.certificateStoreDir);
this.logger.info(`Ensured certificate store directory: ${this.certificateStoreDir}`);
} catch (error) {
this.logger.warn(`Failed to create certificate store directory: ${error}`);
}
await this.loadDefaultCertificatesAsync();
this.initialized = true;
}
/**
* Loads default certificates from the filesystem
* @deprecated This uses sync filesystem operations which block the event loop
*/
public loadDefaultCertificates(): void {
const __dirname = path.dirname(fileURLToPath(import.meta.url));
@ -49,7 +79,28 @@ export class CertificateManager {
key: fs.readFileSync(path.join(certPath, 'key.pem'), 'utf8'),
cert: fs.readFileSync(path.join(certPath, 'cert.pem'), 'utf8')
};
this.logger.info('Loaded default certificates from filesystem');
this.logger.info('Loaded default certificates from filesystem (sync - deprecated)');
} catch (error) {
this.logger.error(`Failed to load default certificates: ${error}`);
this.generateSelfSignedCertificate();
}
}
/**
* Loads default certificates from the filesystem asynchronously
*/
public async loadDefaultCertificatesAsync(): Promise<void> {
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const certPath = path.join(__dirname, '..', '..', '..', 'assets', 'certs');
try {
const [key, cert] = await Promise.all([
AsyncFileSystem.readFile(path.join(certPath, 'key.pem')),
AsyncFileSystem.readFile(path.join(certPath, 'cert.pem'))
]);
this.defaultCertificates = { key, cert };
this.logger.info('Loaded default certificates from filesystem (async)');
} catch (error) {
this.logger.error(`Failed to load default certificates: ${error}`);
this.generateSelfSignedCertificate();

View File

@ -3,6 +3,8 @@ import { promisify } from 'util';
import * as fs from 'fs';
import * as path from 'path';
import * as os from 'os';
import { delay } from '../../core/utils/async-utils.js';
import { AsyncFileSystem } from '../../core/utils/fs-utils.js';
import {
NftBaseError,
NftValidationError,
@ -208,7 +210,7 @@ export class NfTablesProxy {
// Wait before retry, unless it's the last attempt
if (i < maxRetries - 1) {
await new Promise(resolve => setTimeout(resolve, retryDelayMs));
await delay(retryDelayMs);
}
}
}
@ -218,8 +220,13 @@ export class NfTablesProxy {
/**
* Execute system command synchronously with multiple attempts
* @deprecated This method blocks the event loop and should be avoided. Use executeWithRetry instead.
* WARNING: This method contains a busy wait loop that will block the entire Node.js event loop!
*/
private executeWithRetrySync(command: string, maxRetries = 3, retryDelayMs = 1000): string {
// Log deprecation warning
console.warn('[DEPRECATION WARNING] executeWithRetrySync blocks the event loop and should not be used. Consider using the async executeWithRetry method instead.');
let lastError: Error | undefined;
for (let i = 0; i < maxRetries; i++) {
@ -231,10 +238,12 @@ export class NfTablesProxy {
// Wait before retry, unless it's the last attempt
if (i < maxRetries - 1) {
// A naive sleep in sync context
// CRITICAL: This busy wait loop blocks the entire event loop!
// This is a temporary fallback for sync contexts only.
// TODO: Remove this method entirely and make all callers async
const waitUntil = Date.now() + retryDelayMs;
while (Date.now() < waitUntil) {
// busy wait - not great, but this is a fallback method
// Busy wait - blocks event loop
}
}
}
@ -243,6 +252,26 @@ export class NfTablesProxy {
throw new NftExecutionError(`Failed after ${maxRetries} attempts: ${lastError?.message || 'Unknown error'}`);
}
/**
* Execute nftables commands with a temporary file
* This helper handles the common pattern of writing rules to a temp file,
* executing nftables with the file, and cleaning up
*/
private async executeWithTempFile(rulesetContent: string): Promise<void> {
await AsyncFileSystem.writeFile(this.tempFilePath, rulesetContent);
try {
await this.executeWithRetry(
`${NfTablesProxy.NFT_CMD} -f ${this.tempFilePath}`,
this.settings.maxRetries,
this.settings.retryDelayMs
);
} finally {
// Always clean up the temp file
await AsyncFileSystem.remove(this.tempFilePath);
}
}
/**
* Checks if nftables is available and the required modules are loaded
*/
@ -545,15 +574,8 @@ export class NfTablesProxy {
// Only write and apply if we have rules to add
if (rulesetContent) {
// Write the ruleset to a temporary file
fs.writeFileSync(this.tempFilePath, rulesetContent);
// Apply the ruleset
await this.executeWithRetry(
`${NfTablesProxy.NFT_CMD} -f ${this.tempFilePath}`,
this.settings.maxRetries,
this.settings.retryDelayMs
);
// Apply the ruleset using the helper
await this.executeWithTempFile(rulesetContent);
this.log('info', `Added source IP filter rules for ${family}`);
@ -566,9 +588,6 @@ export class NfTablesProxy {
await this.verifyRuleApplication(rule);
}
}
// Remove the temporary file
fs.unlinkSync(this.tempFilePath);
}
return true;
@ -663,13 +682,7 @@ export class NfTablesProxy {
// Apply the rules if we have any
if (rulesetContent) {
fs.writeFileSync(this.tempFilePath, rulesetContent);
await this.executeWithRetry(
`${NfTablesProxy.NFT_CMD} -f ${this.tempFilePath}`,
this.settings.maxRetries,
this.settings.retryDelayMs
);
await this.executeWithTempFile(rulesetContent);
this.log('info', `Added advanced NAT rules for ${family}`);
@ -682,9 +695,6 @@ export class NfTablesProxy {
await this.verifyRuleApplication(rule);
}
}
// Remove the temporary file
fs.unlinkSync(this.tempFilePath);
}
}
@ -816,15 +826,8 @@ export class NfTablesProxy {
// Apply the ruleset if we have any rules
if (rulesetContent) {
// Write to temporary file
fs.writeFileSync(this.tempFilePath, rulesetContent);
// Apply the ruleset
await this.executeWithRetry(
`${NfTablesProxy.NFT_CMD} -f ${this.tempFilePath}`,
this.settings.maxRetries,
this.settings.retryDelayMs
);
// Apply the ruleset using the helper
await this.executeWithTempFile(rulesetContent);
this.log('info', `Added port forwarding rules for ${family}`);
@ -837,9 +840,6 @@ export class NfTablesProxy {
await this.verifyRuleApplication(rule);
}
}
// Remove temporary file
fs.unlinkSync(this.tempFilePath);
}
return true;
@ -931,15 +931,7 @@ export class NfTablesProxy {
// Apply the ruleset if we have any rules
if (rulesetContent) {
// Write to temporary file
fs.writeFileSync(this.tempFilePath, rulesetContent);
// Apply the ruleset
await this.executeWithRetry(
`${NfTablesProxy.NFT_CMD} -f ${this.tempFilePath}`,
this.settings.maxRetries,
this.settings.retryDelayMs
);
await this.executeWithTempFile(rulesetContent);
this.log('info', `Added port forwarding rules for ${family}`);
@ -952,9 +944,6 @@ export class NfTablesProxy {
await this.verifyRuleApplication(rule);
}
}
// Remove temporary file
fs.unlinkSync(this.tempFilePath);
}
return true;
@ -1027,15 +1016,8 @@ export class NfTablesProxy {
// Apply the ruleset if we have any rules
if (rulesetContent) {
// Write to temporary file
fs.writeFileSync(this.tempFilePath, rulesetContent);
// Apply the ruleset
await this.executeWithRetry(
`${NfTablesProxy.NFT_CMD} -f ${this.tempFilePath}`,
this.settings.maxRetries,
this.settings.retryDelayMs
);
// Apply the ruleset using the helper
await this.executeWithTempFile(rulesetContent);
this.log('info', `Added QoS rules for ${family}`);
@ -1048,9 +1030,6 @@ export class NfTablesProxy {
await this.verifyRuleApplication(rule);
}
}
// Remove temporary file
fs.unlinkSync(this.tempFilePath);
}
return true;
@ -1615,25 +1594,27 @@ export class NfTablesProxy {
// Apply the ruleset if we have any rules to delete
if (rulesetContent) {
// Write to temporary file
fs.writeFileSync(this.tempFilePath, rulesetContent);
await AsyncFileSystem.writeFile(this.tempFilePath, rulesetContent);
// Apply the ruleset
await this.executeWithRetry(
`${NfTablesProxy.NFT_CMD} -f ${this.tempFilePath}`,
this.settings.maxRetries,
this.settings.retryDelayMs
);
this.log('info', 'Removed all added rules');
// Mark all rules as removed
this.rules.forEach(rule => {
rule.added = false;
rule.verified = false;
});
// Remove temporary file
fs.unlinkSync(this.tempFilePath);
try {
// Apply the ruleset
await this.executeWithRetry(
`${NfTablesProxy.NFT_CMD} -f ${this.tempFilePath}`,
this.settings.maxRetries,
this.settings.retryDelayMs
);
this.log('info', 'Removed all added rules');
// Mark all rules as removed
this.rules.forEach(rule => {
rule.added = false;
rule.verified = false;
});
} finally {
// Remove temporary file
await AsyncFileSystem.remove(this.tempFilePath);
}
}
// Clean up IP sets if we created any
@ -1862,8 +1843,12 @@ export class NfTablesProxy {
/**
* Synchronous version of cleanSlate
* @deprecated This method blocks the event loop and should be avoided. Use cleanSlate() instead.
* WARNING: This method uses execSync which blocks the entire Node.js event loop!
*/
public static cleanSlateSync(): void {
console.warn('[DEPRECATION WARNING] cleanSlateSync blocks the event loop and should not be used. Consider using the async cleanSlate() method instead.');
try {
// Check for rules with our comment pattern
const stdout = execSync(`${NfTablesProxy.NFT_CMD} list ruleset`).toString();

View File

@ -1,36 +1,34 @@
import * as plugins from '../../plugins.js';
import { AsyncFileSystem } from '../../core/utils/fs-utils.js';
import type { ICertificateData } from './certificate-manager.js';
export class CertStore {
constructor(private certDir: string) {}
public async initialize(): Promise<void> {
await plugins.smartfile.fs.ensureDirSync(this.certDir);
await AsyncFileSystem.ensureDir(this.certDir);
}
public async getCertificate(routeName: string): Promise<ICertificateData | null> {
const certPath = this.getCertPath(routeName);
const metaPath = `${certPath}/meta.json`;
if (!await plugins.smartfile.fs.fileExistsSync(metaPath)) {
if (!await AsyncFileSystem.exists(metaPath)) {
return null;
}
try {
const metaFile = await plugins.smartfile.SmartFile.fromFilePath(metaPath);
const meta = JSON.parse(metaFile.contents.toString());
const meta = await AsyncFileSystem.readJSON(metaPath);
const certFile = await plugins.smartfile.SmartFile.fromFilePath(`${certPath}/cert.pem`);
const cert = certFile.contents.toString();
const keyFile = await plugins.smartfile.SmartFile.fromFilePath(`${certPath}/key.pem`);
const key = keyFile.contents.toString();
const [cert, key] = await Promise.all([
AsyncFileSystem.readFile(`${certPath}/cert.pem`),
AsyncFileSystem.readFile(`${certPath}/key.pem`)
]);
let ca: string | undefined;
const caPath = `${certPath}/ca.pem`;
if (await plugins.smartfile.fs.fileExistsSync(caPath)) {
const caFile = await plugins.smartfile.SmartFile.fromFilePath(caPath);
ca = caFile.contents.toString();
if (await AsyncFileSystem.exists(caPath)) {
ca = await AsyncFileSystem.readFile(caPath);
}
return {
@ -51,14 +49,18 @@ export class CertStore {
certData: ICertificateData
): Promise<void> {
const certPath = this.getCertPath(routeName);
await plugins.smartfile.fs.ensureDirSync(certPath);
await AsyncFileSystem.ensureDir(certPath);
// Save certificate files
await plugins.smartfile.memory.toFs(certData.cert, `${certPath}/cert.pem`);
await plugins.smartfile.memory.toFs(certData.key, `${certPath}/key.pem`);
// Save certificate files in parallel
const savePromises = [
AsyncFileSystem.writeFile(`${certPath}/cert.pem`, certData.cert),
AsyncFileSystem.writeFile(`${certPath}/key.pem`, certData.key)
];
if (certData.ca) {
await plugins.smartfile.memory.toFs(certData.ca, `${certPath}/ca.pem`);
savePromises.push(
AsyncFileSystem.writeFile(`${certPath}/ca.pem`, certData.ca)
);
}
// Save metadata
@ -68,13 +70,17 @@ export class CertStore {
savedAt: new Date().toISOString()
};
await plugins.smartfile.memory.toFs(JSON.stringify(meta, null, 2), `${certPath}/meta.json`);
savePromises.push(
AsyncFileSystem.writeJSON(`${certPath}/meta.json`, meta)
);
await Promise.all(savePromises);
}
public async deleteCertificate(routeName: string): Promise<void> {
const certPath = this.getCertPath(routeName);
if (await plugins.smartfile.fs.fileExistsSync(certPath)) {
await plugins.smartfile.fs.removeManySync([certPath]);
if (await AsyncFileSystem.isDirectory(certPath)) {
await AsyncFileSystem.removeDir(certPath);
}
}

View File

@ -5,7 +5,7 @@ import { TimeoutManager } from './timeout-manager.js';
import { logger } from '../../core/utils/logger.js';
/**
* Manages connection lifecycle, tracking, and cleanup
* Manages connection lifecycle, tracking, and cleanup with performance optimizations
*/
export class ConnectionManager {
private connectionRecords: Map<string, IConnectionRecord> = new Map();
@ -13,12 +13,32 @@ export class ConnectionManager {
incoming: Record<string, number>;
outgoing: Record<string, number>;
} = { incoming: {}, outgoing: {} };
// Performance optimization: Track connections needing inactivity check
private nextInactivityCheck: Map<string, number> = new Map();
private inactivityCheckTimer: NodeJS.Timeout | null = null;
// Connection limits
private readonly maxConnections: number;
private readonly cleanupBatchSize: number = 100;
// Cleanup queue for batched processing
private cleanupQueue: Set<string> = new Set();
private cleanupTimer: NodeJS.Timeout | null = null;
constructor(
private settings: ISmartProxyOptions,
private securityManager: SecurityManager,
private timeoutManager: TimeoutManager
) {}
) {
// Set reasonable defaults for connection limits
this.maxConnections = settings.defaults.security.maxConnections
// Start inactivity check timer if not disabled
if (!settings.disableInactivityCheck) {
this.startInactivityCheckTimer();
}
}
/**
* Generate a unique connection ID
@ -31,17 +51,29 @@ export class ConnectionManager {
/**
* Create and track a new connection
*/
public createConnection(socket: plugins.net.Socket): IConnectionRecord {
public createConnection(socket: plugins.net.Socket): IConnectionRecord | null {
// Enforce connection limit
if (this.connectionRecords.size >= this.maxConnections) {
logger.log('warn', `Connection limit reached (${this.maxConnections}). Rejecting new connection.`, {
currentConnections: this.connectionRecords.size,
maxConnections: this.maxConnections,
component: 'connection-manager'
});
socket.destroy();
return null;
}
const connectionId = this.generateConnectionId();
const remoteIP = socket.remoteAddress || '';
const localPort = socket.localPort || 0;
const now = Date.now();
const record: IConnectionRecord = {
id: connectionId,
incoming: socket,
outgoing: null,
incomingStartTime: Date.now(),
lastActivity: Date.now(),
incomingStartTime: now,
lastActivity: now,
connectionClosed: false,
pendingData: [],
pendingDataSize: 0,
@ -70,6 +102,44 @@ export class ConnectionManager {
public trackConnection(connectionId: string, record: IConnectionRecord): void {
this.connectionRecords.set(connectionId, record);
this.securityManager.trackConnectionByIP(record.remoteIP, connectionId);
// Schedule inactivity check
if (!this.settings.disableInactivityCheck) {
this.scheduleInactivityCheck(connectionId, record);
}
}
/**
* Schedule next inactivity check for a connection
*/
private scheduleInactivityCheck(connectionId: string, record: IConnectionRecord): void {
let timeout = this.settings.inactivityTimeout!;
if (record.hasKeepAlive) {
if (this.settings.keepAliveTreatment === 'immortal') {
// Don't schedule check for immortal connections
return;
} else if (this.settings.keepAliveTreatment === 'extended') {
const multiplier = this.settings.keepAliveInactivityMultiplier || 6;
timeout = timeout * multiplier;
}
}
const checkTime = Date.now() + timeout;
this.nextInactivityCheck.set(connectionId, checkTime);
}
/**
* Start the inactivity check timer
*/
private startInactivityCheckTimer(): void {
// Check every 30 seconds for connections that need inactivity check
this.inactivityCheckTimer = setInterval(() => {
this.performOptimizedInactivityCheck();
}, 30000);
// Allow process to exit even with timer
this.inactivityCheckTimer.unref();
}
/**
@ -98,18 +168,69 @@ export class ConnectionManager {
*/
public initiateCleanupOnce(record: IConnectionRecord, reason: string = 'normal'): void {
if (this.settings.enableDetailedLogging) {
logger.log('info', `Connection cleanup initiated`, { connectionId: record.id, remoteIP: record.remoteIP, reason, component: 'connection-manager' });
logger.log('info', `Connection cleanup initiated`, {
connectionId: record.id,
remoteIP: record.remoteIP,
reason,
component: 'connection-manager'
});
}
if (
record.incomingTerminationReason === null ||
record.incomingTerminationReason === undefined
) {
if (record.incomingTerminationReason == null) {
record.incomingTerminationReason = reason;
this.incrementTerminationStat('incoming', reason);
}
this.cleanupConnection(record, reason);
// Add to cleanup queue for batched processing
this.queueCleanup(record.id);
}
/**
* Queue a connection for cleanup
*/
private queueCleanup(connectionId: string): void {
this.cleanupQueue.add(connectionId);
// Process immediately if queue is getting large
if (this.cleanupQueue.size >= this.cleanupBatchSize) {
this.processCleanupQueue();
} else if (!this.cleanupTimer) {
// Otherwise, schedule batch processing
this.cleanupTimer = setTimeout(() => {
this.processCleanupQueue();
}, 100);
this.cleanupTimer.unref();
}
}
/**
* Process the cleanup queue in batches
*/
private processCleanupQueue(): void {
if (this.cleanupTimer) {
clearTimeout(this.cleanupTimer);
this.cleanupTimer = null;
}
const toCleanup = Array.from(this.cleanupQueue).slice(0, this.cleanupBatchSize);
this.cleanupQueue.clear();
for (const connectionId of toCleanup) {
const record = this.connectionRecords.get(connectionId);
if (record) {
this.cleanupConnection(record, record.incomingTerminationReason || 'normal');
}
}
// If there are more in queue, schedule next batch
if (this.cleanupQueue.size > 0) {
this.cleanupTimer = setTimeout(() => {
this.processCleanupQueue();
}, 10);
this.cleanupTimer.unref();
}
}
/**
@ -119,6 +240,9 @@ export class ConnectionManager {
if (!record.connectionClosed) {
record.connectionClosed = true;
// Remove from inactivity check
this.nextInactivityCheck.delete(record.id);
// Track connection termination
this.securityManager.removeConnectionByIP(record.remoteIP, record.id);
@ -127,29 +251,41 @@ export class ConnectionManager {
record.cleanupTimer = undefined;
}
// Detailed logging data
// Calculate metrics once
const duration = Date.now() - record.incomingStartTime;
const bytesReceived = record.bytesReceived;
const bytesSent = record.bytesSent;
const logData = {
connectionId: record.id,
remoteIP: record.remoteIP,
localPort: record.localPort,
reason,
duration: plugins.prettyMs(duration),
bytes: { in: record.bytesReceived, out: record.bytesSent },
tls: record.isTLS,
keepAlive: record.hasKeepAlive,
usingNetworkProxy: record.usingNetworkProxy,
domainSwitches: record.domainSwitches || 0,
component: 'connection-manager'
};
// Remove all data handlers to make sure we clean up properly
if (record.incoming) {
try {
// Remove our safe data handler
record.incoming.removeAllListeners('data');
// Reset the handler references
record.renegotiationHandler = undefined;
} catch (err) {
logger.log('error', `Error removing data handlers for connection ${record.id}: ${err}`, { connectionId: record.id, error: err, component: 'connection-manager' });
logger.log('error', `Error removing data handlers: ${err}`, {
connectionId: record.id,
error: err,
component: 'connection-manager'
});
}
}
// Handle incoming socket
this.cleanupSocket(record, 'incoming', record.incoming);
// Handle socket cleanup without delay
this.cleanupSocketImmediate(record, 'incoming', record.incoming);
// Handle outgoing socket
if (record.outgoing) {
this.cleanupSocket(record, 'outgoing', record.outgoing);
this.cleanupSocketImmediate(record, 'outgoing', record.outgoing);
}
// Clear pendingData to avoid memory leaks
@ -162,28 +298,13 @@ export class ConnectionManager {
// Log connection details
if (this.settings.enableDetailedLogging) {
logger.log('info',
`Connection from ${record.remoteIP} on port ${record.localPort} terminated (${reason}). ` +
`Duration: ${plugins.prettyMs(duration)}, Bytes IN: ${bytesReceived}, OUT: ${bytesSent}, ` +
`TLS: ${record.isTLS ? 'Yes' : 'No'}, Keep-Alive: ${record.hasKeepAlive ? 'Yes' : 'No'}` +
`${record.usingNetworkProxy ? ', Using NetworkProxy' : ''}` +
`${record.domainSwitches ? `, Domain switches: ${record.domainSwitches}` : ''}`,
{
connectionId: record.id,
remoteIP: record.remoteIP,
localPort: record.localPort,
reason,
duration: plugins.prettyMs(duration),
bytes: { in: bytesReceived, out: bytesSent },
tls: record.isTLS,
keepAlive: record.hasKeepAlive,
usingNetworkProxy: record.usingNetworkProxy,
domainSwitches: record.domainSwitches || 0,
component: 'connection-manager'
}
`Connection terminated: ${record.remoteIP}:${record.localPort} (${reason}) - ` +
`${plugins.prettyMs(duration)}, IN: ${record.bytesReceived}B, OUT: ${record.bytesSent}B`,
logData
);
} else {
logger.log('info',
`Connection from ${record.remoteIP} terminated (${reason}). Active connections: ${this.connectionRecords.size}`,
`Connection terminated: ${record.remoteIP} (${reason}). Active: ${this.connectionRecords.size}`,
{
connectionId: record.id,
remoteIP: record.remoteIP,
@ -197,37 +318,20 @@ export class ConnectionManager {
}
/**
* Helper method to clean up a socket
* Helper method to clean up a socket immediately
*/
private cleanupSocket(record: IConnectionRecord, side: 'incoming' | 'outgoing', socket: plugins.net.Socket): void {
private cleanupSocketImmediate(record: IConnectionRecord, side: 'incoming' | 'outgoing', socket: plugins.net.Socket): void {
try {
if (!socket.destroyed) {
// Try graceful shutdown first, then force destroy after a short timeout
socket.end();
const socketTimeout = setTimeout(() => {
try {
if (!socket.destroyed) {
socket.destroy();
}
} catch (err) {
logger.log('error', `Error destroying ${side} socket for connection ${record.id}: ${err}`, { connectionId: record.id, side, error: err, component: 'connection-manager' });
}
}, 1000);
// Ensure the timeout doesn't block Node from exiting
if (socketTimeout.unref) {
socketTimeout.unref();
}
socket.destroy();
}
} catch (err) {
logger.log('error', `Error closing ${side} socket for connection ${record.id}: ${err}`, { connectionId: record.id, side, error: err, component: 'connection-manager' });
try {
if (!socket.destroyed) {
socket.destroy();
}
} catch (destroyErr) {
logger.log('error', `Error destroying ${side} socket for connection ${record.id}: ${destroyErr}`, { connectionId: record.id, side, error: destroyErr, component: 'connection-manager' });
}
logger.log('error', `Error destroying ${side} socket: ${err}`, {
connectionId: record.id,
side,
error: err,
component: 'connection-manager'
});
}
}
@ -238,49 +342,44 @@ export class ConnectionManager {
return (err: Error) => {
const code = (err as any).code;
let reason = 'error';
const now = Date.now();
const connectionDuration = now - record.incomingStartTime;
const lastActivityAge = now - record.lastActivity;
if (code === 'ECONNRESET') {
reason = 'econnreset';
logger.log('warn', `ECONNRESET on ${side} connection from ${record.remoteIP}. Error: ${err.message}. Duration: ${plugins.prettyMs(connectionDuration)}, Last activity: ${plugins.prettyMs(lastActivityAge)}`, {
connectionId: record.id,
side,
remoteIP: record.remoteIP,
error: err.message,
duration: plugins.prettyMs(connectionDuration),
lastActivity: plugins.prettyMs(lastActivityAge),
component: 'connection-manager'
});
} else if (code === 'ETIMEDOUT') {
reason = 'etimedout';
logger.log('warn', `ETIMEDOUT on ${side} connection from ${record.remoteIP}. Error: ${err.message}. Duration: ${plugins.prettyMs(connectionDuration)}, Last activity: ${plugins.prettyMs(lastActivityAge)}`, {
connectionId: record.id,
side,
remoteIP: record.remoteIP,
error: err.message,
duration: plugins.prettyMs(connectionDuration),
lastActivity: plugins.prettyMs(lastActivityAge),
component: 'connection-manager'
});
} else {
logger.log('error', `Error on ${side} connection from ${record.remoteIP}: ${err.message}. Duration: ${plugins.prettyMs(connectionDuration)}, Last activity: ${plugins.prettyMs(lastActivityAge)}`, {
connectionId: record.id,
side,
remoteIP: record.remoteIP,
error: err.message,
duration: plugins.prettyMs(connectionDuration),
lastActivity: plugins.prettyMs(lastActivityAge),
component: 'connection-manager'
});
// Update activity tracking
if (side === 'incoming') {
record.lastActivity = now;
this.scheduleInactivityCheck(record.id, record);
}
if (side === 'incoming' && record.incomingTerminationReason === null) {
const errorData = {
connectionId: record.id,
side,
remoteIP: record.remoteIP,
error: err.message,
duration: plugins.prettyMs(connectionDuration),
lastActivity: plugins.prettyMs(lastActivityAge),
component: 'connection-manager'
};
switch (code) {
case 'ECONNRESET':
reason = 'econnreset';
logger.log('warn', `ECONNRESET on ${side}: ${record.remoteIP}`, errorData);
break;
case 'ETIMEDOUT':
reason = 'etimedout';
logger.log('warn', `ETIMEDOUT on ${side}: ${record.remoteIP}`, errorData);
break;
default:
logger.log('error', `Error on ${side}: ${record.remoteIP} - ${err.message}`, errorData);
}
if (side === 'incoming' && record.incomingTerminationReason == null) {
record.incomingTerminationReason = reason;
this.incrementTerminationStat('incoming', reason);
} else if (side === 'outgoing' && record.outgoingTerminationReason === null) {
} else if (side === 'outgoing' && record.outgoingTerminationReason == null) {
record.outgoingTerminationReason = reason;
this.incrementTerminationStat('outgoing', reason);
}
@ -303,13 +402,12 @@ export class ConnectionManager {
});
}
if (side === 'incoming' && record.incomingTerminationReason === null) {
if (side === 'incoming' && record.incomingTerminationReason == null) {
record.incomingTerminationReason = 'normal';
this.incrementTerminationStat('incoming', 'normal');
} else if (side === 'outgoing' && record.outgoingTerminationReason === null) {
} else if (side === 'outgoing' && record.outgoingTerminationReason == null) {
record.outgoingTerminationReason = 'normal';
this.incrementTerminationStat('outgoing', 'normal');
// Record the time when outgoing socket closed.
record.outgoingClosedTime = Date.now();
}
@ -332,26 +430,29 @@ export class ConnectionManager {
}
/**
* Check for stalled/inactive connections
* Optimized inactivity check - only checks connections that are due
*/
public performInactivityCheck(): void {
private performOptimizedInactivityCheck(): void {
const now = Date.now();
const connectionIds = [...this.connectionRecords.keys()];
const connectionsToCheck: string[] = [];
for (const id of connectionIds) {
const record = this.connectionRecords.get(id);
if (!record) continue;
// Skip inactivity check if disabled or for immortal keep-alive connections
if (
this.settings.disableInactivityCheck ||
(record.hasKeepAlive && this.settings.keepAliveTreatment === 'immortal')
) {
// Find connections that need checking
for (const [connectionId, checkTime] of this.nextInactivityCheck) {
if (checkTime <= now) {
connectionsToCheck.push(connectionId);
}
}
// Process only connections that need checking
for (const connectionId of connectionsToCheck) {
const record = this.connectionRecords.get(connectionId);
if (!record || record.connectionClosed) {
this.nextInactivityCheck.delete(connectionId);
continue;
}
const inactivityTime = now - record.lastActivity;
// Use extended timeout for extended-treatment keep-alive connections
let effectiveTimeout = this.settings.inactivityTimeout!;
if (record.hasKeepAlive && this.settings.keepAliveTreatment === 'extended') {
@ -359,37 +460,37 @@ export class ConnectionManager {
effectiveTimeout = effectiveTimeout * multiplier;
}
if (inactivityTime > effectiveTimeout && !record.connectionClosed) {
if (inactivityTime > effectiveTimeout) {
// For keep-alive connections, issue a warning first
if (record.hasKeepAlive && !record.inactivityWarningIssued) {
logger.log('warn', `Keep-alive connection ${id} from ${record.remoteIP} inactive for ${plugins.prettyMs(inactivityTime)}. Will close in 10 minutes if no activity.`, {
connectionId: id,
logger.log('warn', `Keep-alive connection inactive: ${record.remoteIP}`, {
connectionId,
remoteIP: record.remoteIP,
inactiveFor: plugins.prettyMs(inactivityTime),
closureWarning: '10 minutes',
component: 'connection-manager'
});
// Set warning flag and add grace period
record.inactivityWarningIssued = true;
record.lastActivity = now - (effectiveTimeout - 600000);
// Reschedule check for 10 minutes later
this.nextInactivityCheck.set(connectionId, now + 600000);
// Try to stimulate activity with a probe packet
if (record.outgoing && !record.outgoing.destroyed) {
try {
record.outgoing.write(Buffer.alloc(0));
if (this.settings.enableDetailedLogging) {
logger.log('info', `Sent probe packet to test keep-alive connection ${id}`, { connectionId: id, component: 'connection-manager' });
}
} catch (err) {
logger.log('error', `Error sending probe packet to connection ${id}: ${err}`, { connectionId: id, error: err, component: 'connection-manager' });
logger.log('error', `Error sending probe packet: ${err}`, {
connectionId,
error: err,
component: 'connection-manager'
});
}
}
} else {
// For non-keep-alive or after warning, close the connection
logger.log('warn', `Closing inactive connection ${id} from ${record.remoteIP} (inactive for ${plugins.prettyMs(inactivityTime)}, keep-alive: ${record.hasKeepAlive ? 'Yes' : 'No'})`, {
connectionId: id,
// Close the connection
logger.log('warn', `Closing inactive connection: ${record.remoteIP}`, {
connectionId,
remoteIP: record.remoteIP,
inactiveFor: plugins.prettyMs(inactivityTime),
hasKeepAlive: record.hasKeepAlive,
@ -397,15 +498,9 @@ export class ConnectionManager {
});
this.cleanupConnection(record, 'inactivity');
}
} else if (inactivityTime <= effectiveTimeout && record.inactivityWarningIssued) {
// If activity detected after warning, clear the warning
if (this.settings.enableDetailedLogging) {
logger.log('info', `Connection ${id} activity detected after inactivity warning`, {
connectionId: id,
component: 'connection-manager'
});
}
record.inactivityWarningIssued = false;
} else {
// Reschedule next check
this.scheduleInactivityCheck(connectionId, record);
}
// Parity check: if outgoing socket closed and incoming remains active
@ -415,8 +510,8 @@ export class ConnectionManager {
!record.connectionClosed &&
now - record.outgoingClosedTime > 120000
) {
logger.log('warn', `Parity check: Connection ${id} from ${record.remoteIP} has incoming socket still active ${plugins.prettyMs(now - record.outgoingClosedTime)} after outgoing socket closed`, {
connectionId: id,
logger.log('warn', `Parity check failed: ${record.remoteIP}`, {
connectionId,
remoteIP: record.remoteIP,
timeElapsed: plugins.prettyMs(now - record.outgoingClosedTime),
component: 'connection-manager'
@ -426,68 +521,81 @@ export class ConnectionManager {
}
}
/**
* Legacy method for backward compatibility
*/
public performInactivityCheck(): void {
this.performOptimizedInactivityCheck();
}
/**
* Clear all connections (for shutdown)
*/
public clearConnections(): void {
// Create a copy of the keys to avoid modification during iteration
const connectionIds = [...this.connectionRecords.keys()];
// Stop timers
if (this.inactivityCheckTimer) {
clearInterval(this.inactivityCheckTimer);
this.inactivityCheckTimer = null;
}
// First pass: End all connections gracefully
for (const id of connectionIds) {
const record = this.connectionRecords.get(id);
if (record) {
if (this.cleanupTimer) {
clearTimeout(this.cleanupTimer);
this.cleanupTimer = null;
}
// Process connections in batches to avoid blocking
const connections = Array.from(this.connectionRecords.values());
const batchSize = 100;
let index = 0;
const processBatch = () => {
const batch = connections.slice(index, index + batchSize);
for (const record of batch) {
try {
// Clear any timers
if (record.cleanupTimer) {
clearTimeout(record.cleanupTimer);
record.cleanupTimer = undefined;
}
// End sockets gracefully
if (record.incoming && !record.incoming.destroyed) {
record.incoming.end();
// Immediate destruction
if (record.incoming) {
record.incoming.removeAllListeners();
if (!record.incoming.destroyed) {
record.incoming.destroy();
}
}
if (record.outgoing && !record.outgoing.destroyed) {
record.outgoing.end();
if (record.outgoing) {
record.outgoing.removeAllListeners();
if (!record.outgoing.destroyed) {
record.outgoing.destroy();
}
}
} catch (err) {
logger.log('error', `Error during graceful end of connection ${id}: ${err}`, { connectionId: id, error: err, component: 'connection-manager' });
}
}
}
// Short delay to allow graceful ends to process
setTimeout(() => {
// Second pass: Force destroy everything
for (const id of connectionIds) {
const record = this.connectionRecords.get(id);
if (record) {
try {
// Remove all listeners to prevent memory leaks
if (record.incoming) {
record.incoming.removeAllListeners();
if (!record.incoming.destroyed) {
record.incoming.destroy();
}
}
if (record.outgoing) {
record.outgoing.removeAllListeners();
if (!record.outgoing.destroyed) {
record.outgoing.destroy();
}
}
} catch (err) {
logger.log('error', `Error during forced destruction of connection ${id}: ${err}`, { connectionId: id, error: err, component: 'connection-manager' });
}
logger.log('error', `Error during connection cleanup: ${err}`, {
connectionId: record.id,
error: err,
component: 'connection-manager'
});
}
}
// Clear all maps
this.connectionRecords.clear();
this.terminationStats = { incoming: {}, outgoing: {} };
}, 100);
index += batchSize;
// Continue with next batch if needed
if (index < connections.length) {
setImmediate(processBatch);
} else {
// Clear all maps
this.connectionRecords.clear();
this.nextInactivityCheck.clear();
this.cleanupQueue.clear();
this.terminationStats = { incoming: {}, outgoing: {} };
}
};
// Start batch processing
setImmediate(processBatch);
}
}