refactor(socket-utils): replace direct socket cleanup with centralized cleanupSocket utility across connection management
This commit is contained in:
parent
eb2e67fecc
commit
bed1a76537
@ -1,5 +1,6 @@
|
|||||||
import * as plugins from '../../plugins.js';
|
import * as plugins from '../../plugins.js';
|
||||||
import { type IHttpProxyOptions, type IConnectionEntry, type ILogger, createLogger } from './models/types.js';
|
import { type IHttpProxyOptions, type IConnectionEntry, type ILogger, createLogger } from './models/types.js';
|
||||||
|
import { cleanupSocket } from '../../core/utils/socket-utils.js';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manages a pool of backend connections for efficient reuse
|
* Manages a pool of backend connections for efficient reuse
|
||||||
@ -133,14 +134,7 @@ export class ConnectionPool {
|
|||||||
if ((connection.isIdle && now - connection.lastUsed > idleTimeout) ||
|
if ((connection.isIdle && now - connection.lastUsed > idleTimeout) ||
|
||||||
connections.length > (this.options.connectionPoolSize || 50)) {
|
connections.length > (this.options.connectionPoolSize || 50)) {
|
||||||
|
|
||||||
try {
|
cleanupSocket(connection.socket, `pool-${host}-idle`);
|
||||||
if (!connection.socket.destroyed) {
|
|
||||||
connection.socket.end();
|
|
||||||
connection.socket.destroy();
|
|
||||||
}
|
|
||||||
} catch (err) {
|
|
||||||
this.logger.error(`Error destroying pooled connection to ${host}`, err);
|
|
||||||
}
|
|
||||||
|
|
||||||
connections.shift(); // Remove from pool
|
connections.shift(); // Remove from pool
|
||||||
removed++;
|
removed++;
|
||||||
@ -170,14 +164,7 @@ export class ConnectionPool {
|
|||||||
this.logger.debug(`Closing ${connections.length} connections to ${host}`);
|
this.logger.debug(`Closing ${connections.length} connections to ${host}`);
|
||||||
|
|
||||||
for (const connection of connections) {
|
for (const connection of connections) {
|
||||||
try {
|
cleanupSocket(connection.socket, `pool-${host}-close`);
|
||||||
if (!connection.socket.destroyed) {
|
|
||||||
connection.socket.end();
|
|
||||||
connection.socket.destroy();
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
this.logger.error(`Error closing connection to ${host}:`, error);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,6 +18,7 @@ import { RequestHandler, type IMetricsTracker } from './request-handler.js';
|
|||||||
import { WebSocketHandler } from './websocket-handler.js';
|
import { WebSocketHandler } from './websocket-handler.js';
|
||||||
import { ProxyRouter } from '../../routing/router/index.js';
|
import { ProxyRouter } from '../../routing/router/index.js';
|
||||||
import { RouteRouter } from '../../routing/router/route-router.js';
|
import { RouteRouter } from '../../routing/router/route-router.js';
|
||||||
|
import { cleanupSocket } from '../../core/utils/socket-utils.js';
|
||||||
import { FunctionCache } from './function-cache.js';
|
import { FunctionCache } from './function-cache.js';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -520,11 +521,7 @@ export class HttpProxy implements IMetricsTracker {
|
|||||||
|
|
||||||
// Close all tracked sockets
|
// Close all tracked sockets
|
||||||
for (const socket of this.socketMap.getArray()) {
|
for (const socket of this.socketMap.getArray()) {
|
||||||
try {
|
cleanupSocket(socket, 'http-proxy-stop');
|
||||||
socket.destroy();
|
|
||||||
} catch (error) {
|
|
||||||
this.logger.error('Error destroying socket', error);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close all connection pool connections
|
// Close all connection pool connections
|
||||||
|
@ -4,6 +4,7 @@ import { SecurityManager } from './security-manager.js';
|
|||||||
import { TimeoutManager } from './timeout-manager.js';
|
import { TimeoutManager } from './timeout-manager.js';
|
||||||
import { logger } from '../../core/utils/logger.js';
|
import { logger } from '../../core/utils/logger.js';
|
||||||
import { LifecycleComponent } from '../../core/utils/lifecycle-component.js';
|
import { LifecycleComponent } from '../../core/utils/lifecycle-component.js';
|
||||||
|
import { cleanupSocket } from '../../core/utils/socket-utils.js';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manages connection lifecycle, tracking, and cleanup with performance optimizations
|
* Manages connection lifecycle, tracking, and cleanup with performance optimizations
|
||||||
@ -278,10 +279,10 @@ export class ConnectionManager extends LifecycleComponent {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Handle socket cleanup without delay
|
// Handle socket cleanup without delay
|
||||||
this.cleanupSocketImmediate(record, 'incoming', record.incoming);
|
cleanupSocket(record.incoming, `${record.id}-incoming`);
|
||||||
|
|
||||||
if (record.outgoing) {
|
if (record.outgoing) {
|
||||||
this.cleanupSocketImmediate(record, 'outgoing', record.outgoing);
|
cleanupSocket(record.outgoing, `${record.id}-outgoing`);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear pendingData to avoid memory leaks
|
// Clear pendingData to avoid memory leaks
|
||||||
@ -313,23 +314,6 @@ export class ConnectionManager extends LifecycleComponent {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Helper method to clean up a socket immediately
|
|
||||||
*/
|
|
||||||
private cleanupSocketImmediate(record: IConnectionRecord, side: 'incoming' | 'outgoing', socket: plugins.net.Socket): void {
|
|
||||||
try {
|
|
||||||
if (!socket.destroyed) {
|
|
||||||
socket.destroy();
|
|
||||||
}
|
|
||||||
} catch (err) {
|
|
||||||
logger.log('error', `Error destroying ${side} socket: ${err}`, {
|
|
||||||
connectionId: record.id,
|
|
||||||
side,
|
|
||||||
error: err,
|
|
||||||
component: 'connection-manager'
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a generic error handler for incoming or outgoing sockets
|
* Creates a generic error handler for incoming or outgoing sockets
|
||||||
@ -552,19 +536,13 @@ export class ConnectionManager extends LifecycleComponent {
|
|||||||
record.cleanupTimer = undefined;
|
record.cleanupTimer = undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Immediate destruction
|
// Immediate destruction using socket-utils
|
||||||
if (record.incoming) {
|
if (record.incoming) {
|
||||||
record.incoming.removeAllListeners();
|
cleanupSocket(record.incoming, `${record.id}-incoming-shutdown`);
|
||||||
if (!record.incoming.destroyed) {
|
|
||||||
record.incoming.destroy();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (record.outgoing) {
|
if (record.outgoing) {
|
||||||
record.outgoing.removeAllListeners();
|
cleanupSocket(record.outgoing, `${record.id}-outgoing-shutdown`);
|
||||||
if (!record.outgoing.destroyed) {
|
|
||||||
record.outgoing.destroy();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.log('error', `Error during connection cleanup: ${err}`, {
|
logger.log('error', `Error during connection cleanup: ${err}`, {
|
||||||
|
@ -2,6 +2,7 @@ import * as plugins from '../../plugins.js';
|
|||||||
import type { ISmartProxyOptions } from './models/interfaces.js';
|
import type { ISmartProxyOptions } from './models/interfaces.js';
|
||||||
import { RouteConnectionHandler } from './route-connection-handler.js';
|
import { RouteConnectionHandler } from './route-connection-handler.js';
|
||||||
import { logger } from '../../core/utils/logger.js';
|
import { logger } from '../../core/utils/logger.js';
|
||||||
|
import { cleanupSocket } from '../../core/utils/socket-utils.js';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* PortManager handles the dynamic creation and removal of port listeners
|
* PortManager handles the dynamic creation and removal of port listeners
|
||||||
@ -64,8 +65,7 @@ export class PortManager {
|
|||||||
const server = plugins.net.createServer((socket) => {
|
const server = plugins.net.createServer((socket) => {
|
||||||
// Check if shutting down
|
// Check if shutting down
|
||||||
if (this.isShuttingDown) {
|
if (this.isShuttingDown) {
|
||||||
socket.end();
|
cleanupSocket(socket, 'port-manager-shutdown');
|
||||||
socket.destroy();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -9,7 +9,7 @@ import { TlsManager } from './tls-manager.js';
|
|||||||
import { HttpProxyBridge } from './http-proxy-bridge.js';
|
import { HttpProxyBridge } from './http-proxy-bridge.js';
|
||||||
import { TimeoutManager } from './timeout-manager.js';
|
import { TimeoutManager } from './timeout-manager.js';
|
||||||
import { RouteManager } from './route-manager.js';
|
import { RouteManager } from './route-manager.js';
|
||||||
import type { ForwardingHandler } from '../../forwarding/handlers/base-handler.js';
|
import { cleanupSocket } from '../../core/utils/socket-utils.js';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handles new connection processing and setup logic with support for route-based configuration
|
* Handles new connection processing and setup logic with support for route-based configuration
|
||||||
@ -84,8 +84,7 @@ export class RouteConnectionHandler {
|
|||||||
const ipValidation = this.securityManager.validateIP(remoteIP);
|
const ipValidation = this.securityManager.validateIP(remoteIP);
|
||||||
if (!ipValidation.allowed) {
|
if (!ipValidation.allowed) {
|
||||||
logger.log('warn', `Connection rejected`, { remoteIP, reason: ipValidation.reason, component: 'route-handler' });
|
logger.log('warn', `Connection rejected`, { remoteIP, reason: ipValidation.reason, component: 'route-handler' });
|
||||||
socket.end();
|
cleanupSocket(socket, `rejected-${ipValidation.reason}`);
|
||||||
socket.destroy();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -825,16 +824,16 @@ export class RouteConnectionHandler {
|
|||||||
// Track event listeners added by the handler so we can clean them up
|
// Track event listeners added by the handler so we can clean them up
|
||||||
const originalOn = socket.on.bind(socket);
|
const originalOn = socket.on.bind(socket);
|
||||||
const originalOnce = socket.once.bind(socket);
|
const originalOnce = socket.once.bind(socket);
|
||||||
const trackedListeners: Array<{event: string; listener: Function}> = [];
|
const trackedListeners: Array<{event: string; listener: (...args: any[]) => void}> = [];
|
||||||
|
|
||||||
// Override socket.on to track listeners
|
// Override socket.on to track listeners
|
||||||
socket.on = function(event: string, listener: Function) {
|
socket.on = function(event: string, listener: (...args: any[]) => void) {
|
||||||
trackedListeners.push({event, listener});
|
trackedListeners.push({event, listener});
|
||||||
return originalOn(event, listener);
|
return originalOn(event, listener);
|
||||||
} as any;
|
} as any;
|
||||||
|
|
||||||
// Override socket.once to track listeners
|
// Override socket.once to track listeners
|
||||||
socket.once = function(event: string, listener: Function) {
|
socket.once = function(event: string, listener: (...args: any[]) => void) {
|
||||||
trackedListeners.push({event, listener});
|
trackedListeners.push({event, listener});
|
||||||
return originalOnce(event, listener);
|
return originalOnce(event, listener);
|
||||||
} as any;
|
} as any;
|
||||||
@ -1265,7 +1264,7 @@ export class RouteConnectionHandler {
|
|||||||
connectionId,
|
connectionId,
|
||||||
serverName,
|
serverName,
|
||||||
connInfo,
|
connInfo,
|
||||||
(connectionId, reason) => this.connectionManager.initiateCleanupOnce(record, reason)
|
(_connectionId, reason) => this.connectionManager.initiateCleanupOnce(record, reason)
|
||||||
);
|
);
|
||||||
|
|
||||||
// Store the handler in the connection record so we can remove it during cleanup
|
// Store the handler in the connection record so we can remove it during cleanup
|
||||||
|
Loading…
x
Reference in New Issue
Block a user