From 43378becd209418e5f813950b3899785d334d347 Mon Sep 17 00:00:00 2001 From: Philipp Kunz Date: Tue, 11 Mar 2025 17:05:15 +0000 Subject: [PATCH] fix(PortProxy): Improve buffering and data handling during connection setup in PortProxy to prevent data loss --- changelog.md | 7 ++ ts/00_commitinfo_data.ts | 2 +- ts/classes.portproxy.ts | 226 +++++++++++++++++++++++++-------------- 3 files changed, 152 insertions(+), 83 deletions(-) diff --git a/changelog.md b/changelog.md index bbaaeae..7dba950 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2025-03-11 - 3.37.2 - fix(PortProxy) +Improve buffering and data handling during connection setup in PortProxy to prevent data loss + +- Added a safeDataHandler and processDataQueue to buffer incoming data reliably during the TLS handshake phase +- Introduced a queue with pause/resume logic to avoid exceeding maxPendingDataSize and ensure all pending data is flushed before piping begins +- Refactored the piping setup to install the renegotiation handler only after proper data flushing + ## 2025-03-11 - 3.37.1 - fix(PortProxy/SNI) Refactor SNI extraction in PortProxy to use the dedicated SniHandler class diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index f822ee8..019face 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartproxy', - version: '3.37.1', + version: '3.37.2', description: 'A powerful proxy package that effectively handles high traffic, with features such as SSL/TLS support, port proxying, WebSocket handling, dynamic routing with authentication options, and automatic ACME certificate management.' } diff --git a/ts/classes.portproxy.ts b/ts/classes.portproxy.ts index 3e6e129..5a3e574 100644 --- a/ts/classes.portproxy.ts +++ b/ts/classes.portproxy.ts @@ -566,44 +566,104 @@ export class PortProxy { connectionOptions.localAddress = record.remoteIP.replace('::ffff:', ''); } + // Create a safe queue for incoming data using a Buffer array + // We'll use this to ensure we don't lose data during handler transitions + const dataQueue: Buffer[] = []; + let queueSize = 0; + let processingQueue = false; + let drainPending = false; + + // Flag to track if we've switched to the final piping mechanism + // Once this is true, we no longer buffer data in dataQueue + let pipingEstablished = false; + // Pause the incoming socket to prevent buffer overflows + // This ensures we control the flow of data until piping is set up socket.pause(); - // Temporary handler to collect data during connection setup - const tempDataHandler = (chunk: Buffer) => { - // Track bytes received - record.bytesReceived += chunk.length; + // Function to safely process the data queue without losing events + const processDataQueue = () => { + if (processingQueue || dataQueue.length === 0 || pipingEstablished) return; + + processingQueue = true; + + try { + // Process all queued chunks with the current active handler + while (dataQueue.length > 0) { + const chunk = dataQueue.shift()!; + queueSize -= chunk.length; + + // Once piping is established, we shouldn't get here, + // but just in case, pass to the outgoing socket directly + if (pipingEstablished && record.outgoing) { + record.outgoing.write(chunk); + continue; + } + + // Track bytes received + record.bytesReceived += chunk.length; - // Check for TLS handshake - if (!record.isTLS && SniHandler.isTlsHandshake(chunk)) { - record.isTLS = true; + // Check for TLS handshake + if (!record.isTLS && SniHandler.isTlsHandshake(chunk)) { + record.isTLS = true; - if (this.settings.enableTlsDebugLogging) { - console.log( - `[${connectionId}] TLS handshake detected in tempDataHandler, ${chunk.length} bytes` - ); + if (this.settings.enableTlsDebugLogging) { + console.log( + `[${connectionId}] TLS handshake detected in tempDataHandler, ${chunk.length} bytes` + ); + } + } + + // Check if adding this chunk would exceed the buffer limit + const newSize = record.pendingDataSize + chunk.length; + + if (this.settings.maxPendingDataSize && newSize > this.settings.maxPendingDataSize) { + console.log( + `[${connectionId}] Buffer limit exceeded for connection from ${record.remoteIP}: ${newSize} bytes > ${this.settings.maxPendingDataSize} bytes` + ); + socket.end(); // Gracefully close the socket + this.initiateCleanupOnce(record, 'buffer_limit_exceeded'); + return; + } + + // Buffer the chunk and update the size counter + record.pendingData.push(Buffer.from(chunk)); + record.pendingDataSize = newSize; + this.updateActivity(record); + } + } finally { + processingQueue = false; + + // If there's a pending drain and we've processed everything, + // signal we're ready for more data if we haven't established piping yet + if (drainPending && dataQueue.length === 0 && !pipingEstablished) { + drainPending = false; + socket.resume(); } } - - // Check if adding this chunk would exceed the buffer limit - const newSize = record.pendingDataSize + chunk.length; - - if (this.settings.maxPendingDataSize && newSize > this.settings.maxPendingDataSize) { - console.log( - `[${connectionId}] Buffer limit exceeded for connection from ${record.remoteIP}: ${newSize} bytes > ${this.settings.maxPendingDataSize} bytes` - ); - socket.end(); // Gracefully close the socket - return this.initiateCleanupOnce(record, 'buffer_limit_exceeded'); - } - - // Buffer the chunk and update the size counter - record.pendingData.push(Buffer.from(chunk)); - record.pendingDataSize = newSize; - this.updateActivity(record); }; - // Add the temp handler to capture all incoming data during connection setup - socket.on('data', tempDataHandler); + // Unified data handler that safely queues incoming data + const safeDataHandler = (chunk: Buffer) => { + // If piping is already established, just let the pipe handle it + if (pipingEstablished) return; + + // Add to our queue for orderly processing + dataQueue.push(Buffer.from(chunk)); // Make a copy to be safe + queueSize += chunk.length; + + // If queue is getting large, pause socket until we catch up + if (this.settings.maxPendingDataSize && queueSize > this.settings.maxPendingDataSize * 0.8) { + socket.pause(); + drainPending = true; + } + + // Process the queue + processDataQueue(); + }; + + // Add our safe data handler + socket.on('data', safeDataHandler); // Add initial chunk to pending data if present if (initialChunk) { @@ -776,56 +836,32 @@ export class PortProxy { // Add the normal error handler for established connections targetSocket.on('error', this.handleError('outgoing', record)); - // Remove temporary data handler - socket.removeListener('data', tempDataHandler); - - // Flush all pending data to target - if (record.pendingData.length > 0) { - const combinedData = Buffer.concat(record.pendingData); - targetSocket.write(combinedData, (err) => { - if (err) { - console.log(`[${connectionId}] Error writing pending data to target: ${err.message}`); - return this.initiateCleanupOnce(record, 'write_error'); - } - - // Now set up piping for future data and resume the socket - socket.pipe(targetSocket); - targetSocket.pipe(socket); - socket.resume(); // Resume the socket after piping is established - - if (this.settings.enableDetailedLogging) { - console.log( - `[${connectionId}] Connection established: ${record.remoteIP} -> ${targetHost}:${connectionOptions.port}` + - `${ - serverName - ? ` (SNI: ${serverName})` - : domainConfig - ? ` (Port-based for domain: ${domainConfig.domains.join(', ')})` - : '' - }` + - ` TLS: ${record.isTLS ? 'Yes' : 'No'}, Keep-Alive: ${ - record.hasKeepAlive ? 'Yes' : 'No' - }` - ); - } else { - console.log( - `Connection established: ${record.remoteIP} -> ${targetHost}:${connectionOptions.port}` + - `${ - serverName - ? ` (SNI: ${serverName})` - : domainConfig - ? ` (Port-based for domain: ${domainConfig.domains.join(', ')})` - : '' - }` - ); - } - }); - } else { - // No pending data, so just set up piping + // Process any remaining data in the queue before switching to piping + processDataQueue(); + + // Setup function to establish piping - we'll use this after flushing data + const setupPiping = () => { + // Mark that we're switching to piping mode + pipingEstablished = true; + + // Setup piping in both directions socket.pipe(targetSocket); targetSocket.pipe(socket); - socket.resume(); // Resume the socket after piping is established - + + // Resume the socket to ensure data flows + socket.resume(); + + // Process any data that might be queued in the interim + if (dataQueue.length > 0) { + // Write any remaining queued data directly to the target socket + for (const chunk of dataQueue) { + targetSocket.write(chunk); + } + // Clear the queue + dataQueue.length = 0; + queueSize = 0; + } + if (this.settings.enableDetailedLogging) { console.log( `[${connectionId}] Connection established: ${record.remoteIP} -> ${targetHost}:${connectionOptions.port}` + @@ -852,6 +888,23 @@ export class PortProxy { }` ); } + }; + + // Flush all pending data to target + if (record.pendingData.length > 0) { + const combinedData = Buffer.concat(record.pendingData); + targetSocket.write(combinedData, (err) => { + if (err) { + console.log(`[${connectionId}] Error writing pending data to target: ${err.message}`); + return this.initiateCleanupOnce(record, 'write_error'); + } + + // Establish piping now that we've flushed the buffered data + setupPiping(); + }); + } else { + // No pending data, just establish piping immediately + setupPiping(); } // Clear the buffer now that we've processed it @@ -859,6 +912,7 @@ export class PortProxy { record.pendingDataSize = 0; // Add the renegotiation handler for SNI validation with strict domain enforcement + // This will be called after we've established piping if (serverName) { // Define a handler for checking renegotiation with improved detection const renegotiationHandler = (renegChunk: Buffer) => { @@ -895,8 +949,13 @@ export class PortProxy { // Store the handler in the connection record so we can remove it during cleanup record.renegotiationHandler = renegotiationHandler; - // Add the listener + // The renegotiation handler is added when piping is established + // Making it part of setupPiping ensures proper sequencing of event handlers socket.on('data', renegotiationHandler); + + if (this.settings.enableDetailedLogging) { + console.log(`[${connectionId}] TLS renegotiation handler installed for SNI domain: ${serverName}`); + } } // Set connection timeout with simpler logic @@ -1056,13 +1115,16 @@ export class PortProxy { const bytesReceived = record.bytesReceived; const bytesSent = record.bytesSent; - // Remove the renegotiation handler if present - if (record.renegotiationHandler && record.incoming) { + // Remove all data handlers (both standard and renegotiation) to make sure we clean up properly + if (record.incoming) { try { - record.incoming.removeListener('data', record.renegotiationHandler); + // Remove our safe data handler + record.incoming.removeAllListeners('data'); + + // Reset the handler references record.renegotiationHandler = undefined; } catch (err) { - console.log(`[${record.id}] Error removing renegotiation handler: ${err}`); + console.log(`[${record.id}] Error removing data handlers: ${err}`); } }