feat(PortProxy): Enhancements to connection management in PortProxy
This commit is contained in:
		
							
								
								
									
										10
									
								
								changelog.md
									
									
									
									
									
								
							
							
						
						
									
										10
									
								
								changelog.md
									
									
									
									
									
								
							| @@ -1,5 +1,15 @@ | ||||
| # Changelog | ||||
|  | ||||
| ## 2025-03-03 - 3.21.0 - feat(PortProxy) | ||||
| Enhancements to connection management in PortProxy | ||||
|  | ||||
| - Introduced a unique ID for each connection record for improved tracking. | ||||
| - Enhanced cleanup mechanism for connections with dual states: initiated and executed. | ||||
| - Implemented shutdown process handling to ensure graceful connection closure. | ||||
| - Added logging for better tracing of connection activities and states. | ||||
| - Improved connection setup with explicit timeouts and data flow management. | ||||
| - Integrated inactivity and parity checks to monitor connection health. | ||||
|  | ||||
| ## 2025-03-01 - 3.20.2 - fix(PortProxy) | ||||
| Enhance connection cleanup handling in PortProxy | ||||
|  | ||||
|   | ||||
| @@ -3,6 +3,6 @@ | ||||
|  */ | ||||
| export const commitinfo = { | ||||
|   name: '@push.rocks/smartproxy', | ||||
|   version: '3.20.2', | ||||
|   version: '3.21.0', | ||||
|   description: 'A powerful proxy package that effectively handles high traffic, with features such as SSL/TLS support, port proxying, WebSocket handling, and dynamic routing with authentication options.' | ||||
| } | ||||
|   | ||||
| @@ -22,6 +22,7 @@ export interface IPortProxySettings extends plugins.tls.TlsOptions { | ||||
|   maxConnectionLifetime?: number; // (ms) force cleanup of long-lived connections | ||||
|   globalPortRanges: Array<{ from: number; to: number }>; // Global allowed port ranges | ||||
|   forwardAllGlobalRanges?: boolean; // When true, forwards all connections on global port ranges to the global targetIP | ||||
|   gracefulShutdownTimeout?: number; // (ms) maximum time to wait for connections to close during shutdown | ||||
| } | ||||
|  | ||||
| /** | ||||
| @@ -93,9 +94,12 @@ interface IConnectionRecord { | ||||
|   incomingStartTime: number; | ||||
|   outgoingStartTime?: number; | ||||
|   outgoingClosedTime?: number; | ||||
|   lockedDomain?: string; // New field to lock this connection to the initial SNI | ||||
|   lockedDomain?: string; // Field to lock this connection to the initial SNI | ||||
|   connectionClosed: boolean; | ||||
|   cleanupTimer?: NodeJS.Timeout; // Timer to force cleanup after max lifetime/inactivity | ||||
|   cleanupInitiated: boolean; // Flag to track if cleanup has been initiated but not completed | ||||
|   id: string; // Unique identifier for the connection | ||||
|   lastActivity: number; // Timestamp of last activity on either socket | ||||
| } | ||||
|  | ||||
| // Helper: Check if a port falls within any of the given port ranges. | ||||
| @@ -128,12 +132,18 @@ const isGlobIPAllowed = (ip: string, allowed: string[], blocked: string[] = []): | ||||
|   return isAllowed(ip, allowed); | ||||
| }; | ||||
|  | ||||
| // Helper: Generate a unique ID for a connection | ||||
| const generateConnectionId = (): string => { | ||||
|   return Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15); | ||||
| }; | ||||
|  | ||||
| export class PortProxy { | ||||
|   private netServers: plugins.net.Server[] = []; | ||||
|   settings: IPortProxySettings; | ||||
|   // Unified record tracking each connection pair. | ||||
|   private connectionRecords: Set<IConnectionRecord> = new Set(); | ||||
|   private connectionRecords: Map<string, IConnectionRecord> = new Map(); | ||||
|   private connectionLogger: NodeJS.Timeout | null = null; | ||||
|   private isShuttingDown: boolean = false; | ||||
|  | ||||
|   // Map to track round robin indices for each domain config. | ||||
|   private domainTargetIndices: Map<IDomainConfig, number> = new Map(); | ||||
| @@ -151,6 +161,7 @@ export class PortProxy { | ||||
|       ...settingsArg, | ||||
|       targetIP: settingsArg.targetIP || 'localhost', | ||||
|       maxConnectionLifetime: settingsArg.maxConnectionLifetime || 600000, | ||||
|       gracefulShutdownTimeout: settingsArg.gracefulShutdownTimeout || 30000, | ||||
|     }; | ||||
|   } | ||||
|  | ||||
| @@ -159,30 +170,77 @@ export class PortProxy { | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Cleans up a connection record if not already cleaned up. | ||||
|    * Destroys both incoming and outgoing sockets, clears timers, and removes the record. | ||||
|    * Logs the cleanup event. | ||||
|    * Initiates the cleanup process for a connection. | ||||
|    * Sets the flag to prevent duplicate cleanup attempts and schedules actual cleanup. | ||||
|    */ | ||||
|   private cleanupConnection(record: IConnectionRecord, special: boolean = false): void { | ||||
|     if (!record.connectionClosed) { | ||||
|   private initiateCleanup(record: IConnectionRecord, reason: string = 'normal'): void { | ||||
|     if (record.cleanupInitiated) return; | ||||
|      | ||||
|     record.cleanupInitiated = true; | ||||
|     const remoteIP = record.incoming.remoteAddress || 'unknown'; | ||||
|     console.log(`Initiating cleanup for connection ${record.id} from ${remoteIP} (reason: ${reason})`); | ||||
|      | ||||
|     // Execute cleanup immediately to prevent lingering connections | ||||
|     this.executeCleanup(record); | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Executes the actual cleanup of a connection. | ||||
|    * Destroys sockets, clears timers, and removes the record. | ||||
|    */ | ||||
|   private executeCleanup(record: IConnectionRecord): void { | ||||
|     if (record.connectionClosed) return; | ||||
|      | ||||
|     record.connectionClosed = true; | ||||
|     const remoteIP = record.incoming.remoteAddress || 'unknown'; | ||||
|      | ||||
|     if (record.cleanupTimer) { | ||||
|       clearTimeout(record.cleanupTimer); | ||||
|       record.cleanupTimer = undefined; | ||||
|     } | ||||
|      | ||||
|     // End the sockets first to allow for graceful closure | ||||
|     try { | ||||
|       if (!record.incoming.destroyed) { | ||||
|         record.incoming.end(); | ||||
|         // Set a safety timeout to force destroy if end doesn't complete | ||||
|         setTimeout(() => { | ||||
|           if (!record.incoming.destroyed) { | ||||
|             console.log(`Forcing destruction of incoming socket for ${remoteIP}`); | ||||
|             record.incoming.destroy(); | ||||
|           } | ||||
|         }, 1000); | ||||
|       } | ||||
|     } catch (err) { | ||||
|       console.error(`Error ending incoming socket for ${remoteIP}:`, err); | ||||
|       if (!record.incoming.destroyed) { | ||||
|         record.incoming.destroy(); | ||||
|       } | ||||
|     } | ||||
|  | ||||
|     try { | ||||
|       if (record.outgoing && !record.outgoing.destroyed) { | ||||
|         record.outgoing.end(); | ||||
|         // Set a safety timeout to force destroy if end doesn't complete | ||||
|         setTimeout(() => { | ||||
|           if (record.outgoing && !record.outgoing.destroyed) { | ||||
|             console.log(`Forcing destruction of outgoing socket for ${remoteIP}`); | ||||
|             record.outgoing.destroy(); | ||||
|           } | ||||
|         }, 1000); | ||||
|       } | ||||
|     } catch (err) { | ||||
|       console.error(`Error ending outgoing socket for ${remoteIP}:`, err); | ||||
|       if (record.outgoing && !record.outgoing.destroyed) { | ||||
|         record.outgoing.destroy(); | ||||
|       } | ||||
|       this.connectionRecords.delete(record); | ||||
|       const remoteIP = record.incoming.remoteAddress || 'unknown'; | ||||
|       if (special) { | ||||
|         console.log(`Special parity cleanup: Connection from ${remoteIP} cleaned up due to duration difference.`); | ||||
|       } else { | ||||
|         console.log(`Connection from ${remoteIP} terminated. Active connections: ${this.connectionRecords.size}`); | ||||
|       } | ||||
|     } | ||||
|  | ||||
|     // Remove the record after a delay to ensure all events have propagated | ||||
|     setTimeout(() => { | ||||
|       this.connectionRecords.delete(record.id); | ||||
|       console.log(`Connection ${record.id} from ${remoteIP} fully cleaned up. Active connections: ${this.connectionRecords.size}`); | ||||
|     }, 2000); | ||||
|   } | ||||
|  | ||||
|   private getTargetIP(domainConfig: IDomainConfig): string { | ||||
| @@ -195,27 +253,60 @@ export class PortProxy { | ||||
|     return this.settings.targetIP!; | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Updates the last activity timestamp for a connection record | ||||
|    */ | ||||
|   private updateActivity(record: IConnectionRecord): void { | ||||
|     record.lastActivity = Date.now(); | ||||
|      | ||||
|     // Reset the inactivity timer if one is set | ||||
|     if (this.settings.maxConnectionLifetime && record.cleanupTimer) { | ||||
|       clearTimeout(record.cleanupTimer); | ||||
|        | ||||
|       // Set a new cleanup timer | ||||
|       record.cleanupTimer = setTimeout(() => { | ||||
|         const now = Date.now(); | ||||
|         const inactivityTime = now - record.lastActivity; | ||||
|         const remoteIP = record.incoming.remoteAddress || 'unknown'; | ||||
|         console.log(`Connection ${record.id} from ${remoteIP} exceeded max lifetime or inactivity period (${inactivityTime}ms), forcing cleanup.`); | ||||
|         this.initiateCleanup(record, 'timeout'); | ||||
|       }, this.settings.maxConnectionLifetime); | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   public async start() { | ||||
|     // Define a unified connection handler for all listening ports. | ||||
|     const connectionHandler = (socket: plugins.net.Socket) => { | ||||
|       if (this.isShuttingDown) { | ||||
|         socket.end(); | ||||
|         socket.destroy(); | ||||
|         return; | ||||
|       } | ||||
|  | ||||
|       const remoteIP = socket.remoteAddress || ''; | ||||
|       const localPort = socket.localPort; // The port on which this connection was accepted. | ||||
|        | ||||
|       const connectionId = generateConnectionId(); | ||||
|       const connectionRecord: IConnectionRecord = { | ||||
|         id: connectionId, | ||||
|         incoming: socket, | ||||
|         outgoing: null, | ||||
|         incomingStartTime: Date.now(), | ||||
|         lastActivity: Date.now(), | ||||
|         connectionClosed: false, | ||||
|         cleanupInitiated: false | ||||
|       }; | ||||
|       this.connectionRecords.add(connectionRecord); | ||||
|       console.log(`New connection from ${remoteIP} on port ${localPort}. Active connections: ${this.connectionRecords.size}`); | ||||
|  | ||||
|       this.connectionRecords.set(connectionId, connectionRecord); | ||||
|       console.log(`New connection ${connectionId} from ${remoteIP} on port ${localPort}. Active connections: ${this.connectionRecords.size}`); | ||||
|  | ||||
|       let initialDataReceived = false; | ||||
|       let incomingTerminationReason: string | null = null; | ||||
|       let outgoingTerminationReason: string | null = null; | ||||
|  | ||||
|       // Local cleanup function that delegates to the class method. | ||||
|       const cleanupOnce = () => { | ||||
|         this.cleanupConnection(connectionRecord); | ||||
|       const initiateCleanupOnce = (reason: string = 'normal') => { | ||||
|         this.initiateCleanup(connectionRecord, reason); | ||||
|       }; | ||||
|  | ||||
|       // Helper to reject an incoming connection. | ||||
| @@ -226,14 +317,31 @@ export class PortProxy { | ||||
|           incomingTerminationReason = reason; | ||||
|           this.incrementTerminationStat('incoming', reason); | ||||
|         } | ||||
|         cleanupOnce(); | ||||
|         initiateCleanupOnce(reason); | ||||
|       }; | ||||
|  | ||||
|       // Set an initial timeout immediately | ||||
|       const initialTimeout = setTimeout(() => { | ||||
|         if (!initialDataReceived) { | ||||
|           console.log(`Initial connection timeout for ${remoteIP} (no data received)`); | ||||
|           if (incomingTerminationReason === null) { | ||||
|             incomingTerminationReason = 'initial_timeout'; | ||||
|             this.incrementTerminationStat('incoming', 'initial_timeout'); | ||||
|           } | ||||
|           initiateCleanupOnce('initial_timeout'); | ||||
|         } | ||||
|       }, 5000); | ||||
|  | ||||
|       socket.on('error', (err: Error) => { | ||||
|         const errorMessage = initialDataReceived | ||||
|           ? `(Immediate) Incoming socket error from ${remoteIP}: ${err.message}` | ||||
|           : `(Premature) Incoming socket error from ${remoteIP} before data received: ${err.message}`; | ||||
|         console.log(errorMessage); | ||||
|          | ||||
|         // Clear the initial timeout if it exists | ||||
|         if (initialTimeout) { | ||||
|           clearTimeout(initialTimeout); | ||||
|         } | ||||
|       }); | ||||
|  | ||||
|       const handleError = (side: 'incoming' | 'outgoing') => (err: Error) => { | ||||
| @@ -242,9 +350,13 @@ export class PortProxy { | ||||
|         if (code === 'ECONNRESET') { | ||||
|           reason = 'econnreset'; | ||||
|           console.log(`ECONNRESET on ${side} side from ${remoteIP}: ${err.message}`); | ||||
|         } else if (code === 'ECONNREFUSED') { | ||||
|           reason = 'econnrefused'; | ||||
|           console.log(`ECONNREFUSED on ${side} side from ${remoteIP}: ${err.message}`); | ||||
|         } else { | ||||
|           console.log(`Error on ${side} side from ${remoteIP}: ${err.message}`); | ||||
|         } | ||||
|          | ||||
|         if (side === 'incoming' && incomingTerminationReason === null) { | ||||
|           incomingTerminationReason = reason; | ||||
|           this.incrementTerminationStat('incoming', reason); | ||||
| @@ -252,11 +364,13 @@ export class PortProxy { | ||||
|           outgoingTerminationReason = reason; | ||||
|           this.incrementTerminationStat('outgoing', reason); | ||||
|         } | ||||
|         cleanupOnce(); | ||||
|          | ||||
|         initiateCleanupOnce(reason); | ||||
|       }; | ||||
|  | ||||
|       const handleClose = (side: 'incoming' | 'outgoing') => () => { | ||||
|         console.log(`Connection closed on ${side} side from ${remoteIP}`); | ||||
|          | ||||
|         if (side === 'incoming' && incomingTerminationReason === null) { | ||||
|           incomingTerminationReason = 'normal'; | ||||
|           this.incrementTerminationStat('incoming', 'normal'); | ||||
| @@ -265,8 +379,24 @@ export class PortProxy { | ||||
|           this.incrementTerminationStat('outgoing', 'normal'); | ||||
|           // Record the time when outgoing socket closed. | ||||
|           connectionRecord.outgoingClosedTime = Date.now(); | ||||
|            | ||||
|           // If incoming is still active but outgoing closed, set a shorter timeout | ||||
|           if (!connectionRecord.incoming.destroyed) { | ||||
|             console.log(`Outgoing socket closed but incoming still active for ${remoteIP}. Setting cleanup timeout.`); | ||||
|             setTimeout(() => { | ||||
|               if (!connectionRecord.connectionClosed && !connectionRecord.incoming.destroyed) { | ||||
|                 console.log(`Incoming socket still active ${Date.now() - connectionRecord.outgoingClosedTime!}ms after outgoing closed for ${remoteIP}. Cleaning up.`); | ||||
|                 initiateCleanupOnce('outgoing_closed_timeout'); | ||||
|               } | ||||
|             }, 10000); // 10 second timeout instead of waiting for the next parity check | ||||
|           } | ||||
|         } | ||||
|          | ||||
|         // If both sides are closed/destroyed, clean up | ||||
|         if ((side === 'incoming' && connectionRecord.outgoing?.destroyed) ||  | ||||
|             (side === 'outgoing' && connectionRecord.incoming.destroyed)) { | ||||
|           initiateCleanupOnce('both_closed'); | ||||
|         } | ||||
|         cleanupOnce(); | ||||
|       }; | ||||
|  | ||||
|       /** | ||||
| @@ -274,9 +404,14 @@ export class PortProxy { | ||||
|        * @param serverName - The SNI hostname (unused when forcedDomain is provided). | ||||
|        * @param initialChunk - Optional initial data chunk. | ||||
|        * @param forcedDomain - If provided, overrides SNI/domain lookup (used for port-based routing). | ||||
|        * @param overridePort - If provided, use this port for the outgoing connection (typically the same as the incoming port). | ||||
|        * @param overridePort - If provided, use this port for the outgoing connection. | ||||
|        */ | ||||
|       const setupConnection = (serverName: string, initialChunk?: Buffer, forcedDomain?: IDomainConfig, overridePort?: number) => { | ||||
|         // Clear the initial timeout since we've received data | ||||
|         if (initialTimeout) { | ||||
|           clearTimeout(initialTimeout); | ||||
|         } | ||||
|          | ||||
|         // If a forcedDomain is provided (port-based routing), use it; otherwise, use SNI-based lookup. | ||||
|         const domainConfig = forcedDomain | ||||
|           ? forcedDomain | ||||
| @@ -297,10 +432,13 @@ export class PortProxy { | ||||
|           if (!isGlobIPAllowed(remoteIP, effectiveAllowedIPs, effectiveBlockedIPs)) { | ||||
|             return rejectIncomingConnection('rejected', `Connection rejected: IP ${remoteIP} not allowed for domain ${domainConfig.domains.join(', ')}`); | ||||
|           } | ||||
|         } else if (this.settings.defaultAllowedIPs) { | ||||
|         } else if (this.settings.defaultAllowedIPs && this.settings.defaultAllowedIPs.length > 0) { | ||||
|           if (!isGlobIPAllowed(remoteIP, this.settings.defaultAllowedIPs, this.settings.defaultBlockedIPs || [])) { | ||||
|             return rejectIncomingConnection('rejected', `Connection rejected: IP ${remoteIP} not allowed by default allowed list`); | ||||
|           } | ||||
|         } else { | ||||
|           // No domain config and no default allowed IPs - reject the connection | ||||
|           return rejectIncomingConnection('no_config', `Connection rejected: No matching domain configuration or default allowed IPs for ${remoteIP}`); | ||||
|         } | ||||
|  | ||||
|         const targetHost = domainConfig ? this.getTargetIP(domainConfig) : this.settings.targetIP!; | ||||
| @@ -312,88 +450,127 @@ export class PortProxy { | ||||
|           connectionOptions.localAddress = remoteIP.replace('::ffff:', ''); | ||||
|         } | ||||
|  | ||||
|         // Add explicit connection timeout and error handling | ||||
|         let connectionTimeout: NodeJS.Timeout | null = null; | ||||
|         let connectionSucceeded = false; | ||||
|          | ||||
|         // Set connection timeout | ||||
|         connectionTimeout = setTimeout(() => { | ||||
|           if (!connectionSucceeded) { | ||||
|             console.log(`Connection timeout connecting to ${targetHost}:${connectionOptions.port} for ${remoteIP}`); | ||||
|             if (outgoingTerminationReason === null) { | ||||
|               outgoingTerminationReason = 'connection_timeout'; | ||||
|               this.incrementTerminationStat('outgoing', 'connection_timeout'); | ||||
|             } | ||||
|             initiateCleanupOnce('connection_timeout'); | ||||
|           } | ||||
|         }, 5000); | ||||
|  | ||||
|         console.log(`Attempting to connect to ${targetHost}:${connectionOptions.port} for client ${remoteIP}...`); | ||||
|          | ||||
|         // Create the target socket | ||||
|         const targetSocket = plugins.net.connect(connectionOptions); | ||||
|         connectionRecord.outgoing = targetSocket; | ||||
|         connectionRecord.outgoingStartTime = Date.now(); | ||||
|          | ||||
|         // Handle successful connection | ||||
|         targetSocket.once('connect', () => { | ||||
|           connectionSucceeded = true; | ||||
|           if (connectionTimeout) { | ||||
|             clearTimeout(connectionTimeout); | ||||
|             connectionTimeout = null; | ||||
|           } | ||||
|            | ||||
|           connectionRecord.outgoingStartTime = Date.now(); | ||||
|           console.log( | ||||
|             `Connection established: ${remoteIP} -> ${targetHost}:${connectionOptions.port}` + | ||||
|             `${serverName ? ` (SNI: ${serverName})` : forcedDomain ? ` (Port-based for domain: ${forcedDomain.domains.join(', ')})` : ''}` | ||||
|           ); | ||||
|  | ||||
|           // Setup data flow after confirmed connection | ||||
|           setupDataFlow(targetSocket, initialChunk); | ||||
|         }); | ||||
|  | ||||
|         // Handle connection errors early | ||||
|         targetSocket.once('error', (err) => { | ||||
|           if (!connectionSucceeded) { | ||||
|             // This is an initial connection error | ||||
|             console.log(`Failed to connect to ${targetHost}:${connectionOptions.port} for ${remoteIP}: ${err.message}`); | ||||
|             if (connectionTimeout) { | ||||
|               clearTimeout(connectionTimeout); | ||||
|               connectionTimeout = null; | ||||
|             } | ||||
|             if (outgoingTerminationReason === null) { | ||||
|               outgoingTerminationReason = 'connection_failed'; | ||||
|               this.incrementTerminationStat('outgoing', 'connection_failed'); | ||||
|             } | ||||
|             initiateCleanupOnce('connection_failed'); | ||||
|           } | ||||
|           // Other errors will be handled by the main error handler | ||||
|         }); | ||||
|       }; | ||||
|  | ||||
|       /** | ||||
|        * Sets up the data flow between sockets after successful connection | ||||
|        */ | ||||
|       const setupDataFlow = (targetSocket: plugins.net.Socket, initialChunk?: Buffer) => { | ||||
|         if (initialChunk) { | ||||
|           socket.unshift(initialChunk); | ||||
|         } | ||||
|          | ||||
|         // Set appropriate timeouts for both sockets | ||||
|         socket.setTimeout(120000); | ||||
|         targetSocket.setTimeout(120000); | ||||
|          | ||||
|         // Set up the pipe in both directions | ||||
|         socket.pipe(targetSocket); | ||||
|         targetSocket.pipe(socket); | ||||
|  | ||||
|         // Attach error and close handlers. | ||||
|         // Attach error and close handlers | ||||
|         socket.on('error', handleError('incoming')); | ||||
|         targetSocket.on('error', handleError('outgoing')); | ||||
|         socket.on('close', handleClose('incoming')); | ||||
|         targetSocket.on('close', handleClose('outgoing')); | ||||
|          | ||||
|         // Handle timeout events | ||||
|         socket.on('timeout', () => { | ||||
|           console.log(`Timeout on incoming side from ${remoteIP}`); | ||||
|           if (incomingTerminationReason === null) { | ||||
|             incomingTerminationReason = 'timeout'; | ||||
|             this.incrementTerminationStat('incoming', 'timeout'); | ||||
|           } | ||||
|           cleanupOnce(); | ||||
|           initiateCleanupOnce('timeout'); | ||||
|         }); | ||||
|          | ||||
|         targetSocket.on('timeout', () => { | ||||
|           console.log(`Timeout on outgoing side from ${remoteIP}`); | ||||
|           if (outgoingTerminationReason === null) { | ||||
|             outgoingTerminationReason = 'timeout'; | ||||
|             this.incrementTerminationStat('outgoing', 'timeout'); | ||||
|           } | ||||
|           cleanupOnce(); | ||||
|           initiateCleanupOnce('timeout'); | ||||
|         }); | ||||
|          | ||||
|         socket.on('end', handleClose('incoming')); | ||||
|         targetSocket.on('end', handleClose('outgoing')); | ||||
|  | ||||
|         // Initialize a cleanup timer for max connection lifetime. | ||||
|         // Track activity for both sockets to reset inactivity timers | ||||
|         socket.on('data', (data) => { | ||||
|           this.updateActivity(connectionRecord); | ||||
|         }); | ||||
|          | ||||
|         targetSocket.on('data', (data) => { | ||||
|           this.updateActivity(connectionRecord); | ||||
|         }); | ||||
|  | ||||
|         // Initialize a cleanup timer for max connection lifetime | ||||
|         if (this.settings.maxConnectionLifetime) { | ||||
|           // Flags to track if data was seen from each side. | ||||
|           let incomingActive = false; | ||||
|           let outgoingActive = false; | ||||
|           const resetCleanupTimer = () => { | ||||
|             if (this.settings.maxConnectionLifetime) { | ||||
|               if (connectionRecord.cleanupTimer) { | ||||
|                 clearTimeout(connectionRecord.cleanupTimer); | ||||
|               } | ||||
|           connectionRecord.cleanupTimer = setTimeout(() => { | ||||
|                 console.log(`Connection from ${remoteIP} exceeded max lifetime with inactivity (${this.settings.maxConnectionLifetime}ms), forcing cleanup.`); | ||||
|                 cleanupOnce(); | ||||
|             console.log(`Connection from ${remoteIP} exceeded max lifetime (${this.settings.maxConnectionLifetime}ms), forcing cleanup.`); | ||||
|             initiateCleanupOnce('max_lifetime'); | ||||
|           }, this.settings.maxConnectionLifetime); | ||||
|         } | ||||
|       }; | ||||
|  | ||||
|           resetCleanupTimer(); | ||||
|  | ||||
|           // Only reset the timer if outgoing socket is still active. | ||||
|           socket.on('data', () => { | ||||
|             incomingActive = true; | ||||
|             // Check if outgoing has not been closed before resetting timer. | ||||
|             if (!connectionRecord.outgoingClosedTime && incomingActive && outgoingActive) { | ||||
|               resetCleanupTimer(); | ||||
|               incomingActive = false; | ||||
|               outgoingActive = false; | ||||
|             } | ||||
|           }); | ||||
|           targetSocket.on('data', () => { | ||||
|             // If outgoing is closed, do not set outgoingActive. | ||||
|             if (connectionRecord.outgoingClosedTime) return; | ||||
|             outgoingActive = true; | ||||
|             if (incomingActive && outgoingActive) { | ||||
|               resetCleanupTimer(); | ||||
|               incomingActive = false; | ||||
|               outgoingActive = false; | ||||
|             } | ||||
|           }); | ||||
|         } | ||||
|       }; | ||||
|  | ||||
|       // --- PORT RANGE-BASED HANDLING --- | ||||
|       // Only apply port-based rules if the incoming port is within one of the global port ranges. | ||||
|       if (this.settings.globalPortRanges && isPortInRanges(localPort, this.settings.globalPortRanges)) { | ||||
| @@ -401,6 +578,7 @@ export class PortProxy { | ||||
|           if (this.settings.defaultAllowedIPs && !isAllowed(remoteIP, this.settings.defaultAllowedIPs)) { | ||||
|             console.log(`Connection from ${remoteIP} rejected: IP ${remoteIP} not allowed in global default allowed list.`); | ||||
|             socket.end(); | ||||
|             initiateCleanupOnce('rejected'); | ||||
|             return; | ||||
|           } | ||||
|           console.log(`Port-based connection from ${remoteIP} on port ${localPort} forwarded to global target IP ${this.settings.targetIP}.`); | ||||
| @@ -429,6 +607,7 @@ export class PortProxy { | ||||
|             if (!isGlobIPAllowed(remoteIP, effectiveAllowedIPs, effectiveBlockedIPs)) { | ||||
|               console.log(`Connection from ${remoteIP} rejected: IP not allowed for domain ${forcedDomain.domains.join(', ')} on port ${localPort}.`); | ||||
|               socket.end(); | ||||
|               initiateCleanupOnce('rejected'); | ||||
|               return; | ||||
|             } | ||||
|             console.log(`Port-based connection from ${remoteIP} on port ${localPort} matched domain ${forcedDomain.domains.join(', ')}.`); | ||||
| @@ -441,32 +620,36 @@ export class PortProxy { | ||||
|  | ||||
|       // --- FALLBACK: SNI-BASED HANDLING (or default when SNI is disabled) --- | ||||
|       if (this.settings.sniEnabled) { | ||||
|         socket.setTimeout(5000, () => { | ||||
|           console.log(`Initial data timeout for ${remoteIP}`); | ||||
|           socket.end(); | ||||
|           cleanupOnce(); | ||||
|         }); | ||||
|         initialDataReceived = false; | ||||
|  | ||||
|         socket.once('data', (chunk: Buffer) => { | ||||
|           socket.setTimeout(0); | ||||
|           initialDataReceived = true; | ||||
|           const serverName = extractSNI(chunk) || ''; | ||||
|           // Lock the connection to the negotiated SNI. | ||||
|           connectionRecord.lockedDomain = serverName; | ||||
|           console.log(`Received connection from ${remoteIP} with SNI: ${serverName}`); | ||||
|            | ||||
|           // Delay adding the renegotiation listener until the next tick, | ||||
|           // so the initial ClientHello is not reprocessed. | ||||
|           setImmediate(() => { | ||||
|             socket.on('data', (renegChunk: Buffer) => { | ||||
|               if (renegChunk.length > 0 && renegChunk.readUInt8(0) === 22) { | ||||
|                 try { | ||||
|                   // Try to extract SNI from potential renegotiation | ||||
|                   const newSNI = extractSNI(renegChunk); | ||||
|                   if (newSNI && newSNI !== connectionRecord.lockedDomain) { | ||||
|                     console.log(`Rehandshake detected with different SNI: ${newSNI} vs locked ${connectionRecord.lockedDomain}. Terminating connection.`); | ||||
|                   cleanupOnce(); | ||||
|                     initiateCleanupOnce('sni_mismatch'); | ||||
|                   } else if (newSNI) { | ||||
|                     console.log(`Rehandshake detected with same SNI: ${newSNI}. Allowing.`); | ||||
|                   } | ||||
|                 } catch (err) { | ||||
|                   console.log(`Error processing potential renegotiation: ${err}. Allowing connection to continue.`); | ||||
|                 } | ||||
|               } | ||||
|             }); | ||||
|           }); | ||||
|            | ||||
|           setupConnection(serverName, chunk); | ||||
|         }); | ||||
|       } else { | ||||
| @@ -507,23 +690,48 @@ export class PortProxy { | ||||
|       this.netServers.push(server); | ||||
|     } | ||||
|  | ||||
|     // Log active connection count, longest running durations, and run parity checks every 10 seconds. | ||||
|     // Log active connection count, run parity checks, and check for connection issues every 10 seconds. | ||||
|     this.connectionLogger = setInterval(() => { | ||||
|       if (this.isShuttingDown) return; | ||||
|        | ||||
|       const now = Date.now(); | ||||
|       let maxIncoming = 0; | ||||
|       let maxOutgoing = 0; | ||||
|       for (const record of this.connectionRecords) { | ||||
|        | ||||
|       // Create a copy of the keys to avoid modification during iteration | ||||
|       const connectionIds = [...this.connectionRecords.keys()]; | ||||
|        | ||||
|       for (const id of connectionIds) { | ||||
|         const record = this.connectionRecords.get(id); | ||||
|         if (!record) continue; | ||||
|          | ||||
|         maxIncoming = Math.max(maxIncoming, now - record.incomingStartTime); | ||||
|         if (record.outgoingStartTime) { | ||||
|           maxOutgoing = Math.max(maxOutgoing, now - record.outgoingStartTime); | ||||
|         } | ||||
|         // Parity check: if outgoing socket closed and incoming remains active for >1 minute, trigger special cleanup. | ||||
|         if (record.outgoingClosedTime && !record.incoming.destroyed && (now - record.outgoingClosedTime > 60000)) { | ||||
|          | ||||
|         // Parity check: if outgoing socket closed and incoming remains active for >30 seconds, trigger cleanup | ||||
|         if (record.outgoingClosedTime &&  | ||||
|             !record.incoming.destroyed &&  | ||||
|             !record.connectionClosed &&  | ||||
|             !record.cleanupInitiated &&  | ||||
|             (now - record.outgoingClosedTime > 30000)) { | ||||
|           const remoteIP = record.incoming.remoteAddress || 'unknown'; | ||||
|           console.log(`Parity check triggered: Incoming socket for ${remoteIP} has been active >1 minute after outgoing closed.`); | ||||
|           this.cleanupConnection(record, true); | ||||
|           console.log(`Parity check triggered: Incoming socket for ${remoteIP} has been active >30s after outgoing closed.`); | ||||
|           this.initiateCleanup(record, 'parity_check'); | ||||
|         } | ||||
|          | ||||
|         // Inactivity check: if no activity for a long time but sockets still open | ||||
|         const inactivityTime = now - record.lastActivity; | ||||
|         if (inactivityTime > 180000 && // 3 minutes | ||||
|             !record.connectionClosed &&  | ||||
|             !record.cleanupInitiated) { | ||||
|           const remoteIP = record.incoming.remoteAddress || 'unknown'; | ||||
|           console.log(`Inactivity check triggered: No activity on connection from ${remoteIP} for ${plugins.prettyMs(inactivityTime)}.`); | ||||
|           this.initiateCleanup(record, 'inactivity'); | ||||
|         } | ||||
|       } | ||||
|        | ||||
|       console.log( | ||||
|         `(Interval Log) Active connections: ${this.connectionRecords.size}. ` + | ||||
|         `Longest running incoming: ${plugins.prettyMs(maxIncoming)}, outgoing: ${plugins.prettyMs(maxOutgoing)}. ` + | ||||
| @@ -534,17 +742,69 @@ export class PortProxy { | ||||
|   } | ||||
|  | ||||
|   public async stop() { | ||||
|     // Close all servers. | ||||
|     const closePromises: Promise<void>[] = this.netServers.map( | ||||
|     console.log("PortProxy shutting down..."); | ||||
|     this.isShuttingDown = true; | ||||
|      | ||||
|     // Stop accepting new connections | ||||
|     const closeServerPromises: Promise<void>[] = this.netServers.map( | ||||
|       server => | ||||
|         new Promise<void>((resolve) => { | ||||
|           server.close(() => resolve()); | ||||
|         }) | ||||
|     ); | ||||
|      | ||||
|     // Stop the connection logger | ||||
|     if (this.connectionLogger) { | ||||
|       clearInterval(this.connectionLogger); | ||||
|       this.connectionLogger = null; | ||||
|     } | ||||
|     await Promise.all(closePromises); | ||||
|  | ||||
|     // Wait for servers to close | ||||
|     await Promise.all(closeServerPromises); | ||||
|     console.log("All servers closed. Cleaning up active connections..."); | ||||
|      | ||||
|     // Gracefully close active connections | ||||
|     const connectionIds = [...this.connectionRecords.keys()]; | ||||
|     console.log(`Cleaning up ${connectionIds.length} active connections...`); | ||||
|      | ||||
|     for (const id of connectionIds) { | ||||
|       const record = this.connectionRecords.get(id); | ||||
|       if (record && !record.connectionClosed && !record.cleanupInitiated) { | ||||
|         this.initiateCleanup(record, 'shutdown'); | ||||
|       } | ||||
|     } | ||||
|      | ||||
|     // Wait for graceful shutdown or timeout | ||||
|     const shutdownTimeout = this.settings.gracefulShutdownTimeout || 30000; | ||||
|     await new Promise<void>((resolve) => { | ||||
|       const checkInterval = setInterval(() => { | ||||
|         if (this.connectionRecords.size === 0) { | ||||
|           clearInterval(checkInterval); | ||||
|           resolve(); | ||||
|         } | ||||
|       }, 1000); | ||||
|        | ||||
|       // Force resolve after timeout | ||||
|       setTimeout(() => { | ||||
|         clearInterval(checkInterval); | ||||
|         if (this.connectionRecords.size > 0) { | ||||
|           console.log(`Forcing shutdown with ${this.connectionRecords.size} connections still active`); | ||||
|            | ||||
|           // Force destroy any remaining connections | ||||
|           for (const record of this.connectionRecords.values()) { | ||||
|             if (!record.incoming.destroyed) { | ||||
|               record.incoming.destroy(); | ||||
|             } | ||||
|             if (record.outgoing && !record.outgoing.destroyed) { | ||||
|               record.outgoing.destroy(); | ||||
|             } | ||||
|           } | ||||
|           this.connectionRecords.clear(); | ||||
|         } | ||||
|         resolve(); | ||||
|       }, shutdownTimeout); | ||||
|     }); | ||||
|      | ||||
|     console.log("PortProxy shutdown complete."); | ||||
|   } | ||||
| } | ||||
		Reference in New Issue
	
	Block a user