226 lines
7.4 KiB
TypeScript
226 lines
7.4 KiB
TypeScript
|
import * as plugins from '../plugins.js';
|
||
|
import { type INetworkProxyOptions, type IWebSocketWithHeartbeat, type ILogger, createLogger, type IReverseProxyConfig } from './classes.np.types.js';
|
||
|
import { ConnectionPool } from './classes.np.connectionpool.js';
|
||
|
import { ProxyRouter } from '../classes.router.js';
|
||
|
|
||
|
/**
|
||
|
* Handles WebSocket connections and proxying
|
||
|
*/
|
||
|
export class WebSocketHandler {
|
||
|
private heartbeatInterval: NodeJS.Timeout | null = null;
|
||
|
private wsServer: plugins.ws.WebSocketServer | null = null;
|
||
|
private logger: ILogger;
|
||
|
|
||
|
constructor(
|
||
|
private options: INetworkProxyOptions,
|
||
|
private connectionPool: ConnectionPool,
|
||
|
private router: ProxyRouter
|
||
|
) {
|
||
|
this.logger = createLogger(options.logLevel || 'info');
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Initialize WebSocket server on an existing HTTPS server
|
||
|
*/
|
||
|
public initialize(server: plugins.https.Server): void {
|
||
|
// Create WebSocket server
|
||
|
this.wsServer = new plugins.ws.WebSocketServer({
|
||
|
server: server,
|
||
|
clientTracking: true
|
||
|
});
|
||
|
|
||
|
// Handle WebSocket connections
|
||
|
this.wsServer.on('connection', (wsIncoming: IWebSocketWithHeartbeat, req: plugins.http.IncomingMessage) => {
|
||
|
this.handleWebSocketConnection(wsIncoming, req);
|
||
|
});
|
||
|
|
||
|
// Start the heartbeat interval
|
||
|
this.startHeartbeat();
|
||
|
|
||
|
this.logger.info('WebSocket handler initialized');
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Start the heartbeat interval to check for inactive WebSocket connections
|
||
|
*/
|
||
|
private startHeartbeat(): void {
|
||
|
// Clean up existing interval if any
|
||
|
if (this.heartbeatInterval) {
|
||
|
clearInterval(this.heartbeatInterval);
|
||
|
}
|
||
|
|
||
|
// Set up the heartbeat interval (check every 30 seconds)
|
||
|
this.heartbeatInterval = setInterval(() => {
|
||
|
if (!this.wsServer || this.wsServer.clients.size === 0) {
|
||
|
return; // Skip if no active connections
|
||
|
}
|
||
|
|
||
|
this.logger.debug(`WebSocket heartbeat check for ${this.wsServer.clients.size} clients`);
|
||
|
|
||
|
this.wsServer.clients.forEach((ws: plugins.wsDefault) => {
|
||
|
const wsWithHeartbeat = ws as IWebSocketWithHeartbeat;
|
||
|
|
||
|
if (wsWithHeartbeat.isAlive === false) {
|
||
|
this.logger.debug('Terminating inactive WebSocket connection');
|
||
|
return wsWithHeartbeat.terminate();
|
||
|
}
|
||
|
|
||
|
wsWithHeartbeat.isAlive = false;
|
||
|
wsWithHeartbeat.ping();
|
||
|
});
|
||
|
}, 30000);
|
||
|
|
||
|
// Make sure the interval doesn't keep the process alive
|
||
|
if (this.heartbeatInterval.unref) {
|
||
|
this.heartbeatInterval.unref();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Handle a new WebSocket connection
|
||
|
*/
|
||
|
private handleWebSocketConnection(wsIncoming: IWebSocketWithHeartbeat, req: plugins.http.IncomingMessage): void {
|
||
|
try {
|
||
|
// Initialize heartbeat tracking
|
||
|
wsIncoming.isAlive = true;
|
||
|
wsIncoming.lastPong = Date.now();
|
||
|
|
||
|
// Handle pong messages to track liveness
|
||
|
wsIncoming.on('pong', () => {
|
||
|
wsIncoming.isAlive = true;
|
||
|
wsIncoming.lastPong = Date.now();
|
||
|
});
|
||
|
|
||
|
// Find target configuration based on request
|
||
|
const proxyConfig = this.router.routeReq(req);
|
||
|
|
||
|
if (!proxyConfig) {
|
||
|
this.logger.warn(`No proxy configuration for WebSocket host: ${req.headers.host}`);
|
||
|
wsIncoming.close(1008, 'No proxy configuration for this host');
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
// Get destination target using round-robin if multiple targets
|
||
|
const destination = this.connectionPool.getNextTarget(
|
||
|
proxyConfig.destinationIps,
|
||
|
proxyConfig.destinationPorts[0]
|
||
|
);
|
||
|
|
||
|
// Build target URL
|
||
|
const protocol = (req.socket as any).encrypted ? 'wss' : 'ws';
|
||
|
const targetUrl = `${protocol}://${destination.host}:${destination.port}${req.url}`;
|
||
|
|
||
|
this.logger.debug(`WebSocket connection from ${req.socket.remoteAddress} to ${targetUrl}`);
|
||
|
|
||
|
// Create headers for outgoing WebSocket connection
|
||
|
const headers: { [key: string]: string } = {};
|
||
|
|
||
|
// Copy relevant headers from incoming request
|
||
|
for (const [key, value] of Object.entries(req.headers)) {
|
||
|
if (value && typeof value === 'string' &&
|
||
|
key.toLowerCase() !== 'connection' &&
|
||
|
key.toLowerCase() !== 'upgrade' &&
|
||
|
key.toLowerCase() !== 'sec-websocket-key' &&
|
||
|
key.toLowerCase() !== 'sec-websocket-version') {
|
||
|
headers[key] = value;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Override host header if needed
|
||
|
if ((proxyConfig as IReverseProxyConfig).rewriteHostHeader) {
|
||
|
headers['host'] = `${destination.host}:${destination.port}`;
|
||
|
}
|
||
|
|
||
|
// Create outgoing WebSocket connection
|
||
|
const wsOutgoing = new plugins.wsDefault(targetUrl, {
|
||
|
headers: headers,
|
||
|
followRedirects: true
|
||
|
});
|
||
|
|
||
|
// Handle connection errors
|
||
|
wsOutgoing.on('error', (err) => {
|
||
|
this.logger.error(`WebSocket target connection error: ${err.message}`);
|
||
|
if (wsIncoming.readyState === wsIncoming.OPEN) {
|
||
|
wsIncoming.close(1011, 'Internal server error');
|
||
|
}
|
||
|
});
|
||
|
|
||
|
// Handle outgoing connection open
|
||
|
wsOutgoing.on('open', () => {
|
||
|
// Forward incoming messages to outgoing connection
|
||
|
wsIncoming.on('message', (data, isBinary) => {
|
||
|
if (wsOutgoing.readyState === wsOutgoing.OPEN) {
|
||
|
wsOutgoing.send(data, { binary: isBinary });
|
||
|
}
|
||
|
});
|
||
|
|
||
|
// Forward outgoing messages to incoming connection
|
||
|
wsOutgoing.on('message', (data, isBinary) => {
|
||
|
if (wsIncoming.readyState === wsIncoming.OPEN) {
|
||
|
wsIncoming.send(data, { binary: isBinary });
|
||
|
}
|
||
|
});
|
||
|
|
||
|
// Handle closing of connections
|
||
|
wsIncoming.on('close', (code, reason) => {
|
||
|
this.logger.debug(`WebSocket client connection closed: ${code} ${reason}`);
|
||
|
if (wsOutgoing.readyState === wsOutgoing.OPEN) {
|
||
|
wsOutgoing.close(code, reason);
|
||
|
}
|
||
|
});
|
||
|
|
||
|
wsOutgoing.on('close', (code, reason) => {
|
||
|
this.logger.debug(`WebSocket target connection closed: ${code} ${reason}`);
|
||
|
if (wsIncoming.readyState === wsIncoming.OPEN) {
|
||
|
wsIncoming.close(code, reason);
|
||
|
}
|
||
|
});
|
||
|
|
||
|
this.logger.debug(`WebSocket connection established: ${req.headers.host} -> ${destination.host}:${destination.port}`);
|
||
|
});
|
||
|
|
||
|
} catch (error) {
|
||
|
this.logger.error(`Error handling WebSocket connection: ${error.message}`);
|
||
|
if (wsIncoming.readyState === wsIncoming.OPEN) {
|
||
|
wsIncoming.close(1011, 'Internal server error');
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Get information about active WebSocket connections
|
||
|
*/
|
||
|
public getConnectionInfo(): { activeConnections: number } {
|
||
|
return {
|
||
|
activeConnections: this.wsServer ? this.wsServer.clients.size : 0
|
||
|
};
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Shutdown the WebSocket handler
|
||
|
*/
|
||
|
public shutdown(): void {
|
||
|
// Stop heartbeat interval
|
||
|
if (this.heartbeatInterval) {
|
||
|
clearInterval(this.heartbeatInterval);
|
||
|
this.heartbeatInterval = null;
|
||
|
}
|
||
|
|
||
|
// Close all WebSocket connections
|
||
|
if (this.wsServer) {
|
||
|
this.logger.info(`Closing ${this.wsServer.clients.size} WebSocket connections`);
|
||
|
|
||
|
for (const client of this.wsServer.clients) {
|
||
|
try {
|
||
|
client.terminate();
|
||
|
} catch (error) {
|
||
|
this.logger.error('Error terminating WebSocket client', error);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Close the server
|
||
|
this.wsServer.close();
|
||
|
this.wsServer = null;
|
||
|
}
|
||
|
}
|
||
|
}
|