fix accumulation
This commit is contained in:
@ -673,4 +673,52 @@ if (!record.connectionClosed && record.outgoing && record.bytesReceived > 0 && r
|
|||||||
- Connection is older than 60 seconds
|
- Connection is older than 60 seconds
|
||||||
- Both sockets are still alive (not destroyed)
|
- Both sockets are still alive (not destroyed)
|
||||||
|
|
||||||
This complements the zombie detection by handling cases where sockets remain technically alive but the connection is effectively dead.
|
This complements the zombie detection by handling cases where sockets remain technically alive but the connection is effectively dead.
|
||||||
|
|
||||||
|
## 🚨 CRITICAL FIX: Cleanup Queue Bug (January 2025)
|
||||||
|
|
||||||
|
### Critical Bug Found
|
||||||
|
The cleanup queue had a severe bug that caused connection accumulation when more than 100 connections needed cleanup:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
// BUG: This cleared the ENTIRE queue after processing only the first batch!
|
||||||
|
const toCleanup = Array.from(this.cleanupQueue).slice(0, this.cleanupBatchSize);
|
||||||
|
this.cleanupQueue.clear(); // ❌ This discarded all connections beyond the first 100!
|
||||||
|
```
|
||||||
|
|
||||||
|
### Fix Implemented
|
||||||
|
```typescript
|
||||||
|
// Now only removes the connections being processed
|
||||||
|
const toCleanup = Array.from(this.cleanupQueue).slice(0, this.cleanupBatchSize);
|
||||||
|
for (const connectionId of toCleanup) {
|
||||||
|
this.cleanupQueue.delete(connectionId); // ✅ Only remove what we process
|
||||||
|
const record = this.connectionRecords.get(connectionId);
|
||||||
|
if (record) {
|
||||||
|
this.cleanupConnection(record, record.incomingTerminationReason || 'normal');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Impact
|
||||||
|
- **Before**: If 150 connections needed cleanup, only the first 100 would be processed and the remaining 50 would accumulate forever
|
||||||
|
- **After**: All connections are properly cleaned up in batches
|
||||||
|
|
||||||
|
### Additional Improvements
|
||||||
|
|
||||||
|
1. **Faster Inactivity Checks**: Reduced from 30s to 10s intervals
|
||||||
|
- Zombies and stuck connections are detected 3x faster
|
||||||
|
- Reduces the window for accumulation
|
||||||
|
|
||||||
|
2. **Duplicate Prevention**: Added check in queueCleanup to prevent processing already-closed connections
|
||||||
|
- Prevents unnecessary work
|
||||||
|
- Ensures connections are only cleaned up once
|
||||||
|
|
||||||
|
### Summary of All Fixes
|
||||||
|
|
||||||
|
1. **Connection Timeout** (already documented) - Prevents accumulation when backends are unreachable
|
||||||
|
2. **Zombie Detection** - Cleans up connections with destroyed sockets
|
||||||
|
3. **Stuck Connection Detection** - Cleans up connections to hanging backends
|
||||||
|
4. **Cleanup Queue Bug** - Ensures ALL connections get cleaned up, not just the first 100
|
||||||
|
5. **Faster Detection** - Reduced check interval from 30s to 10s
|
||||||
|
|
||||||
|
These fixes combined should prevent connection accumulation in all known scenarios.
|
93
test/test.cleanup-queue-bug.node.ts
Normal file
93
test/test.cleanup-queue-bug.node.ts
Normal file
@ -0,0 +1,93 @@
|
|||||||
|
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||||
|
import { SmartProxy } from '../ts/index.js';
|
||||||
|
|
||||||
|
tap.test('cleanup queue bug - verify queue processing handles more than batch size', async (tools) => {
|
||||||
|
console.log('\n=== Cleanup Queue Bug Test ===');
|
||||||
|
console.log('Purpose: Verify that the cleanup queue correctly processes all connections');
|
||||||
|
console.log('even when there are more than the batch size (100)');
|
||||||
|
|
||||||
|
// Create proxy
|
||||||
|
const proxy = new SmartProxy({
|
||||||
|
routes: [{
|
||||||
|
name: 'test-route',
|
||||||
|
match: { ports: 8588 },
|
||||||
|
action: {
|
||||||
|
type: 'forward',
|
||||||
|
target: { host: 'localhost', port: 9996 }
|
||||||
|
}
|
||||||
|
}],
|
||||||
|
enableDetailedLogging: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
await proxy.start();
|
||||||
|
console.log('✓ Proxy started on port 8588');
|
||||||
|
|
||||||
|
// Access connection manager
|
||||||
|
const cm = (proxy as any).connectionManager;
|
||||||
|
|
||||||
|
// Create mock connection records
|
||||||
|
console.log('\n--- Creating 150 mock connections ---');
|
||||||
|
const mockConnections: any[] = [];
|
||||||
|
|
||||||
|
for (let i = 0; i < 150; i++) {
|
||||||
|
const mockRecord = {
|
||||||
|
id: `mock-${i}`,
|
||||||
|
incoming: { destroyed: true, remoteAddress: '127.0.0.1' },
|
||||||
|
outgoing: { destroyed: true },
|
||||||
|
connectionClosed: false,
|
||||||
|
incomingStartTime: Date.now(),
|
||||||
|
lastActivity: Date.now(),
|
||||||
|
remoteIP: '127.0.0.1',
|
||||||
|
remotePort: 10000 + i,
|
||||||
|
localPort: 8588,
|
||||||
|
bytesReceived: 100,
|
||||||
|
bytesSent: 100,
|
||||||
|
incomingTerminationReason: null,
|
||||||
|
cleanupTimer: null
|
||||||
|
};
|
||||||
|
|
||||||
|
// Add to connection records
|
||||||
|
cm.connectionRecords.set(mockRecord.id, mockRecord);
|
||||||
|
mockConnections.push(mockRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`Created ${cm.getConnectionCount()} mock connections`);
|
||||||
|
expect(cm.getConnectionCount()).toEqual(150);
|
||||||
|
|
||||||
|
// Queue all connections for cleanup
|
||||||
|
console.log('\n--- Queueing all connections for cleanup ---');
|
||||||
|
for (const conn of mockConnections) {
|
||||||
|
cm.initiateCleanupOnce(conn, 'test_cleanup');
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`Cleanup queue size: ${cm.cleanupQueue.size}`);
|
||||||
|
expect(cm.cleanupQueue.size).toEqual(150);
|
||||||
|
|
||||||
|
// Wait for cleanup to complete
|
||||||
|
console.log('\n--- Waiting for cleanup batches to process ---');
|
||||||
|
|
||||||
|
// The first batch should process immediately (100 connections)
|
||||||
|
// Then additional batches should be scheduled
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 500));
|
||||||
|
|
||||||
|
// Check final state
|
||||||
|
const finalCount = cm.getConnectionCount();
|
||||||
|
console.log(`\nFinal connection count: ${finalCount}`);
|
||||||
|
console.log(`Cleanup queue size: ${cm.cleanupQueue.size}`);
|
||||||
|
|
||||||
|
// All connections should be cleaned up
|
||||||
|
expect(finalCount).toEqual(0);
|
||||||
|
expect(cm.cleanupQueue.size).toEqual(0);
|
||||||
|
|
||||||
|
// Verify termination stats
|
||||||
|
const stats = cm.getTerminationStats();
|
||||||
|
console.log('Termination stats:', stats);
|
||||||
|
expect(stats.incoming.test_cleanup).toEqual(150);
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
await proxy.stop();
|
||||||
|
|
||||||
|
console.log('\n✓ Test complete: Cleanup queue now correctly processes all connections');
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.start();
|
@ -140,10 +140,10 @@ export class ConnectionManager extends LifecycleComponent {
|
|||||||
* Start the inactivity check timer
|
* Start the inactivity check timer
|
||||||
*/
|
*/
|
||||||
private startInactivityCheckTimer(): void {
|
private startInactivityCheckTimer(): void {
|
||||||
// Check every 30 seconds for connections that need inactivity check
|
// Check more frequently (every 10 seconds) to catch zombies and stuck connections faster
|
||||||
this.setInterval(() => {
|
this.setInterval(() => {
|
||||||
this.performOptimizedInactivityCheck();
|
this.performOptimizedInactivityCheck();
|
||||||
}, 30000);
|
}, 10000);
|
||||||
// Note: LifecycleComponent's setInterval already calls unref()
|
// Note: LifecycleComponent's setInterval already calls unref()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,6 +194,13 @@ export class ConnectionManager extends LifecycleComponent {
|
|||||||
* Queue a connection for cleanup
|
* Queue a connection for cleanup
|
||||||
*/
|
*/
|
||||||
private queueCleanup(connectionId: string): void {
|
private queueCleanup(connectionId: string): void {
|
||||||
|
// Check if connection is already being processed
|
||||||
|
const record = this.connectionRecords.get(connectionId);
|
||||||
|
if (!record || record.connectionClosed) {
|
||||||
|
// Already cleaned up or doesn't exist, skip
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
this.cleanupQueue.add(connectionId);
|
this.cleanupQueue.add(connectionId);
|
||||||
|
|
||||||
// Process immediately if queue is getting large
|
// Process immediately if queue is getting large
|
||||||
@ -217,9 +224,10 @@ export class ConnectionManager extends LifecycleComponent {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const toCleanup = Array.from(this.cleanupQueue).slice(0, this.cleanupBatchSize);
|
const toCleanup = Array.from(this.cleanupQueue).slice(0, this.cleanupBatchSize);
|
||||||
this.cleanupQueue.clear();
|
|
||||||
|
|
||||||
|
// Remove only the items we're processing, not the entire queue!
|
||||||
for (const connectionId of toCleanup) {
|
for (const connectionId of toCleanup) {
|
||||||
|
this.cleanupQueue.delete(connectionId);
|
||||||
const record = this.connectionRecords.get(connectionId);
|
const record = this.connectionRecords.get(connectionId);
|
||||||
if (record) {
|
if (record) {
|
||||||
this.cleanupConnection(record, record.incomingTerminationReason || 'normal');
|
this.cleanupConnection(record, record.incomingTerminationReason || 'normal');
|
||||||
|
Reference in New Issue
Block a user