fix(PortProxy): Improve buffering and data handling during connection setup in PortProxy to prevent data loss

This commit is contained in:
Philipp Kunz 2025-03-11 17:05:15 +00:00
parent 5ba8eb778f
commit 43378becd2
3 changed files with 152 additions and 83 deletions

View File

@ -1,5 +1,12 @@
# Changelog
## 2025-03-11 - 3.37.2 - fix(PortProxy)
Improve buffering and data handling during connection setup in PortProxy to prevent data loss
- Added a safeDataHandler and processDataQueue to buffer incoming data reliably during the TLS handshake phase
- Introduced a queue with pause/resume logic to avoid exceeding maxPendingDataSize and ensure all pending data is flushed before piping begins
- Refactored the piping setup to install the renegotiation handler only after proper data flushing
## 2025-03-11 - 3.37.1 - fix(PortProxy/SNI)
Refactor SNI extraction in PortProxy to use the dedicated SniHandler class

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartproxy',
version: '3.37.1',
version: '3.37.2',
description: 'A powerful proxy package that effectively handles high traffic, with features such as SSL/TLS support, port proxying, WebSocket handling, dynamic routing with authentication options, and automatic ACME certificate management.'
}

View File

@ -566,44 +566,104 @@ export class PortProxy {
connectionOptions.localAddress = record.remoteIP.replace('::ffff:', '');
}
// Create a safe queue for incoming data using a Buffer array
// We'll use this to ensure we don't lose data during handler transitions
const dataQueue: Buffer[] = [];
let queueSize = 0;
let processingQueue = false;
let drainPending = false;
// Flag to track if we've switched to the final piping mechanism
// Once this is true, we no longer buffer data in dataQueue
let pipingEstablished = false;
// Pause the incoming socket to prevent buffer overflows
// This ensures we control the flow of data until piping is set up
socket.pause();
// Temporary handler to collect data during connection setup
const tempDataHandler = (chunk: Buffer) => {
// Track bytes received
record.bytesReceived += chunk.length;
// Function to safely process the data queue without losing events
const processDataQueue = () => {
if (processingQueue || dataQueue.length === 0 || pipingEstablished) return;
// Check for TLS handshake
if (!record.isTLS && SniHandler.isTlsHandshake(chunk)) {
record.isTLS = true;
processingQueue = true;
if (this.settings.enableTlsDebugLogging) {
console.log(
`[${connectionId}] TLS handshake detected in tempDataHandler, ${chunk.length} bytes`
);
try {
// Process all queued chunks with the current active handler
while (dataQueue.length > 0) {
const chunk = dataQueue.shift()!;
queueSize -= chunk.length;
// Once piping is established, we shouldn't get here,
// but just in case, pass to the outgoing socket directly
if (pipingEstablished && record.outgoing) {
record.outgoing.write(chunk);
continue;
}
// Track bytes received
record.bytesReceived += chunk.length;
// Check for TLS handshake
if (!record.isTLS && SniHandler.isTlsHandshake(chunk)) {
record.isTLS = true;
if (this.settings.enableTlsDebugLogging) {
console.log(
`[${connectionId}] TLS handshake detected in tempDataHandler, ${chunk.length} bytes`
);
}
}
// Check if adding this chunk would exceed the buffer limit
const newSize = record.pendingDataSize + chunk.length;
if (this.settings.maxPendingDataSize && newSize > this.settings.maxPendingDataSize) {
console.log(
`[${connectionId}] Buffer limit exceeded for connection from ${record.remoteIP}: ${newSize} bytes > ${this.settings.maxPendingDataSize} bytes`
);
socket.end(); // Gracefully close the socket
this.initiateCleanupOnce(record, 'buffer_limit_exceeded');
return;
}
// Buffer the chunk and update the size counter
record.pendingData.push(Buffer.from(chunk));
record.pendingDataSize = newSize;
this.updateActivity(record);
}
} finally {
processingQueue = false;
// If there's a pending drain and we've processed everything,
// signal we're ready for more data if we haven't established piping yet
if (drainPending && dataQueue.length === 0 && !pipingEstablished) {
drainPending = false;
socket.resume();
}
}
// Check if adding this chunk would exceed the buffer limit
const newSize = record.pendingDataSize + chunk.length;
if (this.settings.maxPendingDataSize && newSize > this.settings.maxPendingDataSize) {
console.log(
`[${connectionId}] Buffer limit exceeded for connection from ${record.remoteIP}: ${newSize} bytes > ${this.settings.maxPendingDataSize} bytes`
);
socket.end(); // Gracefully close the socket
return this.initiateCleanupOnce(record, 'buffer_limit_exceeded');
}
// Buffer the chunk and update the size counter
record.pendingData.push(Buffer.from(chunk));
record.pendingDataSize = newSize;
this.updateActivity(record);
};
// Add the temp handler to capture all incoming data during connection setup
socket.on('data', tempDataHandler);
// Unified data handler that safely queues incoming data
const safeDataHandler = (chunk: Buffer) => {
// If piping is already established, just let the pipe handle it
if (pipingEstablished) return;
// Add to our queue for orderly processing
dataQueue.push(Buffer.from(chunk)); // Make a copy to be safe
queueSize += chunk.length;
// If queue is getting large, pause socket until we catch up
if (this.settings.maxPendingDataSize && queueSize > this.settings.maxPendingDataSize * 0.8) {
socket.pause();
drainPending = true;
}
// Process the queue
processDataQueue();
};
// Add our safe data handler
socket.on('data', safeDataHandler);
// Add initial chunk to pending data if present
if (initialChunk) {
@ -776,55 +836,31 @@ export class PortProxy {
// Add the normal error handler for established connections
targetSocket.on('error', this.handleError('outgoing', record));
// Remove temporary data handler
socket.removeListener('data', tempDataHandler);
// Process any remaining data in the queue before switching to piping
processDataQueue();
// Flush all pending data to target
if (record.pendingData.length > 0) {
const combinedData = Buffer.concat(record.pendingData);
targetSocket.write(combinedData, (err) => {
if (err) {
console.log(`[${connectionId}] Error writing pending data to target: ${err.message}`);
return this.initiateCleanupOnce(record, 'write_error');
}
// Setup function to establish piping - we'll use this after flushing data
const setupPiping = () => {
// Mark that we're switching to piping mode
pipingEstablished = true;
// Now set up piping for future data and resume the socket
socket.pipe(targetSocket);
targetSocket.pipe(socket);
socket.resume(); // Resume the socket after piping is established
if (this.settings.enableDetailedLogging) {
console.log(
`[${connectionId}] Connection established: ${record.remoteIP} -> ${targetHost}:${connectionOptions.port}` +
`${
serverName
? ` (SNI: ${serverName})`
: domainConfig
? ` (Port-based for domain: ${domainConfig.domains.join(', ')})`
: ''
}` +
` TLS: ${record.isTLS ? 'Yes' : 'No'}, Keep-Alive: ${
record.hasKeepAlive ? 'Yes' : 'No'
}`
);
} else {
console.log(
`Connection established: ${record.remoteIP} -> ${targetHost}:${connectionOptions.port}` +
`${
serverName
? ` (SNI: ${serverName})`
: domainConfig
? ` (Port-based for domain: ${domainConfig.domains.join(', ')})`
: ''
}`
);
}
});
} else {
// No pending data, so just set up piping
// Setup piping in both directions
socket.pipe(targetSocket);
targetSocket.pipe(socket);
socket.resume(); // Resume the socket after piping is established
// Resume the socket to ensure data flows
socket.resume();
// Process any data that might be queued in the interim
if (dataQueue.length > 0) {
// Write any remaining queued data directly to the target socket
for (const chunk of dataQueue) {
targetSocket.write(chunk);
}
// Clear the queue
dataQueue.length = 0;
queueSize = 0;
}
if (this.settings.enableDetailedLogging) {
console.log(
@ -852,6 +888,23 @@ export class PortProxy {
}`
);
}
};
// Flush all pending data to target
if (record.pendingData.length > 0) {
const combinedData = Buffer.concat(record.pendingData);
targetSocket.write(combinedData, (err) => {
if (err) {
console.log(`[${connectionId}] Error writing pending data to target: ${err.message}`);
return this.initiateCleanupOnce(record, 'write_error');
}
// Establish piping now that we've flushed the buffered data
setupPiping();
});
} else {
// No pending data, just establish piping immediately
setupPiping();
}
// Clear the buffer now that we've processed it
@ -859,6 +912,7 @@ export class PortProxy {
record.pendingDataSize = 0;
// Add the renegotiation handler for SNI validation with strict domain enforcement
// This will be called after we've established piping
if (serverName) {
// Define a handler for checking renegotiation with improved detection
const renegotiationHandler = (renegChunk: Buffer) => {
@ -895,8 +949,13 @@ export class PortProxy {
// Store the handler in the connection record so we can remove it during cleanup
record.renegotiationHandler = renegotiationHandler;
// Add the listener
// The renegotiation handler is added when piping is established
// Making it part of setupPiping ensures proper sequencing of event handlers
socket.on('data', renegotiationHandler);
if (this.settings.enableDetailedLogging) {
console.log(`[${connectionId}] TLS renegotiation handler installed for SNI domain: ${serverName}`);
}
}
// Set connection timeout with simpler logic
@ -1056,13 +1115,16 @@ export class PortProxy {
const bytesReceived = record.bytesReceived;
const bytesSent = record.bytesSent;
// Remove the renegotiation handler if present
if (record.renegotiationHandler && record.incoming) {
// Remove all data handlers (both standard and renegotiation) to make sure we clean up properly
if (record.incoming) {
try {
record.incoming.removeListener('data', record.renegotiationHandler);
// Remove our safe data handler
record.incoming.removeAllListeners('data');
// Reset the handler references
record.renegotiationHandler = undefined;
} catch (err) {
console.log(`[${record.id}] Error removing renegotiation handler: ${err}`);
console.log(`[${record.id}] Error removing data handlers: ${err}`);
}
}