fix(PortProxy): Enhanced connection setup to handle pending data buffering before establishing outgoing connection

This commit is contained in:
Philipp Kunz 2025-03-05 14:33:09 +00:00
parent 6532e6f0e0
commit bd9292bf47
3 changed files with 95 additions and 45 deletions

View File

@ -1,5 +1,12 @@
# Changelog
## 2025-03-05 - 3.23.1 - fix(PortProxy)
Enhanced connection setup to handle pending data buffering before establishing outgoing connection
- Introduced pending data buffering to address issues with data reception before outgoing connection is fully established.
- Removed immediate data piping in favor of buffering to ensure complete initial data transfer.
- Added temporary data handler to collect incoming data during connection setup for precise activity tracking.
## 2025-03-03 - 3.23.0 - feat(documentation)
Updated documentation with architecture flow diagrams.

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartproxy',
version: '3.23.0',
version: '3.23.1',
description: 'A powerful proxy package that effectively handles high traffic, with features such as SSL/TLS support, port proxying, WebSocket handling, and dynamic routing with authentication options.'
}

View File

@ -99,6 +99,7 @@ interface IConnectionRecord {
connectionClosed: boolean; // Flag to prevent multiple cleanup attempts
cleanupTimer?: NodeJS.Timeout; // Timer for max lifetime/inactivity
lastActivity: number; // Last activity timestamp for inactivity detection
pendingData: Buffer[]; // Buffer to hold data during connection setup
}
// Helper: Check if a port falls within any of the given port ranges
@ -257,7 +258,8 @@ export class PortProxy {
outgoing: null,
incomingStartTime: Date.now(),
lastActivity: Date.now(),
connectionClosed: false
connectionClosed: false,
pendingData: [] // Initialize buffer for pending data
};
this.connectionRecords.set(connectionId, connectionRecord);
@ -391,29 +393,32 @@ export class PortProxy {
connectionOptions.localAddress = remoteIP.replace('::ffff:', '');
}
// Create the target socket and immediately set up data piping
// Temporary handler to collect data during connection setup
const tempDataHandler = (chunk: Buffer) => {
connectionRecord.pendingData.push(Buffer.from(chunk));
this.updateActivity(connectionRecord);
};
// Add the temp handler to capture all incoming data during connection setup
socket.on('data', tempDataHandler);
// Add initial chunk to pending data if present
if (initialChunk) {
connectionRecord.pendingData.push(Buffer.from(initialChunk));
}
// Create the target socket but don't set up piping immediately
const targetSocket = plugins.net.connect(connectionOptions);
connectionRecord.outgoing = targetSocket;
connectionRecord.outgoingStartTime = Date.now();
// Set up the pipe immediately to ensure data flows without delay
if (initialChunk) {
socket.unshift(initialChunk);
}
socket.pipe(targetSocket);
targetSocket.pipe(socket);
console.log(
`Connection established: ${remoteIP} -> ${targetHost}:${connectionOptions.port}` +
`${serverName ? ` (SNI: ${serverName})` : forcedDomain ? ` (Port-based for domain: ${forcedDomain.domains.join(', ')})` : ''}`
);
// Add appropriate handlers for connection management
// Setup error handlers immediately
socket.on('error', handleError('incoming'));
targetSocket.on('error', handleError('outgoing'));
socket.on('close', handleClose('incoming'));
targetSocket.on('close', handleClose('outgoing'));
// Handle timeouts
socket.on('timeout', () => {
console.log(`Timeout on incoming side from ${remoteIP}`);
if (incomingTerminationReason === null) {
@ -435,13 +440,72 @@ export class PortProxy {
socket.setTimeout(120000);
targetSocket.setTimeout(120000);
// Update activity for both sockets
socket.on('data', () => {
connectionRecord.lastActivity = Date.now();
});
targetSocket.on('data', () => {
connectionRecord.lastActivity = Date.now();
// Wait for the outgoing connection to be ready before setting up piping
targetSocket.once('connect', () => {
// Remove temporary data handler
socket.removeListener('data', tempDataHandler);
// Flush all pending data to target
if (connectionRecord.pendingData.length > 0) {
const combinedData = Buffer.concat(connectionRecord.pendingData);
targetSocket.write(combinedData, (err) => {
if (err) {
console.log(`Error writing pending data to target: ${err.message}`);
return initiateCleanupOnce('write_error');
}
// Now set up piping for future data
socket.pipe(targetSocket);
targetSocket.pipe(socket);
console.log(
`Connection established: ${remoteIP} -> ${targetHost}:${connectionOptions.port}` +
`${serverName ? ` (SNI: ${serverName})` : forcedDomain ? ` (Port-based for domain: ${forcedDomain.domains.join(', ')})` : ''}`
);
});
} else {
// No pending data, so just set up piping
socket.pipe(targetSocket);
targetSocket.pipe(socket);
console.log(
`Connection established: ${remoteIP} -> ${targetHost}:${connectionOptions.port}` +
`${serverName ? ` (SNI: ${serverName})` : forcedDomain ? ` (Port-based for domain: ${forcedDomain.domains.join(', ')})` : ''}`
);
}
// Clear the buffer now that we've processed it
connectionRecord.pendingData = [];
// Set up activity tracking
socket.on('data', () => {
connectionRecord.lastActivity = Date.now();
});
targetSocket.on('data', () => {
connectionRecord.lastActivity = Date.now();
});
// Add the renegotiation listener (we don't need setImmediate here anymore
// since we're already in the connect callback)
if (serverName) {
socket.on('data', (renegChunk: Buffer) => {
if (renegChunk.length > 0 && renegChunk.readUInt8(0) === 22) {
try {
// Try to extract SNI from potential renegotiation
const newSNI = extractSNI(renegChunk);
if (newSNI && newSNI !== connectionRecord.lockedDomain) {
console.log(`Rehandshake detected with different SNI: ${newSNI} vs locked ${connectionRecord.lockedDomain}. Terminating connection.`);
initiateCleanupOnce('sni_mismatch');
} else if (newSNI) {
console.log(`Rehandshake detected with same SNI: ${newSNI}. Allowing.`);
}
} catch (err) {
console.log(`Error processing potential renegotiation: ${err}. Allowing connection to continue.`);
}
}
});
}
});
// Initialize a cleanup timer for max connection lifetime
@ -514,27 +578,6 @@ export class PortProxy {
connectionRecord.lockedDomain = serverName;
console.log(`Received connection from ${remoteIP} with SNI: ${serverName}`);
// Delay adding the renegotiation listener until the next tick,
// so the initial ClientHello is not reprocessed.
setImmediate(() => {
socket.on('data', (renegChunk: Buffer) => {
if (renegChunk.length > 0 && renegChunk.readUInt8(0) === 22) {
try {
// Try to extract SNI from potential renegotiation
const newSNI = extractSNI(renegChunk);
if (newSNI && newSNI !== connectionRecord.lockedDomain) {
console.log(`Rehandshake detected with different SNI: ${newSNI} vs locked ${connectionRecord.lockedDomain}. Terminating connection.`);
initiateCleanupOnce('sni_mismatch');
} else if (newSNI) {
console.log(`Rehandshake detected with same SNI: ${newSNI}. Allowing.`);
}
} catch (err) {
console.log(`Error processing potential renegotiation: ${err}. Allowing connection to continue.`);
}
}
});
});
setupConnection(serverName, chunk);
});
} else {