fix(networkproxy): Refactor and improve WebSocket handling and request processing
This commit is contained in:
@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartproxy',
|
||||
version: '3.1.2',
|
||||
version: '3.1.3',
|
||||
description: 'a proxy for handling high workloads of proxying'
|
||||
}
|
||||
|
@ -8,12 +8,11 @@ export interface INetworkProxyOptions {
|
||||
port: number;
|
||||
}
|
||||
|
||||
interface WebSocketWithHeartbeat extends plugins.wsDefault {
|
||||
interface IWebSocketWithHeartbeat extends plugins.wsDefault {
|
||||
lastPong: number;
|
||||
}
|
||||
|
||||
export class NetworkProxy {
|
||||
// INSTANCE
|
||||
public options: INetworkProxyOptions;
|
||||
public proxyConfigs: plugins.tsclass.network.IReverseProxyConfig[] = [];
|
||||
public httpsServer: plugins.https.Server;
|
||||
@ -43,148 +42,23 @@ export class NetworkProxy {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* starts the proxyInstance
|
||||
*/
|
||||
public async start() {
|
||||
// Instead of marking the callback async (which Node won't await),
|
||||
// we call our async handler and catch errors.
|
||||
this.httpsServer = plugins.https.createServer(
|
||||
{
|
||||
key: this.defaultCertificates.key,
|
||||
cert: this.defaultCertificates.cert
|
||||
},
|
||||
async (originRequest, originResponse) => {
|
||||
/**
|
||||
* endRequest function
|
||||
* can be used to prematurely end a request
|
||||
*/
|
||||
const endOriginReqRes = (
|
||||
statusArg: number = 404,
|
||||
messageArg: string = 'This route is not available on this server.',
|
||||
headers: plugins.http.OutgoingHttpHeaders = {},
|
||||
) => {
|
||||
originResponse.writeHead(statusArg, messageArg);
|
||||
originResponse.end(messageArg);
|
||||
if (originRequest.socket !== originResponse.socket) {
|
||||
console.log('hey, something is strange.');
|
||||
}
|
||||
originResponse.destroy();
|
||||
};
|
||||
|
||||
console.log(
|
||||
`got request: ${originRequest.headers.host}${plugins.url.parse(originRequest.url).path}`,
|
||||
);
|
||||
const destinationConfig = this.router.routeReq(originRequest);
|
||||
|
||||
if (!destinationConfig) {
|
||||
console.log(
|
||||
`${originRequest.headers.host} can't be routed properly. Terminating request.`,
|
||||
);
|
||||
endOriginReqRes();
|
||||
return;
|
||||
}
|
||||
|
||||
// authentication
|
||||
if (destinationConfig.authentication) {
|
||||
const authInfo = destinationConfig.authentication;
|
||||
switch (authInfo.type) {
|
||||
case 'Basic':
|
||||
const authHeader = originRequest.headers.authorization;
|
||||
if (authHeader) {
|
||||
if (!authHeader.includes('Basic ')) {
|
||||
return endOriginReqRes(401, 'Authentication required', {
|
||||
'WWW-Authenticate': 'Basic realm="Access to the staging site", charset="UTF-8"',
|
||||
});
|
||||
}
|
||||
const authStringBase64 = originRequest.headers.authorization.replace('Basic ', '');
|
||||
const authString: string = plugins.smartstring.base64.decode(authStringBase64);
|
||||
const userPassArray = authString.split(':');
|
||||
const user = userPassArray[0];
|
||||
const pass = userPassArray[1];
|
||||
if (user === authInfo.user && pass === authInfo.pass) {
|
||||
console.log('request successfully authenticated');
|
||||
} else {
|
||||
return endOriginReqRes(403, 'Forbidden: Wrong credentials');
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
return endOriginReqRes(
|
||||
403,
|
||||
'Forbidden: unsupported authentication method configured. Please report to the admin.',
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let destinationUrl: string;
|
||||
if (destinationConfig) {
|
||||
destinationUrl = `http://${destinationConfig.destinationIp}:${destinationConfig.destinationPort}${originRequest.url}`;
|
||||
} else {
|
||||
return endOriginReqRes();
|
||||
}
|
||||
console.log(destinationUrl);
|
||||
try {
|
||||
const proxyResponse = await plugins.smartrequest.request(
|
||||
destinationUrl,
|
||||
{
|
||||
method: originRequest.method,
|
||||
headers: {
|
||||
...originRequest.headers,
|
||||
'X-Forwarded-Host': originRequest.headers.host,
|
||||
'X-Forwarded-Proto': 'https',
|
||||
},
|
||||
keepAlive: true,
|
||||
},
|
||||
true, // lets make this streaming (keepAlive)
|
||||
(proxyRequest) => {
|
||||
originRequest.on('data', (data) => {
|
||||
proxyRequest.write(data);
|
||||
});
|
||||
originRequest.on('end', () => {
|
||||
proxyRequest.end();
|
||||
});
|
||||
originRequest.on('error', () => {
|
||||
proxyRequest.end();
|
||||
});
|
||||
originRequest.on('close', () => {
|
||||
proxyRequest.end();
|
||||
});
|
||||
originRequest.on('timeout', () => {
|
||||
proxyRequest.end();
|
||||
originRequest.destroy();
|
||||
});
|
||||
proxyRequest.on('error', () => {
|
||||
endOriginReqRes();
|
||||
});
|
||||
},
|
||||
);
|
||||
originResponse.statusCode = proxyResponse.statusCode;
|
||||
console.log(proxyResponse.statusCode);
|
||||
for (const defaultHeader of Object.keys(this.defaultHeaders)) {
|
||||
originResponse.setHeader(defaultHeader, this.defaultHeaders[defaultHeader]);
|
||||
}
|
||||
for (const header of Object.keys(proxyResponse.headers)) {
|
||||
originResponse.setHeader(header, proxyResponse.headers[header]);
|
||||
}
|
||||
proxyResponse.on('data', (data) => {
|
||||
originResponse.write(data);
|
||||
});
|
||||
proxyResponse.on('end', () => {
|
||||
(originRequest, originResponse) => {
|
||||
this.handleRequest(originRequest, originResponse).catch((error) => {
|
||||
console.error('Unhandled error in request handler:', error);
|
||||
try {
|
||||
originResponse.end();
|
||||
});
|
||||
proxyResponse.on('error', () => {
|
||||
originResponse.destroy();
|
||||
});
|
||||
proxyResponse.on('close', () => {
|
||||
originResponse.end();
|
||||
});
|
||||
proxyResponse.on('timeout', () => {
|
||||
originResponse.end();
|
||||
originResponse.destroy();
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('Error while processing request:', error);
|
||||
endOriginReqRes(502, 'Bad Gateway: Error processing the request');
|
||||
}
|
||||
} catch (err) {
|
||||
// ignore errors during cleanup
|
||||
}
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
@ -194,7 +68,7 @@ export class NetworkProxy {
|
||||
// Set up the heartbeat interval
|
||||
this.heartbeatInterval = setInterval(() => {
|
||||
wsServer.clients.forEach((ws: plugins.wsDefault) => {
|
||||
const wsIncoming = ws as WebSocketWithHeartbeat;
|
||||
const wsIncoming = ws as IWebSocketWithHeartbeat;
|
||||
if (!wsIncoming.lastPong) {
|
||||
wsIncoming.lastPong = Date.now();
|
||||
}
|
||||
@ -209,7 +83,7 @@ export class NetworkProxy {
|
||||
|
||||
wsServer.on(
|
||||
'connection',
|
||||
async (wsIncoming: WebSocketWithHeartbeat, reqArg: plugins.http.IncomingMessage) => {
|
||||
(wsIncoming: IWebSocketWithHeartbeat, reqArg: plugins.http.IncomingMessage) => {
|
||||
console.log(
|
||||
`wss proxy: got connection for wsc for https://${reqArg.headers.host}${reqArg.url}`,
|
||||
);
|
||||
@ -220,21 +94,24 @@ export class NetworkProxy {
|
||||
});
|
||||
|
||||
let wsOutgoing: plugins.wsDefault;
|
||||
|
||||
const outGoingDeferred = plugins.smartpromise.defer();
|
||||
|
||||
// --- Improvement 2: Only call routeReq once ---
|
||||
const wsDestinationConfig = this.router.routeReq(reqArg);
|
||||
if (!wsDestinationConfig) {
|
||||
wsIncoming.terminate();
|
||||
return;
|
||||
}
|
||||
try {
|
||||
wsOutgoing = new plugins.wsDefault(
|
||||
`ws://${this.router.routeReq(reqArg).destinationIp}:${
|
||||
this.router.routeReq(reqArg).destinationPort
|
||||
}${reqArg.url}`,
|
||||
`ws://${wsDestinationConfig.destinationIp}:${wsDestinationConfig.destinationPort}${reqArg.url}`,
|
||||
);
|
||||
console.log('wss proxy: initiated outgoing proxy');
|
||||
wsOutgoing.on('open', async () => {
|
||||
outGoingDeferred.resolve();
|
||||
});
|
||||
} catch (err) {
|
||||
console.log(err);
|
||||
console.error('Error initiating outgoing WebSocket:', err);
|
||||
wsIncoming.terminate();
|
||||
return;
|
||||
}
|
||||
@ -259,20 +136,20 @@ export class NetworkProxy {
|
||||
const terminateWsOutgoing = () => {
|
||||
if (wsOutgoing) {
|
||||
wsOutgoing.terminate();
|
||||
console.log('terminated outgoing ws.');
|
||||
console.log('Terminated outgoing ws.');
|
||||
}
|
||||
};
|
||||
wsIncoming.on('error', () => terminateWsOutgoing());
|
||||
wsIncoming.on('close', () => terminateWsOutgoing());
|
||||
wsIncoming.on('error', terminateWsOutgoing);
|
||||
wsIncoming.on('close', terminateWsOutgoing);
|
||||
|
||||
const terminateWsIncoming = () => {
|
||||
if (wsIncoming) {
|
||||
wsIncoming.terminate();
|
||||
console.log('terminated incoming ws.');
|
||||
console.log('Terminated incoming ws.');
|
||||
}
|
||||
};
|
||||
wsOutgoing.on('error', () => terminateWsIncoming());
|
||||
wsOutgoing.on('close', () => terminateWsIncoming());
|
||||
wsOutgoing.on('error', terminateWsIncoming);
|
||||
wsOutgoing.on('close', terminateWsIncoming);
|
||||
},
|
||||
);
|
||||
|
||||
@ -281,26 +158,18 @@ export class NetworkProxy {
|
||||
|
||||
this.httpsServer.on('connection', (connection: plugins.net.Socket) => {
|
||||
this.socketMap.add(connection);
|
||||
console.log(`added connection. now ${this.socketMap.getArray().length} sockets connected.`);
|
||||
console.log(`Added connection. Now ${this.socketMap.getArray().length} sockets connected.`);
|
||||
const cleanupConnection = () => {
|
||||
if (this.socketMap.checkForObject(connection)) {
|
||||
this.socketMap.remove(connection);
|
||||
console.log(`removed connection. ${this.socketMap.getArray().length} sockets remaining.`);
|
||||
console.log(`Removed connection. ${this.socketMap.getArray().length} sockets remaining.`);
|
||||
connection.destroy();
|
||||
}
|
||||
};
|
||||
connection.on('close', () => {
|
||||
cleanupConnection();
|
||||
});
|
||||
connection.on('error', () => {
|
||||
cleanupConnection();
|
||||
});
|
||||
connection.on('end', () => {
|
||||
cleanupConnection();
|
||||
});
|
||||
connection.on('timeout', () => {
|
||||
cleanupConnection();
|
||||
});
|
||||
connection.on('close', cleanupConnection);
|
||||
connection.on('error', cleanupConnection);
|
||||
connection.on('end', cleanupConnection);
|
||||
connection.on('timeout', cleanupConnection);
|
||||
});
|
||||
|
||||
this.httpsServer.listen(this.options.port);
|
||||
@ -309,7 +178,150 @@ export class NetworkProxy {
|
||||
);
|
||||
}
|
||||
|
||||
public async updateProxyConfigs(proxyConfigsArg: plugins.tsclass.network.IReverseProxyConfig[]) {
|
||||
/**
|
||||
* Internal async handler for processing HTTP/HTTPS requests.
|
||||
*/
|
||||
private async handleRequest(
|
||||
originRequest: plugins.http.IncomingMessage,
|
||||
originResponse: plugins.http.ServerResponse,
|
||||
): Promise<void> {
|
||||
const endOriginReqRes = (
|
||||
statusArg: number = 404,
|
||||
messageArg: string = 'This route is not available on this server.',
|
||||
headers: plugins.http.OutgoingHttpHeaders = {},
|
||||
) => {
|
||||
originResponse.writeHead(statusArg, messageArg);
|
||||
originResponse.end(messageArg);
|
||||
if (originRequest.socket !== originResponse.socket) {
|
||||
console.log('hey, something is strange.');
|
||||
}
|
||||
originResponse.destroy();
|
||||
};
|
||||
|
||||
console.log(
|
||||
`got request: ${originRequest.headers.host}${plugins.url.parse(originRequest.url).path}`,
|
||||
);
|
||||
const destinationConfig = this.router.routeReq(originRequest);
|
||||
|
||||
if (!destinationConfig) {
|
||||
console.log(
|
||||
`${originRequest.headers.host} can't be routed properly. Terminating request.`,
|
||||
);
|
||||
endOriginReqRes();
|
||||
return;
|
||||
}
|
||||
|
||||
// authentication
|
||||
if (destinationConfig.authentication) {
|
||||
const authInfo = destinationConfig.authentication;
|
||||
switch (authInfo.type) {
|
||||
case 'Basic': {
|
||||
const authHeader = originRequest.headers.authorization;
|
||||
if (!authHeader) {
|
||||
return endOriginReqRes(401, 'Authentication required', {
|
||||
'WWW-Authenticate': 'Basic realm="Access to the staging site", charset="UTF-8"',
|
||||
});
|
||||
}
|
||||
if (!authHeader.includes('Basic ')) {
|
||||
return endOriginReqRes(401, 'Authentication required', {
|
||||
'WWW-Authenticate': 'Basic realm="Access to the staging site", charset="UTF-8"',
|
||||
});
|
||||
}
|
||||
const authStringBase64 = authHeader.replace('Basic ', '');
|
||||
const authString: string = plugins.smartstring.base64.decode(authStringBase64);
|
||||
const userPassArray = authString.split(':');
|
||||
const user = userPassArray[0];
|
||||
const pass = userPassArray[1];
|
||||
if (user === authInfo.user && pass === authInfo.pass) {
|
||||
console.log('Request successfully authenticated');
|
||||
} else {
|
||||
return endOriginReqRes(403, 'Forbidden: Wrong credentials');
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
return endOriginReqRes(
|
||||
403,
|
||||
'Forbidden: unsupported authentication method configured. Please report to the admin.',
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let destinationUrl: string;
|
||||
if (destinationConfig) {
|
||||
destinationUrl = `http://${destinationConfig.destinationIp}:${destinationConfig.destinationPort}${originRequest.url}`;
|
||||
} else {
|
||||
return endOriginReqRes();
|
||||
}
|
||||
console.log(destinationUrl);
|
||||
try {
|
||||
const proxyResponse = await plugins.smartrequest.request(
|
||||
destinationUrl,
|
||||
{
|
||||
method: originRequest.method,
|
||||
headers: {
|
||||
...originRequest.headers,
|
||||
'X-Forwarded-Host': originRequest.headers.host,
|
||||
'X-Forwarded-Proto': 'https',
|
||||
},
|
||||
keepAlive: true,
|
||||
},
|
||||
true, // streaming (keepAlive)
|
||||
(proxyRequest) => {
|
||||
originRequest.on('data', (data) => {
|
||||
proxyRequest.write(data);
|
||||
});
|
||||
originRequest.on('end', () => {
|
||||
proxyRequest.end();
|
||||
});
|
||||
originRequest.on('error', () => {
|
||||
proxyRequest.end();
|
||||
});
|
||||
originRequest.on('close', () => {
|
||||
proxyRequest.end();
|
||||
});
|
||||
originRequest.on('timeout', () => {
|
||||
proxyRequest.end();
|
||||
originRequest.destroy();
|
||||
});
|
||||
proxyRequest.on('error', () => {
|
||||
endOriginReqRes();
|
||||
});
|
||||
},
|
||||
);
|
||||
originResponse.statusCode = proxyResponse.statusCode;
|
||||
console.log(proxyResponse.statusCode);
|
||||
for (const defaultHeader of Object.keys(this.defaultHeaders)) {
|
||||
originResponse.setHeader(defaultHeader, this.defaultHeaders[defaultHeader]);
|
||||
}
|
||||
for (const header of Object.keys(proxyResponse.headers)) {
|
||||
originResponse.setHeader(header, proxyResponse.headers[header]);
|
||||
}
|
||||
proxyResponse.on('data', (data) => {
|
||||
originResponse.write(data);
|
||||
});
|
||||
proxyResponse.on('end', () => {
|
||||
originResponse.end();
|
||||
});
|
||||
proxyResponse.on('error', () => {
|
||||
originResponse.destroy();
|
||||
});
|
||||
proxyResponse.on('close', () => {
|
||||
originResponse.end();
|
||||
});
|
||||
proxyResponse.on('timeout', () => {
|
||||
originResponse.end();
|
||||
originResponse.destroy();
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('Error while processing request:', error);
|
||||
endOriginReqRes(502, 'Bad Gateway: Error processing the request');
|
||||
}
|
||||
}
|
||||
|
||||
public async updateProxyConfigs(
|
||||
proxyConfigsArg: plugins.tsclass.network.IReverseProxyConfig[],
|
||||
) {
|
||||
console.log(`got new proxy configs`);
|
||||
this.proxyConfigs = proxyConfigsArg;
|
||||
this.router.setNewProxyConfigs(proxyConfigsArg);
|
||||
@ -347,11 +359,11 @@ export class NetworkProxy {
|
||||
this.httpsServer.close(() => {
|
||||
done.resolve();
|
||||
});
|
||||
await this.socketMap.forEach(async (socket) => {
|
||||
for (const socket of this.socketMap.getArray()) {
|
||||
socket.destroy();
|
||||
});
|
||||
}
|
||||
await done.promise;
|
||||
clearInterval(this.heartbeatInterval);
|
||||
console.log('NetworkProxy -> OK: Server has been stopped and all connections closed.');
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user