update
This commit is contained in:
463
ts/proxies/network-proxy/request-handler.ts
Normal file
463
ts/proxies/network-proxy/request-handler.ts
Normal file
@ -0,0 +1,463 @@
|
||||
import * as plugins from '../../plugins.js';
|
||||
import { type NetworkProxyOptions, type Logger, createLogger, type ReverseProxyConfig } from './models/types.js';
|
||||
import { ConnectionPool } from './connection-pool.js';
|
||||
import { ProxyRouter } from '../../http/router/index.js';
|
||||
|
||||
/**
|
||||
* Interface for tracking metrics
|
||||
*/
|
||||
export interface IMetricsTracker {
|
||||
incrementRequestsServed(): void;
|
||||
incrementFailedRequests(): void;
|
||||
}
|
||||
|
||||
// Backward compatibility
|
||||
export type MetricsTracker = IMetricsTracker;
|
||||
|
||||
/**
|
||||
* Handles HTTP request processing and proxying
|
||||
*/
|
||||
export class RequestHandler {
|
||||
private defaultHeaders: { [key: string]: string } = {};
|
||||
private logger: Logger;
|
||||
private metricsTracker: IMetricsTracker | null = null;
|
||||
// HTTP/2 client sessions for backend proxying
|
||||
private h2Sessions: Map<string, plugins.http2.ClientHttp2Session> = new Map();
|
||||
|
||||
constructor(
|
||||
private options: NetworkProxyOptions,
|
||||
private connectionPool: ConnectionPool,
|
||||
private router: ProxyRouter
|
||||
) {
|
||||
this.logger = createLogger(options.logLevel || 'info');
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the metrics tracker instance
|
||||
*/
|
||||
public setMetricsTracker(tracker: IMetricsTracker): void {
|
||||
this.metricsTracker = tracker;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set default headers to be included in all responses
|
||||
*/
|
||||
public setDefaultHeaders(headers: { [key: string]: string }): void {
|
||||
this.defaultHeaders = {
|
||||
...this.defaultHeaders,
|
||||
...headers
|
||||
};
|
||||
this.logger.info('Updated default response headers');
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all default headers
|
||||
*/
|
||||
public getDefaultHeaders(): { [key: string]: string } {
|
||||
return { ...this.defaultHeaders };
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply CORS headers to response if configured
|
||||
*/
|
||||
private applyCorsHeaders(
|
||||
res: plugins.http.ServerResponse,
|
||||
req: plugins.http.IncomingMessage
|
||||
): void {
|
||||
if (!this.options.cors) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Apply CORS headers
|
||||
if (this.options.cors.allowOrigin) {
|
||||
res.setHeader('Access-Control-Allow-Origin', this.options.cors.allowOrigin);
|
||||
}
|
||||
|
||||
if (this.options.cors.allowMethods) {
|
||||
res.setHeader('Access-Control-Allow-Methods', this.options.cors.allowMethods);
|
||||
}
|
||||
|
||||
if (this.options.cors.allowHeaders) {
|
||||
res.setHeader('Access-Control-Allow-Headers', this.options.cors.allowHeaders);
|
||||
}
|
||||
|
||||
if (this.options.cors.maxAge) {
|
||||
res.setHeader('Access-Control-Max-Age', this.options.cors.maxAge.toString());
|
||||
}
|
||||
|
||||
// Handle CORS preflight requests
|
||||
if (req.method === 'OPTIONS') {
|
||||
res.statusCode = 204; // No content
|
||||
res.end();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply default headers to response
|
||||
*/
|
||||
private applyDefaultHeaders(res: plugins.http.ServerResponse): void {
|
||||
// Apply default headers
|
||||
for (const [key, value] of Object.entries(this.defaultHeaders)) {
|
||||
if (!res.hasHeader(key)) {
|
||||
res.setHeader(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
// Add server identifier if not already set
|
||||
if (!res.hasHeader('Server')) {
|
||||
res.setHeader('Server', 'NetworkProxy');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle an HTTP request
|
||||
*/
|
||||
public async handleRequest(
|
||||
req: plugins.http.IncomingMessage,
|
||||
res: plugins.http.ServerResponse
|
||||
): Promise<void> {
|
||||
// Record start time for logging
|
||||
const startTime = Date.now();
|
||||
|
||||
// Apply CORS headers if configured
|
||||
this.applyCorsHeaders(res, req);
|
||||
|
||||
// If this is an OPTIONS request, the response has already been ended in applyCorsHeaders
|
||||
// so we should return early to avoid trying to set more headers
|
||||
if (req.method === 'OPTIONS') {
|
||||
// Increment metrics for OPTIONS requests too
|
||||
if (this.metricsTracker) {
|
||||
this.metricsTracker.incrementRequestsServed();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Apply default headers
|
||||
this.applyDefaultHeaders(res);
|
||||
|
||||
// Determine routing configuration
|
||||
let proxyConfig: ReverseProxyConfig | undefined;
|
||||
try {
|
||||
proxyConfig = this.router.routeReq(req);
|
||||
} catch (err) {
|
||||
this.logger.error('Error routing request', err);
|
||||
res.statusCode = 500;
|
||||
res.end('Internal Server Error');
|
||||
if (this.metricsTracker) this.metricsTracker.incrementFailedRequests();
|
||||
return;
|
||||
}
|
||||
if (!proxyConfig) {
|
||||
this.logger.warn(`No proxy configuration for host: ${req.headers.host}`);
|
||||
res.statusCode = 404;
|
||||
res.end('Not Found: No proxy configuration for this host');
|
||||
if (this.metricsTracker) this.metricsTracker.incrementFailedRequests();
|
||||
return;
|
||||
}
|
||||
// Determine protocol to backend (per-domain override or global)
|
||||
const backendProto = proxyConfig.backendProtocol || this.options.backendProtocol;
|
||||
if (backendProto === 'http2') {
|
||||
const destination = this.connectionPool.getNextTarget(
|
||||
proxyConfig.destinationIps,
|
||||
proxyConfig.destinationPorts[0]
|
||||
);
|
||||
const key = `${destination.host}:${destination.port}`;
|
||||
let session = this.h2Sessions.get(key);
|
||||
if (!session || session.closed || (session as any).destroyed) {
|
||||
session = plugins.http2.connect(`http://${destination.host}:${destination.port}`);
|
||||
this.h2Sessions.set(key, session);
|
||||
session.on('error', () => this.h2Sessions.delete(key));
|
||||
session.on('close', () => this.h2Sessions.delete(key));
|
||||
}
|
||||
// Build headers for HTTP/2 request
|
||||
const hdrs: Record<string, any> = {
|
||||
':method': req.method,
|
||||
':path': req.url,
|
||||
':authority': `${destination.host}:${destination.port}`
|
||||
};
|
||||
for (const [hk, hv] of Object.entries(req.headers)) {
|
||||
if (typeof hv === 'string') hdrs[hk] = hv;
|
||||
}
|
||||
const h2Stream = session.request(hdrs);
|
||||
req.pipe(h2Stream);
|
||||
h2Stream.on('response', (hdrs2: any) => {
|
||||
const status = (hdrs2[':status'] as number) || 502;
|
||||
res.statusCode = status;
|
||||
// Copy headers from HTTP/2 response to HTTP/1 response
|
||||
for (const [hk, hv] of Object.entries(hdrs2)) {
|
||||
if (!hk.startsWith(':') && hv != null) {
|
||||
res.setHeader(hk, hv as string | string[]);
|
||||
}
|
||||
}
|
||||
h2Stream.pipe(res);
|
||||
});
|
||||
h2Stream.on('error', (err) => {
|
||||
res.statusCode = 502;
|
||||
res.end(`Bad Gateway: ${err.message}`);
|
||||
if (this.metricsTracker) this.metricsTracker.incrementFailedRequests();
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// Find target based on hostname
|
||||
const proxyConfig = this.router.routeReq(req);
|
||||
|
||||
if (!proxyConfig) {
|
||||
// No matching proxy configuration
|
||||
this.logger.warn(`No proxy configuration for host: ${req.headers.host}`);
|
||||
res.statusCode = 404;
|
||||
res.end('Not Found: No proxy configuration for this host');
|
||||
|
||||
// Increment failed requests counter
|
||||
if (this.metricsTracker) {
|
||||
this.metricsTracker.incrementFailedRequests();
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// Get destination IP using round-robin if multiple IPs configured
|
||||
const destination = this.connectionPool.getNextTarget(
|
||||
proxyConfig.destinationIps,
|
||||
proxyConfig.destinationPorts[0]
|
||||
);
|
||||
|
||||
// Create options for the proxy request
|
||||
const options: plugins.http.RequestOptions = {
|
||||
hostname: destination.host,
|
||||
port: destination.port,
|
||||
path: req.url,
|
||||
method: req.method,
|
||||
headers: { ...req.headers }
|
||||
};
|
||||
|
||||
// Remove host header to avoid issues with virtual hosts on target server
|
||||
// The host header should match the target server's expected hostname
|
||||
if (options.headers && options.headers.host) {
|
||||
if ((proxyConfig as ReverseProxyConfig).rewriteHostHeader) {
|
||||
options.headers.host = `${destination.host}:${destination.port}`;
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.debug(
|
||||
`Proxying request to ${destination.host}:${destination.port}${req.url}`,
|
||||
{ method: req.method }
|
||||
);
|
||||
|
||||
// Create proxy request
|
||||
const proxyReq = plugins.http.request(options, (proxyRes) => {
|
||||
// Copy status code
|
||||
res.statusCode = proxyRes.statusCode || 500;
|
||||
|
||||
// Copy headers from proxy response to client response
|
||||
for (const [key, value] of Object.entries(proxyRes.headers)) {
|
||||
if (value !== undefined) {
|
||||
res.setHeader(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
// Pipe proxy response to client response
|
||||
proxyRes.pipe(res);
|
||||
|
||||
// Increment served requests counter when the response finishes
|
||||
res.on('finish', () => {
|
||||
if (this.metricsTracker) {
|
||||
this.metricsTracker.incrementRequestsServed();
|
||||
}
|
||||
|
||||
// Log the completed request
|
||||
const duration = Date.now() - startTime;
|
||||
this.logger.debug(
|
||||
`Request completed in ${duration}ms: ${req.method} ${req.url} ${res.statusCode}`,
|
||||
{ duration, statusCode: res.statusCode }
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
// Handle proxy request errors
|
||||
proxyReq.on('error', (error) => {
|
||||
const duration = Date.now() - startTime;
|
||||
this.logger.error(
|
||||
`Proxy error for ${req.method} ${req.url}: ${error.message}`,
|
||||
{ duration, error: error.message }
|
||||
);
|
||||
|
||||
// Increment failed requests counter
|
||||
if (this.metricsTracker) {
|
||||
this.metricsTracker.incrementFailedRequests();
|
||||
}
|
||||
|
||||
// Check if headers have already been sent
|
||||
if (!res.headersSent) {
|
||||
res.statusCode = 502;
|
||||
res.end(`Bad Gateway: ${error.message}`);
|
||||
} else {
|
||||
// If headers already sent, just close the connection
|
||||
res.end();
|
||||
}
|
||||
});
|
||||
|
||||
// Pipe request body to proxy request and handle client-side errors
|
||||
req.pipe(proxyReq);
|
||||
|
||||
// Handle client disconnection
|
||||
req.on('error', (error) => {
|
||||
this.logger.debug(`Client connection error: ${error.message}`);
|
||||
proxyReq.destroy();
|
||||
|
||||
// Increment failed requests counter on client errors
|
||||
if (this.metricsTracker) {
|
||||
this.metricsTracker.incrementFailedRequests();
|
||||
}
|
||||
});
|
||||
|
||||
// Handle response errors
|
||||
res.on('error', (error) => {
|
||||
this.logger.debug(`Response error: ${error.message}`);
|
||||
proxyReq.destroy();
|
||||
|
||||
// Increment failed requests counter on response errors
|
||||
if (this.metricsTracker) {
|
||||
this.metricsTracker.incrementFailedRequests();
|
||||
}
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
// Handle any unexpected errors
|
||||
this.logger.error(
|
||||
`Unexpected error handling request: ${error.message}`,
|
||||
{ error: error.stack }
|
||||
);
|
||||
|
||||
// Increment failed requests counter
|
||||
if (this.metricsTracker) {
|
||||
this.metricsTracker.incrementFailedRequests();
|
||||
}
|
||||
|
||||
if (!res.headersSent) {
|
||||
res.statusCode = 500;
|
||||
res.end('Internal Server Error');
|
||||
} else {
|
||||
res.end();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle HTTP/2 stream requests by proxying to HTTP/1 backends
|
||||
*/
|
||||
public async handleHttp2(stream: any, headers: any): Promise<void> {
|
||||
const startTime = Date.now();
|
||||
const method = headers[':method'] || 'GET';
|
||||
const path = headers[':path'] || '/';
|
||||
// If configured to proxy to backends over HTTP/2, use HTTP/2 client sessions
|
||||
if (this.options.backendProtocol === 'http2') {
|
||||
const authority = headers[':authority'] as string || '';
|
||||
const host = authority.split(':')[0];
|
||||
const fakeReq: any = { headers: { host }, method: headers[':method'], url: headers[':path'], socket: (stream.session as any).socket };
|
||||
const proxyConfig = this.router.routeReq(fakeReq);
|
||||
if (!proxyConfig) {
|
||||
stream.respond({ ':status': 404 });
|
||||
stream.end('Not Found');
|
||||
if (this.metricsTracker) this.metricsTracker.incrementFailedRequests();
|
||||
return;
|
||||
}
|
||||
const destination = this.connectionPool.getNextTarget(proxyConfig.destinationIps, proxyConfig.destinationPorts[0]);
|
||||
const key = `${destination.host}:${destination.port}`;
|
||||
let session = this.h2Sessions.get(key);
|
||||
if (!session || session.closed || (session as any).destroyed) {
|
||||
session = plugins.http2.connect(`http://${destination.host}:${destination.port}`);
|
||||
this.h2Sessions.set(key, session);
|
||||
session.on('error', () => this.h2Sessions.delete(key));
|
||||
session.on('close', () => this.h2Sessions.delete(key));
|
||||
}
|
||||
// Build headers for backend HTTP/2 request
|
||||
const h2Headers: Record<string, any> = {
|
||||
':method': headers[':method'],
|
||||
':path': headers[':path'],
|
||||
':authority': `${destination.host}:${destination.port}`
|
||||
};
|
||||
for (const [k, v] of Object.entries(headers)) {
|
||||
if (!k.startsWith(':') && typeof v === 'string') {
|
||||
h2Headers[k] = v;
|
||||
}
|
||||
}
|
||||
const h2Stream2 = session.request(h2Headers);
|
||||
stream.pipe(h2Stream2);
|
||||
h2Stream2.on('response', (hdrs: any) => {
|
||||
// Map status and headers to client
|
||||
const resp: Record<string, any> = { ':status': hdrs[':status'] as number };
|
||||
for (const [hk, hv] of Object.entries(hdrs)) {
|
||||
if (!hk.startsWith(':') && hv) resp[hk] = hv;
|
||||
}
|
||||
stream.respond(resp);
|
||||
h2Stream2.pipe(stream);
|
||||
});
|
||||
h2Stream2.on('error', (err) => {
|
||||
stream.respond({ ':status': 502 });
|
||||
stream.end(`Bad Gateway: ${err.message}`);
|
||||
if (this.metricsTracker) this.metricsTracker.incrementFailedRequests();
|
||||
});
|
||||
return;
|
||||
}
|
||||
try {
|
||||
// Determine host for routing
|
||||
const authority = headers[':authority'] as string || '';
|
||||
const host = authority.split(':')[0];
|
||||
// Fake request object for routing
|
||||
const fakeReq: any = { headers: { host }, method, url: path, socket: (stream.session as any).socket };
|
||||
const proxyConfig = this.router.routeReq(fakeReq as any);
|
||||
if (!proxyConfig) {
|
||||
stream.respond({ ':status': 404 });
|
||||
stream.end('Not Found');
|
||||
if (this.metricsTracker) this.metricsTracker.incrementFailedRequests();
|
||||
return;
|
||||
}
|
||||
// Select backend target
|
||||
const destination = this.connectionPool.getNextTarget(
|
||||
proxyConfig.destinationIps,
|
||||
proxyConfig.destinationPorts[0]
|
||||
);
|
||||
// Build headers for HTTP/1 proxy
|
||||
const outboundHeaders: Record<string,string> = {};
|
||||
for (const [key, value] of Object.entries(headers)) {
|
||||
if (typeof key === 'string' && typeof value === 'string' && !key.startsWith(':')) {
|
||||
outboundHeaders[key] = value;
|
||||
}
|
||||
}
|
||||
if (outboundHeaders.host && (proxyConfig as any).rewriteHostHeader) {
|
||||
outboundHeaders.host = `${destination.host}:${destination.port}`;
|
||||
}
|
||||
// Create HTTP/1 proxy request
|
||||
const proxyReq = plugins.http.request(
|
||||
{ hostname: destination.host, port: destination.port, path, method, headers: outboundHeaders },
|
||||
(proxyRes) => {
|
||||
// Map status and headers back to HTTP/2
|
||||
const responseHeaders: Record<string, number|string|string[]> = {};
|
||||
for (const [k, v] of Object.entries(proxyRes.headers)) {
|
||||
if (v !== undefined) {
|
||||
responseHeaders[k] = v as string | string[];
|
||||
}
|
||||
}
|
||||
stream.respond({ ':status': proxyRes.statusCode || 500, ...responseHeaders });
|
||||
proxyRes.pipe(stream);
|
||||
stream.on('close', () => proxyReq.destroy());
|
||||
stream.on('error', () => proxyReq.destroy());
|
||||
if (this.metricsTracker) stream.on('end', () => this.metricsTracker.incrementRequestsServed());
|
||||
}
|
||||
);
|
||||
proxyReq.on('error', (err) => {
|
||||
stream.respond({ ':status': 502 });
|
||||
stream.end(`Bad Gateway: ${err.message}`);
|
||||
if (this.metricsTracker) this.metricsTracker.incrementFailedRequests();
|
||||
});
|
||||
// Pipe client stream to backend
|
||||
stream.pipe(proxyReq);
|
||||
} catch (err: any) {
|
||||
stream.respond({ ':status': 500 });
|
||||
stream.end('Internal Server Error');
|
||||
if (this.metricsTracker) this.metricsTracker.incrementFailedRequests();
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user