smartproxy/ts/proxies/smart-proxy/connection-manager.ts

595 lines
18 KiB
TypeScript

import * as plugins from '../../plugins.js';
import type { IConnectionRecord, ISmartProxyOptions } from './models/interfaces.js';
import { SecurityManager } from './security-manager.js';
import { TimeoutManager } from './timeout-manager.js';
import { logger } from '../../core/utils/logger.js';
import { LifecycleComponent } from '../../core/utils/lifecycle-component.js';
/**
* Manages connection lifecycle, tracking, and cleanup with performance optimizations
*/
export class ConnectionManager extends LifecycleComponent {
private connectionRecords: Map<string, IConnectionRecord> = new Map();
private terminationStats: {
incoming: Record<string, number>;
outgoing: Record<string, number>;
} = { incoming: {}, outgoing: {} };
// Performance optimization: Track connections needing inactivity check
private nextInactivityCheck: Map<string, number> = new Map();
// Connection limits
private readonly maxConnections: number;
private readonly cleanupBatchSize: number = 100;
// Cleanup queue for batched processing
private cleanupQueue: Set<string> = new Set();
private cleanupTimer: NodeJS.Timeout | null = null;
constructor(
private settings: ISmartProxyOptions,
private securityManager: SecurityManager,
private timeoutManager: TimeoutManager
) {
super();
// Set reasonable defaults for connection limits
this.maxConnections = settings.defaults?.security?.maxConnections || 10000;
// Start inactivity check timer if not disabled
if (!settings.disableInactivityCheck) {
this.startInactivityCheckTimer();
}
}
/**
* Generate a unique connection ID
*/
public generateConnectionId(): string {
return Math.random().toString(36).substring(2, 15) +
Math.random().toString(36).substring(2, 15);
}
/**
* Create and track a new connection
*/
public createConnection(socket: plugins.net.Socket): IConnectionRecord | null {
// Enforce connection limit
if (this.connectionRecords.size >= this.maxConnections) {
logger.log('warn', `Connection limit reached (${this.maxConnections}). Rejecting new connection.`, {
currentConnections: this.connectionRecords.size,
maxConnections: this.maxConnections,
component: 'connection-manager'
});
socket.destroy();
return null;
}
const connectionId = this.generateConnectionId();
const remoteIP = socket.remoteAddress || '';
const localPort = socket.localPort || 0;
const now = Date.now();
const record: IConnectionRecord = {
id: connectionId,
incoming: socket,
outgoing: null,
incomingStartTime: now,
lastActivity: now,
connectionClosed: false,
pendingData: [],
pendingDataSize: 0,
bytesReceived: 0,
bytesSent: 0,
remoteIP,
localPort,
isTLS: false,
tlsHandshakeComplete: false,
hasReceivedInitialData: false,
hasKeepAlive: false,
incomingTerminationReason: null,
outgoingTerminationReason: null,
usingNetworkProxy: false,
isBrowserConnection: false,
domainSwitches: 0
};
this.trackConnection(connectionId, record);
return record;
}
/**
* Track an existing connection
*/
public trackConnection(connectionId: string, record: IConnectionRecord): void {
this.connectionRecords.set(connectionId, record);
this.securityManager.trackConnectionByIP(record.remoteIP, connectionId);
// Schedule inactivity check
if (!this.settings.disableInactivityCheck) {
this.scheduleInactivityCheck(connectionId, record);
}
}
/**
* Schedule next inactivity check for a connection
*/
private scheduleInactivityCheck(connectionId: string, record: IConnectionRecord): void {
let timeout = this.settings.inactivityTimeout!;
if (record.hasKeepAlive) {
if (this.settings.keepAliveTreatment === 'immortal') {
// Don't schedule check for immortal connections
return;
} else if (this.settings.keepAliveTreatment === 'extended') {
const multiplier = this.settings.keepAliveInactivityMultiplier || 6;
timeout = timeout * multiplier;
}
}
const checkTime = Date.now() + timeout;
this.nextInactivityCheck.set(connectionId, checkTime);
}
/**
* Start the inactivity check timer
*/
private startInactivityCheckTimer(): void {
// Check every 30 seconds for connections that need inactivity check
this.setInterval(() => {
this.performOptimizedInactivityCheck();
}, 30000);
// Note: LifecycleComponent's setInterval already calls unref()
}
/**
* Get a connection by ID
*/
public getConnection(connectionId: string): IConnectionRecord | undefined {
return this.connectionRecords.get(connectionId);
}
/**
* Get all active connections
*/
public getConnections(): Map<string, IConnectionRecord> {
return this.connectionRecords;
}
/**
* Get count of active connections
*/
public getConnectionCount(): number {
return this.connectionRecords.size;
}
/**
* Initiates cleanup once for a connection
*/
public initiateCleanupOnce(record: IConnectionRecord, reason: string = 'normal'): void {
if (this.settings.enableDetailedLogging) {
logger.log('info', `Connection cleanup initiated`, {
connectionId: record.id,
remoteIP: record.remoteIP,
reason,
component: 'connection-manager'
});
}
if (record.incomingTerminationReason == null) {
record.incomingTerminationReason = reason;
this.incrementTerminationStat('incoming', reason);
}
// Add to cleanup queue for batched processing
this.queueCleanup(record.id);
}
/**
* Queue a connection for cleanup
*/
private queueCleanup(connectionId: string): void {
this.cleanupQueue.add(connectionId);
// Process immediately if queue is getting large
if (this.cleanupQueue.size >= this.cleanupBatchSize) {
this.processCleanupQueue();
} else if (!this.cleanupTimer) {
// Otherwise, schedule batch processing
this.cleanupTimer = this.setTimeout(() => {
this.processCleanupQueue();
}, 100);
}
}
/**
* Process the cleanup queue in batches
*/
private processCleanupQueue(): void {
if (this.cleanupTimer) {
this.clearTimeout(this.cleanupTimer);
this.cleanupTimer = null;
}
const toCleanup = Array.from(this.cleanupQueue).slice(0, this.cleanupBatchSize);
this.cleanupQueue.clear();
for (const connectionId of toCleanup) {
const record = this.connectionRecords.get(connectionId);
if (record) {
this.cleanupConnection(record, record.incomingTerminationReason || 'normal');
}
}
// If there are more in queue, schedule next batch
if (this.cleanupQueue.size > 0) {
this.cleanupTimer = this.setTimeout(() => {
this.processCleanupQueue();
}, 10);
}
}
/**
* Clean up a connection record
*/
public cleanupConnection(record: IConnectionRecord, reason: string = 'normal'): void {
if (!record.connectionClosed) {
record.connectionClosed = true;
// Remove from inactivity check
this.nextInactivityCheck.delete(record.id);
// Track connection termination
this.securityManager.removeConnectionByIP(record.remoteIP, record.id);
if (record.cleanupTimer) {
clearTimeout(record.cleanupTimer);
record.cleanupTimer = undefined;
}
// Calculate metrics once
const duration = Date.now() - record.incomingStartTime;
const logData = {
connectionId: record.id,
remoteIP: record.remoteIP,
localPort: record.localPort,
reason,
duration: plugins.prettyMs(duration),
bytes: { in: record.bytesReceived, out: record.bytesSent },
tls: record.isTLS,
keepAlive: record.hasKeepAlive,
usingNetworkProxy: record.usingNetworkProxy,
domainSwitches: record.domainSwitches || 0,
component: 'connection-manager'
};
// Remove all data handlers to make sure we clean up properly
if (record.incoming) {
try {
record.incoming.removeAllListeners('data');
record.renegotiationHandler = undefined;
} catch (err) {
logger.log('error', `Error removing data handlers: ${err}`, {
connectionId: record.id,
error: err,
component: 'connection-manager'
});
}
}
// Handle socket cleanup without delay
this.cleanupSocketImmediate(record, 'incoming', record.incoming);
if (record.outgoing) {
this.cleanupSocketImmediate(record, 'outgoing', record.outgoing);
}
// Clear pendingData to avoid memory leaks
record.pendingData = [];
record.pendingDataSize = 0;
// Remove the record from the tracking map
this.connectionRecords.delete(record.id);
// Log connection details
if (this.settings.enableDetailedLogging) {
logger.log('info',
`Connection terminated: ${record.remoteIP}:${record.localPort} (${reason}) - ` +
`${plugins.prettyMs(duration)}, IN: ${record.bytesReceived}B, OUT: ${record.bytesSent}B`,
logData
);
} else {
logger.log('info',
`Connection terminated: ${record.remoteIP} (${reason}). Active: ${this.connectionRecords.size}`,
{
connectionId: record.id,
remoteIP: record.remoteIP,
reason,
activeConnections: this.connectionRecords.size,
component: 'connection-manager'
}
);
}
}
}
/**
* Helper method to clean up a socket immediately
*/
private cleanupSocketImmediate(record: IConnectionRecord, side: 'incoming' | 'outgoing', socket: plugins.net.Socket): void {
try {
if (!socket.destroyed) {
socket.destroy();
}
} catch (err) {
logger.log('error', `Error destroying ${side} socket: ${err}`, {
connectionId: record.id,
side,
error: err,
component: 'connection-manager'
});
}
}
/**
* Creates a generic error handler for incoming or outgoing sockets
*/
public handleError(side: 'incoming' | 'outgoing', record: IConnectionRecord) {
return (err: Error) => {
const code = (err as any).code;
let reason = 'error';
const now = Date.now();
const connectionDuration = now - record.incomingStartTime;
const lastActivityAge = now - record.lastActivity;
// Update activity tracking
if (side === 'incoming') {
record.lastActivity = now;
this.scheduleInactivityCheck(record.id, record);
}
const errorData = {
connectionId: record.id,
side,
remoteIP: record.remoteIP,
error: err.message,
duration: plugins.prettyMs(connectionDuration),
lastActivity: plugins.prettyMs(lastActivityAge),
component: 'connection-manager'
};
switch (code) {
case 'ECONNRESET':
reason = 'econnreset';
logger.log('warn', `ECONNRESET on ${side}: ${record.remoteIP}`, errorData);
break;
case 'ETIMEDOUT':
reason = 'etimedout';
logger.log('warn', `ETIMEDOUT on ${side}: ${record.remoteIP}`, errorData);
break;
default:
logger.log('error', `Error on ${side}: ${record.remoteIP} - ${err.message}`, errorData);
}
if (side === 'incoming' && record.incomingTerminationReason == null) {
record.incomingTerminationReason = reason;
this.incrementTerminationStat('incoming', reason);
} else if (side === 'outgoing' && record.outgoingTerminationReason == null) {
record.outgoingTerminationReason = reason;
this.incrementTerminationStat('outgoing', reason);
}
this.initiateCleanupOnce(record, reason);
};
}
/**
* Creates a generic close handler for incoming or outgoing sockets
*/
public handleClose(side: 'incoming' | 'outgoing', record: IConnectionRecord) {
return () => {
if (this.settings.enableDetailedLogging) {
logger.log('info', `Connection closed on ${side} side`, {
connectionId: record.id,
side,
remoteIP: record.remoteIP,
component: 'connection-manager'
});
}
if (side === 'incoming' && record.incomingTerminationReason == null) {
record.incomingTerminationReason = 'normal';
this.incrementTerminationStat('incoming', 'normal');
} else if (side === 'outgoing' && record.outgoingTerminationReason == null) {
record.outgoingTerminationReason = 'normal';
this.incrementTerminationStat('outgoing', 'normal');
record.outgoingClosedTime = Date.now();
}
this.initiateCleanupOnce(record, 'closed_' + side);
};
}
/**
* Increment termination statistics
*/
public incrementTerminationStat(side: 'incoming' | 'outgoing', reason: string): void {
this.terminationStats[side][reason] = (this.terminationStats[side][reason] || 0) + 1;
}
/**
* Get termination statistics
*/
public getTerminationStats(): { incoming: Record<string, number>; outgoing: Record<string, number> } {
return this.terminationStats;
}
/**
* Optimized inactivity check - only checks connections that are due
*/
private performOptimizedInactivityCheck(): void {
const now = Date.now();
const connectionsToCheck: string[] = [];
// Find connections that need checking
for (const [connectionId, checkTime] of this.nextInactivityCheck) {
if (checkTime <= now) {
connectionsToCheck.push(connectionId);
}
}
// Process only connections that need checking
for (const connectionId of connectionsToCheck) {
const record = this.connectionRecords.get(connectionId);
if (!record || record.connectionClosed) {
this.nextInactivityCheck.delete(connectionId);
continue;
}
const inactivityTime = now - record.lastActivity;
// Use extended timeout for extended-treatment keep-alive connections
let effectiveTimeout = this.settings.inactivityTimeout!;
if (record.hasKeepAlive && this.settings.keepAliveTreatment === 'extended') {
const multiplier = this.settings.keepAliveInactivityMultiplier || 6;
effectiveTimeout = effectiveTimeout * multiplier;
}
if (inactivityTime > effectiveTimeout) {
// For keep-alive connections, issue a warning first
if (record.hasKeepAlive && !record.inactivityWarningIssued) {
logger.log('warn', `Keep-alive connection inactive: ${record.remoteIP}`, {
connectionId,
remoteIP: record.remoteIP,
inactiveFor: plugins.prettyMs(inactivityTime),
component: 'connection-manager'
});
record.inactivityWarningIssued = true;
// Reschedule check for 10 minutes later
this.nextInactivityCheck.set(connectionId, now + 600000);
// Try to stimulate activity with a probe packet
if (record.outgoing && !record.outgoing.destroyed) {
try {
record.outgoing.write(Buffer.alloc(0));
} catch (err) {
logger.log('error', `Error sending probe packet: ${err}`, {
connectionId,
error: err,
component: 'connection-manager'
});
}
}
} else {
// Close the connection
logger.log('warn', `Closing inactive connection: ${record.remoteIP}`, {
connectionId,
remoteIP: record.remoteIP,
inactiveFor: plugins.prettyMs(inactivityTime),
hasKeepAlive: record.hasKeepAlive,
component: 'connection-manager'
});
this.cleanupConnection(record, 'inactivity');
}
} else {
// Reschedule next check
this.scheduleInactivityCheck(connectionId, record);
}
// Parity check: if outgoing socket closed and incoming remains active
if (
record.outgoingClosedTime &&
!record.incoming.destroyed &&
!record.connectionClosed &&
now - record.outgoingClosedTime > 120000
) {
logger.log('warn', `Parity check failed: ${record.remoteIP}`, {
connectionId,
remoteIP: record.remoteIP,
timeElapsed: plugins.prettyMs(now - record.outgoingClosedTime),
component: 'connection-manager'
});
this.cleanupConnection(record, 'parity_check');
}
}
}
/**
* Legacy method for backward compatibility
*/
public performInactivityCheck(): void {
this.performOptimizedInactivityCheck();
}
/**
* Clear all connections (for shutdown)
*/
public async clearConnections(): Promise<void> {
// Delegate to LifecycleComponent's cleanup
await this.cleanup();
}
/**
* Override LifecycleComponent's onCleanup method
*/
protected async onCleanup(): Promise<void> {
// Process connections in batches to avoid blocking
const connections = Array.from(this.connectionRecords.values());
const batchSize = 100;
let index = 0;
const processBatch = () => {
const batch = connections.slice(index, index + batchSize);
for (const record of batch) {
try {
if (record.cleanupTimer) {
clearTimeout(record.cleanupTimer);
record.cleanupTimer = undefined;
}
// Immediate destruction
if (record.incoming) {
record.incoming.removeAllListeners();
if (!record.incoming.destroyed) {
record.incoming.destroy();
}
}
if (record.outgoing) {
record.outgoing.removeAllListeners();
if (!record.outgoing.destroyed) {
record.outgoing.destroy();
}
}
} catch (err) {
logger.log('error', `Error during connection cleanup: ${err}`, {
connectionId: record.id,
error: err,
component: 'connection-manager'
});
}
}
index += batchSize;
// Continue with next batch if needed
if (index < connections.length) {
setImmediate(processBatch);
} else {
// Clear all maps
this.connectionRecords.clear();
this.nextInactivityCheck.clear();
this.cleanupQueue.clear();
this.terminationStats = { incoming: {}, outgoing: {} };
}
};
// Start batch processing
setImmediate(processBatch);
}
}