diff --git a/readme.hints.md b/readme.hints.md index 4b80e31..e7deae1 100644 --- a/readme.hints.md +++ b/readme.hints.md @@ -582,4 +582,62 @@ onError: (error) => { - **Positive**: Prevents resource exhaustion from rapid reconnection attempts ### Migration Notes -No configuration changes needed. The fix is automatic and backward compatible. \ No newline at end of file +No configuration changes needed. The fix is automatic and backward compatible. + +## Proxy Chain Connection Accumulation Fix (v19.5.14+) + +### Issue +When chaining SmartProxies (Client → SmartProxy1 → SmartProxy2 → Backend), connections would accumulate and never be cleaned up. This was particularly severe when the backend was down or closing connections immediately. + +### Root Cause +The half-open connection support was preventing proper cascade cleanup in proxy chains: +1. Backend closes → SmartProxy2's server socket closes +2. SmartProxy2 keeps client socket open (half-open support) +3. SmartProxy1 never gets notified that downstream is closed +4. Connections accumulate at each proxy in the chain + +The issue was in `createIndependentSocketHandlers()` which waited for BOTH sockets to close before cleanup. + +### Solution +1. **Changed default behavior**: When one socket closes, both close immediately +2. **Made half-open support opt-in**: Only enabled when explicitly requested +3. **Centralized socket handling**: Created `setupBidirectionalForwarding()` for consistent behavior +4. **Applied everywhere**: Updated HttpProxyBridge and route-connection-handler to use centralized handling + +### Changes Made +```typescript +// socket-utils.ts - Default behavior now closes both sockets +export function createIndependentSocketHandlers( + clientSocket, serverSocket, onBothClosed, + options: { enableHalfOpen?: boolean } = {} // Half-open is opt-in +) { + // When server closes, immediately close client (unless half-open enabled) + if (!clientClosed && !options.enableHalfOpen) { + clientSocket.destroy(); + } +} + +// New centralized function for consistent socket pairing +export function setupBidirectionalForwarding( + clientSocket, serverSocket, + handlers: { + onClientData?: (chunk) => void; + onServerData?: (chunk) => void; + onCleanup: (reason) => void; + enableHalfOpen?: boolean; // Default: false + } +) +``` + +### Test Coverage +- `test/test.proxy-chain-simple.node.ts` - Verifies proxy chains don't accumulate connections +- Tests confirm connections stay at 0 even with backend closing immediately +- Works for any proxy chain configuration (not just localhost) + +### Performance Impact +- **Positive**: No more connection accumulation in proxy chains +- **Positive**: Immediate cleanup reduces memory usage +- **Neutral**: Half-open connections still available when needed (opt-in) + +### Migration Notes +No configuration changes needed. The fix applies to all proxy chains automatically. \ No newline at end of file diff --git a/test/test.proxy-chain-simple.node.ts b/test/test.proxy-chain-simple.node.ts new file mode 100644 index 0000000..286b9c9 --- /dev/null +++ b/test/test.proxy-chain-simple.node.ts @@ -0,0 +1,195 @@ +import { tap, expect } from '@git.zone/tstest/tapbundle'; +import * as net from 'net'; +import * as plugins from '../ts/plugins.js'; + +// Import SmartProxy and configurations +import { SmartProxy } from '../ts/index.js'; + +tap.test('simple proxy chain test - identify connection accumulation', async () => { + console.log('\n=== Simple Proxy Chain Test ==='); + console.log('Setup: Client → SmartProxy1 (8590) → SmartProxy2 (8591) → Backend (down)'); + + // Create backend server that accepts and immediately closes connections + const backend = net.createServer((socket) => { + console.log('Backend: Connection received, closing immediately'); + socket.destroy(); + }); + + await new Promise((resolve) => { + backend.listen(9998, () => { + console.log('✓ Backend server started on port 9998 (closes connections immediately)'); + resolve(); + }); + }); + + // Create SmartProxy2 (downstream) + const proxy2 = new SmartProxy({ + ports: [8591], + enableDetailedLogging: true, + socketTimeout: 5000, + routes: [{ + name: 'to-backend', + match: { ports: 8591 }, + action: { + type: 'forward', + target: { + host: 'localhost', + port: 9998 // Backend that closes immediately + } + } + }] + }); + + // Create SmartProxy1 (upstream) + const proxy1 = new SmartProxy({ + ports: [8590], + enableDetailedLogging: true, + socketTimeout: 5000, + routes: [{ + name: 'to-proxy2', + match: { ports: 8590 }, + action: { + type: 'forward', + target: { + host: 'localhost', + port: 8591 // Forward to proxy2 + } + } + }] + }); + + await proxy2.start(); + console.log('✓ SmartProxy2 started on port 8591'); + + await proxy1.start(); + console.log('✓ SmartProxy1 started on port 8590'); + + // Helper to get connection counts + const getConnectionCounts = () => { + const conn1 = (proxy1 as any).connectionManager; + const conn2 = (proxy2 as any).connectionManager; + return { + proxy1: conn1 ? conn1.getConnectionCount() : 0, + proxy2: conn2 ? conn2.getConnectionCount() : 0 + }; + }; + + console.log('\n--- Making 5 sequential connections ---'); + + for (let i = 0; i < 5; i++) { + console.log(`\n=== Connection ${i + 1} ===`); + + const counts = getConnectionCounts(); + console.log(`Before: Proxy1=${counts.proxy1}, Proxy2=${counts.proxy2}`); + + await new Promise((resolve) => { + const client = new net.Socket(); + let dataReceived = false; + + client.on('data', (data) => { + console.log(`Client received data: ${data.toString()}`); + dataReceived = true; + }); + + client.on('error', (err) => { + console.log(`Client error: ${err.code}`); + resolve(); + }); + + client.on('close', () => { + console.log(`Client closed (data received: ${dataReceived})`); + resolve(); + }); + + client.connect(8590, 'localhost', () => { + console.log('Client connected to Proxy1'); + // Send HTTP request + client.write('GET / HTTP/1.1\r\nHost: test.com\r\n\r\n'); + }); + + // Timeout + setTimeout(() => { + if (!client.destroyed) { + console.log('Client timeout, destroying'); + client.destroy(); + } + resolve(); + }, 2000); + }); + + // Wait a bit and check counts + await new Promise(resolve => setTimeout(resolve, 500)); + + const afterCounts = getConnectionCounts(); + console.log(`After: Proxy1=${afterCounts.proxy1}, Proxy2=${afterCounts.proxy2}`); + + if (afterCounts.proxy1 > 0 || afterCounts.proxy2 > 0) { + console.log('⚠️ WARNING: Connections not cleaned up!'); + } + } + + console.log('\n--- Test with backend completely down ---'); + + // Stop backend + backend.close(); + await new Promise(resolve => setTimeout(resolve, 100)); + console.log('✓ Backend stopped'); + + // Make more connections with backend down + for (let i = 0; i < 3; i++) { + console.log(`\n=== Connection ${i + 6} (backend down) ===`); + + const counts = getConnectionCounts(); + console.log(`Before: Proxy1=${counts.proxy1}, Proxy2=${counts.proxy2}`); + + await new Promise((resolve) => { + const client = new net.Socket(); + + client.on('error', () => { + resolve(); + }); + + client.on('close', () => { + resolve(); + }); + + client.connect(8590, 'localhost', () => { + client.write('GET / HTTP/1.1\r\nHost: test.com\r\n\r\n'); + }); + + setTimeout(() => { + if (!client.destroyed) { + client.destroy(); + } + resolve(); + }, 1000); + }); + + await new Promise(resolve => setTimeout(resolve, 500)); + + const afterCounts = getConnectionCounts(); + console.log(`After: Proxy1=${afterCounts.proxy1}, Proxy2=${afterCounts.proxy2}`); + } + + // Final check + console.log('\n--- Final Check ---'); + await new Promise(resolve => setTimeout(resolve, 1000)); + + const finalCounts = getConnectionCounts(); + console.log(`Final counts: Proxy1=${finalCounts.proxy1}, Proxy2=${finalCounts.proxy2}`); + + await proxy1.stop(); + await proxy2.stop(); + + // Verify + if (finalCounts.proxy1 > 0 || finalCounts.proxy2 > 0) { + console.log('\n❌ FAIL: Connections accumulated!'); + } else { + console.log('\n✅ PASS: No connection accumulation'); + } + + expect(finalCounts.proxy1).toEqual(0); + expect(finalCounts.proxy2).toEqual(0); +}); + +tap.start(); \ No newline at end of file diff --git a/test/test.proxy-chaining-accumulation.node.ts b/test/test.proxy-chaining-accumulation.node.ts new file mode 100644 index 0000000..e25f122 --- /dev/null +++ b/test/test.proxy-chaining-accumulation.node.ts @@ -0,0 +1,368 @@ +import { tap, expect } from '@git.zone/tstest/tapbundle'; +import * as net from 'net'; +import * as plugins from '../ts/plugins.js'; + +// Import SmartProxy and configurations +import { SmartProxy } from '../ts/index.js'; + +tap.test('should handle proxy chaining without connection accumulation', async () => { + console.log('\n=== Testing Proxy Chaining Connection Accumulation ==='); + console.log('Setup: Client → SmartProxy1 → SmartProxy2 → Backend (down)'); + + // Create SmartProxy2 (downstream proxy) + const proxy2 = new SmartProxy({ + ports: [8581], + enableDetailedLogging: false, + socketTimeout: 5000, + routes: [{ + name: 'backend-route', + match: { ports: 8581 }, + action: { + type: 'forward', + target: { + host: 'localhost', + port: 9999 // Non-existent backend + } + } + }] + }); + + // Create SmartProxy1 (upstream proxy) + const proxy1 = new SmartProxy({ + ports: [8580], + enableDetailedLogging: false, + socketTimeout: 5000, + routes: [{ + name: 'chain-route', + match: { ports: 8580 }, + action: { + type: 'forward', + target: { + host: 'localhost', + port: 8581 // Forward to proxy2 + } + } + }] + }); + + // Start both proxies + await proxy2.start(); + console.log('✓ SmartProxy2 started on port 8581'); + + await proxy1.start(); + console.log('✓ SmartProxy1 started on port 8580'); + + // Helper to get connection counts + const getConnectionCounts = () => { + const conn1 = (proxy1 as any).connectionManager; + const conn2 = (proxy2 as any).connectionManager; + return { + proxy1: conn1 ? conn1.getConnectionCount() : 0, + proxy2: conn2 ? conn2.getConnectionCount() : 0 + }; + }; + + const initialCounts = getConnectionCounts(); + console.log(`\nInitial connection counts - Proxy1: ${initialCounts.proxy1}, Proxy2: ${initialCounts.proxy2}`); + + // Test 1: Single connection attempt + console.log('\n--- Test 1: Single connection through chain ---'); + + await new Promise((resolve) => { + const client = new net.Socket(); + + client.on('error', (err) => { + console.log(`Client received error: ${err.code}`); + resolve(); + }); + + client.on('close', () => { + console.log('Client connection closed'); + resolve(); + }); + + client.connect(8580, 'localhost', () => { + console.log('Client connected to Proxy1'); + // Send data to trigger routing + client.write('GET / HTTP/1.1\r\nHost: test.com\r\n\r\n'); + }); + + // Timeout + setTimeout(() => { + if (!client.destroyed) { + client.destroy(); + } + resolve(); + }, 1000); + }); + + // Check connections after single attempt + await new Promise(resolve => setTimeout(resolve, 500)); + let counts = getConnectionCounts(); + console.log(`After single connection - Proxy1: ${counts.proxy1}, Proxy2: ${counts.proxy2}`); + + // Test 2: Multiple simultaneous connections + console.log('\n--- Test 2: Multiple simultaneous connections ---'); + + const promises = []; + for (let i = 0; i < 10; i++) { + promises.push(new Promise((resolve) => { + const client = new net.Socket(); + + client.on('error', () => { + resolve(); + }); + + client.on('close', () => { + resolve(); + }); + + client.connect(8580, 'localhost', () => { + // Send data + client.write(`GET /test${i} HTTP/1.1\r\nHost: test.com\r\n\r\n`); + }); + + // Timeout + setTimeout(() => { + if (!client.destroyed) { + client.destroy(); + } + resolve(); + }, 500); + })); + } + + await Promise.all(promises); + console.log('✓ All simultaneous connections completed'); + + // Check connections + counts = getConnectionCounts(); + console.log(`After simultaneous connections - Proxy1: ${counts.proxy1}, Proxy2: ${counts.proxy2}`); + + // Test 3: Rapid serial connections (simulating retries) + console.log('\n--- Test 3: Rapid serial connections (retries) ---'); + + for (let i = 0; i < 20; i++) { + await new Promise((resolve) => { + const client = new net.Socket(); + + client.on('error', () => { + resolve(); + }); + + client.on('close', () => { + resolve(); + }); + + client.connect(8580, 'localhost', () => { + client.write('GET / HTTP/1.1\r\nHost: test.com\r\n\r\n'); + // Quick disconnect to simulate retry behavior + setTimeout(() => client.destroy(), 50); + }); + + // Timeout + setTimeout(() => { + if (!client.destroyed) { + client.destroy(); + } + resolve(); + }, 200); + }); + + if ((i + 1) % 5 === 0) { + counts = getConnectionCounts(); + console.log(`After ${i + 1} retries - Proxy1: ${counts.proxy1}, Proxy2: ${counts.proxy2}`); + } + + // Small delay between retries + await new Promise(resolve => setTimeout(resolve, 50)); + } + + // Test 4: Long-lived connection attempt + console.log('\n--- Test 4: Long-lived connection attempt ---'); + + await new Promise((resolve) => { + const client = new net.Socket(); + + client.on('error', () => { + resolve(); + }); + + client.on('close', () => { + console.log('Long-lived client closed'); + resolve(); + }); + + client.connect(8580, 'localhost', () => { + console.log('Long-lived client connected'); + // Send data periodically + const interval = setInterval(() => { + if (!client.destroyed && client.writable) { + client.write('PING\r\n'); + } else { + clearInterval(interval); + } + }, 100); + + // Close after 2 seconds + setTimeout(() => { + clearInterval(interval); + client.destroy(); + }, 2000); + }); + + // Timeout + setTimeout(() => { + if (!client.destroyed) { + client.destroy(); + } + resolve(); + }, 3000); + }); + + // Final check + await new Promise(resolve => setTimeout(resolve, 1000)); + + const finalCounts = getConnectionCounts(); + console.log(`\nFinal connection counts - Proxy1: ${finalCounts.proxy1}, Proxy2: ${finalCounts.proxy2}`); + + // Monitor for a bit to see if connections are cleaned up + console.log('\nMonitoring connection cleanup...'); + for (let i = 0; i < 3; i++) { + await new Promise(resolve => setTimeout(resolve, 500)); + counts = getConnectionCounts(); + console.log(`After ${(i + 1) * 0.5}s - Proxy1: ${counts.proxy1}, Proxy2: ${counts.proxy2}`); + } + + // Stop proxies + await proxy1.stop(); + console.log('\n✓ SmartProxy1 stopped'); + + await proxy2.stop(); + console.log('✓ SmartProxy2 stopped'); + + // Analysis + console.log('\n=== Analysis ==='); + if (finalCounts.proxy1 > 0 || finalCounts.proxy2 > 0) { + console.log('❌ FAIL: Connections accumulated!'); + console.log(`Proxy1 leaked ${finalCounts.proxy1} connections`); + console.log(`Proxy2 leaked ${finalCounts.proxy2} connections`); + } else { + console.log('✅ PASS: No connection accumulation detected'); + } + + // Verify + expect(finalCounts.proxy1).toEqual(0); + expect(finalCounts.proxy2).toEqual(0); +}); + +tap.test('should handle proxy chain with HTTP traffic', async () => { + console.log('\n=== Testing Proxy Chain with HTTP Traffic ==='); + + // Create SmartProxy2 with HTTP handling + const proxy2 = new SmartProxy({ + ports: [8583], + useHttpProxy: [8583], // Enable HTTP proxy handling + httpProxyPort: 8584, + enableDetailedLogging: false, + routes: [{ + name: 'http-backend', + match: { ports: 8583 }, + action: { + type: 'forward', + target: { + host: 'localhost', + port: 9999 // Non-existent backend + } + } + }] + }); + + // Create SmartProxy1 with HTTP handling + const proxy1 = new SmartProxy({ + ports: [8582], + useHttpProxy: [8582], // Enable HTTP proxy handling + httpProxyPort: 8585, + enableDetailedLogging: false, + routes: [{ + name: 'http-chain', + match: { ports: 8582 }, + action: { + type: 'forward', + target: { + host: 'localhost', + port: 8583 // Forward to proxy2 + } + } + }] + }); + + await proxy2.start(); + console.log('✓ SmartProxy2 (HTTP) started on port 8583'); + + await proxy1.start(); + console.log('✓ SmartProxy1 (HTTP) started on port 8582'); + + // Helper to get connection counts + const getConnectionCounts = () => { + const conn1 = (proxy1 as any).connectionManager; + const conn2 = (proxy2 as any).connectionManager; + return { + proxy1: conn1 ? conn1.getConnectionCount() : 0, + proxy2: conn2 ? conn2.getConnectionCount() : 0 + }; + }; + + console.log('\nSending HTTP requests through chain...'); + + // Make HTTP requests + for (let i = 0; i < 5; i++) { + await new Promise((resolve) => { + const client = new net.Socket(); + let responseData = ''; + + client.on('data', (data) => { + responseData += data.toString(); + // Check if we got a complete HTTP response + if (responseData.includes('\r\n\r\n')) { + console.log(`Response ${i + 1}: ${responseData.split('\r\n')[0]}`); + client.destroy(); + } + }); + + client.on('error', () => { + resolve(); + }); + + client.on('close', () => { + resolve(); + }); + + client.connect(8582, 'localhost', () => { + client.write(`GET /test${i} HTTP/1.1\r\nHost: test.com\r\nConnection: close\r\n\r\n`); + }); + + setTimeout(() => { + if (!client.destroyed) { + client.destroy(); + } + resolve(); + }, 1000); + }); + + await new Promise(resolve => setTimeout(resolve, 100)); + } + + await new Promise(resolve => setTimeout(resolve, 1000)); + + const finalCounts = getConnectionCounts(); + console.log(`\nFinal HTTP proxy counts - Proxy1: ${finalCounts.proxy1}, Proxy2: ${finalCounts.proxy2}`); + + await proxy1.stop(); + await proxy2.stop(); + + expect(finalCounts.proxy1).toEqual(0); + expect(finalCounts.proxy2).toEqual(0); +}); + +tap.start(); \ No newline at end of file diff --git a/ts/core/utils/socket-utils.ts b/ts/core/utils/socket-utils.ts index 8afae91..421d3c6 100644 --- a/ts/core/utils/socket-utils.ts +++ b/ts/core/utils/socket-utils.ts @@ -109,7 +109,8 @@ export function createSocketCleanupHandler( export function createIndependentSocketHandlers( clientSocket: plugins.net.Socket | plugins.tls.TLSSocket, serverSocket: plugins.net.Socket | plugins.tls.TLSSocket, - onBothClosed: (reason: string) => void + onBothClosed: (reason: string) => void, + options: { enableHalfOpen?: boolean } = {} ): { cleanupClient: (reason: string) => Promise, cleanupServer: (reason: string) => Promise } { let clientClosed = false; let serverClosed = false; @@ -127,8 +128,13 @@ export function createIndependentSocketHandlers( clientClosed = true; clientReason = reason; - // Allow server to continue if still active - if (!serverClosed && serverSocket.writable) { + // Default behavior: close both sockets when one closes (required for proxy chains) + if (!serverClosed && !options.enableHalfOpen) { + serverSocket.destroy(); + } + + // Half-open support (opt-in only) + if (!serverClosed && serverSocket.writable && options.enableHalfOpen) { // Half-close: stop reading from client, let server finish clientSocket.pause(); clientSocket.unpipe(serverSocket); @@ -145,8 +151,13 @@ export function createIndependentSocketHandlers( serverClosed = true; serverReason = reason; - // Allow client to continue if still active - if (!clientClosed && clientSocket.writable) { + // Default behavior: close both sockets when one closes (required for proxy chains) + if (!clientClosed && !options.enableHalfOpen) { + clientSocket.destroy(); + } + + // Half-open support (opt-in only) + if (!clientClosed && clientSocket.writable && options.enableHalfOpen) { // Half-close: stop reading from server, let client finish serverSocket.pause(); serverSocket.unpipe(clientSocket); @@ -194,6 +205,79 @@ export function setupSocketHandlers( }); } +/** + * Setup bidirectional data forwarding between two sockets with proper cleanup + * @param clientSocket The client/incoming socket + * @param serverSocket The server/outgoing socket + * @param handlers Object containing optional handlers for data and cleanup + * @returns Cleanup functions for both sockets + */ +export function setupBidirectionalForwarding( + clientSocket: plugins.net.Socket | plugins.tls.TLSSocket, + serverSocket: plugins.net.Socket | plugins.tls.TLSSocket, + handlers: { + onClientData?: (chunk: Buffer) => void; + onServerData?: (chunk: Buffer) => void; + onCleanup: (reason: string) => void; + enableHalfOpen?: boolean; + } +): { cleanupClient: (reason: string) => Promise, cleanupServer: (reason: string) => Promise } { + // Set up cleanup handlers + const { cleanupClient, cleanupServer } = createIndependentSocketHandlers( + clientSocket, + serverSocket, + handlers.onCleanup, + { enableHalfOpen: handlers.enableHalfOpen } + ); + + // Set up error and close handlers + setupSocketHandlers(clientSocket, cleanupClient, undefined, 'client'); + setupSocketHandlers(serverSocket, cleanupServer, undefined, 'server'); + + // Set up data forwarding with backpressure handling + clientSocket.on('data', (chunk: Buffer) => { + if (handlers.onClientData) { + handlers.onClientData(chunk); + } + + if (serverSocket.writable) { + const flushed = serverSocket.write(chunk); + + // Handle backpressure + if (!flushed) { + clientSocket.pause(); + serverSocket.once('drain', () => { + if (!clientSocket.destroyed) { + clientSocket.resume(); + } + }); + } + } + }); + + serverSocket.on('data', (chunk: Buffer) => { + if (handlers.onServerData) { + handlers.onServerData(chunk); + } + + if (clientSocket.writable) { + const flushed = clientSocket.write(chunk); + + // Handle backpressure + if (!flushed) { + serverSocket.pause(); + clientSocket.once('drain', () => { + if (!serverSocket.destroyed) { + serverSocket.resume(); + } + }); + } + } + }); + + return { cleanupClient, cleanupServer }; +} + /** * Pipe two sockets together with proper cleanup on either end * @param socket1 First socket diff --git a/ts/proxies/smart-proxy/http-proxy-bridge.ts b/ts/proxies/smart-proxy/http-proxy-bridge.ts index 4306bb7..2516b7d 100644 --- a/ts/proxies/smart-proxy/http-proxy-bridge.ts +++ b/ts/proxies/smart-proxy/http-proxy-bridge.ts @@ -1,5 +1,6 @@ import * as plugins from '../../plugins.js'; import { HttpProxy } from '../http-proxy/index.js'; +import { setupBidirectionalForwarding } from '../../core/utils/socket-utils.js'; import type { IConnectionRecord, ISmartProxyOptions } from './models/interfaces.js'; import type { IRouteConfig } from './models/route-types.js'; @@ -123,36 +124,25 @@ export class HttpProxyBridge { proxySocket.write(initialChunk); } - // Pipe the sockets together - socket.pipe(proxySocket); - proxySocket.pipe(socket); - - // Handle cleanup - let cleanedUp = false; - const cleanup = (reason: string) => { - if (cleanedUp) return; - cleanedUp = true; - - // Remove all event listeners to prevent memory leaks - socket.removeAllListeners('end'); - socket.removeAllListeners('error'); - proxySocket.removeAllListeners('end'); - proxySocket.removeAllListeners('error'); - - socket.unpipe(proxySocket); - proxySocket.unpipe(socket); - - if (!proxySocket.destroyed) { - proxySocket.destroy(); - } - - cleanupCallback(reason); - }; - - socket.on('end', () => cleanup('socket_end')); - socket.on('error', () => cleanup('socket_error')); - proxySocket.on('end', () => cleanup('proxy_end')); - proxySocket.on('error', () => cleanup('proxy_error')); + // Use centralized bidirectional forwarding + setupBidirectionalForwarding(socket, proxySocket, { + onClientData: (chunk) => { + // Update stats if needed + if (record) { + record.bytesReceived += chunk.length; + } + }, + onServerData: (chunk) => { + // Update stats if needed + if (record) { + record.bytesSent += chunk.length; + } + }, + onCleanup: (reason) => { + cleanupCallback(reason); + }, + enableHalfOpen: false // Close both when one closes (required for proxy chains) + }); } /** diff --git a/ts/proxies/smart-proxy/route-connection-handler.ts b/ts/proxies/smart-proxy/route-connection-handler.ts index 63e427a..d0130ef 100644 --- a/ts/proxies/smart-proxy/route-connection-handler.ts +++ b/ts/proxies/smart-proxy/route-connection-handler.ts @@ -9,7 +9,7 @@ import { TlsManager } from './tls-manager.js'; import { HttpProxyBridge } from './http-proxy-bridge.js'; import { TimeoutManager } from './timeout-manager.js'; import { RouteManager } from './route-manager.js'; -import { cleanupSocket, createIndependentSocketHandlers, setupSocketHandlers, createSocketWithErrorHandler } from '../../core/utils/socket-utils.js'; +import { cleanupSocket, createIndependentSocketHandlers, setupSocketHandlers, createSocketWithErrorHandler, setupBidirectionalForwarding } from '../../core/utils/socket-utils.js'; /** * Handles new connection processing and setup logic with support for route-based configuration @@ -1137,65 +1137,27 @@ export class RouteConnectionHandler { record.pendingDataSize = 0; } - // Set up independent socket handlers for half-open connection support - const { cleanupClient, cleanupServer } = createIndependentSocketHandlers( - socket, - targetSocket, - (reason) => { + // Use centralized bidirectional forwarding setup + setupBidirectionalForwarding(socket, targetSocket, { + onClientData: (chunk) => { + record.bytesReceived += chunk.length; + this.timeoutManager.updateActivity(record); + }, + onServerData: (chunk) => { + record.bytesSent += chunk.length; + this.timeoutManager.updateActivity(record); + }, + onCleanup: (reason) => { this.connectionManager.cleanupConnection(record, reason); - } - ); - - // Setup socket handlers with custom timeout handling - setupSocketHandlers(socket, cleanupClient, (sock) => { - // Don't close on timeout for keep-alive connections - if (record.hasKeepAlive) { - sock.setTimeout(this.settings.socketTimeout || 3600000); - } - }, 'client'); - - setupSocketHandlers(targetSocket, cleanupServer, (sock) => { - // Don't close on timeout for keep-alive connections - if (record.hasKeepAlive) { - sock.setTimeout(this.settings.socketTimeout || 3600000); - } - }, 'server'); - - // Forward data from client to target with backpressure handling - socket.on('data', (chunk: Buffer) => { - record.bytesReceived += chunk.length; - this.timeoutManager.updateActivity(record); - - if (targetSocket.writable) { - const flushed = targetSocket.write(chunk); - - // Handle backpressure - if (!flushed) { - socket.pause(); - targetSocket.once('drain', () => { - socket.resume(); - }); - } - } - }); - - // Forward data from target to client with backpressure handling - targetSocket.on('data', (chunk: Buffer) => { - record.bytesSent += chunk.length; - this.timeoutManager.updateActivity(record); - - if (socket.writable) { - const flushed = socket.write(chunk); - - // Handle backpressure - if (!flushed) { - targetSocket.pause(); - socket.once('drain', () => { - targetSocket.resume(); - }); - } - } + }, + enableHalfOpen: false // Default: close both when one closes (required for proxy chains) }); + + // Apply timeouts if keep-alive is enabled + if (record.hasKeepAlive) { + socket.setTimeout(this.settings.socketTimeout || 3600000); + targetSocket.setTimeout(this.settings.socketTimeout || 3600000); + } // Log successful connection logger.log('info', @@ -1354,11 +1316,5 @@ export class RouteConnectionHandler { // Apply socket timeouts this.timeoutManager.applySocketTimeouts(record); - - // Track outgoing data for bytes counting (moved from the duplicate connect handler) - targetSocket.on('data', (chunk: Buffer) => { - record.bytesSent += chunk.length; - this.timeoutManager.updateActivity(record); - }); } } \ No newline at end of file