feat(NetworkProxy): Introduce WebSocket heartbeat to maintain active connections in NetworkProxy

This commit is contained in:
Philipp Kunz 2024-10-07 12:52:01 +02:00
parent ceede84774
commit 459ee7130f
3 changed files with 39 additions and 8 deletions

View File

@ -1,5 +1,12 @@
# Changelog # Changelog
## 2024-10-07 - 3.1.0 - feat(NetworkProxy)
Introduce WebSocket heartbeat to maintain active connections in NetworkProxy
- Added heartbeat mechanism to WebSocket connections to ensure they remain active.
- Terminating WebSocket if no pong is received for 5 minutes.
- Set up heartbeat interval to run every 1 minute for connection checks.
## 2024-10-07 - 3.0.61 - fix(networkproxy) ## 2024-10-07 - 3.0.61 - fix(networkproxy)
Improve error handling for proxy requests Improve error handling for proxy requests

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartproxy', name: '@push.rocks/smartproxy',
version: '3.0.61', version: '3.1.0',
description: 'a proxy for handling high workloads of proxying' description: 'a proxy for handling high workloads of proxying'
} }

View File

@ -5,6 +5,10 @@ export interface INetworkProxyOptions {
port: number; port: number;
} }
interface WebSocketWithHeartbeat extends plugins.wsDefault {
lastPong: number;
}
export class NetworkProxy { export class NetworkProxy {
// INSTANCE // INSTANCE
public options: INetworkProxyOptions; public options: INetworkProxyOptions;
@ -13,6 +17,7 @@ export class NetworkProxy {
public router = new ProxyRouter(); public router = new ProxyRouter();
public socketMap = new plugins.lik.ObjectMap<plugins.net.Socket>(); public socketMap = new plugins.lik.ObjectMap<plugins.net.Socket>();
public defaultHeaders: { [key: string]: string } = {}; public defaultHeaders: { [key: string]: string } = {};
public heartbeatInterval: NodeJS.Timeout;
public alreadyAddedReverseConfigs: { public alreadyAddedReverseConfigs: {
[hostName: string]: plugins.tsclass.network.IReverseProxyConfig; [hostName: string]: plugins.tsclass.network.IReverseProxyConfig;
@ -198,7 +203,7 @@ JNj2Dr5H0XoLFFnvuvzcRbhlJ9J67JzR+7g=
}, },
keepAlive: true, keepAlive: true,
}, },
true, // lets make this streaming true, // lets make this streaming (keepAlive)
(proxyRequest) => { (proxyRequest) => {
originRequest.on('data', (data) => { originRequest.on('data', (data) => {
proxyRequest.write(data); proxyRequest.write(data);
@ -254,13 +259,35 @@ JNj2Dr5H0XoLFFnvuvzcRbhlJ9J67JzR+7g=
// Enable websockets // Enable websockets
const wsServer = new plugins.ws.WebSocketServer({ server: this.httpsServer }); const wsServer = new plugins.ws.WebSocketServer({ server: this.httpsServer });
// Set up the heartbeat interval
this.heartbeatInterval = setInterval(() => {
wsServer.clients.forEach((ws: plugins.wsDefault) => {
const wsIncoming = ws as WebSocketWithHeartbeat;
if (!wsIncoming.lastPong) {
wsIncoming.lastPong = Date.now();
}
if (Date.now() - wsIncoming.lastPong > 5 * 60 * 1000) {
console.log('Terminating websocket due to missing pong for 5 minutes.');
wsIncoming.terminate();
} else {
wsIncoming.ping();
}
});
}, 60000); // runs every 1 minute
wsServer.on( wsServer.on(
'connection', 'connection',
async (wsIncoming: plugins.wsDefault, reqArg: plugins.http.IncomingMessage) => { async (wsIncoming: WebSocketWithHeartbeat, reqArg: plugins.http.IncomingMessage) => {
console.log( console.log(
`wss proxy: got connection for wsc for https://${reqArg.headers.host}${reqArg.url}` `wss proxy: got connection for wsc for https://${reqArg.headers.host}${reqArg.url}`
); );
wsIncoming.lastPong = Date.now();
wsIncoming.on('pong', () => {
wsIncoming.lastPong = Date.now();
});
let wsOutgoing: plugins.wsDefault; let wsOutgoing: plugins.wsDefault;
const outGoingDeferred = plugins.smartpromise.defer(); const outGoingDeferred = plugins.smartpromise.defer();
@ -315,8 +342,6 @@ JNj2Dr5H0XoLFFnvuvzcRbhlJ9J67JzR+7g=
}; };
wsOutgoing.on('error', () => terminateWsIncoming()); wsOutgoing.on('error', () => terminateWsIncoming());
wsOutgoing.on('close', () => terminateWsIncoming()); wsOutgoing.on('close', () => terminateWsIncoming());
} }
); );
@ -358,8 +383,6 @@ JNj2Dr5H0XoLFFnvuvzcRbhlJ9J67JzR+7g=
this.proxyConfigs = proxyConfigsArg; this.proxyConfigs = proxyConfigsArg;
this.router.setNewProxyConfigs(proxyConfigsArg); this.router.setNewProxyConfigs(proxyConfigsArg);
for (const hostCandidate of this.proxyConfigs) { for (const hostCandidate of this.proxyConfigs) {
// console.log(hostCandidate);
const existingHostNameConfig = this.alreadyAddedReverseConfigs[hostCandidate.hostName]; const existingHostNameConfig = this.alreadyAddedReverseConfigs[hostCandidate.hostName];
if (!existingHostNameConfig) { if (!existingHostNameConfig) {
@ -397,6 +420,7 @@ JNj2Dr5H0XoLFFnvuvzcRbhlJ9J67JzR+7g=
socket.destroy(); socket.destroy();
}); });
await done.promise; await done.promise;
clearInterval(this.heartbeatInterval);
console.log('NetworkProxy -> OK: Server has been stopped and all connections closed.'); console.log('NetworkProxy -> OK: Server has been stopped and all connections closed.');
} }
} }