Compare commits
12 Commits
Author | SHA1 | Date | |
---|---|---|---|
2a75a86d73 | |||
250eafd36c | |||
facb68a9d0 | |||
23898c1577 | |||
2d240671ab | |||
705a59413d | |||
e9723a8af9 | |||
300ab1a077 | |||
900942a263 | |||
d45485985a | |||
9fdc2d5069 | |||
37c87e8450 |
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartproxy",
|
||||
"version": "19.5.9",
|
||||
"version": "19.5.16",
|
||||
"private": false,
|
||||
"description": "A powerful proxy package with unified route-based configuration for high traffic management. Features include SSL/TLS support, flexible routing patterns, WebSocket handling, advanced security options, and automatic ACME certificate management.",
|
||||
"main": "dist_ts/index.js",
|
||||
|
112
readme.hints.md
112
readme.hints.md
@ -413,4 +413,114 @@ const routes: IRouteConfig[] = [{
|
||||
### 7. Next Steps (Remaining Work)
|
||||
- **Phase 2 (cont)**: Migrate components to use LifecycleComponent
|
||||
- **Phase 3**: Add worker threads for CPU-intensive operations
|
||||
- **Phase 4**: Performance monitoring dashboard
|
||||
- **Phase 4**: Performance monitoring dashboard
|
||||
|
||||
## Socket Error Handling Fix (v19.5.11+)
|
||||
|
||||
### Issue
|
||||
Server crashed with unhandled 'error' event when backend connections failed (ECONNREFUSED). Also caused memory leak with rising active connection count as failed connections weren't cleaned up properly.
|
||||
|
||||
### Root Cause
|
||||
1. **Race Condition**: In forwarding handlers, sockets were created with `net.connect()` but error handlers were attached later, creating a window where errors could crash the server
|
||||
2. **Incomplete Cleanup**: When server connections failed, client sockets weren't properly cleaned up, leaving connection records in memory
|
||||
|
||||
### Solution
|
||||
Created `createSocketWithErrorHandler()` utility that attaches error handlers immediately:
|
||||
```typescript
|
||||
// Before (race condition):
|
||||
const socket = net.connect(port, host);
|
||||
// ... other code ...
|
||||
socket.on('error', handler); // Too late!
|
||||
|
||||
// After (safe):
|
||||
const socket = createSocketWithErrorHandler({
|
||||
port, host,
|
||||
onError: (error) => {
|
||||
// Handle error immediately
|
||||
clientSocket.destroy();
|
||||
},
|
||||
onConnect: () => {
|
||||
// Set up forwarding
|
||||
}
|
||||
});
|
||||
```
|
||||
|
||||
### Changes Made
|
||||
1. **New Utility**: `ts/core/utils/socket-utils.ts` - Added `createSocketWithErrorHandler()`
|
||||
2. **Updated Handlers**:
|
||||
- `https-passthrough-handler.ts` - Uses safe socket creation
|
||||
- `https-terminate-to-http-handler.ts` - Uses safe socket creation
|
||||
3. **Connection Cleanup**: Client sockets destroyed immediately on server connection failure
|
||||
|
||||
### Test Coverage
|
||||
- `test/test.socket-error-handling.node.ts` - Verifies server doesn't crash on ECONNREFUSED
|
||||
- `test/test.forwarding-error-fix.node.ts` - Tests forwarding handlers handle errors gracefully
|
||||
|
||||
### Configuration
|
||||
No configuration changes needed. The fix is transparent to users.
|
||||
|
||||
### Important Note
|
||||
The fix was applied in two places:
|
||||
1. **ForwardingHandler classes** (`https-passthrough-handler.ts`, etc.) - These are standalone forwarding utilities
|
||||
2. **SmartProxy route-connection-handler** (`route-connection-handler.ts`) - This is where the actual SmartProxy connection handling happens
|
||||
|
||||
The critical fix for SmartProxy was in `setupDirectConnection()` method in route-connection-handler.ts, which now uses `createSocketWithErrorHandler()` to properly handle connection failures and clean up connection records.
|
||||
|
||||
## Connection Cleanup Improvements (v19.5.12+)
|
||||
|
||||
### Issue
|
||||
Connections were still counting up during rapid retry scenarios, especially when routing failed or backend connections were refused. This was due to:
|
||||
1. **Delayed Cleanup**: Using `initiateCleanupOnce` queued cleanup operations (batch of 100 every 100ms) instead of immediate cleanup
|
||||
2. **NFTables Memory Leak**: NFTables connections were never cleaned up, staying in memory forever
|
||||
3. **Connection Limit Bypass**: When max connections reached, connection record check happened after creation
|
||||
|
||||
### Root Cause Analysis
|
||||
1. **Queued vs Immediate Cleanup**:
|
||||
- `initiateCleanupOnce()`: Adds to cleanup queue, processes up to 100 connections every 100ms
|
||||
- `cleanupConnection()`: Immediate synchronous cleanup
|
||||
- Under rapid retries, connections were created faster than the queue could process them
|
||||
|
||||
2. **NFTables Connections**:
|
||||
- Marked with `usingNetworkProxy = true` but never cleaned up
|
||||
- Connection records stayed in memory indefinitely
|
||||
|
||||
3. **Error Path Cleanup**:
|
||||
- Many error paths used `socket.end()` (async) followed by cleanup
|
||||
- Created timing windows where connections weren't fully cleaned
|
||||
|
||||
### Solution
|
||||
1. **Immediate Cleanup**: Changed all error paths from `initiateCleanupOnce()` to `cleanupConnection()` for immediate cleanup
|
||||
2. **NFTables Cleanup**: Added socket close listener to clean up connection records when NFTables connections close
|
||||
3. **Connection Limit Fix**: Added null check after `createConnection()` to handle rejection properly
|
||||
|
||||
### Changes Made in route-connection-handler.ts
|
||||
```typescript
|
||||
// 1. NFTables cleanup (line 551-553)
|
||||
socket.once('close', () => {
|
||||
this.connectionManager.cleanupConnection(record, 'nftables_closed');
|
||||
});
|
||||
|
||||
// 2. Connection limit check (line 93-96)
|
||||
const record = this.connectionManager.createConnection(socket);
|
||||
if (!record) {
|
||||
// Connection was rejected due to limit - socket already destroyed
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. Changed all error paths to use immediate cleanup
|
||||
// Before: this.connectionManager.initiateCleanupOnce(record, reason)
|
||||
// After: this.connectionManager.cleanupConnection(record, reason)
|
||||
```
|
||||
|
||||
### Test Coverage
|
||||
- `test/test.rapid-retry-cleanup.node.ts` - Verifies connection cleanup under rapid retry scenarios
|
||||
- Test shows connection count stays at 0 even with 20 rapid retries with 50ms intervals
|
||||
- Confirms both ECONNREFUSED and routing failure scenarios are handled correctly
|
||||
|
||||
### Performance Impact
|
||||
- **Positive**: No more connection accumulation under load
|
||||
- **Positive**: Immediate cleanup reduces memory usage
|
||||
- **Consideration**: More frequent cleanup operations, but prevents queue backlog
|
||||
|
||||
### Migration Notes
|
||||
No configuration changes needed. The improvements are automatic and backward compatible.
|
337
readme.plan.md
337
readme.plan.md
@ -1,337 +0,0 @@
|
||||
# SmartProxy Socket Cleanup Fix Plan
|
||||
|
||||
## Problem Summary
|
||||
|
||||
The current socket cleanup implementation is too aggressive and closes long-lived connections prematurely. This affects:
|
||||
- WebSocket connections in HTTPS passthrough
|
||||
- Long-lived HTTP connections (SSE, streaming)
|
||||
- Database connections
|
||||
- Any connection that should remain open for hours
|
||||
|
||||
## Root Causes
|
||||
|
||||
### 1. **Bilateral Socket Cleanup**
|
||||
When one socket closes, both sockets are immediately destroyed:
|
||||
```typescript
|
||||
// In createSocketCleanupHandler
|
||||
cleanupSocket(clientSocket, 'client');
|
||||
cleanupSocket(serverSocket, 'server'); // Both destroyed together!
|
||||
```
|
||||
|
||||
### 2. **Aggressive Timeout Handling**
|
||||
Timeout events immediately trigger connection cleanup:
|
||||
```typescript
|
||||
socket.on('timeout', () => {
|
||||
handleClose(`${prefix}_timeout`); // Destroys both sockets!
|
||||
});
|
||||
```
|
||||
|
||||
### 3. **Parity Check Forces Closure**
|
||||
If one socket closes but the other remains open for >2 minutes, connection is forcefully terminated:
|
||||
```typescript
|
||||
if (record.outgoingClosedTime &&
|
||||
!record.incoming.destroyed &&
|
||||
now - record.outgoingClosedTime > 120000) {
|
||||
this.cleanupConnection(record, 'parity_check');
|
||||
}
|
||||
```
|
||||
|
||||
### 4. **No Half-Open Connection Support**
|
||||
The proxy doesn't support TCP half-open connections where one side closes while the other continues sending.
|
||||
|
||||
## Fix Implementation Plan
|
||||
|
||||
### Phase 1: Fix Socket Cleanup (Prevent Premature Closure)
|
||||
|
||||
#### 1.1 Modify `cleanupSocket()` to support graceful shutdown
|
||||
```typescript
|
||||
export interface CleanupOptions {
|
||||
immediate?: boolean; // Force immediate destruction
|
||||
allowDrain?: boolean; // Allow write buffer to drain
|
||||
gracePeriod?: number; // Ms to wait before force close
|
||||
}
|
||||
|
||||
export function cleanupSocket(
|
||||
socket: Socket | TLSSocket | null,
|
||||
socketName?: string,
|
||||
options: CleanupOptions = {}
|
||||
): Promise<void> {
|
||||
if (!socket || socket.destroyed) return Promise.resolve();
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
const cleanup = () => {
|
||||
socket.removeAllListeners();
|
||||
if (!socket.destroyed) {
|
||||
socket.destroy();
|
||||
}
|
||||
resolve();
|
||||
};
|
||||
|
||||
if (options.immediate) {
|
||||
cleanup();
|
||||
} else if (options.allowDrain && socket.writable) {
|
||||
// Allow pending writes to complete
|
||||
socket.end(() => cleanup());
|
||||
|
||||
// Force cleanup after grace period
|
||||
if (options.gracePeriod) {
|
||||
setTimeout(cleanup, options.gracePeriod);
|
||||
}
|
||||
} else {
|
||||
cleanup();
|
||||
}
|
||||
});
|
||||
}
|
||||
```
|
||||
|
||||
#### 1.2 Implement Independent Socket Tracking
|
||||
```typescript
|
||||
export function createIndependentSocketHandlers(
|
||||
clientSocket: Socket,
|
||||
serverSocket: Socket,
|
||||
onBothClosed: (reason: string) => void
|
||||
): { cleanupClient: () => void, cleanupServer: () => void } {
|
||||
let clientClosed = false;
|
||||
let serverClosed = false;
|
||||
let clientReason = '';
|
||||
let serverReason = '';
|
||||
|
||||
const checkBothClosed = () => {
|
||||
if (clientClosed && serverClosed) {
|
||||
onBothClosed(`client: ${clientReason}, server: ${serverReason}`);
|
||||
}
|
||||
};
|
||||
|
||||
const cleanupClient = async (reason: string) => {
|
||||
if (clientClosed) return;
|
||||
clientClosed = true;
|
||||
clientReason = reason;
|
||||
|
||||
// Allow server to continue if still active
|
||||
if (!serverClosed && serverSocket.writable) {
|
||||
// Half-close: stop reading from client, let server finish
|
||||
clientSocket.pause();
|
||||
clientSocket.unpipe(serverSocket);
|
||||
await cleanupSocket(clientSocket, 'client', { allowDrain: true });
|
||||
} else {
|
||||
await cleanupSocket(clientSocket, 'client');
|
||||
}
|
||||
|
||||
checkBothClosed();
|
||||
};
|
||||
|
||||
const cleanupServer = async (reason: string) => {
|
||||
if (serverClosed) return;
|
||||
serverClosed = true;
|
||||
serverReason = reason;
|
||||
|
||||
// Allow client to continue if still active
|
||||
if (!clientClosed && clientSocket.writable) {
|
||||
// Half-close: stop reading from server, let client finish
|
||||
serverSocket.pause();
|
||||
serverSocket.unpipe(clientSocket);
|
||||
await cleanupSocket(serverSocket, 'server', { allowDrain: true });
|
||||
} else {
|
||||
await cleanupSocket(serverSocket, 'server');
|
||||
}
|
||||
|
||||
checkBothClosed();
|
||||
};
|
||||
|
||||
return { cleanupClient, cleanupServer };
|
||||
}
|
||||
```
|
||||
|
||||
### Phase 2: Fix Timeout Handling
|
||||
|
||||
#### 2.1 Separate timeout handling from connection closure
|
||||
```typescript
|
||||
export function setupSocketHandlers(
|
||||
socket: Socket | TLSSocket,
|
||||
handleClose: (reason: string) => void,
|
||||
handleTimeout?: (socket: Socket) => void, // New optional handler
|
||||
errorPrefix?: string
|
||||
): void {
|
||||
socket.on('error', (error) => {
|
||||
const prefix = errorPrefix || 'Socket';
|
||||
handleClose(`${prefix}_error: ${error.message}`);
|
||||
});
|
||||
|
||||
socket.on('close', () => {
|
||||
const prefix = errorPrefix || 'socket';
|
||||
handleClose(`${prefix}_closed`);
|
||||
});
|
||||
|
||||
socket.on('timeout', () => {
|
||||
if (handleTimeout) {
|
||||
handleTimeout(socket); // Custom timeout handling
|
||||
} else {
|
||||
// Default: just log, don't close
|
||||
console.warn(`Socket timeout: ${errorPrefix || 'socket'}`);
|
||||
}
|
||||
});
|
||||
}
|
||||
```
|
||||
|
||||
#### 2.2 Update HTTPS passthrough handler
|
||||
```typescript
|
||||
// In https-passthrough-handler.ts
|
||||
const { cleanupClient, cleanupServer } = createIndependentSocketHandlers(
|
||||
clientSocket,
|
||||
serverSocket,
|
||||
(reason) => {
|
||||
this.emit(ForwardingHandlerEvents.DISCONNECTED, {
|
||||
remoteAddress,
|
||||
bytesSent,
|
||||
bytesReceived,
|
||||
reason
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
// Setup handlers with custom timeout handling
|
||||
setupSocketHandlers(clientSocket, cleanupClient, (socket) => {
|
||||
// Just reset timeout, don't close
|
||||
socket.setTimeout(timeout);
|
||||
}, 'client');
|
||||
|
||||
setupSocketHandlers(serverSocket, cleanupServer, (socket) => {
|
||||
// Just reset timeout, don't close
|
||||
socket.setTimeout(timeout);
|
||||
}, 'server');
|
||||
```
|
||||
|
||||
### Phase 3: Fix Connection Manager
|
||||
|
||||
#### 3.1 Remove aggressive parity check
|
||||
```typescript
|
||||
// Remove or significantly increase the parity check timeout
|
||||
// From 2 minutes to 30 minutes for long-lived connections
|
||||
if (record.outgoingClosedTime &&
|
||||
!record.incoming.destroyed &&
|
||||
!record.connectionClosed &&
|
||||
now - record.outgoingClosedTime > 1800000) { // 30 minutes
|
||||
// Only close if no data activity
|
||||
if (now - record.lastActivity > 600000) { // 10 minutes of inactivity
|
||||
this.cleanupConnection(record, 'parity_check');
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### 3.2 Update cleanupConnection to check socket states
|
||||
```typescript
|
||||
public cleanupConnection(record: IConnectionRecord, reason: string = 'normal'): void {
|
||||
if (!record.connectionClosed) {
|
||||
record.connectionClosed = true;
|
||||
|
||||
// Only cleanup sockets that are actually closed or inactive
|
||||
if (record.incoming && (!record.incoming.writable || record.incoming.destroyed)) {
|
||||
cleanupSocket(record.incoming, `${record.id}-incoming`, { immediate: true });
|
||||
}
|
||||
|
||||
if (record.outgoing && (!record.outgoing.writable || record.outgoing.destroyed)) {
|
||||
cleanupSocket(record.outgoing, `${record.id}-outgoing`, { immediate: true });
|
||||
}
|
||||
|
||||
// If either socket is still active, don't remove the record yet
|
||||
if ((record.incoming && record.incoming.writable) ||
|
||||
(record.outgoing && record.outgoing.writable)) {
|
||||
record.connectionClosed = false; // Reset flag
|
||||
return; // Don't finish cleanup
|
||||
}
|
||||
|
||||
// Continue with full cleanup...
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Phase 4: Testing and Validation
|
||||
|
||||
#### 4.1 Test Cases to Implement
|
||||
1. WebSocket connection should stay open for >1 hour
|
||||
2. HTTP streaming response should continue after request closes
|
||||
3. Half-open connections should work correctly
|
||||
4. Verify no socket leaks with long-running connections
|
||||
5. Test graceful shutdown with pending data
|
||||
|
||||
#### 4.2 Socket Leak Prevention
|
||||
- Ensure all event listeners are tracked and removed
|
||||
- Use WeakMap for socket metadata to prevent memory leaks
|
||||
- Implement connection count monitoring
|
||||
- Add periodic health checks for orphaned sockets
|
||||
|
||||
## Implementation Order
|
||||
|
||||
1. **Day 1**: Implement graceful `cleanupSocket()` and independent socket handlers
|
||||
2. **Day 2**: Update all handlers to use new cleanup mechanism
|
||||
3. **Day 3**: Fix timeout handling to not close connections
|
||||
4. **Day 4**: Update connection manager parity check and cleanup logic
|
||||
5. **Day 5**: Comprehensive testing and leak detection
|
||||
|
||||
## Configuration Changes
|
||||
|
||||
Add new options to SmartProxyOptions:
|
||||
```typescript
|
||||
interface ISmartProxyOptions {
|
||||
// Existing options...
|
||||
|
||||
// New options for long-lived connections
|
||||
socketCleanupGracePeriod?: number; // Default: 5000ms
|
||||
allowHalfOpenConnections?: boolean; // Default: true
|
||||
parityCheckTimeout?: number; // Default: 1800000ms (30 min)
|
||||
timeoutBehavior?: 'close' | 'reset' | 'ignore'; // Default: 'reset'
|
||||
}
|
||||
```
|
||||
|
||||
## Success Metrics
|
||||
|
||||
1. WebSocket connections remain stable for 24+ hours
|
||||
2. No premature connection closures reported
|
||||
3. Memory usage remains stable (no socket leaks)
|
||||
4. Half-open connections work correctly
|
||||
5. Graceful shutdown completes within grace period
|
||||
|
||||
## Implementation Status: COMPLETED ✅
|
||||
|
||||
### Implemented Changes
|
||||
|
||||
1. **Modified `cleanupSocket()` in `socket-utils.ts`**
|
||||
- Added `CleanupOptions` interface with `immediate`, `allowDrain`, and `gracePeriod` options
|
||||
- Implemented graceful shutdown support with write buffer draining
|
||||
|
||||
2. **Created `createIndependentSocketHandlers()` in `socket-utils.ts`**
|
||||
- Tracks socket states independently
|
||||
- Supports half-open connections where one side can close while the other remains open
|
||||
- Only triggers full cleanup when both sockets are closed
|
||||
|
||||
3. **Updated `setupSocketHandlers()` in `socket-utils.ts`**
|
||||
- Added optional `handleTimeout` parameter to customize timeout behavior
|
||||
- Prevents automatic connection closure on timeout events
|
||||
|
||||
4. **Updated HTTPS Passthrough Handler**
|
||||
- Now uses `createIndependentSocketHandlers` for half-open support
|
||||
- Custom timeout handling that resets timer instead of closing connection
|
||||
- Manual data forwarding with backpressure handling
|
||||
|
||||
5. **Updated Connection Manager**
|
||||
- Extended parity check from 2 minutes to 30 minutes
|
||||
- Added activity check before closing (10 minutes of inactivity required)
|
||||
- Modified cleanup to check socket states before destroying
|
||||
|
||||
6. **Updated Basic Forwarding in Route Connection Handler**
|
||||
- Replaced simple `pipe()` with independent socket handlers
|
||||
- Added manual data forwarding with backpressure support
|
||||
- Removed bilateral close handlers to prevent premature cleanup
|
||||
|
||||
### Test Results
|
||||
|
||||
All tests passing:
|
||||
- ✅ Long-lived connection test: Connection stayed open for 61+ seconds with periodic keep-alive
|
||||
- ✅ Half-open connection test: One side closed while the other continued to send data
|
||||
- ✅ No socket leaks or premature closures
|
||||
|
||||
### Notes
|
||||
|
||||
- The fix maintains backward compatibility
|
||||
- No configuration changes required for existing deployments
|
||||
- Long-lived connections now work correctly in both HTTPS passthrough and basic forwarding modes
|
201
test/test.rapid-retry-cleanup.node.ts
Normal file
201
test/test.rapid-retry-cleanup.node.ts
Normal file
@ -0,0 +1,201 @@
|
||||
import { tap, expect } from '@git.zone/tstest/tapbundle';
|
||||
import * as net from 'net';
|
||||
import * as plugins from '../ts/plugins.js';
|
||||
|
||||
// Import SmartProxy and configurations
|
||||
import { SmartProxy } from '../ts/index.js';
|
||||
|
||||
tap.test('should handle rapid connection retries without leaking connections', async () => {
|
||||
console.log('\n=== Testing Rapid Connection Retry Cleanup ===');
|
||||
|
||||
// Create a SmartProxy instance
|
||||
const proxy = new SmartProxy({
|
||||
ports: [8550],
|
||||
enableDetailedLogging: false,
|
||||
maxConnectionLifetime: 10000,
|
||||
socketTimeout: 5000,
|
||||
routes: [{
|
||||
name: 'test-route',
|
||||
match: { ports: 8550 },
|
||||
action: {
|
||||
type: 'forward',
|
||||
target: {
|
||||
host: 'localhost',
|
||||
port: 9999 // Non-existent port to force connection failures
|
||||
}
|
||||
}
|
||||
}]
|
||||
});
|
||||
|
||||
// Start the proxy
|
||||
await proxy.start();
|
||||
console.log('✓ Proxy started on port 8550');
|
||||
|
||||
// Helper to get active connection count
|
||||
const getActiveConnections = () => {
|
||||
const connectionManager = (proxy as any).connectionManager;
|
||||
return connectionManager ? connectionManager.getConnectionCount() : 0;
|
||||
};
|
||||
|
||||
// Track connection counts
|
||||
const connectionCounts: number[] = [];
|
||||
const initialCount = getActiveConnections();
|
||||
console.log(`Initial connection count: ${initialCount}`);
|
||||
|
||||
// Simulate rapid retries
|
||||
const retryCount = 20;
|
||||
const retryDelay = 50; // 50ms between retries
|
||||
let successfulConnections = 0;
|
||||
let failedConnections = 0;
|
||||
|
||||
console.log(`\nSimulating ${retryCount} rapid connection attempts...`);
|
||||
|
||||
for (let i = 0; i < retryCount; i++) {
|
||||
await new Promise<void>((resolve) => {
|
||||
const client = new net.Socket();
|
||||
|
||||
client.on('error', () => {
|
||||
failedConnections++;
|
||||
client.destroy();
|
||||
resolve();
|
||||
});
|
||||
|
||||
client.on('close', () => {
|
||||
resolve();
|
||||
});
|
||||
|
||||
client.connect(8550, 'localhost', () => {
|
||||
// Send some data to trigger routing
|
||||
client.write('GET / HTTP/1.1\r\nHost: test.com\r\n\r\n');
|
||||
successfulConnections++;
|
||||
});
|
||||
|
||||
// Force close after a short time
|
||||
setTimeout(() => {
|
||||
if (!client.destroyed) {
|
||||
client.destroy();
|
||||
}
|
||||
}, 100);
|
||||
});
|
||||
|
||||
// Small delay between retries
|
||||
await new Promise(resolve => setTimeout(resolve, retryDelay));
|
||||
|
||||
// Check connection count after each attempt
|
||||
const currentCount = getActiveConnections();
|
||||
connectionCounts.push(currentCount);
|
||||
|
||||
if ((i + 1) % 5 === 0) {
|
||||
console.log(`After ${i + 1} attempts: ${currentCount} active connections`);
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`\nConnection attempts complete:`);
|
||||
console.log(`- Successful: ${successfulConnections}`);
|
||||
console.log(`- Failed: ${failedConnections}`);
|
||||
|
||||
// Wait a bit for any pending cleanups
|
||||
console.log('\nWaiting for cleanup...');
|
||||
await new Promise(resolve => setTimeout(resolve, 1000));
|
||||
|
||||
// Check final connection count
|
||||
const finalCount = getActiveConnections();
|
||||
console.log(`\nFinal connection count: ${finalCount}`);
|
||||
|
||||
// Analyze connection count trend
|
||||
const maxCount = Math.max(...connectionCounts);
|
||||
const avgCount = connectionCounts.reduce((a, b) => a + b, 0) / connectionCounts.length;
|
||||
|
||||
console.log(`\nConnection count statistics:`);
|
||||
console.log(`- Maximum: ${maxCount}`);
|
||||
console.log(`- Average: ${avgCount.toFixed(2)}`);
|
||||
console.log(`- Initial: ${initialCount}`);
|
||||
console.log(`- Final: ${finalCount}`);
|
||||
|
||||
// Stop the proxy
|
||||
await proxy.stop();
|
||||
console.log('\n✓ Proxy stopped');
|
||||
|
||||
// Verify results
|
||||
expect(finalCount).toEqual(initialCount);
|
||||
expect(maxCount).toBeLessThan(10); // Should not accumulate many connections
|
||||
|
||||
console.log('\n✅ PASS: Connection cleanup working correctly under rapid retries!');
|
||||
});
|
||||
|
||||
tap.test('should handle routing failures without leaking connections', async () => {
|
||||
console.log('\n=== Testing Routing Failure Cleanup ===');
|
||||
|
||||
// Create a SmartProxy instance with no routes
|
||||
const proxy = new SmartProxy({
|
||||
ports: [8551],
|
||||
enableDetailedLogging: false,
|
||||
maxConnectionLifetime: 10000,
|
||||
socketTimeout: 5000,
|
||||
routes: [] // No routes - all connections will fail routing
|
||||
});
|
||||
|
||||
// Start the proxy
|
||||
await proxy.start();
|
||||
console.log('✓ Proxy started on port 8551 with no routes');
|
||||
|
||||
// Helper to get active connection count
|
||||
const getActiveConnections = () => {
|
||||
const connectionManager = (proxy as any).connectionManager;
|
||||
return connectionManager ? connectionManager.getConnectionCount() : 0;
|
||||
};
|
||||
|
||||
const initialCount = getActiveConnections();
|
||||
console.log(`Initial connection count: ${initialCount}`);
|
||||
|
||||
// Create multiple connections that will fail routing
|
||||
const connectionPromises = [];
|
||||
for (let i = 0; i < 10; i++) {
|
||||
connectionPromises.push(new Promise<void>((resolve) => {
|
||||
const client = new net.Socket();
|
||||
|
||||
client.on('error', () => {
|
||||
client.destroy();
|
||||
resolve();
|
||||
});
|
||||
|
||||
client.on('close', () => {
|
||||
resolve();
|
||||
});
|
||||
|
||||
client.connect(8551, 'localhost', () => {
|
||||
// Send data to trigger routing (which will fail)
|
||||
client.write('GET / HTTP/1.1\r\nHost: test.com\r\n\r\n');
|
||||
});
|
||||
|
||||
// Force close after a short time
|
||||
setTimeout(() => {
|
||||
if (!client.destroyed) {
|
||||
client.destroy();
|
||||
}
|
||||
resolve();
|
||||
}, 500);
|
||||
}));
|
||||
}
|
||||
|
||||
// Wait for all connections to complete
|
||||
await Promise.all(connectionPromises);
|
||||
console.log('✓ All connection attempts completed');
|
||||
|
||||
// Wait for cleanup
|
||||
await new Promise(resolve => setTimeout(resolve, 500));
|
||||
|
||||
const finalCount = getActiveConnections();
|
||||
console.log(`Final connection count: ${finalCount}`);
|
||||
|
||||
// Stop the proxy
|
||||
await proxy.stop();
|
||||
console.log('✓ Proxy stopped');
|
||||
|
||||
// Verify no connections leaked
|
||||
expect(finalCount).toEqual(initialCount);
|
||||
|
||||
console.log('\n✅ PASS: Routing failures cleaned up correctly!');
|
||||
});
|
||||
|
||||
tap.start();
|
@ -6,6 +6,14 @@ export interface CleanupOptions {
|
||||
gracePeriod?: number; // Ms to wait before force close
|
||||
}
|
||||
|
||||
export interface SafeSocketOptions {
|
||||
port: number;
|
||||
host: string;
|
||||
onError?: (error: Error) => void;
|
||||
onConnect?: () => void;
|
||||
timeout?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Safely cleanup a socket by removing all listeners and destroying it
|
||||
* @param socket The socket to cleanup
|
||||
@ -197,4 +205,39 @@ export function pipeSockets(
|
||||
): void {
|
||||
socket1.pipe(socket2);
|
||||
socket2.pipe(socket1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a socket with immediate error handling to prevent crashes
|
||||
* @param options Socket creation options
|
||||
* @returns The created socket
|
||||
*/
|
||||
export function createSocketWithErrorHandler(options: SafeSocketOptions): plugins.net.Socket {
|
||||
const { port, host, onError, onConnect, timeout } = options;
|
||||
|
||||
// Create socket with immediate error handler attachment
|
||||
const socket = new plugins.net.Socket();
|
||||
|
||||
// Attach error handler BEFORE connecting to catch immediate errors
|
||||
socket.on('error', (error) => {
|
||||
console.error(`Socket connection error to ${host}:${port}: ${error.message}`);
|
||||
if (onError) {
|
||||
onError(error);
|
||||
}
|
||||
});
|
||||
|
||||
// Attach connect handler if provided
|
||||
if (onConnect) {
|
||||
socket.on('connect', onConnect);
|
||||
}
|
||||
|
||||
// Set timeout if provided
|
||||
if (timeout) {
|
||||
socket.setTimeout(timeout);
|
||||
}
|
||||
|
||||
// Now attempt to connect - any immediate errors will be caught
|
||||
socket.connect(port, host);
|
||||
|
||||
return socket;
|
||||
}
|
@ -2,7 +2,7 @@ import * as plugins from '../../plugins.js';
|
||||
import { ForwardingHandler } from './base-handler.js';
|
||||
import type { IForwardConfig } from '../config/forwarding-types.js';
|
||||
import { ForwardingHandlerEvents } from '../config/forwarding-types.js';
|
||||
import { createIndependentSocketHandlers, setupSocketHandlers } from '../../core/utils/socket-utils.js';
|
||||
import { createIndependentSocketHandlers, setupSocketHandlers, createSocketWithErrorHandler } from '../../core/utils/socket-utils.js';
|
||||
|
||||
/**
|
||||
* Handler for HTTPS passthrough (SNI forwarding without termination)
|
||||
@ -48,91 +48,122 @@ export class HttpsPassthroughHandler extends ForwardingHandler {
|
||||
target: `${target.host}:${target.port}`
|
||||
});
|
||||
|
||||
// Create a connection to the target server
|
||||
const serverSocket = plugins.net.connect(target.port, target.host);
|
||||
|
||||
// Track data transfer for logging
|
||||
let bytesSent = 0;
|
||||
let bytesReceived = 0;
|
||||
let serverSocket: plugins.net.Socket | null = null;
|
||||
let cleanupClient: ((reason: string) => Promise<void>) | null = null;
|
||||
let cleanupServer: ((reason: string) => Promise<void>) | null = null;
|
||||
|
||||
// Create independent handlers for half-open connection support
|
||||
const { cleanupClient, cleanupServer } = createIndependentSocketHandlers(
|
||||
clientSocket,
|
||||
serverSocket,
|
||||
(reason) => {
|
||||
// Create a connection to the target server with immediate error handling
|
||||
serverSocket = createSocketWithErrorHandler({
|
||||
port: target.port,
|
||||
host: target.host,
|
||||
onError: async (error) => {
|
||||
// Server connection failed - clean up client socket immediately
|
||||
this.emit(ForwardingHandlerEvents.ERROR, {
|
||||
error: error.message,
|
||||
code: (error as any).code || 'UNKNOWN',
|
||||
remoteAddress,
|
||||
target: `${target.host}:${target.port}`
|
||||
});
|
||||
|
||||
// Clean up the client socket since we can't forward
|
||||
if (!clientSocket.destroyed) {
|
||||
clientSocket.destroy();
|
||||
}
|
||||
|
||||
this.emit(ForwardingHandlerEvents.DISCONNECTED, {
|
||||
remoteAddress,
|
||||
bytesSent,
|
||||
bytesReceived,
|
||||
reason
|
||||
bytesSent: 0,
|
||||
bytesReceived: 0,
|
||||
reason: `server_connection_failed: ${error.message}`
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
// Setup handlers with custom timeout handling that doesn't close connections
|
||||
const timeout = this.getTimeout();
|
||||
|
||||
setupSocketHandlers(clientSocket, cleanupClient, (socket) => {
|
||||
// Just reset timeout, don't close
|
||||
socket.setTimeout(timeout);
|
||||
}, 'client');
|
||||
|
||||
setupSocketHandlers(serverSocket, cleanupServer, (socket) => {
|
||||
// Just reset timeout, don't close
|
||||
socket.setTimeout(timeout);
|
||||
}, 'server');
|
||||
|
||||
// Forward data from client to server
|
||||
clientSocket.on('data', (data) => {
|
||||
bytesSent += data.length;
|
||||
|
||||
// Check if server socket is writable
|
||||
if (serverSocket.writable) {
|
||||
const flushed = serverSocket.write(data);
|
||||
},
|
||||
onConnect: () => {
|
||||
// Connection successful - set up forwarding handlers
|
||||
const handlers = createIndependentSocketHandlers(
|
||||
clientSocket,
|
||||
serverSocket!,
|
||||
(reason) => {
|
||||
this.emit(ForwardingHandlerEvents.DISCONNECTED, {
|
||||
remoteAddress,
|
||||
bytesSent,
|
||||
bytesReceived,
|
||||
reason
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
// Handle backpressure
|
||||
if (!flushed) {
|
||||
clientSocket.pause();
|
||||
serverSocket.once('drain', () => {
|
||||
clientSocket.resume();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
this.emit(ForwardingHandlerEvents.DATA_FORWARDED, {
|
||||
direction: 'outbound',
|
||||
bytes: data.length,
|
||||
total: bytesSent
|
||||
});
|
||||
});
|
||||
|
||||
// Forward data from server to client
|
||||
serverSocket.on('data', (data) => {
|
||||
bytesReceived += data.length;
|
||||
|
||||
// Check if client socket is writable
|
||||
if (clientSocket.writable) {
|
||||
const flushed = clientSocket.write(data);
|
||||
cleanupClient = handlers.cleanupClient;
|
||||
cleanupServer = handlers.cleanupServer;
|
||||
|
||||
// Handle backpressure
|
||||
if (!flushed) {
|
||||
serverSocket.pause();
|
||||
clientSocket.once('drain', () => {
|
||||
serverSocket.resume();
|
||||
// Setup handlers with custom timeout handling that doesn't close connections
|
||||
const timeout = this.getTimeout();
|
||||
|
||||
setupSocketHandlers(clientSocket, cleanupClient, (socket) => {
|
||||
// Just reset timeout, don't close
|
||||
socket.setTimeout(timeout);
|
||||
}, 'client');
|
||||
|
||||
setupSocketHandlers(serverSocket!, cleanupServer, (socket) => {
|
||||
// Just reset timeout, don't close
|
||||
socket.setTimeout(timeout);
|
||||
}, 'server');
|
||||
|
||||
// Forward data from client to server
|
||||
clientSocket.on('data', (data) => {
|
||||
bytesSent += data.length;
|
||||
|
||||
// Check if server socket is writable
|
||||
if (serverSocket && serverSocket.writable) {
|
||||
const flushed = serverSocket.write(data);
|
||||
|
||||
// Handle backpressure
|
||||
if (!flushed) {
|
||||
clientSocket.pause();
|
||||
serverSocket.once('drain', () => {
|
||||
clientSocket.resume();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
this.emit(ForwardingHandlerEvents.DATA_FORWARDED, {
|
||||
direction: 'outbound',
|
||||
bytes: data.length,
|
||||
total: bytesSent
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// Forward data from server to client
|
||||
serverSocket!.on('data', (data) => {
|
||||
bytesReceived += data.length;
|
||||
|
||||
// Check if client socket is writable
|
||||
if (clientSocket.writable) {
|
||||
const flushed = clientSocket.write(data);
|
||||
|
||||
// Handle backpressure
|
||||
if (!flushed) {
|
||||
serverSocket!.pause();
|
||||
clientSocket.once('drain', () => {
|
||||
serverSocket!.resume();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
this.emit(ForwardingHandlerEvents.DATA_FORWARDED, {
|
||||
direction: 'inbound',
|
||||
bytes: data.length,
|
||||
total: bytesReceived
|
||||
});
|
||||
});
|
||||
|
||||
// Set initial timeouts - they will be reset on each timeout event
|
||||
clientSocket.setTimeout(timeout);
|
||||
serverSocket!.setTimeout(timeout);
|
||||
}
|
||||
|
||||
this.emit(ForwardingHandlerEvents.DATA_FORWARDED, {
|
||||
direction: 'inbound',
|
||||
bytes: data.length,
|
||||
total: bytesReceived
|
||||
});
|
||||
});
|
||||
|
||||
// Set initial timeouts - they will be reset on each timeout event
|
||||
clientSocket.setTimeout(timeout);
|
||||
serverSocket.setTimeout(timeout);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2,7 +2,7 @@ import * as plugins from '../../plugins.js';
|
||||
import { ForwardingHandler } from './base-handler.js';
|
||||
import type { IForwardConfig } from '../config/forwarding-types.js';
|
||||
import { ForwardingHandlerEvents } from '../config/forwarding-types.js';
|
||||
import { createSocketCleanupHandler, setupSocketHandlers } from '../../core/utils/socket-utils.js';
|
||||
import { createSocketCleanupHandler, setupSocketHandlers, createSocketWithErrorHandler } from '../../core/utils/socket-utils.js';
|
||||
|
||||
/**
|
||||
* Handler for HTTPS termination with HTTP backend
|
||||
@ -141,19 +141,41 @@ export class HttpsTerminateToHttpHandler extends ForwardingHandler {
|
||||
if (dataBuffer.includes(Buffer.from('\r\n\r\n')) && !connectionEstablished) {
|
||||
const target = this.getTargetFromConfig();
|
||||
|
||||
// Create backend connection
|
||||
backendSocket = plugins.net.connect(target.port, target.host, () => {
|
||||
connectionEstablished = true;
|
||||
|
||||
// Send buffered data
|
||||
if (dataBuffer.length > 0) {
|
||||
backendSocket!.write(dataBuffer);
|
||||
dataBuffer = Buffer.alloc(0);
|
||||
// Create backend connection with immediate error handling
|
||||
backendSocket = createSocketWithErrorHandler({
|
||||
port: target.port,
|
||||
host: target.host,
|
||||
onError: (error) => {
|
||||
this.emit(ForwardingHandlerEvents.ERROR, {
|
||||
error: error.message,
|
||||
code: (error as any).code || 'UNKNOWN',
|
||||
remoteAddress,
|
||||
target: `${target.host}:${target.port}`
|
||||
});
|
||||
|
||||
// Clean up the TLS socket since we can't forward
|
||||
if (!tlsSocket.destroyed) {
|
||||
tlsSocket.destroy();
|
||||
}
|
||||
|
||||
this.emit(ForwardingHandlerEvents.DISCONNECTED, {
|
||||
remoteAddress,
|
||||
reason: `backend_connection_failed: ${error.message}`
|
||||
});
|
||||
},
|
||||
onConnect: () => {
|
||||
connectionEstablished = true;
|
||||
|
||||
// Send buffered data
|
||||
if (dataBuffer.length > 0) {
|
||||
backendSocket!.write(dataBuffer);
|
||||
dataBuffer = Buffer.alloc(0);
|
||||
}
|
||||
|
||||
// Set up bidirectional data flow
|
||||
tlsSocket.pipe(backendSocket!);
|
||||
backendSocket!.pipe(tlsSocket);
|
||||
}
|
||||
|
||||
// Set up bidirectional data flow
|
||||
tlsSocket.pipe(backendSocket!);
|
||||
backendSocket!.pipe(tlsSocket);
|
||||
});
|
||||
|
||||
// Update the cleanup handler with the backend socket
|
||||
|
@ -2,7 +2,7 @@ import * as plugins from '../../plugins.js';
|
||||
import { ForwardingHandler } from './base-handler.js';
|
||||
import type { IForwardConfig } from '../config/forwarding-types.js';
|
||||
import { ForwardingHandlerEvents } from '../config/forwarding-types.js';
|
||||
import { createSocketCleanupHandler, setupSocketHandlers } from '../../core/utils/socket-utils.js';
|
||||
import { createSocketCleanupHandler, setupSocketHandlers, createSocketWithErrorHandler } from '../../core/utils/socket-utils.js';
|
||||
|
||||
/**
|
||||
* Handler for HTTPS termination with HTTPS backend
|
||||
|
@ -9,7 +9,7 @@ import { TlsManager } from './tls-manager.js';
|
||||
import { HttpProxyBridge } from './http-proxy-bridge.js';
|
||||
import { TimeoutManager } from './timeout-manager.js';
|
||||
import { RouteManager } from './route-manager.js';
|
||||
import { cleanupSocket, createIndependentSocketHandlers, setupSocketHandlers } from '../../core/utils/socket-utils.js';
|
||||
import { cleanupSocket, createIndependentSocketHandlers, setupSocketHandlers, createSocketWithErrorHandler } from '../../core/utils/socket-utils.js';
|
||||
|
||||
/**
|
||||
* Handles new connection processing and setup logic with support for route-based configuration
|
||||
@ -90,6 +90,10 @@ export class RouteConnectionHandler {
|
||||
|
||||
// Create a new connection record
|
||||
const record = this.connectionManager.createConnection(socket);
|
||||
if (!record) {
|
||||
// Connection was rejected due to limit - socket already destroyed by connection manager
|
||||
return;
|
||||
}
|
||||
const connectionId = record.id;
|
||||
|
||||
// Apply socket optimizations
|
||||
@ -546,6 +550,12 @@ export class RouteConnectionHandler {
|
||||
|
||||
// We don't close the socket - just let it remain open
|
||||
// The kernel-level NFTables rules will handle the actual forwarding
|
||||
|
||||
// Set up cleanup when the socket eventually closes
|
||||
socket.once('close', () => {
|
||||
this.connectionManager.cleanupConnection(record, 'nftables_closed');
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@ -687,7 +697,7 @@ export class RouteConnectionHandler {
|
||||
record,
|
||||
initialChunk,
|
||||
this.settings.httpProxyPort || 8443,
|
||||
(reason) => this.connectionManager.initiateCleanupOnce(record, reason)
|
||||
(reason) => this.connectionManager.cleanupConnection(record, reason)
|
||||
);
|
||||
return;
|
||||
}
|
||||
@ -742,7 +752,7 @@ export class RouteConnectionHandler {
|
||||
record,
|
||||
initialChunk,
|
||||
this.settings.httpProxyPort || 8443,
|
||||
(reason) => this.connectionManager.initiateCleanupOnce(record, reason)
|
||||
(reason) => this.connectionManager.cleanupConnection(record, reason)
|
||||
);
|
||||
return;
|
||||
} else {
|
||||
@ -919,6 +929,7 @@ export class RouteConnectionHandler {
|
||||
|
||||
/**
|
||||
* Setup improved error handling for the outgoing connection
|
||||
* @deprecated This method is no longer used - error handling is done in createSocketWithErrorHandler
|
||||
*/
|
||||
private setupOutgoingErrorHandler(
|
||||
connectionId: string,
|
||||
@ -1073,8 +1084,217 @@ export class RouteConnectionHandler {
|
||||
record.pendingDataSize = initialChunk.length;
|
||||
}
|
||||
|
||||
// Create the target socket
|
||||
const targetSocket = plugins.net.connect(connectionOptions);
|
||||
// Create the target socket with immediate error handling
|
||||
const targetSocket = createSocketWithErrorHandler({
|
||||
port: finalTargetPort,
|
||||
host: finalTargetHost,
|
||||
onError: (error) => {
|
||||
// Connection failed - clean up everything immediately
|
||||
logger.log('error',
|
||||
`Connection setup error for ${connectionId} to ${finalTargetHost}:${finalTargetPort}: ${error.message} (${(error as any).code})`,
|
||||
{
|
||||
connectionId,
|
||||
targetHost: finalTargetHost,
|
||||
targetPort: finalTargetPort,
|
||||
errorMessage: error.message,
|
||||
errorCode: (error as any).code,
|
||||
component: 'route-handler'
|
||||
}
|
||||
);
|
||||
|
||||
// Log specific error types for easier debugging
|
||||
if ((error as any).code === 'ECONNREFUSED') {
|
||||
logger.log('error',
|
||||
`Connection ${connectionId}: Target ${finalTargetHost}:${finalTargetPort} refused connection. Check if the target service is running and listening on that port.`,
|
||||
{
|
||||
connectionId,
|
||||
targetHost: finalTargetHost,
|
||||
targetPort: finalTargetPort,
|
||||
recommendation: 'Check if the target service is running and listening on that port.',
|
||||
component: 'route-handler'
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// Resume the incoming socket to prevent it from hanging
|
||||
socket.resume();
|
||||
|
||||
// Clean up the incoming socket
|
||||
if (!socket.destroyed) {
|
||||
socket.destroy();
|
||||
}
|
||||
|
||||
// Clean up the connection record - this is critical!
|
||||
this.connectionManager.cleanupConnection(record, `connection_failed_${(error as any).code || 'unknown'}`);
|
||||
},
|
||||
onConnect: () => {
|
||||
if (this.settings.enableDetailedLogging) {
|
||||
logger.log('info', `Connection ${connectionId} established to target ${finalTargetHost}:${finalTargetPort}`, {
|
||||
connectionId,
|
||||
targetHost: finalTargetHost,
|
||||
targetPort: finalTargetPort,
|
||||
component: 'route-handler'
|
||||
});
|
||||
}
|
||||
|
||||
// Clear any error listeners added by createSocketWithErrorHandler
|
||||
targetSocket.removeAllListeners('error');
|
||||
|
||||
// Add the normal error handler for established connections
|
||||
targetSocket.on('error', this.connectionManager.handleError('outgoing', record));
|
||||
|
||||
// Flush any pending data to target
|
||||
if (record.pendingData.length > 0) {
|
||||
const combinedData = Buffer.concat(record.pendingData);
|
||||
|
||||
if (this.settings.enableDetailedLogging) {
|
||||
console.log(
|
||||
`[${connectionId}] Forwarding ${combinedData.length} bytes of initial data to target`
|
||||
);
|
||||
}
|
||||
|
||||
// Write pending data immediately
|
||||
targetSocket.write(combinedData, (err) => {
|
||||
if (err) {
|
||||
logger.log('error', `Error writing pending data to target for connection ${connectionId}: ${err.message}`, {
|
||||
connectionId,
|
||||
error: err.message,
|
||||
component: 'route-handler'
|
||||
});
|
||||
return this.connectionManager.cleanupConnection(record, 'write_error');
|
||||
}
|
||||
});
|
||||
|
||||
// Clear the buffer now that we've processed it
|
||||
record.pendingData = [];
|
||||
record.pendingDataSize = 0;
|
||||
}
|
||||
|
||||
// Set up independent socket handlers for half-open connection support
|
||||
const { cleanupClient, cleanupServer } = createIndependentSocketHandlers(
|
||||
socket,
|
||||
targetSocket,
|
||||
(reason) => {
|
||||
this.connectionManager.cleanupConnection(record, reason);
|
||||
}
|
||||
);
|
||||
|
||||
// Setup socket handlers with custom timeout handling
|
||||
setupSocketHandlers(socket, cleanupClient, (sock) => {
|
||||
// Don't close on timeout for keep-alive connections
|
||||
if (record.hasKeepAlive) {
|
||||
sock.setTimeout(this.settings.socketTimeout || 3600000);
|
||||
}
|
||||
}, 'client');
|
||||
|
||||
setupSocketHandlers(targetSocket, cleanupServer, (sock) => {
|
||||
// Don't close on timeout for keep-alive connections
|
||||
if (record.hasKeepAlive) {
|
||||
sock.setTimeout(this.settings.socketTimeout || 3600000);
|
||||
}
|
||||
}, 'server');
|
||||
|
||||
// Forward data from client to target with backpressure handling
|
||||
socket.on('data', (chunk: Buffer) => {
|
||||
record.bytesReceived += chunk.length;
|
||||
this.timeoutManager.updateActivity(record);
|
||||
|
||||
if (targetSocket.writable) {
|
||||
const flushed = targetSocket.write(chunk);
|
||||
|
||||
// Handle backpressure
|
||||
if (!flushed) {
|
||||
socket.pause();
|
||||
targetSocket.once('drain', () => {
|
||||
socket.resume();
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Forward data from target to client with backpressure handling
|
||||
targetSocket.on('data', (chunk: Buffer) => {
|
||||
record.bytesSent += chunk.length;
|
||||
this.timeoutManager.updateActivity(record);
|
||||
|
||||
if (socket.writable) {
|
||||
const flushed = socket.write(chunk);
|
||||
|
||||
// Handle backpressure
|
||||
if (!flushed) {
|
||||
targetSocket.pause();
|
||||
socket.once('drain', () => {
|
||||
targetSocket.resume();
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Log successful connection
|
||||
logger.log('info',
|
||||
`Connection established: ${record.remoteIP} -> ${finalTargetHost}:${finalTargetPort}` +
|
||||
`${serverName ? ` (SNI: ${serverName})` : record.lockedDomain ? ` (Domain: ${record.lockedDomain})` : ''}`,
|
||||
{
|
||||
remoteIP: record.remoteIP,
|
||||
targetHost: finalTargetHost,
|
||||
targetPort: finalTargetPort,
|
||||
sni: serverName || undefined,
|
||||
domain: !serverName && record.lockedDomain ? record.lockedDomain : undefined,
|
||||
component: 'route-handler'
|
||||
}
|
||||
);
|
||||
|
||||
// Add TLS renegotiation handler if needed
|
||||
if (serverName) {
|
||||
// Create connection info object for the existing connection
|
||||
const connInfo = {
|
||||
sourceIp: record.remoteIP,
|
||||
sourcePort: record.incoming.remotePort || 0,
|
||||
destIp: record.incoming.localAddress || '',
|
||||
destPort: record.incoming.localPort || 0,
|
||||
};
|
||||
|
||||
// Create a renegotiation handler function
|
||||
const renegotiationHandler = this.tlsManager.createRenegotiationHandler(
|
||||
connectionId,
|
||||
serverName,
|
||||
connInfo,
|
||||
(_connectionId, reason) => this.connectionManager.cleanupConnection(record, reason)
|
||||
);
|
||||
|
||||
// Store the handler in the connection record so we can remove it during cleanup
|
||||
record.renegotiationHandler = renegotiationHandler;
|
||||
|
||||
// Add the handler to the socket
|
||||
socket.on('data', renegotiationHandler);
|
||||
|
||||
if (this.settings.enableDetailedLogging) {
|
||||
logger.log('info', `TLS renegotiation handler installed for connection ${connectionId} with SNI ${serverName}`, {
|
||||
connectionId,
|
||||
serverName,
|
||||
component: 'route-handler'
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Set connection timeout
|
||||
record.cleanupTimer = this.timeoutManager.setupConnectionTimeout(record, (record, reason) => {
|
||||
logger.log('warn', `Connection ${connectionId} from ${record.remoteIP} exceeded max lifetime, forcing cleanup`, {
|
||||
connectionId,
|
||||
remoteIP: record.remoteIP,
|
||||
component: 'route-handler'
|
||||
});
|
||||
this.connectionManager.cleanupConnection(record, reason);
|
||||
});
|
||||
|
||||
// Mark TLS handshake as complete for TLS connections
|
||||
if (record.isTLS) {
|
||||
record.tlsHandshakeComplete = true;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Only set up basic properties - everything else happens in onConnect
|
||||
record.outgoing = targetSocket;
|
||||
record.outgoingStartTime = Date.now();
|
||||
|
||||
@ -1107,12 +1327,6 @@ export class RouteConnectionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
// Setup improved error handling for outgoing connection
|
||||
this.setupOutgoingErrorHandler(connectionId, targetSocket, record, socket, finalTargetHost, finalTargetPort);
|
||||
|
||||
// Note: Close handlers are managed by independent socket handlers above
|
||||
// We don't register handleClose here to avoid bilateral cleanup
|
||||
|
||||
// Setup error handlers for incoming socket
|
||||
socket.on('error', this.connectionManager.handleError('incoming', record));
|
||||
|
||||
@ -1141,7 +1355,7 @@ export class RouteConnectionHandler {
|
||||
record.incomingTerminationReason = 'timeout';
|
||||
this.connectionManager.incrementTerminationStat('incoming', 'timeout');
|
||||
}
|
||||
this.connectionManager.initiateCleanupOnce(record, 'timeout_incoming');
|
||||
this.connectionManager.cleanupConnection(record, 'timeout_incoming');
|
||||
});
|
||||
|
||||
targetSocket.on('timeout', () => {
|
||||
@ -1168,183 +1382,16 @@ export class RouteConnectionHandler {
|
||||
record.outgoingTerminationReason = 'timeout';
|
||||
this.connectionManager.incrementTerminationStat('outgoing', 'timeout');
|
||||
}
|
||||
this.connectionManager.initiateCleanupOnce(record, 'timeout_outgoing');
|
||||
this.connectionManager.cleanupConnection(record, 'timeout_outgoing');
|
||||
});
|
||||
|
||||
// Apply socket timeouts
|
||||
this.timeoutManager.applySocketTimeouts(record);
|
||||
|
||||
// Track outgoing data for bytes counting
|
||||
// Track outgoing data for bytes counting (moved from the duplicate connect handler)
|
||||
targetSocket.on('data', (chunk: Buffer) => {
|
||||
record.bytesSent += chunk.length;
|
||||
this.timeoutManager.updateActivity(record);
|
||||
});
|
||||
|
||||
// Wait for the outgoing connection to be ready before setting up piping
|
||||
targetSocket.once('connect', () => {
|
||||
if (this.settings.enableDetailedLogging) {
|
||||
logger.log('info', `Connection ${connectionId} established to target ${finalTargetHost}:${finalTargetPort}`, {
|
||||
connectionId,
|
||||
targetHost: finalTargetHost,
|
||||
targetPort: finalTargetPort,
|
||||
component: 'route-handler'
|
||||
});
|
||||
}
|
||||
|
||||
// Clear the initial connection error handler
|
||||
targetSocket.removeAllListeners('error');
|
||||
|
||||
// Add the normal error handler for established connections
|
||||
targetSocket.on('error', this.connectionManager.handleError('outgoing', record));
|
||||
|
||||
// Flush any pending data to target
|
||||
if (record.pendingData.length > 0) {
|
||||
const combinedData = Buffer.concat(record.pendingData);
|
||||
|
||||
if (this.settings.enableDetailedLogging) {
|
||||
console.log(
|
||||
`[${connectionId}] Forwarding ${combinedData.length} bytes of initial data to target`
|
||||
);
|
||||
}
|
||||
|
||||
// Write pending data immediately
|
||||
targetSocket.write(combinedData, (err) => {
|
||||
if (err) {
|
||||
logger.log('error', `Error writing pending data to target for connection ${connectionId}: ${err.message}`, {
|
||||
connectionId,
|
||||
error: err.message,
|
||||
component: 'route-handler'
|
||||
});
|
||||
return this.connectionManager.initiateCleanupOnce(record, 'write_error');
|
||||
}
|
||||
});
|
||||
|
||||
// Clear the buffer now that we've processed it
|
||||
record.pendingData = [];
|
||||
record.pendingDataSize = 0;
|
||||
}
|
||||
|
||||
// Set up independent socket handlers for half-open connection support
|
||||
const { cleanupClient, cleanupServer } = createIndependentSocketHandlers(
|
||||
socket,
|
||||
targetSocket,
|
||||
(reason) => {
|
||||
this.connectionManager.initiateCleanupOnce(record, reason);
|
||||
}
|
||||
);
|
||||
|
||||
// Setup socket handlers with custom timeout handling
|
||||
setupSocketHandlers(socket, cleanupClient, (sock) => {
|
||||
// Don't close on timeout for keep-alive connections
|
||||
if (record.hasKeepAlive) {
|
||||
sock.setTimeout(this.settings.socketTimeout || 3600000);
|
||||
}
|
||||
}, 'client');
|
||||
|
||||
setupSocketHandlers(targetSocket, cleanupServer, (sock) => {
|
||||
// Don't close on timeout for keep-alive connections
|
||||
if (record.hasKeepAlive) {
|
||||
sock.setTimeout(this.settings.socketTimeout || 3600000);
|
||||
}
|
||||
}, 'server');
|
||||
|
||||
// Forward data from client to target with backpressure handling
|
||||
socket.on('data', (chunk: Buffer) => {
|
||||
record.bytesReceived += chunk.length;
|
||||
this.timeoutManager.updateActivity(record);
|
||||
|
||||
if (targetSocket.writable) {
|
||||
const flushed = targetSocket.write(chunk);
|
||||
|
||||
// Handle backpressure
|
||||
if (!flushed) {
|
||||
socket.pause();
|
||||
targetSocket.once('drain', () => {
|
||||
socket.resume();
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Forward data from target to client with backpressure handling
|
||||
targetSocket.on('data', (chunk: Buffer) => {
|
||||
record.bytesSent += chunk.length;
|
||||
this.timeoutManager.updateActivity(record);
|
||||
|
||||
if (socket.writable) {
|
||||
const flushed = socket.write(chunk);
|
||||
|
||||
// Handle backpressure
|
||||
if (!flushed) {
|
||||
targetSocket.pause();
|
||||
socket.once('drain', () => {
|
||||
targetSocket.resume();
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Log successful connection
|
||||
logger.log('info',
|
||||
`Connection established: ${record.remoteIP} -> ${finalTargetHost}:${finalTargetPort}` +
|
||||
`${serverName ? ` (SNI: ${serverName})` : record.lockedDomain ? ` (Domain: ${record.lockedDomain})` : ''}`,
|
||||
{
|
||||
remoteIP: record.remoteIP,
|
||||
targetHost: finalTargetHost,
|
||||
targetPort: finalTargetPort,
|
||||
sni: serverName || undefined,
|
||||
domain: !serverName && record.lockedDomain ? record.lockedDomain : undefined,
|
||||
component: 'route-handler'
|
||||
}
|
||||
);
|
||||
|
||||
// Add TLS renegotiation handler if needed
|
||||
if (serverName) {
|
||||
// Create connection info object for the existing connection
|
||||
const connInfo = {
|
||||
sourceIp: record.remoteIP,
|
||||
sourcePort: record.incoming.remotePort || 0,
|
||||
destIp: record.incoming.localAddress || '',
|
||||
destPort: record.incoming.localPort || 0,
|
||||
};
|
||||
|
||||
// Create a renegotiation handler function
|
||||
const renegotiationHandler = this.tlsManager.createRenegotiationHandler(
|
||||
connectionId,
|
||||
serverName,
|
||||
connInfo,
|
||||
(_connectionId, reason) => this.connectionManager.initiateCleanupOnce(record, reason)
|
||||
);
|
||||
|
||||
// Store the handler in the connection record so we can remove it during cleanup
|
||||
record.renegotiationHandler = renegotiationHandler;
|
||||
|
||||
// Add the handler to the socket
|
||||
socket.on('data', renegotiationHandler);
|
||||
|
||||
if (this.settings.enableDetailedLogging) {
|
||||
logger.log('info', `TLS renegotiation handler installed for connection ${connectionId} with SNI ${serverName}`, {
|
||||
connectionId,
|
||||
serverName,
|
||||
component: 'route-handler'
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Set connection timeout
|
||||
record.cleanupTimer = this.timeoutManager.setupConnectionTimeout(record, (record, reason) => {
|
||||
logger.log('warn', `Connection ${connectionId} from ${record.remoteIP} exceeded max lifetime, forcing cleanup`, {
|
||||
connectionId,
|
||||
remoteIP: record.remoteIP,
|
||||
component: 'route-handler'
|
||||
});
|
||||
this.connectionManager.initiateCleanupOnce(record, reason);
|
||||
});
|
||||
|
||||
// Mark TLS handshake as complete for TLS connections
|
||||
if (record.isTLS) {
|
||||
record.tlsHandshakeComplete = true;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user