From ced9b5b27bb873668722efd1aa2b3b66ebfa5323 Mon Sep 17 00:00:00 2001 From: Philipp Kunz Date: Mon, 3 Mar 2025 02:14:21 +0000 Subject: [PATCH] fix(core): Improve connection management and error handling in PortProxy --- changelog.md | 9 ++ ts/00_commitinfo_data.ts | 2 +- ts/classes.portproxy.ts | 271 +++++++++++++++------------------------ 3 files changed, 116 insertions(+), 166 deletions(-) diff --git a/changelog.md b/changelog.md index 255726a..22771af 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,14 @@ # Changelog +## 2025-03-03 - 3.22.3 - fix(core) +Improve connection management and error handling in PortProxy + +- Refactored connection cleanup to handle errors more gracefully. +- Introduced comprehensive comments for better code understanding. +- Revised SNI data timeout logic for connection handling. +- Enhanced logging and error reporting during connection management. +- Improved inactivity checks and parity checks for existing connections. + ## 2025-03-03 - 3.22.2 - fix(portproxy) Refactored connection cleanup logic in PortProxy diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 33a36f5..9c56d96 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '3.22.2', + version: '3.22.3', description: 'A powerful proxy package that effectively handles high traffic, with features such as SSL/TLS support, port proxying, WebSocket handling, and dynamic routing with authentication options.' } diff --git a/ts/classes.portproxy.ts b/ts/classes.portproxy.ts index e8d9ed1..59bb9ca 100644 --- a/ts/classes.portproxy.ts +++ b/ts/classes.portproxy.ts @@ -23,7 +23,6 @@ export interface IPortProxySettings extends plugins.tls.TlsOptions { globalPortRanges: Array<{ from: number; to: number }>; // Global allowed port ranges forwardAllGlobalRanges?: boolean; // When true, forwards all connections on global port ranges to the global targetIP gracefulShutdownTimeout?: number; // (ms) maximum time to wait for connections to close during shutdown - initialDataTimeout?: number; // (ms) timeout for receiving initial data, useful for chained proxies } /** @@ -90,24 +89,24 @@ function extractSNI(buffer: Buffer): string | undefined { } interface IConnectionRecord { + id: string; // Unique connection identifier incoming: plugins.net.Socket; outgoing: plugins.net.Socket | null; incomingStartTime: number; outgoingStartTime?: number; outgoingClosedTime?: number; - lockedDomain?: string; // Field to lock this connection to the initial SNI - connectionClosed: boolean; - cleanupTimer?: NodeJS.Timeout; // Timer to force cleanup after max lifetime/inactivity - id: string; // Unique identifier for the connection - lastActivity: number; // Timestamp of last activity on either socket + lockedDomain?: string; // Used to lock this connection to the initial SNI + connectionClosed: boolean; // Flag to prevent multiple cleanup attempts + cleanupTimer?: NodeJS.Timeout; // Timer for max lifetime/inactivity + lastActivity: number; // Last activity timestamp for inactivity detection } -// Helper: Check if a port falls within any of the given port ranges. +// Helper: Check if a port falls within any of the given port ranges const isPortInRanges = (port: number, ranges: Array<{ from: number; to: number }>): boolean => { return ranges.some(range => port >= range.from && port <= range.to); }; -// Helper: Check if a given IP matches any of the glob patterns. +// Helper: Check if a given IP matches any of the glob patterns const isAllowed = (ip: string, patterns: string[]): boolean => { const normalizeIP = (ip: string): string[] => { if (ip.startsWith('::ffff:')) { @@ -126,13 +125,13 @@ const isAllowed = (ip: string, patterns: string[]): boolean => { ); }; -// Helper: Check if an IP is allowed considering allowed and blocked glob patterns. +// Helper: Check if an IP is allowed considering allowed and blocked glob patterns const isGlobIPAllowed = (ip: string, allowed: string[], blocked: string[] = []): boolean => { if (blocked.length > 0 && isAllowed(ip, blocked)) return false; return isAllowed(ip, allowed); }; -// Helper: Generate a unique ID for a connection +// Helper: Generate a unique connection ID const generateConnectionId = (): string => { return Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15); }; @@ -140,12 +139,11 @@ const generateConnectionId = (): string => { export class PortProxy { private netServers: plugins.net.Server[] = []; settings: IPortProxySettings; - // Unified record tracking each connection pair. private connectionRecords: Map = new Map(); private connectionLogger: NodeJS.Timeout | null = null; private isShuttingDown: boolean = false; - // Map to track round robin indices for each domain config. + // Map to track round robin indices for each domain config private domainTargetIndices: Map = new Map(); private terminationStats: { @@ -163,9 +161,6 @@ export class PortProxy { maxConnectionLifetime: settingsArg.maxConnectionLifetime || 600000, gracefulShutdownTimeout: settingsArg.gracefulShutdownTimeout || 30000, }; - - // Debug logging for constructor settings - console.log(`PortProxy initialized with targetIP: ${this.settings.targetIP}, toPort: ${this.settings.toPort}, fromPort: ${this.settings.fromPort}, sniEnabled: ${this.settings.sniEnabled}`); } private incrementTerminationStat(side: 'incoming' | 'outgoing', reason: string): void { @@ -173,28 +168,66 @@ export class PortProxy { } /** - * Cleans up a connection record if not already cleaned up. + * Cleans up a connection record. * Destroys both incoming and outgoing sockets, clears timers, and removes the record. - * Logs the cleanup event. + * @param record - The connection record to clean up + * @param reason - Optional reason for cleanup (for logging) */ private cleanupConnection(record: IConnectionRecord, reason: string = 'normal'): void { if (!record.connectionClosed) { record.connectionClosed = true; + if (record.cleanupTimer) { clearTimeout(record.cleanupTimer); + record.cleanupTimer = undefined; } - if (!record.incoming.destroyed) { - record.incoming.destroy(); + + try { + if (!record.incoming.destroyed) { + // Try graceful shutdown first, then force destroy after a short timeout + record.incoming.end(); + setTimeout(() => { + if (record && !record.incoming.destroyed) { + record.incoming.destroy(); + } + }, 1000); + } + } catch (err) { + console.log(`Error closing incoming socket: ${err}`); + if (!record.incoming.destroyed) { + record.incoming.destroy(); + } } - if (record.outgoing && !record.outgoing.destroyed) { - record.outgoing.destroy(); + + try { + if (record.outgoing && !record.outgoing.destroyed) { + // Try graceful shutdown first, then force destroy after a short timeout + record.outgoing.end(); + setTimeout(() => { + if (record && record.outgoing && !record.outgoing.destroyed) { + record.outgoing.destroy(); + } + }, 1000); + } + } catch (err) { + console.log(`Error closing outgoing socket: ${err}`); + if (record.outgoing && !record.outgoing.destroyed) { + record.outgoing.destroy(); + } } + + // Remove the record from the tracking map this.connectionRecords.delete(record.id); + const remoteIP = record.incoming.remoteAddress || 'unknown'; console.log(`Connection from ${remoteIP} terminated (${reason}). Active connections: ${this.connectionRecords.size}`); } } + private updateActivity(record: IConnectionRecord): void { + record.lastActivity = Date.now(); + } + private getTargetIP(domainConfig: IDomainConfig): string { if (domainConfig.targetIPs && domainConfig.targetIPs.length > 0) { const currentIndex = this.domainTargetIndices.get(domainConfig) || 0; @@ -205,10 +238,6 @@ export class PortProxy { return this.settings.targetIP!; } - private updateActivity(record: IConnectionRecord): void { - record.lastActivity = Date.now(); - } - public async start() { // Define a unified connection handler for all listening ports. const connectionHandler = (socket: plugins.net.Socket) => { @@ -228,22 +257,28 @@ export class PortProxy { outgoing: null, incomingStartTime: Date.now(), lastActivity: Date.now(), - connectionClosed: false, - cleanupInitiated: false + connectionClosed: false }; - this.connectionRecords.set(connectionId, connectionRecord); - console.log(`New connection ${connectionId} from ${remoteIP} on port ${localPort}. Active connections: ${this.connectionRecords.size}`); + + console.log(`New connection from ${remoteIP} on port ${localPort}. Active connections: ${this.connectionRecords.size}`); let initialDataReceived = false; let incomingTerminationReason: string | null = null; let outgoingTerminationReason: string | null = null; - // Local cleanup function that delegates to the class method. + // Local function for cleanupOnce + const cleanupOnce = () => { + this.cleanupConnection(connectionRecord); + }; + + // Define initiateCleanupOnce for compatibility with potential future improvements const initiateCleanupOnce = (reason: string = 'normal') => { - this.initiateCleanup(connectionRecord, reason); + console.log(`Connection cleanup initiated for ${remoteIP} (${reason})`); + cleanupOnce(); }; + // Helper to reject an incoming connection const rejectIncomingConnection = (reason: string, logMessage: string) => { console.log(logMessage); socket.end(); @@ -254,40 +289,14 @@ export class PortProxy { cleanupOnce(); }; - // IMPORTANT: We won't set any initial timeout for a chained proxy scenario - // The code below is commented out to restore original behavior - /* - let initialTimeout: NodeJS.Timeout | null = null; - const initialTimeoutMs = this.settings.initialDataTimeout || - (this.settings.sniEnabled ? 15000 : 0); - - if (initialTimeoutMs > 0) { - console.log(`Setting initial data timeout of ${initialTimeoutMs}ms for connection from ${remoteIP}`); - initialTimeout = setTimeout(() => { - if (!initialDataReceived) { - console.log(`Initial connection timeout for ${remoteIP} (no data received after ${initialTimeoutMs}ms)`); - if (incomingTerminationReason === null) { - incomingTerminationReason = 'initial_timeout'; - this.incrementTerminationStat('incoming', 'initial_timeout'); - } - initiateCleanupOnce('initial_timeout'); - } - }, initialTimeoutMs); - } else { - console.log(`No initial timeout set for connection from ${remoteIP} (likely chained proxy)`); - initialDataReceived = true; - } - */ - - // Original behavior: only set timeout if SNI is enabled, and use a fixed 5 second timeout + // Set an initial timeout for SNI data if needed let initialTimeout: NodeJS.Timeout | null = null; if (this.settings.sniEnabled) { - console.log(`Setting 5 second initial timeout for SNI extraction from ${remoteIP}`); initialTimeout = setTimeout(() => { if (!initialDataReceived) { console.log(`Initial data timeout for ${remoteIP}`); socket.end(); - initiateCleanupOnce('initial_timeout'); + cleanupOnce(); } }, 5000); } else { @@ -295,10 +304,7 @@ export class PortProxy { } socket.on('error', (err: Error) => { - const errorMessage = initialDataReceived - ? `(Immediate) Incoming socket error from ${remoteIP}: ${err.message}` - : `(Premature) Incoming socket error from ${remoteIP} before data received: ${err.message}`; - console.log(errorMessage); + console.log(`Incoming socket error from ${remoteIP}: ${err.message}`); }); const handleError = (side: 'incoming' | 'outgoing') => (err: Error) => { @@ -317,7 +323,7 @@ export class PortProxy { outgoingTerminationReason = reason; this.incrementTerminationStat('outgoing', reason); } - cleanupOnce(); + initiateCleanupOnce(reason); }; const handleClose = (side: 'incoming' | 'outgoing') => () => { @@ -331,7 +337,7 @@ export class PortProxy { // Record the time when outgoing socket closed. connectionRecord.outgoingClosedTime = Date.now(); } - cleanupOnce(); + initiateCleanupOnce('closed_' + side); }; /** @@ -345,6 +351,7 @@ export class PortProxy { // Clear the initial timeout since we've received data if (initialTimeout) { clearTimeout(initialTimeout); + initialTimeout = null; } // If a forcedDomain is provided (port-based routing), use it; otherwise, use SNI-based lookup. @@ -354,9 +361,7 @@ export class PortProxy { config.domains.some(d => plugins.minimatch(serverName, d)) ) : undefined); - // Effective IP check: merge allowed IPs with default allowed, and remove blocked IPs. - // Use original domain configuration and IP validation logic - // This restores the behavior that was working before + // IP validation is skipped if allowedIPs is empty if (domainConfig) { const effectiveAllowedIPs: string[] = [ ...domainConfig.allowedIPs, @@ -367,7 +372,7 @@ export class PortProxy { ...(this.settings.defaultBlockedIPs || []) ]; - // Special case: if allowedIPs is empty, skip IP validation for backward compatibility + // Skip IP validation if allowedIPs is empty if (domainConfig.allowedIPs.length > 0 && !isGlobIPAllowed(remoteIP, effectiveAllowedIPs, effectiveBlockedIPs)) { return rejectIncomingConnection('rejected', `Connection rejected: IP ${remoteIP} not allowed for domain ${domainConfig.domains.join(', ')}`); } @@ -375,8 +380,7 @@ export class PortProxy { if (!isGlobIPAllowed(remoteIP, this.settings.defaultAllowedIPs, this.settings.defaultBlockedIPs || [])) { return rejectIncomingConnection('rejected', `Connection rejected: IP ${remoteIP} not allowed by default allowed list`); } - } - // If no IP validation rules, allow the connection (original behavior) + } const targetHost = domainConfig ? this.getTargetIP(domainConfig) : this.settings.targetIP!; const connectionOptions: plugins.net.NetConnectOpts = { @@ -387,116 +391,57 @@ export class PortProxy { connectionOptions.localAddress = remoteIP.replace('::ffff:', ''); } - // Add explicit connection timeout and error handling - let connectionTimeout: NodeJS.Timeout | null = null; - let connectionSucceeded = false; - - // Set connection timeout - longer for chained proxies - connectionTimeout = setTimeout(() => { - if (!connectionSucceeded) { - console.log(`Connection timeout connecting to ${targetHost}:${connectionOptions.port} for ${remoteIP}`); - if (outgoingTerminationReason === null) { - outgoingTerminationReason = 'connection_timeout'; - this.incrementTerminationStat('outgoing', 'connection_timeout'); - } - initiateCleanupOnce('connection_timeout'); - } - }, 10000); // Increased from 5s to 10s to accommodate chained proxies - - console.log(`Attempting to connect to ${targetHost}:${connectionOptions.port} for client ${remoteIP}...`); - - // Create the target socket + // Create the target socket and immediately set up data piping const targetSocket = plugins.net.connect(connectionOptions); connectionRecord.outgoing = targetSocket; + connectionRecord.outgoingStartTime = Date.now(); - // Handle successful connection - targetSocket.once('connect', () => { - connectionSucceeded = true; - if (connectionTimeout) { - clearTimeout(connectionTimeout); - connectionTimeout = null; - } - - connectionRecord.outgoingStartTime = Date.now(); - console.log( - `Connection established: ${remoteIP} -> ${targetHost}:${connectionOptions.port}` + - `${serverName ? ` (SNI: ${serverName})` : forcedDomain ? ` (Port-based for domain: ${forcedDomain.domains.join(', ')})` : ''}` - ); - - // Setup data flow after confirmed connection - setupDataFlow(targetSocket, initialChunk); - }); - - // Handle connection errors early - targetSocket.once('error', (err) => { - if (!connectionSucceeded) { - // This is an initial connection error - console.log(`Failed to connect to ${targetHost}:${connectionOptions.port} for ${remoteIP}: ${err.message}`); - if (connectionTimeout) { - clearTimeout(connectionTimeout); - connectionTimeout = null; - } - if (outgoingTerminationReason === null) { - outgoingTerminationReason = 'connection_failed'; - this.incrementTerminationStat('outgoing', 'connection_failed'); - } - initiateCleanupOnce('connection_failed'); - } - // Other errors will be handled by the main error handler - }); - }; - - /** - * Sets up the data flow between sockets after successful connection - */ - const setupDataFlow = (targetSocket: plugins.net.Socket, initialChunk?: Buffer) => { + // Set up the pipe immediately to ensure data flows without delay if (initialChunk) { socket.unshift(initialChunk); } - // Set appropriate timeouts for both sockets - socket.setTimeout(120000); - targetSocket.setTimeout(120000); - - // Set up the pipe in both directions socket.pipe(targetSocket); targetSocket.pipe(socket); + + console.log( + `Connection established: ${remoteIP} -> ${targetHost}:${connectionOptions.port}` + + `${serverName ? ` (SNI: ${serverName})` : forcedDomain ? ` (Port-based for domain: ${forcedDomain.domains.join(', ')})` : ''}` + ); - // Attach error and close handlers + // Add appropriate handlers for connection management socket.on('error', handleError('incoming')); targetSocket.on('error', handleError('outgoing')); socket.on('close', handleClose('incoming')); targetSocket.on('close', handleClose('outgoing')); - - // Handle timeout events socket.on('timeout', () => { console.log(`Timeout on incoming side from ${remoteIP}`); if (incomingTerminationReason === null) { incomingTerminationReason = 'timeout'; this.incrementTerminationStat('incoming', 'timeout'); } - initiateCleanupOnce('timeout'); + initiateCleanupOnce('timeout_incoming'); }); - targetSocket.on('timeout', () => { console.log(`Timeout on outgoing side from ${remoteIP}`); if (outgoingTerminationReason === null) { outgoingTerminationReason = 'timeout'; this.incrementTerminationStat('outgoing', 'timeout'); } - initiateCleanupOnce('timeout'); + initiateCleanupOnce('timeout_outgoing'); }); - - socket.on('end', handleClose('incoming')); - targetSocket.on('end', handleClose('outgoing')); - // Track activity for both sockets to reset inactivity timers - socket.on('data', (data) => { - this.updateActivity(connectionRecord); + // Set appropriate timeouts + socket.setTimeout(120000); + targetSocket.setTimeout(120000); + + // Update activity for both sockets + socket.on('data', () => { + connectionRecord.lastActivity = Date.now(); }); - targetSocket.on('data', (data) => { - this.updateActivity(connectionRecord); + targetSocket.on('data', () => { + connectionRecord.lastActivity = Date.now(); }); // Initialize a cleanup timer for max connection lifetime @@ -515,7 +460,6 @@ export class PortProxy { if (this.settings.defaultAllowedIPs && !isAllowed(remoteIP, this.settings.defaultAllowedIPs)) { console.log(`Connection from ${remoteIP} rejected: IP ${remoteIP} not allowed in global default allowed list.`); socket.end(); - initiateCleanupOnce('rejected'); return; } console.log(`Port-based connection from ${remoteIP} on port ${localPort} forwarded to global target IP ${this.settings.targetIP}.`); @@ -544,7 +488,6 @@ export class PortProxy { if (!isGlobIPAllowed(remoteIP, effectiveAllowedIPs, effectiveBlockedIPs)) { console.log(`Connection from ${remoteIP} rejected: IP not allowed for domain ${forcedDomain.domains.join(', ')} on port ${localPort}.`); socket.end(); - initiateCleanupOnce('rejected'); return; } console.log(`Port-based connection from ${remoteIP} on port ${localPort} matched domain ${forcedDomain.domains.join(', ')}.`); @@ -632,7 +575,7 @@ export class PortProxy { this.netServers.push(server); } - // Log active connection count, run parity checks, and check for connection issues every 10 seconds. + // Log active connection count, longest running durations, and run parity checks every 10 seconds. this.connectionLogger = setInterval(() => { if (this.isShuttingDown) return; @@ -652,25 +595,23 @@ export class PortProxy { maxOutgoing = Math.max(maxOutgoing, now - record.outgoingStartTime); } - // Parity check: if outgoing socket closed and incoming remains active for >30 seconds, trigger cleanup + // Parity check: if outgoing socket closed and incoming remains active if (record.outgoingClosedTime && !record.incoming.destroyed && !record.connectionClosed && - !record.cleanupInitiated && (now - record.outgoingClosedTime > 30000)) { const remoteIP = record.incoming.remoteAddress || 'unknown'; - console.log(`Parity check triggered: Incoming socket for ${remoteIP} has been active >30s after outgoing closed.`); - this.initiateCleanup(record, 'parity_check'); + console.log(`Parity check: Incoming socket for ${remoteIP} still active ${plugins.prettyMs(now - record.outgoingClosedTime)} after outgoing closed.`); + this.cleanupConnection(record, 'parity_check'); } - // Inactivity check: if no activity for a long time but sockets still open + // Inactivity check const inactivityTime = now - record.lastActivity; if (inactivityTime > 180000 && // 3 minutes - !record.connectionClosed && - !record.cleanupInitiated) { + !record.connectionClosed) { const remoteIP = record.incoming.remoteAddress || 'unknown'; - console.log(`Inactivity check triggered: No activity on connection from ${remoteIP} for ${plugins.prettyMs(inactivityTime)}.`); - this.initiateCleanup(record, 'inactivity'); + console.log(`Inactivity check: No activity on connection from ${remoteIP} for ${plugins.prettyMs(inactivityTime)}.`); + this.cleanupConnection(record, 'inactivity'); } } @@ -705,14 +646,14 @@ export class PortProxy { await Promise.all(closeServerPromises); console.log("All servers closed. Cleaning up active connections..."); - // Gracefully close active connections + // Clean up active connections const connectionIds = [...this.connectionRecords.keys()]; console.log(`Cleaning up ${connectionIds.length} active connections...`); for (const id of connectionIds) { const record = this.connectionRecords.get(id); - if (record && !record.connectionClosed && !record.cleanupInitiated) { - this.initiateCleanup(record, 'shutdown'); + if (record && !record.connectionClosed) { + this.cleanupConnection(record, 'shutdown'); } }