Enhance connection cleanup and error handling in RouteConnectionHandler

- Implement immediate cleanup for connection failures to prevent leaks
- Add NFTables cleanup on socket close to manage memory usage
- Fix connection limit bypass by checking record after creation
- Introduce tests for rapid connection retries and routing failures
This commit is contained in:
Philipp Kunz 2025-06-01 14:22:06 +00:00
parent facb68a9d0
commit 250eafd36c
3 changed files with 280 additions and 13 deletions

View File

@ -465,3 +465,62 @@ The fix was applied in two places:
2. **SmartProxy route-connection-handler** (`route-connection-handler.ts`) - This is where the actual SmartProxy connection handling happens 2. **SmartProxy route-connection-handler** (`route-connection-handler.ts`) - This is where the actual SmartProxy connection handling happens
The critical fix for SmartProxy was in `setupDirectConnection()` method in route-connection-handler.ts, which now uses `createSocketWithErrorHandler()` to properly handle connection failures and clean up connection records. The critical fix for SmartProxy was in `setupDirectConnection()` method in route-connection-handler.ts, which now uses `createSocketWithErrorHandler()` to properly handle connection failures and clean up connection records.
## Connection Cleanup Improvements (v19.5.12+)
### Issue
Connections were still counting up during rapid retry scenarios, especially when routing failed or backend connections were refused. This was due to:
1. **Delayed Cleanup**: Using `initiateCleanupOnce` queued cleanup operations (batch of 100 every 100ms) instead of immediate cleanup
2. **NFTables Memory Leak**: NFTables connections were never cleaned up, staying in memory forever
3. **Connection Limit Bypass**: When max connections reached, connection record check happened after creation
### Root Cause Analysis
1. **Queued vs Immediate Cleanup**:
- `initiateCleanupOnce()`: Adds to cleanup queue, processes up to 100 connections every 100ms
- `cleanupConnection()`: Immediate synchronous cleanup
- Under rapid retries, connections were created faster than the queue could process them
2. **NFTables Connections**:
- Marked with `usingNetworkProxy = true` but never cleaned up
- Connection records stayed in memory indefinitely
3. **Error Path Cleanup**:
- Many error paths used `socket.end()` (async) followed by cleanup
- Created timing windows where connections weren't fully cleaned
### Solution
1. **Immediate Cleanup**: Changed all error paths from `initiateCleanupOnce()` to `cleanupConnection()` for immediate cleanup
2. **NFTables Cleanup**: Added socket close listener to clean up connection records when NFTables connections close
3. **Connection Limit Fix**: Added null check after `createConnection()` to handle rejection properly
### Changes Made in route-connection-handler.ts
```typescript
// 1. NFTables cleanup (line 551-553)
socket.once('close', () => {
this.connectionManager.cleanupConnection(record, 'nftables_closed');
});
// 2. Connection limit check (line 93-96)
const record = this.connectionManager.createConnection(socket);
if (!record) {
// Connection was rejected due to limit - socket already destroyed
return;
}
// 3. Changed all error paths to use immediate cleanup
// Before: this.connectionManager.initiateCleanupOnce(record, reason)
// After: this.connectionManager.cleanupConnection(record, reason)
```
### Test Coverage
- `test/test.rapid-retry-cleanup.node.ts` - Verifies connection cleanup under rapid retry scenarios
- Test shows connection count stays at 0 even with 20 rapid retries with 50ms intervals
- Confirms both ECONNREFUSED and routing failure scenarios are handled correctly
### Performance Impact
- **Positive**: No more connection accumulation under load
- **Positive**: Immediate cleanup reduces memory usage
- **Consideration**: More frequent cleanup operations, but prevents queue backlog
### Migration Notes
No configuration changes needed. The improvements are automatic and backward compatible.

View File

@ -0,0 +1,201 @@
import { tap, expect } from '@git.zone/tstest/tapbundle';
import * as net from 'net';
import * as plugins from '../ts/plugins.js';
// Import SmartProxy and configurations
import { SmartProxy } from '../ts/index.js';
tap.test('should handle rapid connection retries without leaking connections', async () => {
console.log('\n=== Testing Rapid Connection Retry Cleanup ===');
// Create a SmartProxy instance
const proxy = new SmartProxy({
ports: [8550],
enableDetailedLogging: false,
maxConnectionLifetime: 10000,
socketTimeout: 5000,
routes: [{
name: 'test-route',
match: { ports: 8550 },
action: {
type: 'forward',
target: {
host: 'localhost',
port: 9999 // Non-existent port to force connection failures
}
}
}]
});
// Start the proxy
await proxy.start();
console.log('✓ Proxy started on port 8550');
// Helper to get active connection count
const getActiveConnections = () => {
const connectionManager = (proxy as any).connectionManager;
return connectionManager ? connectionManager.getConnectionCount() : 0;
};
// Track connection counts
const connectionCounts: number[] = [];
const initialCount = getActiveConnections();
console.log(`Initial connection count: ${initialCount}`);
// Simulate rapid retries
const retryCount = 20;
const retryDelay = 50; // 50ms between retries
let successfulConnections = 0;
let failedConnections = 0;
console.log(`\nSimulating ${retryCount} rapid connection attempts...`);
for (let i = 0; i < retryCount; i++) {
await new Promise<void>((resolve) => {
const client = new net.Socket();
client.on('error', () => {
failedConnections++;
client.destroy();
resolve();
});
client.on('close', () => {
resolve();
});
client.connect(8550, 'localhost', () => {
// Send some data to trigger routing
client.write('GET / HTTP/1.1\r\nHost: test.com\r\n\r\n');
successfulConnections++;
});
// Force close after a short time
setTimeout(() => {
if (!client.destroyed) {
client.destroy();
}
}, 100);
});
// Small delay between retries
await new Promise(resolve => setTimeout(resolve, retryDelay));
// Check connection count after each attempt
const currentCount = getActiveConnections();
connectionCounts.push(currentCount);
if ((i + 1) % 5 === 0) {
console.log(`After ${i + 1} attempts: ${currentCount} active connections`);
}
}
console.log(`\nConnection attempts complete:`);
console.log(`- Successful: ${successfulConnections}`);
console.log(`- Failed: ${failedConnections}`);
// Wait a bit for any pending cleanups
console.log('\nWaiting for cleanup...');
await new Promise(resolve => setTimeout(resolve, 1000));
// Check final connection count
const finalCount = getActiveConnections();
console.log(`\nFinal connection count: ${finalCount}`);
// Analyze connection count trend
const maxCount = Math.max(...connectionCounts);
const avgCount = connectionCounts.reduce((a, b) => a + b, 0) / connectionCounts.length;
console.log(`\nConnection count statistics:`);
console.log(`- Maximum: ${maxCount}`);
console.log(`- Average: ${avgCount.toFixed(2)}`);
console.log(`- Initial: ${initialCount}`);
console.log(`- Final: ${finalCount}`);
// Stop the proxy
await proxy.stop();
console.log('\n✓ Proxy stopped');
// Verify results
expect(finalCount).toEqual(initialCount);
expect(maxCount).toBeLessThan(10); // Should not accumulate many connections
console.log('\n✅ PASS: Connection cleanup working correctly under rapid retries!');
});
tap.test('should handle routing failures without leaking connections', async () => {
console.log('\n=== Testing Routing Failure Cleanup ===');
// Create a SmartProxy instance with no routes
const proxy = new SmartProxy({
ports: [8551],
enableDetailedLogging: false,
maxConnectionLifetime: 10000,
socketTimeout: 5000,
routes: [] // No routes - all connections will fail routing
});
// Start the proxy
await proxy.start();
console.log('✓ Proxy started on port 8551 with no routes');
// Helper to get active connection count
const getActiveConnections = () => {
const connectionManager = (proxy as any).connectionManager;
return connectionManager ? connectionManager.getConnectionCount() : 0;
};
const initialCount = getActiveConnections();
console.log(`Initial connection count: ${initialCount}`);
// Create multiple connections that will fail routing
const connectionPromises = [];
for (let i = 0; i < 10; i++) {
connectionPromises.push(new Promise<void>((resolve) => {
const client = new net.Socket();
client.on('error', () => {
client.destroy();
resolve();
});
client.on('close', () => {
resolve();
});
client.connect(8551, 'localhost', () => {
// Send data to trigger routing (which will fail)
client.write('GET / HTTP/1.1\r\nHost: test.com\r\n\r\n');
});
// Force close after a short time
setTimeout(() => {
if (!client.destroyed) {
client.destroy();
}
resolve();
}, 500);
}));
}
// Wait for all connections to complete
await Promise.all(connectionPromises);
console.log('✓ All connection attempts completed');
// Wait for cleanup
await new Promise(resolve => setTimeout(resolve, 500));
const finalCount = getActiveConnections();
console.log(`Final connection count: ${finalCount}`);
// Stop the proxy
await proxy.stop();
console.log('✓ Proxy stopped');
// Verify no connections leaked
expect(finalCount).toEqual(initialCount);
console.log('\n✅ PASS: Routing failures cleaned up correctly!');
});
tap.start();

View File

@ -90,6 +90,10 @@ export class RouteConnectionHandler {
// Create a new connection record // Create a new connection record
const record = this.connectionManager.createConnection(socket); const record = this.connectionManager.createConnection(socket);
if (!record) {
// Connection was rejected due to limit - socket already destroyed by connection manager
return;
}
const connectionId = record.id; const connectionId = record.id;
// Apply socket optimizations // Apply socket optimizations
@ -546,6 +550,12 @@ export class RouteConnectionHandler {
// We don't close the socket - just let it remain open // We don't close the socket - just let it remain open
// The kernel-level NFTables rules will handle the actual forwarding // The kernel-level NFTables rules will handle the actual forwarding
// Set up cleanup when the socket eventually closes
socket.once('close', () => {
this.connectionManager.cleanupConnection(record, 'nftables_closed');
});
return; return;
} }
@ -687,7 +697,7 @@ export class RouteConnectionHandler {
record, record,
initialChunk, initialChunk,
this.settings.httpProxyPort || 8443, this.settings.httpProxyPort || 8443,
(reason) => this.connectionManager.initiateCleanupOnce(record, reason) (reason) => this.connectionManager.cleanupConnection(record, reason)
); );
return; return;
} }
@ -742,7 +752,7 @@ export class RouteConnectionHandler {
record, record,
initialChunk, initialChunk,
this.settings.httpProxyPort || 8443, this.settings.httpProxyPort || 8443,
(reason) => this.connectionManager.initiateCleanupOnce(record, reason) (reason) => this.connectionManager.cleanupConnection(record, reason)
); );
return; return;
} else { } else {
@ -919,6 +929,7 @@ export class RouteConnectionHandler {
/** /**
* Setup improved error handling for the outgoing connection * Setup improved error handling for the outgoing connection
* @deprecated This method is no longer used - error handling is done in createSocketWithErrorHandler
*/ */
private setupOutgoingErrorHandler( private setupOutgoingErrorHandler(
connectionId: string, connectionId: string,
@ -1074,8 +1085,6 @@ export class RouteConnectionHandler {
} }
// Create the target socket with immediate error handling // Create the target socket with immediate error handling
let connectionEstablished = false;
const targetSocket = createSocketWithErrorHandler({ const targetSocket = createSocketWithErrorHandler({
port: finalTargetPort, port: finalTargetPort,
host: finalTargetHost, host: finalTargetHost,
@ -1119,8 +1128,6 @@ export class RouteConnectionHandler {
this.connectionManager.cleanupConnection(record, `connection_failed_${(error as any).code || 'unknown'}`); this.connectionManager.cleanupConnection(record, `connection_failed_${(error as any).code || 'unknown'}`);
}, },
onConnect: () => { onConnect: () => {
connectionEstablished = true;
if (this.settings.enableDetailedLogging) { if (this.settings.enableDetailedLogging) {
logger.log('info', `Connection ${connectionId} established to target ${finalTargetHost}:${finalTargetPort}`, { logger.log('info', `Connection ${connectionId} established to target ${finalTargetHost}:${finalTargetPort}`, {
connectionId, connectionId,
@ -1154,7 +1161,7 @@ export class RouteConnectionHandler {
error: err.message, error: err.message,
component: 'route-handler' component: 'route-handler'
}); });
return this.connectionManager.initiateCleanupOnce(record, 'write_error'); return this.connectionManager.cleanupConnection(record, 'write_error');
} }
}); });
@ -1168,7 +1175,7 @@ export class RouteConnectionHandler {
socket, socket,
targetSocket, targetSocket,
(reason) => { (reason) => {
this.connectionManager.initiateCleanupOnce(record, reason); this.connectionManager.cleanupConnection(record, reason);
} }
); );
@ -1252,7 +1259,7 @@ export class RouteConnectionHandler {
connectionId, connectionId,
serverName, serverName,
connInfo, connInfo,
(_connectionId, reason) => this.connectionManager.initiateCleanupOnce(record, reason) (_connectionId, reason) => this.connectionManager.cleanupConnection(record, reason)
); );
// Store the handler in the connection record so we can remove it during cleanup // Store the handler in the connection record so we can remove it during cleanup
@ -1277,7 +1284,7 @@ export class RouteConnectionHandler {
remoteIP: record.remoteIP, remoteIP: record.remoteIP,
component: 'route-handler' component: 'route-handler'
}); });
this.connectionManager.initiateCleanupOnce(record, reason); this.connectionManager.cleanupConnection(record, reason);
}); });
// Mark TLS handshake as complete for TLS connections // Mark TLS handshake as complete for TLS connections
@ -1348,7 +1355,7 @@ export class RouteConnectionHandler {
record.incomingTerminationReason = 'timeout'; record.incomingTerminationReason = 'timeout';
this.connectionManager.incrementTerminationStat('incoming', 'timeout'); this.connectionManager.incrementTerminationStat('incoming', 'timeout');
} }
this.connectionManager.initiateCleanupOnce(record, 'timeout_incoming'); this.connectionManager.cleanupConnection(record, 'timeout_incoming');
}); });
targetSocket.on('timeout', () => { targetSocket.on('timeout', () => {
@ -1375,7 +1382,7 @@ export class RouteConnectionHandler {
record.outgoingTerminationReason = 'timeout'; record.outgoingTerminationReason = 'timeout';
this.connectionManager.incrementTerminationStat('outgoing', 'timeout'); this.connectionManager.incrementTerminationStat('outgoing', 'timeout');
} }
this.connectionManager.initiateCleanupOnce(record, 'timeout_outgoing'); this.connectionManager.cleanupConnection(record, 'timeout_outgoing');
}); });
// Apply socket timeouts // Apply socket timeouts