Implement proxy chain connection accumulation fix and add comprehensive tests

- Updated socket handling to prevent connection accumulation in chained proxies.
- Introduced centralized bidirectional forwarding for consistent socket management.
- Enhanced cleanup logic to ensure immediate closure of sockets when one closes.
- Added tests to verify connection behavior under various scenarios, including backend failures and rapid reconnections.
This commit is contained in:
2025-06-01 15:10:36 +00:00
parent 3ac3345be8
commit 07f5ceddc4
6 changed files with 751 additions and 100 deletions

View File

@ -109,7 +109,8 @@ export function createSocketCleanupHandler(
export function createIndependentSocketHandlers(
clientSocket: plugins.net.Socket | plugins.tls.TLSSocket,
serverSocket: plugins.net.Socket | plugins.tls.TLSSocket,
onBothClosed: (reason: string) => void
onBothClosed: (reason: string) => void,
options: { enableHalfOpen?: boolean } = {}
): { cleanupClient: (reason: string) => Promise<void>, cleanupServer: (reason: string) => Promise<void> } {
let clientClosed = false;
let serverClosed = false;
@ -127,8 +128,13 @@ export function createIndependentSocketHandlers(
clientClosed = true;
clientReason = reason;
// Allow server to continue if still active
if (!serverClosed && serverSocket.writable) {
// Default behavior: close both sockets when one closes (required for proxy chains)
if (!serverClosed && !options.enableHalfOpen) {
serverSocket.destroy();
}
// Half-open support (opt-in only)
if (!serverClosed && serverSocket.writable && options.enableHalfOpen) {
// Half-close: stop reading from client, let server finish
clientSocket.pause();
clientSocket.unpipe(serverSocket);
@ -145,8 +151,13 @@ export function createIndependentSocketHandlers(
serverClosed = true;
serverReason = reason;
// Allow client to continue if still active
if (!clientClosed && clientSocket.writable) {
// Default behavior: close both sockets when one closes (required for proxy chains)
if (!clientClosed && !options.enableHalfOpen) {
clientSocket.destroy();
}
// Half-open support (opt-in only)
if (!clientClosed && clientSocket.writable && options.enableHalfOpen) {
// Half-close: stop reading from server, let client finish
serverSocket.pause();
serverSocket.unpipe(clientSocket);
@ -194,6 +205,79 @@ export function setupSocketHandlers(
});
}
/**
* Setup bidirectional data forwarding between two sockets with proper cleanup
* @param clientSocket The client/incoming socket
* @param serverSocket The server/outgoing socket
* @param handlers Object containing optional handlers for data and cleanup
* @returns Cleanup functions for both sockets
*/
export function setupBidirectionalForwarding(
clientSocket: plugins.net.Socket | plugins.tls.TLSSocket,
serverSocket: plugins.net.Socket | plugins.tls.TLSSocket,
handlers: {
onClientData?: (chunk: Buffer) => void;
onServerData?: (chunk: Buffer) => void;
onCleanup: (reason: string) => void;
enableHalfOpen?: boolean;
}
): { cleanupClient: (reason: string) => Promise<void>, cleanupServer: (reason: string) => Promise<void> } {
// Set up cleanup handlers
const { cleanupClient, cleanupServer } = createIndependentSocketHandlers(
clientSocket,
serverSocket,
handlers.onCleanup,
{ enableHalfOpen: handlers.enableHalfOpen }
);
// Set up error and close handlers
setupSocketHandlers(clientSocket, cleanupClient, undefined, 'client');
setupSocketHandlers(serverSocket, cleanupServer, undefined, 'server');
// Set up data forwarding with backpressure handling
clientSocket.on('data', (chunk: Buffer) => {
if (handlers.onClientData) {
handlers.onClientData(chunk);
}
if (serverSocket.writable) {
const flushed = serverSocket.write(chunk);
// Handle backpressure
if (!flushed) {
clientSocket.pause();
serverSocket.once('drain', () => {
if (!clientSocket.destroyed) {
clientSocket.resume();
}
});
}
}
});
serverSocket.on('data', (chunk: Buffer) => {
if (handlers.onServerData) {
handlers.onServerData(chunk);
}
if (clientSocket.writable) {
const flushed = clientSocket.write(chunk);
// Handle backpressure
if (!flushed) {
serverSocket.pause();
clientSocket.once('drain', () => {
if (!serverSocket.destroyed) {
serverSocket.resume();
}
});
}
}
});
return { cleanupClient, cleanupServer };
}
/**
* Pipe two sockets together with proper cleanup on either end
* @param socket1 First socket

View File

@ -1,5 +1,6 @@
import * as plugins from '../../plugins.js';
import { HttpProxy } from '../http-proxy/index.js';
import { setupBidirectionalForwarding } from '../../core/utils/socket-utils.js';
import type { IConnectionRecord, ISmartProxyOptions } from './models/interfaces.js';
import type { IRouteConfig } from './models/route-types.js';
@ -123,36 +124,25 @@ export class HttpProxyBridge {
proxySocket.write(initialChunk);
}
// Pipe the sockets together
socket.pipe(proxySocket);
proxySocket.pipe(socket);
// Handle cleanup
let cleanedUp = false;
const cleanup = (reason: string) => {
if (cleanedUp) return;
cleanedUp = true;
// Remove all event listeners to prevent memory leaks
socket.removeAllListeners('end');
socket.removeAllListeners('error');
proxySocket.removeAllListeners('end');
proxySocket.removeAllListeners('error');
socket.unpipe(proxySocket);
proxySocket.unpipe(socket);
if (!proxySocket.destroyed) {
proxySocket.destroy();
}
cleanupCallback(reason);
};
socket.on('end', () => cleanup('socket_end'));
socket.on('error', () => cleanup('socket_error'));
proxySocket.on('end', () => cleanup('proxy_end'));
proxySocket.on('error', () => cleanup('proxy_error'));
// Use centralized bidirectional forwarding
setupBidirectionalForwarding(socket, proxySocket, {
onClientData: (chunk) => {
// Update stats if needed
if (record) {
record.bytesReceived += chunk.length;
}
},
onServerData: (chunk) => {
// Update stats if needed
if (record) {
record.bytesSent += chunk.length;
}
},
onCleanup: (reason) => {
cleanupCallback(reason);
},
enableHalfOpen: false // Close both when one closes (required for proxy chains)
});
}
/**

View File

@ -9,7 +9,7 @@ import { TlsManager } from './tls-manager.js';
import { HttpProxyBridge } from './http-proxy-bridge.js';
import { TimeoutManager } from './timeout-manager.js';
import { RouteManager } from './route-manager.js';
import { cleanupSocket, createIndependentSocketHandlers, setupSocketHandlers, createSocketWithErrorHandler } from '../../core/utils/socket-utils.js';
import { cleanupSocket, createIndependentSocketHandlers, setupSocketHandlers, createSocketWithErrorHandler, setupBidirectionalForwarding } from '../../core/utils/socket-utils.js';
/**
* Handles new connection processing and setup logic with support for route-based configuration
@ -1137,65 +1137,27 @@ export class RouteConnectionHandler {
record.pendingDataSize = 0;
}
// Set up independent socket handlers for half-open connection support
const { cleanupClient, cleanupServer } = createIndependentSocketHandlers(
socket,
targetSocket,
(reason) => {
// Use centralized bidirectional forwarding setup
setupBidirectionalForwarding(socket, targetSocket, {
onClientData: (chunk) => {
record.bytesReceived += chunk.length;
this.timeoutManager.updateActivity(record);
},
onServerData: (chunk) => {
record.bytesSent += chunk.length;
this.timeoutManager.updateActivity(record);
},
onCleanup: (reason) => {
this.connectionManager.cleanupConnection(record, reason);
}
);
// Setup socket handlers with custom timeout handling
setupSocketHandlers(socket, cleanupClient, (sock) => {
// Don't close on timeout for keep-alive connections
if (record.hasKeepAlive) {
sock.setTimeout(this.settings.socketTimeout || 3600000);
}
}, 'client');
setupSocketHandlers(targetSocket, cleanupServer, (sock) => {
// Don't close on timeout for keep-alive connections
if (record.hasKeepAlive) {
sock.setTimeout(this.settings.socketTimeout || 3600000);
}
}, 'server');
// Forward data from client to target with backpressure handling
socket.on('data', (chunk: Buffer) => {
record.bytesReceived += chunk.length;
this.timeoutManager.updateActivity(record);
if (targetSocket.writable) {
const flushed = targetSocket.write(chunk);
// Handle backpressure
if (!flushed) {
socket.pause();
targetSocket.once('drain', () => {
socket.resume();
});
}
}
});
// Forward data from target to client with backpressure handling
targetSocket.on('data', (chunk: Buffer) => {
record.bytesSent += chunk.length;
this.timeoutManager.updateActivity(record);
if (socket.writable) {
const flushed = socket.write(chunk);
// Handle backpressure
if (!flushed) {
targetSocket.pause();
socket.once('drain', () => {
targetSocket.resume();
});
}
}
},
enableHalfOpen: false // Default: close both when one closes (required for proxy chains)
});
// Apply timeouts if keep-alive is enabled
if (record.hasKeepAlive) {
socket.setTimeout(this.settings.socketTimeout || 3600000);
targetSocket.setTimeout(this.settings.socketTimeout || 3600000);
}
// Log successful connection
logger.log('info',
@ -1354,11 +1316,5 @@ export class RouteConnectionHandler {
// Apply socket timeouts
this.timeoutManager.applySocketTimeouts(record);
// Track outgoing data for bytes counting (moved from the duplicate connect handler)
targetSocket.on('data', (chunk: Buffer) => {
record.bytesSent += chunk.length;
this.timeoutManager.updateActivity(record);
});
}
}