feat(core): Enhance core functionalities and test coverage for NetworkProxy and PortProxy

This commit is contained in:
2025-03-05 17:06:51 +00:00
parent 48c5ea3b1d
commit f6cc665f12
8 changed files with 1335 additions and 482 deletions

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartproxy',
version: '3.23.1',
version: '3.24.0',
description: 'A powerful proxy package that effectively handles high traffic, with features such as SSL/TLS support, port proxying, WebSocket handling, and dynamic routing with authentication options.'
}

File diff suppressed because it is too large Load Diff

View File

@ -23,6 +23,13 @@ export interface IPortProxySettings extends plugins.tls.TlsOptions {
globalPortRanges: Array<{ from: number; to: number }>; // Global allowed port ranges
forwardAllGlobalRanges?: boolean; // When true, forwards all connections on global port ranges to the global targetIP
gracefulShutdownTimeout?: number; // (ms) maximum time to wait for connections to close during shutdown
// Socket optimization settings
noDelay?: boolean; // Disable Nagle's algorithm (default: true)
keepAlive?: boolean; // Enable TCP keepalive (default: true)
keepAliveInitialDelay?: number; // Initial delay before sending keepalive probes (ms)
maxPendingDataSize?: number; // Maximum bytes to buffer during connection setup
initialDataTimeout?: number; // Timeout for initial data/SNI (ms)
}
/**
@ -100,6 +107,7 @@ interface IConnectionRecord {
cleanupTimer?: NodeJS.Timeout; // Timer for max lifetime/inactivity
lastActivity: number; // Last activity timestamp for inactivity detection
pendingData: Buffer[]; // Buffer to hold data during connection setup
pendingDataSize: number; // Track total size of pending data
}
// Helper: Check if a port falls within any of the given port ranges
@ -161,6 +169,11 @@ export class PortProxy {
targetIP: settingsArg.targetIP || 'localhost',
maxConnectionLifetime: settingsArg.maxConnectionLifetime || 600000,
gracefulShutdownTimeout: settingsArg.gracefulShutdownTimeout || 30000,
noDelay: settingsArg.noDelay !== undefined ? settingsArg.noDelay : true,
keepAlive: settingsArg.keepAlive !== undefined ? settingsArg.keepAlive : true,
keepAliveInitialDelay: settingsArg.keepAliveInitialDelay || 60000, // 1 minute
maxPendingDataSize: settingsArg.maxPendingDataSize || 1024 * 1024, // 1MB
initialDataTimeout: settingsArg.initialDataTimeout || 5000 // 5 seconds
};
}
@ -187,16 +200,29 @@ export class PortProxy {
if (!record.incoming.destroyed) {
// Try graceful shutdown first, then force destroy after a short timeout
record.incoming.end();
setTimeout(() => {
if (record && !record.incoming.destroyed) {
record.incoming.destroy();
const incomingTimeout = setTimeout(() => {
try {
if (record && !record.incoming.destroyed) {
record.incoming.destroy();
}
} catch (err) {
console.log(`Error destroying incoming socket: ${err}`);
}
}, 1000);
// Ensure the timeout doesn't block Node from exiting
if (incomingTimeout.unref) {
incomingTimeout.unref();
}
}
} catch (err) {
console.log(`Error closing incoming socket: ${err}`);
if (!record.incoming.destroyed) {
record.incoming.destroy();
try {
if (!record.incoming.destroyed) {
record.incoming.destroy();
}
} catch (destroyErr) {
console.log(`Error destroying incoming socket: ${destroyErr}`);
}
}
@ -204,19 +230,36 @@ export class PortProxy {
if (record.outgoing && !record.outgoing.destroyed) {
// Try graceful shutdown first, then force destroy after a short timeout
record.outgoing.end();
setTimeout(() => {
if (record && record.outgoing && !record.outgoing.destroyed) {
record.outgoing.destroy();
const outgoingTimeout = setTimeout(() => {
try {
if (record && record.outgoing && !record.outgoing.destroyed) {
record.outgoing.destroy();
}
} catch (err) {
console.log(`Error destroying outgoing socket: ${err}`);
}
}, 1000);
// Ensure the timeout doesn't block Node from exiting
if (outgoingTimeout.unref) {
outgoingTimeout.unref();
}
}
} catch (err) {
console.log(`Error closing outgoing socket: ${err}`);
if (record.outgoing && !record.outgoing.destroyed) {
record.outgoing.destroy();
try {
if (record.outgoing && !record.outgoing.destroyed) {
record.outgoing.destroy();
}
} catch (destroyErr) {
console.log(`Error destroying outgoing socket: ${destroyErr}`);
}
}
// Clear pendingData to avoid memory leaks
record.pendingData = [];
record.pendingDataSize = 0;
// Remove the record from the tracking map
this.connectionRecords.delete(record.id);
@ -240,6 +283,11 @@ export class PortProxy {
}
public async start() {
// Don't start if already shutting down
if (this.isShuttingDown) {
console.log("Cannot start PortProxy while it's shutting down");
return;
}
// Define a unified connection handler for all listening ports.
const connectionHandler = (socket: plugins.net.Socket) => {
if (this.isShuttingDown) {
@ -251,6 +299,10 @@ export class PortProxy {
const remoteIP = socket.remoteAddress || '';
const localPort = socket.localPort; // The port on which this connection was accepted.
// Apply socket optimizations
socket.setNoDelay(this.settings.noDelay);
socket.setKeepAlive(this.settings.keepAlive, this.settings.keepAliveInitialDelay);
const connectionId = generateConnectionId();
const connectionRecord: IConnectionRecord = {
id: connectionId,
@ -259,7 +311,8 @@ export class PortProxy {
incomingStartTime: Date.now(),
lastActivity: Date.now(),
connectionClosed: false,
pendingData: [] // Initialize buffer for pending data
pendingData: [], // Initialize buffer for pending data
pendingDataSize: 0 // Initialize buffer size counter
};
this.connectionRecords.set(connectionId, connectionRecord);
@ -296,11 +349,15 @@ export class PortProxy {
if (this.settings.sniEnabled) {
initialTimeout = setTimeout(() => {
if (!initialDataReceived) {
console.log(`Initial data timeout for ${remoteIP}`);
console.log(`Initial data timeout (${this.settings.initialDataTimeout}ms) for connection from ${remoteIP} on port ${localPort}`);
if (incomingTerminationReason === null) {
incomingTerminationReason = 'initial_timeout';
this.incrementTerminationStat('incoming', 'initial_timeout');
}
socket.end();
cleanupOnce();
}
}, 5000);
}, this.settings.initialDataTimeout || 5000);
} else {
initialDataReceived = true;
}
@ -393,9 +450,23 @@ export class PortProxy {
connectionOptions.localAddress = remoteIP.replace('::ffff:', '');
}
// Pause the incoming socket to prevent buffer overflows
socket.pause();
// Temporary handler to collect data during connection setup
const tempDataHandler = (chunk: Buffer) => {
// Check if adding this chunk would exceed the buffer limit
const newSize = connectionRecord.pendingDataSize + chunk.length;
if (this.settings.maxPendingDataSize && newSize > this.settings.maxPendingDataSize) {
console.log(`Buffer limit exceeded for connection from ${remoteIP}: ${newSize} bytes > ${this.settings.maxPendingDataSize} bytes`);
socket.end(); // Gracefully close the socket
return initiateCleanupOnce('buffer_limit_exceeded');
}
// Buffer the chunk and update the size counter
connectionRecord.pendingData.push(Buffer.from(chunk));
connectionRecord.pendingDataSize = newSize;
this.updateActivity(connectionRecord);
};
@ -405,6 +476,7 @@ export class PortProxy {
// Add initial chunk to pending data if present
if (initialChunk) {
connectionRecord.pendingData.push(Buffer.from(initialChunk));
connectionRecord.pendingDataSize = initialChunk.length;
}
// Create the target socket but don't set up piping immediately
@ -412,11 +484,47 @@ export class PortProxy {
connectionRecord.outgoing = targetSocket;
connectionRecord.outgoingStartTime = Date.now();
// Setup error handlers immediately
socket.on('error', handleError('incoming'));
targetSocket.on('error', handleError('outgoing'));
socket.on('close', handleClose('incoming'));
// Apply socket optimizations
targetSocket.setNoDelay(this.settings.noDelay);
targetSocket.setKeepAlive(this.settings.keepAlive, this.settings.keepAliveInitialDelay);
// Setup specific error handler for connection phase
targetSocket.once('error', (err) => {
// This handler runs only once during the initial connection phase
const code = (err as any).code;
console.log(`Connection setup error to ${targetHost}:${connectionOptions.port}: ${err.message} (${code})`);
// Resume the incoming socket to prevent it from hanging
socket.resume();
if (code === 'ECONNREFUSED') {
console.log(`Target ${targetHost}:${connectionOptions.port} refused connection`);
} else if (code === 'ETIMEDOUT') {
console.log(`Connection to ${targetHost}:${connectionOptions.port} timed out`);
} else if (code === 'ECONNRESET') {
console.log(`Connection to ${targetHost}:${connectionOptions.port} was reset`);
} else if (code === 'EHOSTUNREACH') {
console.log(`Host ${targetHost} is unreachable`);
}
// Clear any existing error handler after connection phase
targetSocket.removeAllListeners('error');
// Re-add the normal error handler for established connections
targetSocket.on('error', handleError('outgoing'));
if (outgoingTerminationReason === null) {
outgoingTerminationReason = 'connection_failed';
this.incrementTerminationStat('outgoing', 'connection_failed');
}
// Clean up the connection
initiateCleanupOnce(`connection_failed_${code}`);
});
// Setup close handler
targetSocket.on('close', handleClose('outgoing'));
socket.on('close', handleClose('incoming'));
// Handle timeouts
socket.on('timeout', () => {
@ -442,6 +550,12 @@ export class PortProxy {
// Wait for the outgoing connection to be ready before setting up piping
targetSocket.once('connect', () => {
// Clear the initial connection error handler
targetSocket.removeAllListeners('error');
// Add the normal error handler for established connections
targetSocket.on('error', handleError('outgoing'));
// Remove temporary data handler
socket.removeListener('data', tempDataHandler);
@ -454,9 +568,10 @@ export class PortProxy {
return initiateCleanupOnce('write_error');
}
// Now set up piping for future data
// Now set up piping for future data and resume the socket
socket.pipe(targetSocket);
targetSocket.pipe(socket);
socket.resume(); // Resume the socket after piping is established
console.log(
`Connection established: ${remoteIP} -> ${targetHost}:${connectionOptions.port}` +
@ -467,6 +582,7 @@ export class PortProxy {
// No pending data, so just set up piping
socket.pipe(targetSocket);
targetSocket.pipe(socket);
socket.resume(); // Resume the socket after piping is established
console.log(
`Connection established: ${remoteIP} -> ${targetHost}:${connectionOptions.port}` +
@ -476,6 +592,7 @@ export class PortProxy {
// Clear the buffer now that we've processed it
connectionRecord.pendingData = [];
connectionRecord.pendingDataSize = 0;
// Set up activity tracking
socket.on('data', () => {
@ -620,6 +737,8 @@ export class PortProxy {
// Log active connection count, longest running durations, and run parity checks every 10 seconds.
this.connectionLogger = setInterval(() => {
// Immediately return if shutting down
if (this.isShuttingDown) return;
if (this.isShuttingDown) return;
const now = Date.now();
@ -675,7 +794,16 @@ export class PortProxy {
const closeServerPromises: Promise<void>[] = this.netServers.map(
server =>
new Promise<void>((resolve) => {
server.close(() => resolve());
if (!server.listening) {
resolve();
return;
}
server.close((err) => {
if (err) {
console.log(`Error closing server: ${err.message}`);
}
resolve();
});
})
);
@ -689,47 +817,77 @@ export class PortProxy {
await Promise.all(closeServerPromises);
console.log("All servers closed. Cleaning up active connections...");
// Clean up active connections
// Force destroy all active connections immediately
const connectionIds = [...this.connectionRecords.keys()];
console.log(`Cleaning up ${connectionIds.length} active connections...`);
// First pass: End all connections gracefully
for (const id of connectionIds) {
const record = this.connectionRecords.get(id);
if (record && !record.connectionClosed) {
this.cleanupConnection(record, 'shutdown');
if (record) {
try {
// Clear any timers
if (record.cleanupTimer) {
clearTimeout(record.cleanupTimer);
record.cleanupTimer = undefined;
}
// End sockets gracefully
if (record.incoming && !record.incoming.destroyed) {
record.incoming.end();
}
if (record.outgoing && !record.outgoing.destroyed) {
record.outgoing.end();
}
} catch (err) {
console.log(`Error during graceful connection end for ${id}: ${err}`);
}
}
}
// Wait for graceful shutdown or timeout
const shutdownTimeout = this.settings.gracefulShutdownTimeout || 30000;
await new Promise<void>((resolve) => {
const checkInterval = setInterval(() => {
if (this.connectionRecords.size === 0) {
clearInterval(checkInterval);
resolve(); // lets resolve here as early as we reach 0 remaining connections
}
}, 1000);
// Force resolve after timeout
setTimeout(() => {
clearInterval(checkInterval);
if (this.connectionRecords.size > 0) {
console.log(`Forcing shutdown with ${this.connectionRecords.size} connections still active`);
// Force destroy any remaining connections
for (const record of this.connectionRecords.values()) {
// Short delay to allow graceful ends to process
await new Promise(resolve => setTimeout(resolve, 100));
// Second pass: Force destroy everything
for (const id of connectionIds) {
const record = this.connectionRecords.get(id);
if (record) {
try {
// Remove all listeners to prevent memory leaks
if (record.incoming) {
record.incoming.removeAllListeners();
if (!record.incoming.destroyed) {
record.incoming.destroy();
}
if (record.outgoing && !record.outgoing.destroyed) {
}
if (record.outgoing) {
record.outgoing.removeAllListeners();
if (!record.outgoing.destroyed) {
record.outgoing.destroy();
}
}
this.connectionRecords.clear();
} catch (err) {
console.log(`Error during forced connection destruction for ${id}: ${err}`);
}
resolve();
}, shutdownTimeout);
});
}
}
// Clear the connection records map
this.connectionRecords.clear();
// Clear the domain target indices map to prevent memory leaks
this.domainTargetIndices.clear();
// Clear any servers array
this.netServers = [];
// Reset termination stats
this.terminationStats = {
incoming: {},
outgoing: {}
};
console.log("PortProxy shutdown complete.");
}