Enhance socket cleanup and management for improved connection handling

- Refactor cleanupSocket function to support options for immediate destruction, allowing drain, and grace periods.
- Introduce createIndependentSocketHandlers for better management of half-open connections between client and server sockets.
- Update various handlers (HTTP, HTTPS passthrough, HTTPS terminate) to utilize new cleanup and socket management functions.
- Implement custom timeout handling in socket setup to prevent immediate closure during keep-alive connections.
- Add tests for long-lived connections and half-open connection scenarios to ensure stability and reliability.
- Adjust connection manager to handle socket cleanup based on activity status, improving resource management.
This commit is contained in:
Philipp Kunz 2025-06-01 12:27:15 +00:00
parent 265b80ee04
commit ad80798210
13 changed files with 728 additions and 1223 deletions

View File

@ -1,5 +1,5 @@
{
"expiryDate": "2025-08-30T08:04:36.897Z",
"issueDate": "2025-06-01T08:04:36.897Z",
"savedAt": "2025-06-01T08:04:36.897Z"
"expiryDate": "2025-08-30T08:11:10.101Z",
"issueDate": "2025-06-01T08:11:10.101Z",
"savedAt": "2025-06-01T08:11:10.102Z"
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,192 @@
import { tap, expect } from '@git.zone/tstest/tapbundle';
import * as net from 'net';
import * as tls from 'tls';
import { SmartProxy } from '../ts/index.js';
let testProxy: SmartProxy;
let targetServer: net.Server;
// Create a simple echo server as target
tap.test('setup test environment', async () => {
// Create target server that echoes data back
targetServer = net.createServer((socket) => {
console.log('Target server: client connected');
// Echo data back
socket.on('data', (data) => {
console.log(`Target server received: ${data.toString().trim()}`);
socket.write(data);
});
socket.on('close', () => {
console.log('Target server: client disconnected');
});
});
await new Promise<void>((resolve) => {
targetServer.listen(9876, () => {
console.log('Target server listening on port 9876');
resolve();
});
});
// Create proxy with simple TCP forwarding (no TLS)
testProxy = new SmartProxy({
routes: [{
name: 'tcp-forward-test',
match: {
ports: 8888 // Plain TCP port
},
action: {
type: 'forward',
target: {
host: 'localhost',
port: 9876
}
// No TLS configuration - just plain TCP forwarding
}
}],
defaults: {
target: {
host: 'localhost',
port: 9876
}
},
enableDetailedLogging: true,
keepAliveTreatment: 'extended', // Allow long-lived connections
inactivityTimeout: 3600000, // 1 hour
socketTimeout: 3600000, // 1 hour
keepAlive: true,
keepAliveInitialDelay: 1000
});
await testProxy.start();
});
tap.test('should keep WebSocket-like connection open for extended period', async (tools) => {
tools.timeout(65000); // 65 second test timeout
const client = new net.Socket();
let messagesReceived = 0;
let connectionClosed = false;
// Connect to proxy
await new Promise<void>((resolve, reject) => {
client.connect(8888, 'localhost', () => {
console.log('Client connected to proxy');
resolve();
});
client.on('error', reject);
});
// Set up data handler
client.on('data', (data) => {
console.log(`Client received: ${data.toString().trim()}`);
messagesReceived++;
});
client.on('close', () => {
console.log('Client connection closed');
connectionClosed = true;
});
// Send initial handshake-like data
client.write('HELLO\n');
// Wait for response
await new Promise(resolve => setTimeout(resolve, 100));
expect(messagesReceived).toEqual(1);
// Simulate WebSocket-like keep-alive pattern
// Send periodic messages over 60 seconds
const startTime = Date.now();
const pingInterval = setInterval(() => {
if (!connectionClosed && Date.now() - startTime < 60000) {
console.log('Sending ping...');
client.write('PING\n');
} else {
clearInterval(pingInterval);
}
}, 10000); // Every 10 seconds
// Wait for 61 seconds
await new Promise(resolve => setTimeout(resolve, 61000));
// Clean up interval
clearInterval(pingInterval);
// Connection should still be open
expect(connectionClosed).toEqual(false);
// Should have received responses (1 hello + 6 pings)
expect(messagesReceived).toBeGreaterThan(5);
// Close connection gracefully
client.end();
// Wait for close
await new Promise(resolve => setTimeout(resolve, 100));
expect(connectionClosed).toEqual(true);
});
tap.test('should support half-open connections', async () => {
const client = new net.Socket();
const serverSocket = await new Promise<net.Socket>((resolve) => {
targetServer.once('connection', resolve);
client.connect(8888, 'localhost');
});
let clientClosed = false;
let serverClosed = false;
let serverReceivedData = false;
client.on('close', () => {
clientClosed = true;
});
serverSocket.on('close', () => {
serverClosed = true;
});
serverSocket.on('data', () => {
serverReceivedData = true;
});
// Client sends data then closes write side
client.write('HALF-OPEN TEST\n');
client.end(); // Close write side only
// Wait a bit
await new Promise(resolve => setTimeout(resolve, 500));
// Server should still be able to send data
expect(serverClosed).toEqual(false);
serverSocket.write('RESPONSE\n');
// Wait for data
await new Promise(resolve => setTimeout(resolve, 100));
// Now close server side
serverSocket.end();
// Wait for full close
await new Promise(resolve => setTimeout(resolve, 500));
expect(clientClosed).toEqual(true);
expect(serverClosed).toEqual(true);
expect(serverReceivedData).toEqual(true);
});
tap.test('cleanup', async () => {
await testProxy.stop();
await new Promise<void>((resolve) => {
targetServer.close(() => {
console.log('Target server closed');
resolve();
});
});
});
export default tap.start();

View File

@ -1,27 +1,62 @@
import * as plugins from '../../plugins.js';
export interface CleanupOptions {
immediate?: boolean; // Force immediate destruction
allowDrain?: boolean; // Allow write buffer to drain
gracePeriod?: number; // Ms to wait before force close
}
/**
* Safely cleanup a socket by removing all listeners and destroying it
* @param socket The socket to cleanup
* @param socketName Optional name for logging
* @param options Cleanup options
*/
export function cleanupSocket(socket: plugins.net.Socket | plugins.tls.TLSSocket | null, socketName?: string): void {
if (!socket) return;
export function cleanupSocket(
socket: plugins.net.Socket | plugins.tls.TLSSocket | null,
socketName?: string,
options: CleanupOptions = {}
): Promise<void> {
if (!socket || socket.destroyed) return Promise.resolve();
try {
// Remove all event listeners
socket.removeAllListeners();
return new Promise<void>((resolve) => {
const cleanup = () => {
try {
// Remove all event listeners
socket.removeAllListeners();
// Destroy if not already destroyed
if (!socket.destroyed) {
socket.destroy();
}
} catch (err) {
console.error(`Error cleaning up socket${socketName ? ` (${socketName})` : ''}: ${err}`);
}
resolve();
};
// Unpipe any streams
socket.unpipe();
// Destroy if not already destroyed
if (!socket.destroyed) {
socket.destroy();
if (options.immediate) {
// Immediate cleanup (old behavior)
socket.unpipe();
cleanup();
} else if (options.allowDrain && socket.writable) {
// Allow pending writes to complete
socket.end(() => cleanup());
// Force cleanup after grace period
if (options.gracePeriod) {
setTimeout(() => {
if (!socket.destroyed) {
cleanup();
}
}, options.gracePeriod);
}
} else {
// Default: immediate cleanup
socket.unpipe();
cleanup();
}
} catch (err) {
console.error(`Error cleaning up socket${socketName ? ` (${socketName})` : ''}: ${err}`);
}
});
}
/**
@ -30,6 +65,7 @@ export function cleanupSocket(socket: plugins.net.Socket | plugins.tls.TLSSocket
* @param serverSocket The server socket (optional)
* @param onCleanup Optional callback when cleanup is done
* @returns A cleanup function that can be called multiple times safely
* @deprecated Use createIndependentSocketHandlers for better half-open support
*/
export function createSocketCleanupHandler(
clientSocket: plugins.net.Socket | plugins.tls.TLSSocket,
@ -42,10 +78,10 @@ export function createSocketCleanupHandler(
if (cleanedUp) return;
cleanedUp = true;
// Cleanup both sockets
cleanupSocket(clientSocket, 'client');
// Cleanup both sockets (old behavior - too aggressive)
cleanupSocket(clientSocket, 'client', { immediate: true });
if (serverSocket) {
cleanupSocket(serverSocket, 'server');
cleanupSocket(serverSocket, 'server', { immediate: true });
}
// Call cleanup callback if provided
@ -55,15 +91,79 @@ export function createSocketCleanupHandler(
};
}
/**
* Create independent cleanup handlers for paired sockets that support half-open connections
* @param clientSocket The client socket
* @param serverSocket The server socket
* @param onBothClosed Callback when both sockets are closed
* @returns Independent cleanup functions for each socket
*/
export function createIndependentSocketHandlers(
clientSocket: plugins.net.Socket | plugins.tls.TLSSocket,
serverSocket: plugins.net.Socket | plugins.tls.TLSSocket,
onBothClosed: (reason: string) => void
): { cleanupClient: (reason: string) => Promise<void>, cleanupServer: (reason: string) => Promise<void> } {
let clientClosed = false;
let serverClosed = false;
let clientReason = '';
let serverReason = '';
const checkBothClosed = () => {
if (clientClosed && serverClosed) {
onBothClosed(`client: ${clientReason}, server: ${serverReason}`);
}
};
const cleanupClient = async (reason: string) => {
if (clientClosed) return;
clientClosed = true;
clientReason = reason;
// Allow server to continue if still active
if (!serverClosed && serverSocket.writable) {
// Half-close: stop reading from client, let server finish
clientSocket.pause();
clientSocket.unpipe(serverSocket);
await cleanupSocket(clientSocket, 'client', { allowDrain: true, gracePeriod: 5000 });
} else {
await cleanupSocket(clientSocket, 'client', { immediate: true });
}
checkBothClosed();
};
const cleanupServer = async (reason: string) => {
if (serverClosed) return;
serverClosed = true;
serverReason = reason;
// Allow client to continue if still active
if (!clientClosed && clientSocket.writable) {
// Half-close: stop reading from server, let client finish
serverSocket.pause();
serverSocket.unpipe(clientSocket);
await cleanupSocket(serverSocket, 'server', { allowDrain: true, gracePeriod: 5000 });
} else {
await cleanupSocket(serverSocket, 'server', { immediate: true });
}
checkBothClosed();
};
return { cleanupClient, cleanupServer };
}
/**
* Setup socket error and close handlers with proper cleanup
* @param socket The socket to setup handlers for
* @param handleClose The cleanup function to call
* @param handleTimeout Optional custom timeout handler
* @param errorPrefix Optional prefix for error messages
*/
export function setupSocketHandlers(
socket: plugins.net.Socket | plugins.tls.TLSSocket,
handleClose: (reason: string) => void,
handleTimeout?: (socket: plugins.net.Socket | plugins.tls.TLSSocket) => void,
errorPrefix?: string
): void {
socket.on('error', (error) => {
@ -77,8 +177,12 @@ export function setupSocketHandlers(
});
socket.on('timeout', () => {
const prefix = errorPrefix || 'socket';
handleClose(`${prefix}_timeout`);
if (handleTimeout) {
handleTimeout(socket); // Custom timeout handling
} else {
// Default: just log, don't close
console.warn(`Socket timeout: ${errorPrefix || 'socket'}`);
}
});
}

View File

@ -49,7 +49,12 @@ export class HttpForwardingHandler extends ForwardingHandler {
});
};
setupSocketHandlers(socket, handleClose, 'http');
// Use custom timeout handler that doesn't close the socket
setupSocketHandlers(socket, handleClose, () => {
// For HTTP, we can be more aggressive with timeouts since connections are shorter
// But still don't close immediately - let the connection finish naturally
console.warn(`HTTP socket timeout from ${remoteAddress}`);
}, 'http');
socket.on('error', (error) => {
this.emit(ForwardingHandlerEvents.ERROR, {

View File

@ -2,7 +2,7 @@ import * as plugins from '../../plugins.js';
import { ForwardingHandler } from './base-handler.js';
import type { IForwardConfig } from '../config/forwarding-types.js';
import { ForwardingHandlerEvents } from '../config/forwarding-types.js';
import { createSocketCleanupHandler, setupSocketHandlers, pipeSockets } from '../../core/utils/socket-utils.js';
import { createIndependentSocketHandlers, setupSocketHandlers } from '../../core/utils/socket-utils.js';
/**
* Handler for HTTPS passthrough (SNI forwarding without termination)
@ -55,19 +55,32 @@ export class HttpsPassthroughHandler extends ForwardingHandler {
let bytesSent = 0;
let bytesReceived = 0;
// Create cleanup handler with our utility
const handleClose = createSocketCleanupHandler(clientSocket, serverSocket, (reason) => {
this.emit(ForwardingHandlerEvents.DISCONNECTED, {
remoteAddress,
bytesSent,
bytesReceived,
reason
});
});
// Create independent handlers for half-open connection support
const { cleanupClient, cleanupServer } = createIndependentSocketHandlers(
clientSocket,
serverSocket,
(reason) => {
this.emit(ForwardingHandlerEvents.DISCONNECTED, {
remoteAddress,
bytesSent,
bytesReceived,
reason
});
}
);
// Setup error and close handlers for both sockets
setupSocketHandlers(serverSocket, handleClose, 'server');
setupSocketHandlers(clientSocket, handleClose, 'client');
// Setup handlers with custom timeout handling that doesn't close connections
const timeout = this.getTimeout();
setupSocketHandlers(clientSocket, cleanupClient, (socket) => {
// Just reset timeout, don't close
socket.setTimeout(timeout);
}, 'client');
setupSocketHandlers(serverSocket, cleanupServer, (socket) => {
// Just reset timeout, don't close
socket.setTimeout(timeout);
}, 'server');
// Forward data from client to server
clientSocket.on('data', (data) => {
@ -117,8 +130,7 @@ export class HttpsPassthroughHandler extends ForwardingHandler {
});
});
// Set timeouts
const timeout = this.getTimeout();
// Set initial timeouts - they will be reset on each timeout event
clientSocket.setTimeout(timeout);
serverSocket.setTimeout(timeout);
}
@ -128,7 +140,7 @@ export class HttpsPassthroughHandler extends ForwardingHandler {
* @param req The HTTP request
* @param res The HTTP response
*/
public handleHttpRequest(req: plugins.http.IncomingMessage, res: plugins.http.ServerResponse): void {
public handleHttpRequest(_req: plugins.http.IncomingMessage, res: plugins.http.ServerResponse): void {
// HTTPS passthrough doesn't support HTTP requests
res.writeHead(404, { 'Content-Type': 'text/plain' });
res.end('HTTP not supported for this domain');

View File

@ -112,7 +112,7 @@ export class HttpsTerminateToHttpHandler extends ForwardingHandler {
});
// Set up error handling with our cleanup utility
setupSocketHandlers(tlsSocket, handleClose, 'tls');
setupSocketHandlers(tlsSocket, handleClose, undefined, 'tls');
// Set timeout
const timeout = this.getTimeout();
@ -167,7 +167,7 @@ export class HttpsTerminateToHttpHandler extends ForwardingHandler {
});
// Set up handlers for backend socket
setupSocketHandlers(backendSocket, newHandleClose, 'backend');
setupSocketHandlers(backendSocket, newHandleClose, undefined, 'backend');
backendSocket.on('error', (error) => {
this.emit(ForwardingHandlerEvents.ERROR, {

View File

@ -106,7 +106,7 @@ export class HttpsTerminateToHttpsHandler extends ForwardingHandler {
});
// Set up error handling with our cleanup utility
setupSocketHandlers(tlsSocket, handleClose, 'tls');
setupSocketHandlers(tlsSocket, handleClose, undefined, 'tls');
// Set timeout
const timeout = this.getTimeout();
@ -151,7 +151,7 @@ export class HttpsTerminateToHttpsHandler extends ForwardingHandler {
});
// Set up handlers for backend socket
setupSocketHandlers(backendSocket, newHandleClose, 'backend');
setupSocketHandlers(backendSocket, newHandleClose, undefined, 'backend');
backendSocket.on('error', (error) => {
this.emit(ForwardingHandlerEvents.ERROR, {

View File

@ -134,7 +134,7 @@ export class ConnectionPool {
if ((connection.isIdle && now - connection.lastUsed > idleTimeout) ||
connections.length > (this.options.connectionPoolSize || 50)) {
cleanupSocket(connection.socket, `pool-${host}-idle`);
cleanupSocket(connection.socket, `pool-${host}-idle`, { immediate: true }).catch(() => {});
connections.shift(); // Remove from pool
removed++;
@ -164,7 +164,7 @@ export class ConnectionPool {
this.logger.debug(`Closing ${connections.length} connections to ${host}`);
for (const connection of connections) {
cleanupSocket(connection.socket, `pool-${host}-close`);
cleanupSocket(connection.socket, `pool-${host}-close`, { immediate: true }).catch(() => {});
}
}

View File

@ -520,9 +520,10 @@ export class HttpProxy implements IMetricsTracker {
this.webSocketHandler.shutdown();
// Close all tracked sockets
for (const socket of this.socketMap.getArray()) {
cleanupSocket(socket, 'http-proxy-stop');
}
const socketCleanupPromises = this.socketMap.getArray().map(socket =>
cleanupSocket(socket, 'http-proxy-stop', { immediate: true })
);
await Promise.all(socketCleanupPromises);
// Close all connection pool connections
this.connectionPool.closeAllConnections();

View File

@ -278,12 +278,37 @@ export class ConnectionManager extends LifecycleComponent {
}
}
// Handle socket cleanup without delay
cleanupSocket(record.incoming, `${record.id}-incoming`);
// Handle socket cleanup - check if sockets are still active
const cleanupPromises: Promise<void>[] = [];
if (record.incoming) {
if (!record.incoming.writable || record.incoming.destroyed) {
// Socket is not active, clean up immediately
cleanupPromises.push(cleanupSocket(record.incoming, `${record.id}-incoming`, { immediate: true }));
} else {
// Socket is still active, allow graceful cleanup
cleanupPromises.push(cleanupSocket(record.incoming, `${record.id}-incoming`, { allowDrain: true, gracePeriod: 5000 }));
}
}
if (record.outgoing) {
cleanupSocket(record.outgoing, `${record.id}-outgoing`);
if (!record.outgoing.writable || record.outgoing.destroyed) {
// Socket is not active, clean up immediately
cleanupPromises.push(cleanupSocket(record.outgoing, `${record.id}-outgoing`, { immediate: true }));
} else {
// Socket is still active, allow graceful cleanup
cleanupPromises.push(cleanupSocket(record.outgoing, `${record.id}-outgoing`, { allowDrain: true, gracePeriod: 5000 }));
}
}
// Wait for cleanup to complete
Promise.all(cleanupPromises).catch(err => {
logger.log('error', `Error during socket cleanup: ${err}`, {
connectionId: record.id,
error: err,
component: 'connection-manager'
});
});
// Clear pendingData to avoid memory leaks
record.pendingData = [];
@ -484,19 +509,24 @@ export class ConnectionManager extends LifecycleComponent {
}
// Parity check: if outgoing socket closed and incoming remains active
// Increased from 2 minutes to 30 minutes for long-lived connections
if (
record.outgoingClosedTime &&
!record.incoming.destroyed &&
!record.connectionClosed &&
now - record.outgoingClosedTime > 120000
now - record.outgoingClosedTime > 1800000 // 30 minutes
) {
logger.log('warn', `Parity check failed: ${record.remoteIP}`, {
connectionId,
remoteIP: record.remoteIP,
timeElapsed: plugins.prettyMs(now - record.outgoingClosedTime),
component: 'connection-manager'
});
this.cleanupConnection(record, 'parity_check');
// Only close if no data activity for 10 minutes
if (now - record.lastActivity > 600000) {
logger.log('warn', `Parity check failed after extended timeout: ${record.remoteIP}`, {
connectionId,
remoteIP: record.remoteIP,
timeElapsed: plugins.prettyMs(now - record.outgoingClosedTime),
inactiveFor: plugins.prettyMs(now - record.lastActivity),
component: 'connection-manager'
});
this.cleanupConnection(record, 'parity_check');
}
}
}
}
@ -537,13 +567,18 @@ export class ConnectionManager extends LifecycleComponent {
}
// Immediate destruction using socket-utils
const shutdownPromises: Promise<void>[] = [];
if (record.incoming) {
cleanupSocket(record.incoming, `${record.id}-incoming-shutdown`);
shutdownPromises.push(cleanupSocket(record.incoming, `${record.id}-incoming-shutdown`, { immediate: true }));
}
if (record.outgoing) {
cleanupSocket(record.outgoing, `${record.id}-outgoing-shutdown`);
shutdownPromises.push(cleanupSocket(record.outgoing, `${record.id}-outgoing-shutdown`, { immediate: true }));
}
// Don't wait for shutdown cleanup in this batch processing
Promise.all(shutdownPromises).catch(() => {});
} catch (err) {
logger.log('error', `Error during connection cleanup: ${err}`, {
connectionId: record.id,

View File

@ -65,7 +65,7 @@ export class PortManager {
const server = plugins.net.createServer((socket) => {
// Check if shutting down
if (this.isShuttingDown) {
cleanupSocket(socket, 'port-manager-shutdown');
cleanupSocket(socket, 'port-manager-shutdown', { immediate: true });
return;
}

View File

@ -9,7 +9,7 @@ import { TlsManager } from './tls-manager.js';
import { HttpProxyBridge } from './http-proxy-bridge.js';
import { TimeoutManager } from './timeout-manager.js';
import { RouteManager } from './route-manager.js';
import { cleanupSocket } from '../../core/utils/socket-utils.js';
import { cleanupSocket, createIndependentSocketHandlers, setupSocketHandlers } from '../../core/utils/socket-utils.js';
/**
* Handles new connection processing and setup logic with support for route-based configuration
@ -84,7 +84,7 @@ export class RouteConnectionHandler {
const ipValidation = this.securityManager.validateIP(remoteIP);
if (!ipValidation.allowed) {
logger.log('warn', `Connection rejected`, { remoteIP, reason: ipValidation.reason, component: 'route-handler' });
cleanupSocket(socket, `rejected-${ipValidation.reason}`);
cleanupSocket(socket, `rejected-${ipValidation.reason}`, { immediate: true });
return;
}
@ -1110,9 +1110,8 @@ export class RouteConnectionHandler {
// Setup improved error handling for outgoing connection
this.setupOutgoingErrorHandler(connectionId, targetSocket, record, socket, finalTargetHost, finalTargetPort);
// Setup close handlers
targetSocket.on('close', this.connectionManager.handleClose('outgoing', record));
socket.on('close', this.connectionManager.handleClose('incoming', record));
// Note: Close handlers are managed by independent socket handlers above
// We don't register handleClose here to avoid bilateral cleanup
// Setup error handlers for incoming socket
socket.on('error', this.connectionManager.handleError('incoming', record));
@ -1225,14 +1224,64 @@ export class RouteConnectionHandler {
record.pendingDataSize = 0;
}
// Immediately setup bidirectional piping - much simpler than manual data management
socket.pipe(targetSocket);
targetSocket.pipe(socket);
// Set up independent socket handlers for half-open connection support
const { cleanupClient, cleanupServer } = createIndependentSocketHandlers(
socket,
targetSocket,
(reason) => {
this.connectionManager.initiateCleanupOnce(record, reason);
}
);
// Track incoming data for bytes counting - do this after piping is set up
// Setup socket handlers with custom timeout handling
setupSocketHandlers(socket, cleanupClient, (sock) => {
// Don't close on timeout for keep-alive connections
if (record.hasKeepAlive) {
sock.setTimeout(this.settings.socketTimeout || 3600000);
}
}, 'client');
setupSocketHandlers(targetSocket, cleanupServer, (sock) => {
// Don't close on timeout for keep-alive connections
if (record.hasKeepAlive) {
sock.setTimeout(this.settings.socketTimeout || 3600000);
}
}, 'server');
// Forward data from client to target with backpressure handling
socket.on('data', (chunk: Buffer) => {
record.bytesReceived += chunk.length;
this.timeoutManager.updateActivity(record);
if (targetSocket.writable) {
const flushed = targetSocket.write(chunk);
// Handle backpressure
if (!flushed) {
socket.pause();
targetSocket.once('drain', () => {
socket.resume();
});
}
}
});
// Forward data from target to client with backpressure handling
targetSocket.on('data', (chunk: Buffer) => {
record.bytesSent += chunk.length;
this.timeoutManager.updateActivity(record);
if (socket.writable) {
const flushed = socket.write(chunk);
// Handle backpressure
if (!flushed) {
targetSocket.pause();
socket.once('drain', () => {
targetSocket.resume();
});
}
}
});
// Log successful connection