import * as plugins from './plugins.js'; import type { IConnectionRecord, IPortProxySettings } from './classes.pp.interfaces.js'; import { SecurityManager } from './classes.pp.securitymanager.js'; import { TimeoutManager } from './classes.pp.timeoutmanager.js'; /** * Manages connection lifecycle, tracking, and cleanup */ export class ConnectionManager { private connectionRecords: Map = new Map(); private terminationStats: { incoming: Record; outgoing: Record; } = { incoming: {}, outgoing: {} }; constructor( private settings: IPortProxySettings, private securityManager: SecurityManager, private timeoutManager: TimeoutManager ) {} /** * Generate a unique connection ID */ public generateConnectionId(): string { return Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15); } /** * Create and track a new connection */ public createConnection(socket: plugins.net.Socket): IConnectionRecord { const connectionId = this.generateConnectionId(); const remoteIP = socket.remoteAddress || ''; const localPort = socket.localPort || 0; const record: IConnectionRecord = { id: connectionId, incoming: socket, outgoing: null, incomingStartTime: Date.now(), lastActivity: Date.now(), connectionClosed: false, pendingData: [], pendingDataSize: 0, bytesReceived: 0, bytesSent: 0, remoteIP, localPort, isTLS: false, tlsHandshakeComplete: false, hasReceivedInitialData: false, hasKeepAlive: false, incomingTerminationReason: null, outgoingTerminationReason: null, usingNetworkProxy: false, isBrowserConnection: false, domainSwitches: 0 }; this.trackConnection(connectionId, record); return record; } /** * Track an existing connection */ public trackConnection(connectionId: string, record: IConnectionRecord): void { this.connectionRecords.set(connectionId, record); this.securityManager.trackConnectionByIP(record.remoteIP, connectionId); } /** * Get a connection by ID */ public getConnection(connectionId: string): IConnectionRecord | undefined { return this.connectionRecords.get(connectionId); } /** * Get all active connections */ public getConnections(): Map { return this.connectionRecords; } /** * Get count of active connections */ public getConnectionCount(): number { return this.connectionRecords.size; } /** * Initiates cleanup once for a connection */ public initiateCleanupOnce(record: IConnectionRecord, reason: string = 'normal'): void { if (this.settings.enableDetailedLogging) { console.log(`[${record.id}] Connection cleanup initiated for ${record.remoteIP} (${reason})`); } if ( record.incomingTerminationReason === null || record.incomingTerminationReason === undefined ) { record.incomingTerminationReason = reason; this.incrementTerminationStat('incoming', reason); } this.cleanupConnection(record, reason); } /** * Clean up a connection record */ public cleanupConnection(record: IConnectionRecord, reason: string = 'normal'): void { if (!record.connectionClosed) { record.connectionClosed = true; // Track connection termination this.securityManager.removeConnectionByIP(record.remoteIP, record.id); if (record.cleanupTimer) { clearTimeout(record.cleanupTimer); record.cleanupTimer = undefined; } // Detailed logging data const duration = Date.now() - record.incomingStartTime; const bytesReceived = record.bytesReceived; const bytesSent = record.bytesSent; // Remove all data handlers to make sure we clean up properly if (record.incoming) { try { // Remove our safe data handler record.incoming.removeAllListeners('data'); // Reset the handler references record.renegotiationHandler = undefined; } catch (err) { console.log(`[${record.id}] Error removing data handlers: ${err}`); } } // Handle incoming socket this.cleanupSocket(record, 'incoming', record.incoming); // Handle outgoing socket if (record.outgoing) { this.cleanupSocket(record, 'outgoing', record.outgoing); } // Clear pendingData to avoid memory leaks record.pendingData = []; record.pendingDataSize = 0; // Remove the record from the tracking map this.connectionRecords.delete(record.id); // Log connection details if (this.settings.enableDetailedLogging) { console.log( `[${record.id}] Connection from ${record.remoteIP} on port ${record.localPort} terminated (${reason}).` + ` Duration: ${plugins.prettyMs(duration)}, Bytes IN: ${bytesReceived}, OUT: ${bytesSent}, ` + `TLS: ${record.isTLS ? 'Yes' : 'No'}, Keep-Alive: ${record.hasKeepAlive ? 'Yes' : 'No'}` + `${record.usingNetworkProxy ? ', Using NetworkProxy' : ''}` + `${record.domainSwitches ? `, Domain switches: ${record.domainSwitches}` : ''}` ); } else { console.log( `[${record.id}] Connection from ${record.remoteIP} terminated (${reason}). Active connections: ${this.connectionRecords.size}` ); } } } /** * Helper method to clean up a socket */ private cleanupSocket(record: IConnectionRecord, side: 'incoming' | 'outgoing', socket: plugins.net.Socket): void { try { if (!socket.destroyed) { // Try graceful shutdown first, then force destroy after a short timeout socket.end(); const socketTimeout = setTimeout(() => { try { if (!socket.destroyed) { socket.destroy(); } } catch (err) { console.log(`[${record.id}] Error destroying ${side} socket: ${err}`); } }, 1000); // Ensure the timeout doesn't block Node from exiting if (socketTimeout.unref) { socketTimeout.unref(); } } } catch (err) { console.log(`[${record.id}] Error closing ${side} socket: ${err}`); try { if (!socket.destroyed) { socket.destroy(); } } catch (destroyErr) { console.log(`[${record.id}] Error destroying ${side} socket: ${destroyErr}`); } } } /** * Creates a generic error handler for incoming or outgoing sockets */ public handleError(side: 'incoming' | 'outgoing', record: IConnectionRecord) { return (err: Error) => { const code = (err as any).code; let reason = 'error'; const now = Date.now(); const connectionDuration = now - record.incomingStartTime; const lastActivityAge = now - record.lastActivity; if (code === 'ECONNRESET') { reason = 'econnreset'; console.log( `[${record.id}] ECONNRESET on ${side} side from ${record.remoteIP}: ${err.message}. ` + `Duration: ${plugins.prettyMs(connectionDuration)}, Last activity: ${plugins.prettyMs(lastActivityAge)} ago` ); } else if (code === 'ETIMEDOUT') { reason = 'etimedout'; console.log( `[${record.id}] ETIMEDOUT on ${side} side from ${record.remoteIP}: ${err.message}. ` + `Duration: ${plugins.prettyMs(connectionDuration)}, Last activity: ${plugins.prettyMs(lastActivityAge)} ago` ); } else { console.log( `[${record.id}] Error on ${side} side from ${record.remoteIP}: ${err.message}. ` + `Duration: ${plugins.prettyMs(connectionDuration)}, Last activity: ${plugins.prettyMs(lastActivityAge)} ago` ); } if (side === 'incoming' && record.incomingTerminationReason === null) { record.incomingTerminationReason = reason; this.incrementTerminationStat('incoming', reason); } else if (side === 'outgoing' && record.outgoingTerminationReason === null) { record.outgoingTerminationReason = reason; this.incrementTerminationStat('outgoing', reason); } this.initiateCleanupOnce(record, reason); }; } /** * Creates a generic close handler for incoming or outgoing sockets */ public handleClose(side: 'incoming' | 'outgoing', record: IConnectionRecord) { return () => { if (this.settings.enableDetailedLogging) { console.log(`[${record.id}] Connection closed on ${side} side from ${record.remoteIP}`); } if (side === 'incoming' && record.incomingTerminationReason === null) { record.incomingTerminationReason = 'normal'; this.incrementTerminationStat('incoming', 'normal'); } else if (side === 'outgoing' && record.outgoingTerminationReason === null) { record.outgoingTerminationReason = 'normal'; this.incrementTerminationStat('outgoing', 'normal'); // Record the time when outgoing socket closed. record.outgoingClosedTime = Date.now(); } this.initiateCleanupOnce(record, 'closed_' + side); }; } /** * Increment termination statistics */ public incrementTerminationStat(side: 'incoming' | 'outgoing', reason: string): void { this.terminationStats[side][reason] = (this.terminationStats[side][reason] || 0) + 1; } /** * Get termination statistics */ public getTerminationStats(): { incoming: Record; outgoing: Record } { return this.terminationStats; } /** * Check for stalled/inactive connections */ public performInactivityCheck(): void { const now = Date.now(); const connectionIds = [...this.connectionRecords.keys()]; for (const id of connectionIds) { const record = this.connectionRecords.get(id); if (!record) continue; // Skip inactivity check if disabled or for immortal keep-alive connections if ( this.settings.disableInactivityCheck || (record.hasKeepAlive && this.settings.keepAliveTreatment === 'immortal') ) { continue; } const inactivityTime = now - record.lastActivity; // Use extended timeout for extended-treatment keep-alive connections let effectiveTimeout = this.settings.inactivityTimeout!; if (record.hasKeepAlive && this.settings.keepAliveTreatment === 'extended') { const multiplier = this.settings.keepAliveInactivityMultiplier || 6; effectiveTimeout = effectiveTimeout * multiplier; } if (inactivityTime > effectiveTimeout && !record.connectionClosed) { // For keep-alive connections, issue a warning first if (record.hasKeepAlive && !record.inactivityWarningIssued) { console.log( `[${id}] Warning: Keep-alive connection from ${record.remoteIP} inactive for ${ plugins.prettyMs(inactivityTime) }. Will close in 10 minutes if no activity.` ); // Set warning flag and add grace period record.inactivityWarningIssued = true; record.lastActivity = now - (effectiveTimeout - 600000); // Try to stimulate activity with a probe packet if (record.outgoing && !record.outgoing.destroyed) { try { record.outgoing.write(Buffer.alloc(0)); if (this.settings.enableDetailedLogging) { console.log(`[${id}] Sent probe packet to test keep-alive connection`); } } catch (err) { console.log(`[${id}] Error sending probe packet: ${err}`); } } } else { // For non-keep-alive or after warning, close the connection console.log( `[${id}] Inactivity check: No activity on connection from ${record.remoteIP} ` + `for ${plugins.prettyMs(inactivityTime)}.` + (record.hasKeepAlive ? ' Despite keep-alive being enabled.' : '') ); this.cleanupConnection(record, 'inactivity'); } } else if (inactivityTime <= effectiveTimeout && record.inactivityWarningIssued) { // If activity detected after warning, clear the warning if (this.settings.enableDetailedLogging) { console.log( `[${id}] Connection activity detected after inactivity warning, resetting warning` ); } record.inactivityWarningIssued = false; } // Parity check: if outgoing socket closed and incoming remains active if ( record.outgoingClosedTime && !record.incoming.destroyed && !record.connectionClosed && now - record.outgoingClosedTime > 120000 ) { console.log( `[${id}] Parity check: Incoming socket for ${record.remoteIP} still active ${ plugins.prettyMs(now - record.outgoingClosedTime) } after outgoing closed.` ); this.cleanupConnection(record, 'parity_check'); } } } /** * Clear all connections (for shutdown) */ public clearConnections(): void { // Create a copy of the keys to avoid modification during iteration const connectionIds = [...this.connectionRecords.keys()]; // First pass: End all connections gracefully for (const id of connectionIds) { const record = this.connectionRecords.get(id); if (record) { try { // Clear any timers if (record.cleanupTimer) { clearTimeout(record.cleanupTimer); record.cleanupTimer = undefined; } // End sockets gracefully if (record.incoming && !record.incoming.destroyed) { record.incoming.end(); } if (record.outgoing && !record.outgoing.destroyed) { record.outgoing.end(); } } catch (err) { console.log(`Error during graceful connection end for ${id}: ${err}`); } } } // Short delay to allow graceful ends to process setTimeout(() => { // Second pass: Force destroy everything for (const id of connectionIds) { const record = this.connectionRecords.get(id); if (record) { try { // Remove all listeners to prevent memory leaks if (record.incoming) { record.incoming.removeAllListeners(); if (!record.incoming.destroyed) { record.incoming.destroy(); } } if (record.outgoing) { record.outgoing.removeAllListeners(); if (!record.outgoing.destroyed) { record.outgoing.destroy(); } } } catch (err) { console.log(`Error during forced connection destruction for ${id}: ${err}`); } } } // Clear all maps this.connectionRecords.clear(); this.terminationStats = { incoming: {}, outgoing: {} }; }, 100); } }