fix(portproxy): Refactored connection cleanup logic in PortProxy

This commit is contained in:
Philipp Kunz 2025-03-03 02:03:24 +00:00
parent 12de96a7d5
commit 131d9d326e
3 changed files with 22 additions and 123 deletions

View File

@ -1,5 +1,12 @@
# Changelog
## 2025-03-03 - 3.22.2 - fix(portproxy)
Refactored connection cleanup logic in PortProxy
- Simplified the connection cleanup logic by removing redundant methods.
- Consolidated the cleanup initiation and execution into a single cleanup method.
- Improved error handling by ensuring connections are closed appropriately.
## 2025-03-03 - 3.22.1 - fix(PortProxy)
Fix connection timeout and IP validation handling for PortProxy

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartproxy',
version: '3.22.1',
version: '3.22.2',
description: 'A powerful proxy package that effectively handles high traffic, with features such as SSL/TLS support, port proxying, WebSocket handling, and dynamic routing with authentication options.'
}

View File

@ -98,7 +98,6 @@ interface IConnectionRecord {
lockedDomain?: string; // Field to lock this connection to the initial SNI
connectionClosed: boolean;
cleanupTimer?: NodeJS.Timeout; // Timer to force cleanup after max lifetime/inactivity
cleanupInitiated: boolean; // Flag to track if cleanup has been initiated but not completed
id: string; // Unique identifier for the connection
lastActivity: number; // Timestamp of last activity on either socket
}
@ -174,77 +173,26 @@ export class PortProxy {
}
/**
* Initiates the cleanup process for a connection.
* Sets the flag to prevent duplicate cleanup attempts and schedules actual cleanup.
* Cleans up a connection record if not already cleaned up.
* Destroys both incoming and outgoing sockets, clears timers, and removes the record.
* Logs the cleanup event.
*/
private initiateCleanup(record: IConnectionRecord, reason: string = 'normal'): void {
if (record.cleanupInitiated) return;
record.cleanupInitiated = true;
const remoteIP = record.incoming.remoteAddress || 'unknown';
console.log(`Initiating cleanup for connection ${record.id} from ${remoteIP} (reason: ${reason})`);
// Execute cleanup immediately to prevent lingering connections
this.executeCleanup(record);
}
/**
* Executes the actual cleanup of a connection.
* Destroys sockets, clears timers, and removes the record.
*/
private executeCleanup(record: IConnectionRecord): void {
if (record.connectionClosed) return;
record.connectionClosed = true;
const remoteIP = record.incoming.remoteAddress || 'unknown';
if (record.cleanupTimer) {
clearTimeout(record.cleanupTimer);
record.cleanupTimer = undefined;
}
// End the sockets first to allow for graceful closure
try {
if (!record.incoming.destroyed) {
record.incoming.end();
// Set a safety timeout to force destroy if end doesn't complete
setTimeout(() => {
if (!record.incoming.destroyed) {
console.log(`Forcing destruction of incoming socket for ${remoteIP}`);
record.incoming.destroy();
}
}, 1000);
private cleanupConnection(record: IConnectionRecord, reason: string = 'normal'): void {
if (!record.connectionClosed) {
record.connectionClosed = true;
if (record.cleanupTimer) {
clearTimeout(record.cleanupTimer);
}
} catch (err) {
console.error(`Error ending incoming socket for ${remoteIP}:`, err);
if (!record.incoming.destroyed) {
record.incoming.destroy();
}
}
try {
if (record.outgoing && !record.outgoing.destroyed) {
record.outgoing.end();
// Set a safety timeout to force destroy if end doesn't complete
setTimeout(() => {
if (record.outgoing && !record.outgoing.destroyed) {
console.log(`Forcing destruction of outgoing socket for ${remoteIP}`);
record.outgoing.destroy();
}
}, 1000);
}
} catch (err) {
console.error(`Error ending outgoing socket for ${remoteIP}:`, err);
if (record.outgoing && !record.outgoing.destroyed) {
record.outgoing.destroy();
}
}
// Remove the record after a delay to ensure all events have propagated
setTimeout(() => {
this.connectionRecords.delete(record.id);
console.log(`Connection ${record.id} from ${remoteIP} fully cleaned up. Active connections: ${this.connectionRecords.size}`);
}, 2000);
const remoteIP = record.incoming.remoteAddress || 'unknown';
console.log(`Connection from ${remoteIP} terminated (${reason}). Active connections: ${this.connectionRecords.size}`);
}
}
private getTargetIP(domainConfig: IDomainConfig): string {
@ -257,25 +205,8 @@ export class PortProxy {
return this.settings.targetIP!;
}
/**
* Updates the last activity timestamp for a connection record
*/
private updateActivity(record: IConnectionRecord): void {
record.lastActivity = Date.now();
// Reset the inactivity timer if one is set
if (this.settings.maxConnectionLifetime && record.cleanupTimer) {
clearTimeout(record.cleanupTimer);
// Set a new cleanup timer
record.cleanupTimer = setTimeout(() => {
const now = Date.now();
const inactivityTime = now - record.lastActivity;
const remoteIP = record.incoming.remoteAddress || 'unknown';
console.log(`Connection ${record.id} from ${remoteIP} exceeded max lifetime or inactivity period (${inactivityTime}ms), forcing cleanup.`);
this.initiateCleanup(record, 'timeout');
}, this.settings.maxConnectionLifetime);
}
}
public async start() {
@ -313,7 +244,6 @@ export class PortProxy {
this.initiateCleanup(connectionRecord, reason);
};
// Helper to reject an incoming connection.
const rejectIncomingConnection = (reason: string, logMessage: string) => {
console.log(logMessage);
socket.end();
@ -321,7 +251,7 @@ export class PortProxy {
incomingTerminationReason = reason;
this.incrementTerminationStat('incoming', reason);
}
initiateCleanupOnce(reason);
cleanupOnce();
};
// IMPORTANT: We won't set any initial timeout for a chained proxy scenario
@ -369,22 +299,6 @@ export class PortProxy {
? `(Immediate) Incoming socket error from ${remoteIP}: ${err.message}`
: `(Premature) Incoming socket error from ${remoteIP} before data received: ${err.message}`;
console.log(errorMessage);
// Clear the initial timeout if it exists
if (initialTimeout) {
clearTimeout(initialTimeout);
initialTimeout = null;
}
// For premature errors, we need to handle them explicitly
// since the standard error handlers might not be set up yet
if (!initialDataReceived) {
if (incomingTerminationReason === null) {
incomingTerminationReason = 'premature_error';
this.incrementTerminationStat('incoming', 'premature_error');
}
initiateCleanupOnce('premature_error');
}
});
const handleError = (side: 'incoming' | 'outgoing') => (err: Error) => {
@ -393,13 +307,9 @@ export class PortProxy {
if (code === 'ECONNRESET') {
reason = 'econnreset';
console.log(`ECONNRESET on ${side} side from ${remoteIP}: ${err.message}`);
} else if (code === 'ECONNREFUSED') {
reason = 'econnrefused';
console.log(`ECONNREFUSED on ${side} side from ${remoteIP}: ${err.message}`);
} else {
console.log(`Error on ${side} side from ${remoteIP}: ${err.message}`);
}
if (side === 'incoming' && incomingTerminationReason === null) {
incomingTerminationReason = reason;
this.incrementTerminationStat('incoming', reason);
@ -407,13 +317,11 @@ export class PortProxy {
outgoingTerminationReason = reason;
this.incrementTerminationStat('outgoing', reason);
}
initiateCleanupOnce(reason);
cleanupOnce();
};
const handleClose = (side: 'incoming' | 'outgoing') => () => {
console.log(`Connection closed on ${side} side from ${remoteIP}`);
if (side === 'incoming' && incomingTerminationReason === null) {
incomingTerminationReason = 'normal';
this.incrementTerminationStat('incoming', 'normal');
@ -422,24 +330,8 @@ export class PortProxy {
this.incrementTerminationStat('outgoing', 'normal');
// Record the time when outgoing socket closed.
connectionRecord.outgoingClosedTime = Date.now();
// If incoming is still active but outgoing closed, set a shorter timeout
if (!connectionRecord.incoming.destroyed) {
console.log(`Outgoing socket closed but incoming still active for ${remoteIP}. Setting cleanup timeout.`);
setTimeout(() => {
if (!connectionRecord.connectionClosed && !connectionRecord.incoming.destroyed) {
console.log(`Incoming socket still active ${Date.now() - connectionRecord.outgoingClosedTime!}ms after outgoing closed for ${remoteIP}. Cleaning up.`);
initiateCleanupOnce('outgoing_closed_timeout');
}
}, 10000); // 10 second timeout instead of waiting for the next parity check
}
}
// If both sides are closed/destroyed, clean up
if ((side === 'incoming' && connectionRecord.outgoing?.destroyed) ||
(side === 'outgoing' && connectionRecord.incoming.destroyed)) {
initiateCleanupOnce('both_closed');
}
cleanupOnce();
};
/**