1254 lines
41 KiB
TypeScript
1254 lines
41 KiB
TypeScript
import * as plugins from './plugins.js';
|
|
import { ProxyRouter } from './classes.router.js';
|
|
import * as fs from 'fs';
|
|
import * as path from 'path';
|
|
import { fileURLToPath } from 'url';
|
|
|
|
export interface INetworkProxyOptions {
|
|
port: number;
|
|
maxConnections?: number;
|
|
keepAliveTimeout?: number;
|
|
headersTimeout?: number;
|
|
logLevel?: 'error' | 'warn' | 'info' | 'debug';
|
|
cors?: {
|
|
allowOrigin?: string;
|
|
allowMethods?: string;
|
|
allowHeaders?: string;
|
|
maxAge?: number;
|
|
};
|
|
|
|
// New settings for PortProxy integration
|
|
connectionPoolSize?: number; // Maximum connections to maintain in the pool to each backend
|
|
portProxyIntegration?: boolean; // Flag to indicate this proxy is used by PortProxy
|
|
}
|
|
|
|
interface IWebSocketWithHeartbeat extends plugins.wsDefault {
|
|
lastPong: number;
|
|
isAlive: boolean;
|
|
}
|
|
|
|
export class NetworkProxy {
|
|
// Configuration
|
|
public options: INetworkProxyOptions;
|
|
public proxyConfigs: plugins.tsclass.network.IReverseProxyConfig[] = [];
|
|
public defaultHeaders: { [key: string]: string } = {};
|
|
|
|
// Server instances
|
|
public httpsServer: plugins.https.Server;
|
|
public wsServer: plugins.ws.WebSocketServer;
|
|
|
|
// State tracking
|
|
public router = new ProxyRouter();
|
|
public socketMap = new plugins.lik.ObjectMap<plugins.net.Socket>();
|
|
public activeContexts: Set<string> = new Set();
|
|
public connectedClients: number = 0;
|
|
public startTime: number = 0;
|
|
public requestsServed: number = 0;
|
|
public failedRequests: number = 0;
|
|
|
|
// New tracking for PortProxy integration
|
|
private portProxyConnections: number = 0;
|
|
private tlsTerminatedConnections: number = 0;
|
|
|
|
// Timers and intervals
|
|
private heartbeatInterval: NodeJS.Timeout;
|
|
private metricsInterval: NodeJS.Timeout;
|
|
private connectionPoolCleanupInterval: NodeJS.Timeout;
|
|
|
|
// Certificates
|
|
private defaultCertificates: { key: string; cert: string };
|
|
private certificateCache: Map<string, { key: string; cert: string; expires?: Date }> = new Map();
|
|
|
|
// New connection pool for backend connections
|
|
private connectionPool: Map<string, Array<{
|
|
socket: plugins.net.Socket;
|
|
lastUsed: number;
|
|
isIdle: boolean;
|
|
}>> = new Map();
|
|
|
|
/**
|
|
* Creates a new NetworkProxy instance
|
|
*/
|
|
constructor(optionsArg: INetworkProxyOptions) {
|
|
// Set default options
|
|
this.options = {
|
|
port: optionsArg.port,
|
|
maxConnections: optionsArg.maxConnections || 10000,
|
|
keepAliveTimeout: optionsArg.keepAliveTimeout || 120000, // 2 minutes
|
|
headersTimeout: optionsArg.headersTimeout || 60000, // 1 minute
|
|
logLevel: optionsArg.logLevel || 'info',
|
|
cors: optionsArg.cors || {
|
|
allowOrigin: '*',
|
|
allowMethods: 'GET, POST, PUT, DELETE, OPTIONS',
|
|
allowHeaders: 'Content-Type, Authorization',
|
|
maxAge: 86400
|
|
},
|
|
// New defaults for PortProxy integration
|
|
connectionPoolSize: optionsArg.connectionPoolSize || 50,
|
|
portProxyIntegration: optionsArg.portProxyIntegration || false
|
|
};
|
|
|
|
this.loadDefaultCertificates();
|
|
}
|
|
|
|
/**
|
|
* Loads default certificates from the filesystem
|
|
*/
|
|
private loadDefaultCertificates(): void {
|
|
const __dirname = path.dirname(fileURLToPath(import.meta.url));
|
|
const certPath = path.join(__dirname, '..', 'assets', 'certs');
|
|
|
|
try {
|
|
this.defaultCertificates = {
|
|
key: fs.readFileSync(path.join(certPath, 'key.pem'), 'utf8'),
|
|
cert: fs.readFileSync(path.join(certPath, 'cert.pem'), 'utf8')
|
|
};
|
|
this.log('info', 'Default certificates loaded successfully');
|
|
} catch (error) {
|
|
this.log('error', 'Error loading default certificates', error);
|
|
|
|
// Generate self-signed fallback certificates
|
|
try {
|
|
// This is a placeholder for actual certificate generation code
|
|
// In a real implementation, you would use a library like selfsigned to generate certs
|
|
this.defaultCertificates = {
|
|
key: "FALLBACK_KEY_CONTENT",
|
|
cert: "FALLBACK_CERT_CONTENT"
|
|
};
|
|
this.log('warn', 'Using fallback self-signed certificates');
|
|
} catch (fallbackError) {
|
|
this.log('error', 'Failed to generate fallback certificates', fallbackError);
|
|
throw new Error('Could not load or generate SSL certificates');
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Returns the port number this NetworkProxy is listening on
|
|
* Useful for PortProxy to determine where to forward connections
|
|
*/
|
|
public getListeningPort(): number {
|
|
return this.options.port;
|
|
}
|
|
|
|
/**
|
|
* Updates the server capacity settings
|
|
* @param maxConnections Maximum number of simultaneous connections
|
|
* @param keepAliveTimeout Keep-alive timeout in milliseconds
|
|
* @param connectionPoolSize Size of the connection pool per backend
|
|
*/
|
|
public updateCapacity(maxConnections?: number, keepAliveTimeout?: number, connectionPoolSize?: number): void {
|
|
if (maxConnections !== undefined) {
|
|
this.options.maxConnections = maxConnections;
|
|
this.log('info', `Updated max connections to ${maxConnections}`);
|
|
}
|
|
|
|
if (keepAliveTimeout !== undefined) {
|
|
this.options.keepAliveTimeout = keepAliveTimeout;
|
|
|
|
if (this.httpsServer) {
|
|
this.httpsServer.keepAliveTimeout = keepAliveTimeout;
|
|
this.log('info', `Updated keep-alive timeout to ${keepAliveTimeout}ms`);
|
|
}
|
|
}
|
|
|
|
if (connectionPoolSize !== undefined) {
|
|
this.options.connectionPoolSize = connectionPoolSize;
|
|
this.log('info', `Updated connection pool size to ${connectionPoolSize}`);
|
|
|
|
// Cleanup excess connections in the pool if the size was reduced
|
|
this.cleanupConnectionPool();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Returns current server metrics
|
|
* Useful for PortProxy to determine which NetworkProxy to use for load balancing
|
|
*/
|
|
public getMetrics(): any {
|
|
return {
|
|
activeConnections: this.connectedClients,
|
|
totalRequests: this.requestsServed,
|
|
failedRequests: this.failedRequests,
|
|
portProxyConnections: this.portProxyConnections,
|
|
tlsTerminatedConnections: this.tlsTerminatedConnections,
|
|
connectionPoolSize: Array.from(this.connectionPool.entries()).reduce((acc, [host, connections]) => {
|
|
acc[host] = connections.length;
|
|
return acc;
|
|
}, {} as Record<string, number>),
|
|
uptime: Math.floor((Date.now() - this.startTime) / 1000),
|
|
memoryUsage: process.memoryUsage(),
|
|
activeWebSockets: this.wsServer?.clients.size || 0
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Cleanup the connection pool by removing idle connections
|
|
* or reducing pool size if it exceeds the configured maximum
|
|
*/
|
|
private cleanupConnectionPool(): void {
|
|
const now = Date.now();
|
|
const idleTimeout = this.options.keepAliveTimeout || 120000; // 2 minutes default
|
|
|
|
for (const [host, connections] of this.connectionPool.entries()) {
|
|
// Sort by last used time (oldest first)
|
|
connections.sort((a, b) => a.lastUsed - b.lastUsed);
|
|
|
|
// Remove idle connections older than the idle timeout
|
|
let removed = 0;
|
|
while (connections.length > 0) {
|
|
const connection = connections[0];
|
|
|
|
// Remove if idle and exceeds timeout, or if pool is too large
|
|
if ((connection.isIdle && now - connection.lastUsed > idleTimeout) ||
|
|
connections.length > this.options.connectionPoolSize!) {
|
|
|
|
try {
|
|
if (!connection.socket.destroyed) {
|
|
connection.socket.end();
|
|
connection.socket.destroy();
|
|
}
|
|
} catch (err) {
|
|
this.log('error', `Error destroying pooled connection to ${host}`, err);
|
|
}
|
|
|
|
connections.shift(); // Remove from pool
|
|
removed++;
|
|
} else {
|
|
break; // Stop removing if we've reached active or recent connections
|
|
}
|
|
}
|
|
|
|
if (removed > 0) {
|
|
this.log('debug', `Removed ${removed} idle connections from pool for ${host}, ${connections.length} remaining`);
|
|
}
|
|
|
|
// Update the pool with the remaining connections
|
|
if (connections.length === 0) {
|
|
this.connectionPool.delete(host);
|
|
} else {
|
|
this.connectionPool.set(host, connections);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get a connection from the pool or create a new one
|
|
*/
|
|
private getConnectionFromPool(host: string, port: number): Promise<plugins.net.Socket> {
|
|
return new Promise((resolve, reject) => {
|
|
const poolKey = `${host}:${port}`;
|
|
const connectionList = this.connectionPool.get(poolKey) || [];
|
|
|
|
// Look for an idle connection
|
|
const idleConnectionIndex = connectionList.findIndex(c => c.isIdle);
|
|
|
|
if (idleConnectionIndex >= 0) {
|
|
// Get existing connection from pool
|
|
const connection = connectionList[idleConnectionIndex];
|
|
connection.isIdle = false;
|
|
connection.lastUsed = Date.now();
|
|
this.log('debug', `Reusing connection from pool for ${poolKey}`);
|
|
|
|
// Update the pool
|
|
this.connectionPool.set(poolKey, connectionList);
|
|
|
|
resolve(connection.socket);
|
|
return;
|
|
}
|
|
|
|
// No idle connection available, create a new one if pool isn't full
|
|
if (connectionList.length < this.options.connectionPoolSize!) {
|
|
this.log('debug', `Creating new connection to ${host}:${port}`);
|
|
|
|
try {
|
|
const socket = plugins.net.connect({
|
|
host,
|
|
port,
|
|
keepAlive: true,
|
|
keepAliveInitialDelay: 30000 // 30 seconds
|
|
});
|
|
|
|
socket.once('connect', () => {
|
|
// Add to connection pool
|
|
const connection = {
|
|
socket,
|
|
lastUsed: Date.now(),
|
|
isIdle: false
|
|
};
|
|
|
|
connectionList.push(connection);
|
|
this.connectionPool.set(poolKey, connectionList);
|
|
|
|
// Setup cleanup when the connection is closed
|
|
socket.once('close', () => {
|
|
const idx = connectionList.findIndex(c => c.socket === socket);
|
|
if (idx >= 0) {
|
|
connectionList.splice(idx, 1);
|
|
this.connectionPool.set(poolKey, connectionList);
|
|
this.log('debug', `Removed closed connection from pool for ${poolKey}`);
|
|
}
|
|
});
|
|
|
|
resolve(socket);
|
|
});
|
|
|
|
socket.once('error', (err) => {
|
|
this.log('error', `Error creating connection to ${host}:${port}`, err);
|
|
reject(err);
|
|
});
|
|
} catch (err) {
|
|
this.log('error', `Failed to create connection to ${host}:${port}`, err);
|
|
reject(err);
|
|
}
|
|
} else {
|
|
// Pool is full, wait for an idle connection or reject
|
|
this.log('warn', `Connection pool for ${poolKey} is full (${connectionList.length})`);
|
|
reject(new Error(`Connection pool for ${poolKey} is full`));
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Return a connection to the pool for reuse
|
|
*/
|
|
private returnConnectionToPool(socket: plugins.net.Socket, host: string, port: number): void {
|
|
const poolKey = `${host}:${port}`;
|
|
const connectionList = this.connectionPool.get(poolKey) || [];
|
|
|
|
// Find this connection in the pool
|
|
const connectionIndex = connectionList.findIndex(c => c.socket === socket);
|
|
|
|
if (connectionIndex >= 0) {
|
|
// Mark as idle and update last used time
|
|
connectionList[connectionIndex].isIdle = true;
|
|
connectionList[connectionIndex].lastUsed = Date.now();
|
|
|
|
this.log('debug', `Returned connection to pool for ${poolKey}`);
|
|
} else {
|
|
this.log('warn', `Attempted to return unknown connection to pool for ${poolKey}`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Starts the proxy server
|
|
*/
|
|
public async start(): Promise<void> {
|
|
this.startTime = Date.now();
|
|
|
|
// Create the HTTPS server
|
|
this.httpsServer = plugins.https.createServer(
|
|
{
|
|
key: this.defaultCertificates.key,
|
|
cert: this.defaultCertificates.cert
|
|
},
|
|
(req, res) => this.handleRequest(req, res)
|
|
);
|
|
|
|
// Configure server timeouts
|
|
this.httpsServer.keepAliveTimeout = this.options.keepAliveTimeout;
|
|
this.httpsServer.headersTimeout = this.options.headersTimeout;
|
|
|
|
// Setup connection tracking
|
|
this.setupConnectionTracking();
|
|
|
|
// Setup WebSocket support
|
|
this.setupWebsocketSupport();
|
|
|
|
// Start metrics collection
|
|
this.setupMetricsCollection();
|
|
|
|
// Setup connection pool cleanup interval
|
|
this.setupConnectionPoolCleanup();
|
|
|
|
// Start the server
|
|
return new Promise((resolve) => {
|
|
this.httpsServer.listen(this.options.port, () => {
|
|
this.log('info', `NetworkProxy started on port ${this.options.port}`);
|
|
resolve();
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Sets up tracking of TCP connections
|
|
*/
|
|
private setupConnectionTracking(): void {
|
|
this.httpsServer.on('connection', (connection: plugins.net.Socket) => {
|
|
// Check if max connections reached
|
|
if (this.socketMap.getArray().length >= this.options.maxConnections) {
|
|
this.log('warn', `Max connections (${this.options.maxConnections}) reached, rejecting new connection`);
|
|
connection.destroy();
|
|
return;
|
|
}
|
|
|
|
// Add connection to tracking
|
|
this.socketMap.add(connection);
|
|
this.connectedClients = this.socketMap.getArray().length;
|
|
|
|
// Check for connection from PortProxy by inspecting the source port
|
|
// This is a heuristic - in a production environment you might use a more robust method
|
|
const localPort = connection.localPort;
|
|
const remotePort = connection.remotePort;
|
|
|
|
// If this connection is from a PortProxy (usually indicated by it coming from localhost)
|
|
if (this.options.portProxyIntegration && connection.remoteAddress?.includes('127.0.0.1')) {
|
|
this.portProxyConnections++;
|
|
this.log('debug', `New connection from PortProxy (local: ${localPort}, remote: ${remotePort})`);
|
|
} else {
|
|
this.log('debug', `New direct connection (local: ${localPort}, remote: ${remotePort})`);
|
|
}
|
|
|
|
// Setup connection cleanup handlers
|
|
const cleanupConnection = () => {
|
|
if (this.socketMap.checkForObject(connection)) {
|
|
this.socketMap.remove(connection);
|
|
this.connectedClients = this.socketMap.getArray().length;
|
|
|
|
// If this was a PortProxy connection, decrement the counter
|
|
if (this.options.portProxyIntegration && connection.remoteAddress?.includes('127.0.0.1')) {
|
|
this.portProxyConnections--;
|
|
}
|
|
|
|
this.log('debug', `Connection closed. ${this.connectedClients} connections remaining`);
|
|
}
|
|
};
|
|
|
|
connection.on('close', cleanupConnection);
|
|
connection.on('error', (err) => {
|
|
this.log('debug', 'Connection error', err);
|
|
cleanupConnection();
|
|
});
|
|
connection.on('end', cleanupConnection);
|
|
connection.on('timeout', () => {
|
|
this.log('debug', 'Connection timeout');
|
|
cleanupConnection();
|
|
});
|
|
});
|
|
|
|
// Track TLS handshake completions
|
|
this.httpsServer.on('secureConnection', (tlsSocket) => {
|
|
this.tlsTerminatedConnections++;
|
|
this.log('debug', 'TLS handshake completed, connection secured');
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Sets up WebSocket support
|
|
*/
|
|
private setupWebsocketSupport(): void {
|
|
// Create WebSocket server
|
|
this.wsServer = new plugins.ws.WebSocketServer({
|
|
server: this.httpsServer,
|
|
// Add WebSocket specific timeout
|
|
clientTracking: true
|
|
});
|
|
|
|
// Handle WebSocket connections
|
|
this.wsServer.on('connection', (wsIncoming: IWebSocketWithHeartbeat, reqArg: plugins.http.IncomingMessage) => {
|
|
this.handleWebSocketConnection(wsIncoming, reqArg);
|
|
});
|
|
|
|
// Set up the heartbeat interval (check every 30 seconds, terminate after 2 minutes of inactivity)
|
|
this.heartbeatInterval = setInterval(() => {
|
|
if (this.wsServer.clients.size === 0) {
|
|
return; // Skip if no active connections
|
|
}
|
|
|
|
this.log('debug', `WebSocket heartbeat check for ${this.wsServer.clients.size} clients`);
|
|
this.wsServer.clients.forEach((ws: plugins.wsDefault) => {
|
|
const wsWithHeartbeat = ws as IWebSocketWithHeartbeat;
|
|
|
|
if (wsWithHeartbeat.isAlive === false) {
|
|
this.log('debug', 'Terminating inactive WebSocket connection');
|
|
return wsWithHeartbeat.terminate();
|
|
}
|
|
|
|
wsWithHeartbeat.isAlive = false;
|
|
wsWithHeartbeat.ping();
|
|
});
|
|
}, 30000);
|
|
}
|
|
|
|
/**
|
|
* Sets up metrics collection
|
|
*/
|
|
private setupMetricsCollection(): void {
|
|
this.metricsInterval = setInterval(() => {
|
|
const uptime = Math.floor((Date.now() - this.startTime) / 1000);
|
|
const metrics = {
|
|
uptime,
|
|
activeConnections: this.connectedClients,
|
|
totalRequests: this.requestsServed,
|
|
failedRequests: this.failedRequests,
|
|
portProxyConnections: this.portProxyConnections,
|
|
tlsTerminatedConnections: this.tlsTerminatedConnections,
|
|
activeWebSockets: this.wsServer?.clients.size || 0,
|
|
memoryUsage: process.memoryUsage(),
|
|
activeContexts: Array.from(this.activeContexts),
|
|
connectionPool: Object.fromEntries(
|
|
Array.from(this.connectionPool.entries()).map(([host, connections]) => [
|
|
host,
|
|
{
|
|
total: connections.length,
|
|
idle: connections.filter(c => c.isIdle).length
|
|
}
|
|
])
|
|
)
|
|
};
|
|
|
|
this.log('debug', 'Proxy metrics', metrics);
|
|
}, 60000); // Log metrics every minute
|
|
}
|
|
|
|
/**
|
|
* Sets up connection pool cleanup
|
|
*/
|
|
private setupConnectionPoolCleanup(): void {
|
|
// Clean up idle connections every minute
|
|
this.connectionPoolCleanupInterval = setInterval(() => {
|
|
this.cleanupConnectionPool();
|
|
}, 60000); // 1 minute
|
|
}
|
|
|
|
/**
|
|
* Handles an incoming WebSocket connection
|
|
*/
|
|
private handleWebSocketConnection(wsIncoming: IWebSocketWithHeartbeat, reqArg: plugins.http.IncomingMessage): void {
|
|
const wsPath = reqArg.url;
|
|
const wsHost = reqArg.headers.host;
|
|
|
|
this.log('info', `WebSocket connection for ${wsHost}${wsPath}`);
|
|
|
|
// Setup heartbeat tracking
|
|
wsIncoming.isAlive = true;
|
|
wsIncoming.lastPong = Date.now();
|
|
wsIncoming.on('pong', () => {
|
|
wsIncoming.isAlive = true;
|
|
wsIncoming.lastPong = Date.now();
|
|
});
|
|
|
|
// Get the destination configuration
|
|
const wsDestinationConfig = this.router.routeReq(reqArg);
|
|
if (!wsDestinationConfig) {
|
|
this.log('warn', `No route found for WebSocket ${wsHost}${wsPath}`);
|
|
wsIncoming.terminate();
|
|
return;
|
|
}
|
|
|
|
// Check authentication if required
|
|
if (wsDestinationConfig.authentication) {
|
|
try {
|
|
if (!this.authenticateRequest(reqArg, wsDestinationConfig)) {
|
|
this.log('warn', `WebSocket authentication failed for ${wsHost}${wsPath}`);
|
|
wsIncoming.terminate();
|
|
return;
|
|
}
|
|
} catch (error) {
|
|
this.log('error', 'WebSocket authentication error', error);
|
|
wsIncoming.terminate();
|
|
return;
|
|
}
|
|
}
|
|
|
|
// Setup outgoing WebSocket connection
|
|
let wsOutgoing: plugins.wsDefault;
|
|
const outGoingDeferred = plugins.smartpromise.defer();
|
|
|
|
try {
|
|
const wsTarget = `ws://${wsDestinationConfig.destinationIp}:${wsDestinationConfig.destinationPort}${reqArg.url}`;
|
|
this.log('debug', `Proxying WebSocket to ${wsTarget}`);
|
|
|
|
wsOutgoing = new plugins.wsDefault(wsTarget);
|
|
|
|
wsOutgoing.on('open', () => {
|
|
this.log('debug', 'Outgoing WebSocket connection established');
|
|
outGoingDeferred.resolve();
|
|
});
|
|
|
|
wsOutgoing.on('error', (error) => {
|
|
this.log('error', 'Outgoing WebSocket error', error);
|
|
outGoingDeferred.reject(error);
|
|
if (wsIncoming.readyState === wsIncoming.OPEN) {
|
|
wsIncoming.terminate();
|
|
}
|
|
});
|
|
} catch (err) {
|
|
this.log('error', 'Failed to create outgoing WebSocket connection', err);
|
|
wsIncoming.terminate();
|
|
return;
|
|
}
|
|
|
|
// Handle message forwarding from client to backend
|
|
wsIncoming.on('message', async (message, isBinary) => {
|
|
try {
|
|
// Wait for outgoing connection to be ready
|
|
await outGoingDeferred.promise;
|
|
|
|
// Only forward if both connections are still open
|
|
if (wsOutgoing.readyState === wsOutgoing.OPEN) {
|
|
wsOutgoing.send(message, { binary: isBinary });
|
|
}
|
|
} catch (error) {
|
|
this.log('error', 'Error forwarding WebSocket message to backend', error);
|
|
}
|
|
});
|
|
|
|
// Handle message forwarding from backend to client
|
|
wsOutgoing.on('message', (message, isBinary) => {
|
|
try {
|
|
// Only forward if the incoming connection is still open
|
|
if (wsIncoming.readyState === wsIncoming.OPEN) {
|
|
wsIncoming.send(message, { binary: isBinary });
|
|
}
|
|
} catch (error) {
|
|
this.log('error', 'Error forwarding WebSocket message to client', error);
|
|
}
|
|
});
|
|
|
|
// Clean up connections when either side closes
|
|
wsIncoming.on('close', (code, reason) => {
|
|
this.log('debug', `Incoming WebSocket closed: ${code} - ${reason}`);
|
|
if (wsOutgoing && wsOutgoing.readyState !== wsOutgoing.CLOSED) {
|
|
try {
|
|
// Validate close code (must be 1000-4999) or use 1000 as default
|
|
const validCode = (code >= 1000 && code <= 4999) ? code : 1000;
|
|
wsOutgoing.close(validCode, reason.toString() || '');
|
|
} catch (error) {
|
|
this.log('error', 'Error closing outgoing WebSocket', error);
|
|
wsOutgoing.terminate();
|
|
}
|
|
}
|
|
});
|
|
|
|
wsOutgoing.on('close', (code, reason) => {
|
|
this.log('debug', `Outgoing WebSocket closed: ${code} - ${reason}`);
|
|
if (wsIncoming && wsIncoming.readyState !== wsIncoming.CLOSED) {
|
|
try {
|
|
// Validate close code (must be 1000-4999) or use 1000 as default
|
|
const validCode = (code >= 1000 && code <= 4999) ? code : 1000;
|
|
wsIncoming.close(validCode, reason.toString() || '');
|
|
} catch (error) {
|
|
this.log('error', 'Error closing incoming WebSocket', error);
|
|
wsIncoming.terminate();
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Handles an HTTP/HTTPS request
|
|
*/
|
|
private async handleRequest(
|
|
originRequest: plugins.http.IncomingMessage,
|
|
originResponse: plugins.http.ServerResponse
|
|
): Promise<void> {
|
|
this.requestsServed++;
|
|
const startTime = Date.now();
|
|
const reqId = `req_${Date.now()}_${Math.random().toString(36).substring(2, 7)}`;
|
|
|
|
try {
|
|
const reqPath = plugins.url.parse(originRequest.url).path;
|
|
this.log('info', `[${reqId}] ${originRequest.method} ${originRequest.headers.host}${reqPath}`);
|
|
|
|
// Handle preflight OPTIONS requests for CORS
|
|
if (originRequest.method === 'OPTIONS' && this.options.cors) {
|
|
this.handleCorsRequest(originRequest, originResponse);
|
|
return;
|
|
}
|
|
|
|
// Get destination configuration
|
|
const destinationConfig = this.router.routeReq(originRequest);
|
|
if (!destinationConfig) {
|
|
this.log('warn', `[${reqId}] No route found for ${originRequest.headers.host}`);
|
|
this.sendErrorResponse(originResponse, 404, 'Not Found: No matching route');
|
|
this.failedRequests++;
|
|
return;
|
|
}
|
|
|
|
// Handle authentication if configured
|
|
if (destinationConfig.authentication) {
|
|
try {
|
|
if (!this.authenticateRequest(originRequest, destinationConfig)) {
|
|
this.sendErrorResponse(originResponse, 401, 'Unauthorized', {
|
|
'WWW-Authenticate': 'Basic realm="Access to the proxy site", charset="UTF-8"'
|
|
});
|
|
this.failedRequests++;
|
|
return;
|
|
}
|
|
} catch (error) {
|
|
this.log('error', `[${reqId}] Authentication error`, error);
|
|
this.sendErrorResponse(originResponse, 500, 'Internal Server Error: Authentication failed');
|
|
this.failedRequests++;
|
|
return;
|
|
}
|
|
}
|
|
|
|
// Determine if we should use connection pooling
|
|
const useConnectionPool = this.options.portProxyIntegration &&
|
|
originRequest.socket.remoteAddress?.includes('127.0.0.1');
|
|
|
|
// Construct destination URL
|
|
const destinationUrl = `http://${destinationConfig.destinationIp}:${destinationConfig.destinationPort}${originRequest.url}`;
|
|
|
|
if (useConnectionPool) {
|
|
this.log('debug', `[${reqId}] Proxying to ${destinationUrl} (using connection pool)`);
|
|
await this.forwardRequestUsingConnectionPool(
|
|
reqId,
|
|
originRequest,
|
|
originResponse,
|
|
destinationConfig.destinationIp,
|
|
destinationConfig.destinationPort,
|
|
originRequest.url
|
|
);
|
|
} else {
|
|
this.log('debug', `[${reqId}] Proxying to ${destinationUrl}`);
|
|
await this.forwardRequest(reqId, originRequest, originResponse, destinationUrl);
|
|
}
|
|
|
|
const processingTime = Date.now() - startTime;
|
|
this.log('debug', `[${reqId}] Request completed in ${processingTime}ms`);
|
|
} catch (error) {
|
|
this.log('error', `[${reqId}] Unhandled error in request handler`, error);
|
|
try {
|
|
this.sendErrorResponse(originResponse, 502, 'Bad Gateway: Server error');
|
|
} catch (responseError) {
|
|
this.log('error', `[${reqId}] Failed to send error response`, responseError);
|
|
}
|
|
this.failedRequests++;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handles a CORS preflight request
|
|
*/
|
|
private handleCorsRequest(
|
|
req: plugins.http.IncomingMessage,
|
|
res: plugins.http.ServerResponse
|
|
): void {
|
|
const cors = this.options.cors;
|
|
|
|
// Set CORS headers
|
|
res.setHeader('Access-Control-Allow-Origin', cors.allowOrigin);
|
|
res.setHeader('Access-Control-Allow-Methods', cors.allowMethods);
|
|
res.setHeader('Access-Control-Allow-Headers', cors.allowHeaders);
|
|
res.setHeader('Access-Control-Max-Age', String(cors.maxAge));
|
|
|
|
// Handle preflight request
|
|
res.statusCode = 204;
|
|
res.end();
|
|
|
|
// Count this as a request served
|
|
this.requestsServed++;
|
|
}
|
|
|
|
/**
|
|
* Authenticates a request against the destination config
|
|
*/
|
|
private authenticateRequest(
|
|
req: plugins.http.IncomingMessage,
|
|
config: plugins.tsclass.network.IReverseProxyConfig
|
|
): boolean {
|
|
const authInfo = config.authentication;
|
|
if (!authInfo) {
|
|
return true; // No authentication required
|
|
}
|
|
|
|
switch (authInfo.type) {
|
|
case 'Basic': {
|
|
const authHeader = req.headers.authorization;
|
|
if (!authHeader || !authHeader.includes('Basic ')) {
|
|
return false;
|
|
}
|
|
|
|
const authStringBase64 = authHeader.replace('Basic ', '');
|
|
const authString: string = plugins.smartstring.base64.decode(authStringBase64);
|
|
const [user, pass] = authString.split(':');
|
|
|
|
// Use constant-time comparison to prevent timing attacks
|
|
const userMatch = user === authInfo.user;
|
|
const passMatch = pass === authInfo.pass;
|
|
|
|
return userMatch && passMatch;
|
|
}
|
|
default:
|
|
throw new Error(`Unsupported authentication method: ${authInfo.type}`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Forwards a request to the destination using connection pool
|
|
* for optimized connection reuse from PortProxy
|
|
*/
|
|
private async forwardRequestUsingConnectionPool(
|
|
reqId: string,
|
|
originRequest: plugins.http.IncomingMessage,
|
|
originResponse: plugins.http.ServerResponse,
|
|
host: string,
|
|
port: number,
|
|
path: string
|
|
): Promise<void> {
|
|
try {
|
|
// Try to get a connection from the pool
|
|
const socket = await this.getConnectionFromPool(host, port);
|
|
|
|
// Create an HTTP client request using the pooled socket
|
|
const reqOptions = {
|
|
createConnection: () => socket,
|
|
host,
|
|
port,
|
|
path,
|
|
method: originRequest.method,
|
|
headers: this.prepareForwardHeaders(originRequest),
|
|
timeout: 30000 // 30 second timeout
|
|
};
|
|
|
|
const proxyReq = plugins.http.request(reqOptions);
|
|
|
|
// Handle timeouts
|
|
proxyReq.on('timeout', () => {
|
|
this.log('warn', `[${reqId}] Request to ${host}:${port}${path} timed out`);
|
|
proxyReq.destroy();
|
|
});
|
|
|
|
// Handle errors
|
|
proxyReq.on('error', (err) => {
|
|
this.log('error', `[${reqId}] Error in proxy request to ${host}:${port}${path}`, err);
|
|
|
|
// Check if the client response is still writable
|
|
if (!originResponse.writableEnded) {
|
|
this.sendErrorResponse(originResponse, 502, 'Bad Gateway: Error communicating with upstream server');
|
|
}
|
|
|
|
// Don't return the socket to the pool on error
|
|
try {
|
|
if (!socket.destroyed) {
|
|
socket.destroy();
|
|
}
|
|
} catch (socketErr) {
|
|
this.log('error', `[${reqId}] Error destroying socket after request error`, socketErr);
|
|
}
|
|
});
|
|
|
|
// Forward request body
|
|
originRequest.pipe(proxyReq);
|
|
|
|
// Handle response
|
|
proxyReq.on('response', (proxyRes) => {
|
|
// Copy status and headers
|
|
originResponse.statusCode = proxyRes.statusCode;
|
|
|
|
for (const [name, value] of Object.entries(proxyRes.headers)) {
|
|
if (value !== undefined) {
|
|
originResponse.setHeader(name, value);
|
|
}
|
|
}
|
|
|
|
// Forward the response body
|
|
proxyRes.pipe(originResponse);
|
|
|
|
// Return connection to pool when the response completes
|
|
proxyRes.on('end', () => {
|
|
if (!socket.destroyed) {
|
|
this.returnConnectionToPool(socket, host, port);
|
|
}
|
|
});
|
|
|
|
proxyRes.on('error', (err) => {
|
|
this.log('error', `[${reqId}] Error in proxy response from ${host}:${port}${path}`, err);
|
|
|
|
// Don't return the socket to the pool on error
|
|
try {
|
|
if (!socket.destroyed) {
|
|
socket.destroy();
|
|
}
|
|
} catch (socketErr) {
|
|
this.log('error', `[${reqId}] Error destroying socket after response error`, socketErr);
|
|
}
|
|
});
|
|
});
|
|
} catch (error) {
|
|
this.log('error', `[${reqId}] Error setting up pooled connection to ${host}:${port}`, error);
|
|
this.sendErrorResponse(originResponse, 502, 'Bad Gateway: Unable to reach upstream server');
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Forwards a request to the destination (standard method)
|
|
*/
|
|
private async forwardRequest(
|
|
reqId: string,
|
|
originRequest: plugins.http.IncomingMessage,
|
|
originResponse: plugins.http.ServerResponse,
|
|
destinationUrl: string
|
|
): Promise<void> {
|
|
try {
|
|
const proxyRequest = await plugins.smartrequest.request(
|
|
destinationUrl,
|
|
{
|
|
method: originRequest.method,
|
|
headers: this.prepareForwardHeaders(originRequest),
|
|
keepAlive: true,
|
|
timeout: 30000 // 30 second timeout
|
|
},
|
|
true, // streaming
|
|
(proxyRequestStream) => this.setupRequestStreaming(originRequest, proxyRequestStream)
|
|
);
|
|
|
|
// Handle the response
|
|
this.processProxyResponse(reqId, originResponse, proxyRequest);
|
|
} catch (error) {
|
|
this.log('error', `[${reqId}] Error forwarding request`, error);
|
|
this.sendErrorResponse(originResponse, 502, 'Bad Gateway: Unable to reach upstream server');
|
|
throw error; // Let the main handler catch this
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Prepares headers to forward to the backend
|
|
*/
|
|
private prepareForwardHeaders(req: plugins.http.IncomingMessage): plugins.http.OutgoingHttpHeaders {
|
|
const safeHeaders = { ...req.headers };
|
|
|
|
// Add forwarding headers
|
|
safeHeaders['X-Forwarded-Host'] = req.headers.host;
|
|
safeHeaders['X-Forwarded-Proto'] = 'https';
|
|
safeHeaders['X-Forwarded-For'] = (req.socket.remoteAddress || '').replace(/^::ffff:/, '');
|
|
|
|
// Add proxy-specific headers
|
|
safeHeaders['X-Proxy-Id'] = `NetworkProxy-${this.options.port}`;
|
|
|
|
// If this is coming from PortProxy, add a header to indicate that
|
|
if (this.options.portProxyIntegration && req.socket.remoteAddress?.includes('127.0.0.1')) {
|
|
safeHeaders['X-PortProxy-Forwarded'] = 'true';
|
|
}
|
|
|
|
// Remove sensitive headers we don't want to forward
|
|
const sensitiveHeaders = ['connection', 'upgrade', 'http2-settings'];
|
|
for (const header of sensitiveHeaders) {
|
|
delete safeHeaders[header];
|
|
}
|
|
|
|
return safeHeaders;
|
|
}
|
|
|
|
/**
|
|
* Sets up request streaming for the proxy
|
|
*/
|
|
private setupRequestStreaming(
|
|
originRequest: plugins.http.IncomingMessage,
|
|
proxyRequest: plugins.http.ClientRequest
|
|
): void {
|
|
// Forward request body data
|
|
originRequest.on('data', (chunk) => {
|
|
proxyRequest.write(chunk);
|
|
});
|
|
|
|
// End the request when done
|
|
originRequest.on('end', () => {
|
|
proxyRequest.end();
|
|
});
|
|
|
|
// Handle request errors
|
|
originRequest.on('error', (error) => {
|
|
this.log('error', 'Error in client request stream', error);
|
|
proxyRequest.destroy(error);
|
|
});
|
|
|
|
// Handle client abort/timeout
|
|
originRequest.on('close', () => {
|
|
if (!originRequest.complete) {
|
|
this.log('debug', 'Client closed connection before request completed');
|
|
proxyRequest.destroy();
|
|
}
|
|
});
|
|
|
|
originRequest.on('timeout', () => {
|
|
this.log('debug', 'Client request timeout');
|
|
proxyRequest.destroy(new Error('Client request timeout'));
|
|
});
|
|
|
|
// Handle proxy request errors
|
|
proxyRequest.on('error', (error) => {
|
|
this.log('error', 'Error in outgoing proxy request', error);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Processes a proxy response
|
|
*/
|
|
private processProxyResponse(
|
|
reqId: string,
|
|
originResponse: plugins.http.ServerResponse,
|
|
proxyResponse: plugins.http.IncomingMessage
|
|
): void {
|
|
this.log('debug', `[${reqId}] Received upstream response: ${proxyResponse.statusCode}`);
|
|
|
|
// Set status code
|
|
originResponse.statusCode = proxyResponse.statusCode;
|
|
|
|
// Add default headers
|
|
for (const [headerName, headerValue] of Object.entries(this.defaultHeaders)) {
|
|
originResponse.setHeader(headerName, headerValue);
|
|
}
|
|
|
|
// Add CORS headers if enabled
|
|
if (this.options.cors) {
|
|
originResponse.setHeader('Access-Control-Allow-Origin', this.options.cors.allowOrigin);
|
|
}
|
|
|
|
// Copy response headers
|
|
for (const [headerName, headerValue] of Object.entries(proxyResponse.headers)) {
|
|
// Skip hop-by-hop headers
|
|
const hopByHopHeaders = ['connection', 'keep-alive', 'transfer-encoding', 'te',
|
|
'trailer', 'upgrade', 'proxy-authorization', 'proxy-authenticate'];
|
|
if (!hopByHopHeaders.includes(headerName.toLowerCase())) {
|
|
originResponse.setHeader(headerName, headerValue);
|
|
}
|
|
}
|
|
|
|
// Stream response body
|
|
proxyResponse.on('data', (chunk) => {
|
|
const canContinue = originResponse.write(chunk);
|
|
|
|
// Apply backpressure if needed
|
|
if (!canContinue) {
|
|
proxyResponse.pause();
|
|
originResponse.once('drain', () => {
|
|
proxyResponse.resume();
|
|
});
|
|
}
|
|
});
|
|
|
|
// End the response when done
|
|
proxyResponse.on('end', () => {
|
|
originResponse.end();
|
|
});
|
|
|
|
// Handle response errors
|
|
proxyResponse.on('error', (error) => {
|
|
this.log('error', `[${reqId}] Error in proxy response stream`, error);
|
|
originResponse.destroy(error);
|
|
});
|
|
|
|
originResponse.on('error', (error) => {
|
|
this.log('error', `[${reqId}] Error in client response stream`, error);
|
|
proxyResponse.destroy();
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Sends an error response to the client
|
|
*/
|
|
private sendErrorResponse(
|
|
res: plugins.http.ServerResponse,
|
|
statusCode: number = 500,
|
|
message: string = 'Internal Server Error',
|
|
headers: plugins.http.OutgoingHttpHeaders = {}
|
|
): void {
|
|
try {
|
|
// If headers already sent, just end the response
|
|
if (res.headersSent) {
|
|
res.end();
|
|
return;
|
|
}
|
|
|
|
// Add default headers
|
|
for (const [key, value] of Object.entries(this.defaultHeaders)) {
|
|
res.setHeader(key, value);
|
|
}
|
|
|
|
// Add provided headers
|
|
for (const [key, value] of Object.entries(headers)) {
|
|
res.setHeader(key, value);
|
|
}
|
|
|
|
// Send error response
|
|
res.writeHead(statusCode, message);
|
|
|
|
// Send error body as JSON for API clients
|
|
if (res.getHeader('Content-Type') === 'application/json') {
|
|
res.end(JSON.stringify({ error: { status: statusCode, message } }));
|
|
} else {
|
|
// Send as plain text
|
|
res.end(message);
|
|
}
|
|
} catch (error) {
|
|
this.log('error', 'Error sending error response', error);
|
|
try {
|
|
res.destroy();
|
|
} catch (destroyError) {
|
|
// Last resort - nothing more we can do
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Updates proxy configurations
|
|
*/
|
|
public async updateProxyConfigs(
|
|
proxyConfigsArg: plugins.tsclass.network.IReverseProxyConfig[]
|
|
): Promise<void> {
|
|
this.log('info', `Updating proxy configurations (${proxyConfigsArg.length} configs)`);
|
|
|
|
// Update internal configs
|
|
this.proxyConfigs = proxyConfigsArg;
|
|
this.router.setNewProxyConfigs(proxyConfigsArg);
|
|
|
|
// Collect all hostnames for cleanup later
|
|
const currentHostNames = new Set<string>();
|
|
|
|
// Add/update SSL contexts for each host
|
|
for (const config of proxyConfigsArg) {
|
|
currentHostNames.add(config.hostName);
|
|
|
|
try {
|
|
// Check if we need to update the cert
|
|
const currentCert = this.certificateCache.get(config.hostName);
|
|
const shouldUpdate = !currentCert ||
|
|
currentCert.key !== config.privateKey ||
|
|
currentCert.cert !== config.publicKey;
|
|
|
|
if (shouldUpdate) {
|
|
this.log('debug', `Updating SSL context for ${config.hostName}`);
|
|
|
|
// Update the HTTPS server context
|
|
this.httpsServer.addContext(config.hostName, {
|
|
key: config.privateKey,
|
|
cert: config.publicKey
|
|
});
|
|
|
|
// Update the cache
|
|
this.certificateCache.set(config.hostName, {
|
|
key: config.privateKey,
|
|
cert: config.publicKey
|
|
});
|
|
|
|
this.activeContexts.add(config.hostName);
|
|
}
|
|
} catch (error) {
|
|
this.log('error', `Failed to add SSL context for ${config.hostName}`, error);
|
|
}
|
|
}
|
|
|
|
// Clean up removed contexts
|
|
// Note: Node.js doesn't officially support removing contexts
|
|
// This would require server restart in production
|
|
for (const hostname of this.activeContexts) {
|
|
if (!currentHostNames.has(hostname)) {
|
|
this.log('info', `Hostname ${hostname} removed from configuration`);
|
|
this.activeContexts.delete(hostname);
|
|
this.certificateCache.delete(hostname);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Adds default headers to be included in all responses
|
|
*/
|
|
public async addDefaultHeaders(headersArg: { [key: string]: string }): Promise<void> {
|
|
this.log('info', 'Adding default headers', headersArg);
|
|
this.defaultHeaders = {
|
|
...this.defaultHeaders,
|
|
...headersArg
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Stops the proxy server
|
|
*/
|
|
public async stop(): Promise<void> {
|
|
this.log('info', 'Stopping NetworkProxy server');
|
|
|
|
// Clear intervals
|
|
if (this.heartbeatInterval) {
|
|
clearInterval(this.heartbeatInterval);
|
|
}
|
|
|
|
if (this.metricsInterval) {
|
|
clearInterval(this.metricsInterval);
|
|
}
|
|
|
|
if (this.connectionPoolCleanupInterval) {
|
|
clearInterval(this.connectionPoolCleanupInterval);
|
|
}
|
|
|
|
// Close WebSocket server if exists
|
|
if (this.wsServer) {
|
|
for (const client of this.wsServer.clients) {
|
|
try {
|
|
client.terminate();
|
|
} catch (error) {
|
|
this.log('error', 'Error terminating WebSocket client', error);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close all tracked sockets
|
|
for (const socket of this.socketMap.getArray()) {
|
|
try {
|
|
socket.destroy();
|
|
} catch (error) {
|
|
this.log('error', 'Error destroying socket', error);
|
|
}
|
|
}
|
|
|
|
// Close all connection pool connections
|
|
for (const [host, connections] of this.connectionPool.entries()) {
|
|
for (const connection of connections) {
|
|
try {
|
|
if (!connection.socket.destroyed) {
|
|
connection.socket.destroy();
|
|
}
|
|
} catch (error) {
|
|
this.log('error', `Error destroying pooled connection to ${host}`, error);
|
|
}
|
|
}
|
|
}
|
|
this.connectionPool.clear();
|
|
|
|
// Close the HTTPS server
|
|
return new Promise((resolve) => {
|
|
this.httpsServer.close(() => {
|
|
this.log('info', 'NetworkProxy server stopped successfully');
|
|
resolve();
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Logs a message according to the configured log level
|
|
*/
|
|
private log(level: 'error' | 'warn' | 'info' | 'debug', message: string, data?: any): void {
|
|
const logLevels = {
|
|
error: 0,
|
|
warn: 1,
|
|
info: 2,
|
|
debug: 3
|
|
};
|
|
|
|
// Skip if log level is higher than configured
|
|
if (logLevels[level] > logLevels[this.options.logLevel]) {
|
|
return;
|
|
}
|
|
|
|
const timestamp = new Date().toISOString();
|
|
const prefix = `[${timestamp}] [${level.toUpperCase()}]`;
|
|
|
|
switch (level) {
|
|
case 'error':
|
|
console.error(`${prefix} ${message}`, data || '');
|
|
break;
|
|
case 'warn':
|
|
console.warn(`${prefix} ${message}`, data || '');
|
|
break;
|
|
case 'info':
|
|
console.log(`${prefix} ${message}`, data || '');
|
|
break;
|
|
case 'debug':
|
|
console.log(`${prefix} ${message}`, data || '');
|
|
break;
|
|
}
|
|
}
|
|
} |