feat(proxy): Implement WrappedSocket class for PROXY protocol support and update connection handling
This commit is contained in:
@ -5,3 +5,4 @@
|
||||
export * from './common-types.js';
|
||||
export * from './socket-augmentation.js';
|
||||
export * from './route-context.js';
|
||||
export * from './wrapped-socket.js';
|
||||
|
259
ts/core/models/wrapped-socket.ts
Normal file
259
ts/core/models/wrapped-socket.ts
Normal file
@ -0,0 +1,259 @@
|
||||
import { EventEmitter } from 'events';
|
||||
import * as plugins from '../../plugins.js';
|
||||
|
||||
/**
|
||||
* WrappedSocket wraps a regular net.Socket to provide transparent access
|
||||
* to the real client IP and port when behind a proxy using PROXY protocol.
|
||||
*
|
||||
* This is the FOUNDATION for all PROXY protocol support and must be implemented
|
||||
* before any protocol parsing can occur.
|
||||
*/
|
||||
export class WrappedSocket extends EventEmitter {
|
||||
private realClientIP?: string;
|
||||
private realClientPort?: number;
|
||||
|
||||
constructor(
|
||||
public readonly socket: plugins.net.Socket,
|
||||
realClientIP?: string,
|
||||
realClientPort?: number
|
||||
) {
|
||||
super();
|
||||
this.realClientIP = realClientIP;
|
||||
this.realClientPort = realClientPort;
|
||||
|
||||
// Forward all socket events
|
||||
this.forwardSocketEvents();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the real client IP if available, otherwise the socket's remote address
|
||||
*/
|
||||
get remoteAddress(): string | undefined {
|
||||
return this.realClientIP || this.socket.remoteAddress;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the real client port if available, otherwise the socket's remote port
|
||||
*/
|
||||
get remotePort(): number | undefined {
|
||||
return this.realClientPort || this.socket.remotePort;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the remote family (IPv4 or IPv6)
|
||||
*/
|
||||
get remoteFamily(): string | undefined {
|
||||
// If we have a real client IP, determine the family
|
||||
if (this.realClientIP) {
|
||||
if (this.realClientIP.includes(':')) {
|
||||
return 'IPv6';
|
||||
} else {
|
||||
return 'IPv4';
|
||||
}
|
||||
}
|
||||
return this.socket.remoteFamily;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the local address of the socket
|
||||
*/
|
||||
get localAddress(): string | undefined {
|
||||
return this.socket.localAddress;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the local port of the socket
|
||||
*/
|
||||
get localPort(): number | undefined {
|
||||
return this.socket.localPort;
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates if this connection came through a trusted proxy
|
||||
*/
|
||||
get isFromTrustedProxy(): boolean {
|
||||
return !!this.realClientIP;
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the real client information (called after parsing PROXY protocol)
|
||||
*/
|
||||
setProxyInfo(ip: string, port: number): void {
|
||||
this.realClientIP = ip;
|
||||
this.realClientPort = port;
|
||||
}
|
||||
|
||||
// Pass-through all socket methods
|
||||
write(data: any, encoding?: any, callback?: any): boolean {
|
||||
return this.socket.write(data, encoding, callback);
|
||||
}
|
||||
|
||||
end(data?: any, encoding?: any, callback?: any): this {
|
||||
this.socket.end(data, encoding, callback);
|
||||
return this;
|
||||
}
|
||||
|
||||
destroy(error?: Error): this {
|
||||
this.socket.destroy(error);
|
||||
return this;
|
||||
}
|
||||
|
||||
pause(): this {
|
||||
this.socket.pause();
|
||||
return this;
|
||||
}
|
||||
|
||||
resume(): this {
|
||||
this.socket.resume();
|
||||
return this;
|
||||
}
|
||||
|
||||
setTimeout(timeout: number, callback?: () => void): this {
|
||||
this.socket.setTimeout(timeout, callback);
|
||||
return this;
|
||||
}
|
||||
|
||||
setNoDelay(noDelay?: boolean): this {
|
||||
this.socket.setNoDelay(noDelay);
|
||||
return this;
|
||||
}
|
||||
|
||||
setKeepAlive(enable?: boolean, initialDelay?: number): this {
|
||||
this.socket.setKeepAlive(enable, initialDelay);
|
||||
return this;
|
||||
}
|
||||
|
||||
ref(): this {
|
||||
this.socket.ref();
|
||||
return this;
|
||||
}
|
||||
|
||||
unref(): this {
|
||||
this.socket.unref();
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Pipe to another stream
|
||||
*/
|
||||
pipe<T extends NodeJS.WritableStream>(destination: T, options?: {
|
||||
end?: boolean;
|
||||
}): T {
|
||||
return this.socket.pipe(destination, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Cork the stream
|
||||
*/
|
||||
cork(): void {
|
||||
if ('cork' in this.socket && typeof this.socket.cork === 'function') {
|
||||
this.socket.cork();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Uncork the stream
|
||||
*/
|
||||
uncork(): void {
|
||||
if ('uncork' in this.socket && typeof this.socket.uncork === 'function') {
|
||||
this.socket.uncork();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of bytes read
|
||||
*/
|
||||
get bytesRead(): number {
|
||||
return this.socket.bytesRead;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of bytes written
|
||||
*/
|
||||
get bytesWritten(): number {
|
||||
return this.socket.bytesWritten;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the socket is connecting
|
||||
*/
|
||||
get connecting(): boolean {
|
||||
return this.socket.connecting;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the socket is destroyed
|
||||
*/
|
||||
get destroyed(): boolean {
|
||||
return this.socket.destroyed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the socket is readable
|
||||
*/
|
||||
get readable(): boolean {
|
||||
return this.socket.readable;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the socket is writable
|
||||
*/
|
||||
get writable(): boolean {
|
||||
return this.socket.writable;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get pending status
|
||||
*/
|
||||
get pending(): boolean {
|
||||
return this.socket.pending;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get ready state
|
||||
*/
|
||||
get readyState(): string {
|
||||
return this.socket.readyState;
|
||||
}
|
||||
|
||||
/**
|
||||
* Address info
|
||||
*/
|
||||
address(): plugins.net.AddressInfo | {} | null {
|
||||
const addr = this.socket.address();
|
||||
if (addr === null) return null;
|
||||
if (typeof addr === 'string') return addr as any;
|
||||
return addr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set socket encoding
|
||||
*/
|
||||
setEncoding(encoding?: BufferEncoding): this {
|
||||
this.socket.setEncoding(encoding);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect method (for client sockets)
|
||||
*/
|
||||
connect(options: plugins.net.SocketConnectOpts, connectionListener?: () => void): this;
|
||||
connect(port: number, host?: string, connectionListener?: () => void): this;
|
||||
connect(path: string, connectionListener?: () => void): this;
|
||||
connect(...args: any[]): this {
|
||||
(this.socket as any).connect(...args);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Forward all events from the underlying socket
|
||||
*/
|
||||
private forwardSocketEvents(): void {
|
||||
const events = ['data', 'end', 'close', 'error', 'drain', 'timeout', 'connect', 'ready', 'lookup'];
|
||||
events.forEach(event => {
|
||||
this.socket.on(event, (...args) => {
|
||||
this.emit(event, ...args);
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
@ -5,6 +5,7 @@ import { TimeoutManager } from './timeout-manager.js';
|
||||
import { logger } from '../../core/utils/logger.js';
|
||||
import { LifecycleComponent } from '../../core/utils/lifecycle-component.js';
|
||||
import { cleanupSocket } from '../../core/utils/socket-utils.js';
|
||||
import { WrappedSocket } from '../../core/models/wrapped-socket.js';
|
||||
|
||||
/**
|
||||
* Manages connection lifecycle, tracking, and cleanup with performance optimizations
|
||||
@ -53,8 +54,9 @@ export class ConnectionManager extends LifecycleComponent {
|
||||
|
||||
/**
|
||||
* Create and track a new connection
|
||||
* Accepts either a regular net.Socket or a WrappedSocket for transparent PROXY protocol support
|
||||
*/
|
||||
public createConnection(socket: plugins.net.Socket): IConnectionRecord | null {
|
||||
public createConnection(socket: plugins.net.Socket | WrappedSocket): IConnectionRecord | null {
|
||||
// Enforce connection limit
|
||||
if (this.connectionRecords.size >= this.maxConnections) {
|
||||
logger.log('warn', `Connection limit reached (${this.maxConnections}). Rejecting new connection.`, {
|
||||
@ -282,22 +284,26 @@ export class ConnectionManager extends LifecycleComponent {
|
||||
const cleanupPromises: Promise<void>[] = [];
|
||||
|
||||
if (record.incoming) {
|
||||
// Extract underlying socket if it's a WrappedSocket
|
||||
const incomingSocket = record.incoming instanceof WrappedSocket ? record.incoming.socket : record.incoming;
|
||||
if (!record.incoming.writable || record.incoming.destroyed) {
|
||||
// Socket is not active, clean up immediately
|
||||
cleanupPromises.push(cleanupSocket(record.incoming, `${record.id}-incoming`, { immediate: true }));
|
||||
cleanupPromises.push(cleanupSocket(incomingSocket, `${record.id}-incoming`, { immediate: true }));
|
||||
} else {
|
||||
// Socket is still active, allow graceful cleanup
|
||||
cleanupPromises.push(cleanupSocket(record.incoming, `${record.id}-incoming`, { allowDrain: true, gracePeriod: 5000 }));
|
||||
cleanupPromises.push(cleanupSocket(incomingSocket, `${record.id}-incoming`, { allowDrain: true, gracePeriod: 5000 }));
|
||||
}
|
||||
}
|
||||
|
||||
if (record.outgoing) {
|
||||
// Extract underlying socket if it's a WrappedSocket
|
||||
const outgoingSocket = record.outgoing instanceof WrappedSocket ? record.outgoing.socket : record.outgoing;
|
||||
if (!record.outgoing.writable || record.outgoing.destroyed) {
|
||||
// Socket is not active, clean up immediately
|
||||
cleanupPromises.push(cleanupSocket(record.outgoing, `${record.id}-outgoing`, { immediate: true }));
|
||||
cleanupPromises.push(cleanupSocket(outgoingSocket, `${record.id}-outgoing`, { immediate: true }));
|
||||
} else {
|
||||
// Socket is still active, allow graceful cleanup
|
||||
cleanupPromises.push(cleanupSocket(record.outgoing, `${record.id}-outgoing`, { allowDrain: true, gracePeriod: 5000 }));
|
||||
cleanupPromises.push(cleanupSocket(outgoingSocket, `${record.id}-outgoing`, { allowDrain: true, gracePeriod: 5000 }));
|
||||
}
|
||||
}
|
||||
|
||||
@ -570,11 +576,13 @@ export class ConnectionManager extends LifecycleComponent {
|
||||
const shutdownPromises: Promise<void>[] = [];
|
||||
|
||||
if (record.incoming) {
|
||||
shutdownPromises.push(cleanupSocket(record.incoming, `${record.id}-incoming-shutdown`, { immediate: true }));
|
||||
const incomingSocket = record.incoming instanceof WrappedSocket ? record.incoming.socket : record.incoming;
|
||||
shutdownPromises.push(cleanupSocket(incomingSocket, `${record.id}-incoming-shutdown`, { immediate: true }));
|
||||
}
|
||||
|
||||
if (record.outgoing) {
|
||||
shutdownPromises.push(cleanupSocket(record.outgoing, `${record.id}-outgoing-shutdown`, { immediate: true }));
|
||||
const outgoingSocket = record.outgoing instanceof WrappedSocket ? record.outgoing.socket : record.outgoing;
|
||||
shutdownPromises.push(cleanupSocket(outgoingSocket, `${record.id}-outgoing-shutdown`, { immediate: true }));
|
||||
}
|
||||
|
||||
// Don't wait for shutdown cleanup in this batch processing
|
||||
|
@ -1,4 +1,5 @@
|
||||
import * as plugins from '../../../plugins.js';
|
||||
import type { WrappedSocket } from '../../../core/models/wrapped-socket.js';
|
||||
// Certificate types removed - define IAcmeOptions locally
|
||||
export interface IAcmeOptions {
|
||||
enabled?: boolean;
|
||||
@ -34,6 +35,11 @@ export interface ISmartProxyOptions {
|
||||
// Port configuration
|
||||
preserveSourceIP?: boolean; // Preserve client IP when forwarding
|
||||
|
||||
// PROXY protocol configuration
|
||||
proxyIPs?: string[]; // List of trusted proxy IPs that can send PROXY protocol
|
||||
acceptProxyProtocol?: boolean; // Global option to accept PROXY protocol (defaults based on proxyIPs)
|
||||
sendProxyProtocol?: boolean; // Global option to send PROXY protocol to all targets
|
||||
|
||||
// Global/default settings
|
||||
defaults?: {
|
||||
target?: {
|
||||
@ -128,8 +134,8 @@ export interface ISmartProxyOptions {
|
||||
*/
|
||||
export interface IConnectionRecord {
|
||||
id: string; // Unique connection identifier
|
||||
incoming: plugins.net.Socket;
|
||||
outgoing: plugins.net.Socket | null;
|
||||
incoming: plugins.net.Socket | WrappedSocket;
|
||||
outgoing: plugins.net.Socket | WrappedSocket | null;
|
||||
incomingStartTime: number;
|
||||
outgoingStartTime?: number;
|
||||
outgoingClosedTime?: number;
|
||||
|
@ -11,6 +11,7 @@ import { HttpProxyBridge } from './http-proxy-bridge.js';
|
||||
import { TimeoutManager } from './timeout-manager.js';
|
||||
import { SharedRouteManager as RouteManager } from '../../core/routing/route-manager.js';
|
||||
import { cleanupSocket, createIndependentSocketHandlers, setupSocketHandlers, createSocketWithErrorHandler, setupBidirectionalForwarding } from '../../core/utils/socket-utils.js';
|
||||
import { WrappedSocket } from '../../core/models/wrapped-socket.js';
|
||||
|
||||
/**
|
||||
* Handles new connection processing and setup logic with support for route-based configuration
|
||||
@ -81,39 +82,52 @@ export class RouteConnectionHandler {
|
||||
const remoteIP = socket.remoteAddress || '';
|
||||
const localPort = socket.localPort || 0;
|
||||
|
||||
// Always wrap the socket to prepare for potential PROXY protocol
|
||||
const wrappedSocket = new WrappedSocket(socket);
|
||||
|
||||
// If this is from a trusted proxy, log it
|
||||
if (this.settings.proxyIPs?.includes(remoteIP)) {
|
||||
logger.log('debug', `Connection from trusted proxy ${remoteIP}, PROXY protocol parsing will be enabled`, {
|
||||
remoteIP,
|
||||
component: 'route-handler'
|
||||
});
|
||||
}
|
||||
|
||||
// Validate IP against rate limits and connection limits
|
||||
const ipValidation = this.securityManager.validateIP(remoteIP);
|
||||
// Note: For wrapped sockets, this will use the underlying socket IP until PROXY protocol is parsed
|
||||
const ipValidation = this.securityManager.validateIP(wrappedSocket.remoteAddress || '');
|
||||
if (!ipValidation.allowed) {
|
||||
logger.log('warn', `Connection rejected`, { remoteIP, reason: ipValidation.reason, component: 'route-handler' });
|
||||
cleanupSocket(socket, `rejected-${ipValidation.reason}`, { immediate: true });
|
||||
logger.log('warn', `Connection rejected`, { remoteIP: wrappedSocket.remoteAddress, reason: ipValidation.reason, component: 'route-handler' });
|
||||
cleanupSocket(wrappedSocket.socket, `rejected-${ipValidation.reason}`, { immediate: true });
|
||||
return;
|
||||
}
|
||||
|
||||
// Create a new connection record
|
||||
const record = this.connectionManager.createConnection(socket);
|
||||
// Create a new connection record with the wrapped socket
|
||||
const record = this.connectionManager.createConnection(wrappedSocket);
|
||||
if (!record) {
|
||||
// Connection was rejected due to limit - socket already destroyed by connection manager
|
||||
return;
|
||||
}
|
||||
const connectionId = record.id;
|
||||
|
||||
// Apply socket optimizations
|
||||
socket.setNoDelay(this.settings.noDelay);
|
||||
// Apply socket optimizations (apply to underlying socket)
|
||||
const underlyingSocket = wrappedSocket.socket;
|
||||
underlyingSocket.setNoDelay(this.settings.noDelay);
|
||||
|
||||
// Apply keep-alive settings if enabled
|
||||
if (this.settings.keepAlive) {
|
||||
socket.setKeepAlive(true, this.settings.keepAliveInitialDelay);
|
||||
underlyingSocket.setKeepAlive(true, this.settings.keepAliveInitialDelay);
|
||||
record.hasKeepAlive = true;
|
||||
|
||||
// Apply enhanced TCP keep-alive options if enabled
|
||||
if (this.settings.enableKeepAliveProbes) {
|
||||
try {
|
||||
// These are platform-specific and may not be available
|
||||
if ('setKeepAliveProbes' in socket) {
|
||||
(socket as any).setKeepAliveProbes(10);
|
||||
if ('setKeepAliveProbes' in underlyingSocket) {
|
||||
(underlyingSocket as any).setKeepAliveProbes(10);
|
||||
}
|
||||
if ('setKeepAliveInterval' in socket) {
|
||||
(socket as any).setKeepAliveInterval(1000);
|
||||
if ('setKeepAliveInterval' in underlyingSocket) {
|
||||
(underlyingSocket as any).setKeepAliveInterval(1000);
|
||||
}
|
||||
} catch (err) {
|
||||
// Ignore errors - these are optional enhancements
|
||||
@ -151,13 +165,13 @@ export class RouteConnectionHandler {
|
||||
}
|
||||
|
||||
// Handle the connection - wait for initial data to determine if it's TLS
|
||||
this.handleInitialData(socket, record);
|
||||
this.handleInitialData(wrappedSocket, record);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle initial data from a connection to determine routing
|
||||
*/
|
||||
private handleInitialData(socket: plugins.net.Socket, record: IConnectionRecord): void {
|
||||
private handleInitialData(socket: plugins.net.Socket | WrappedSocket, record: IConnectionRecord): void {
|
||||
const connectionId = record.id;
|
||||
const localPort = record.localPort;
|
||||
let initialDataReceived = false;
|
||||
@ -177,9 +191,11 @@ export class RouteConnectionHandler {
|
||||
|
||||
// If no routes require TLS handling and it's not port 443, route immediately
|
||||
if (!needsTlsHandling && localPort !== 443) {
|
||||
// Extract underlying socket for socket-utils functions
|
||||
const underlyingSocket = socket instanceof WrappedSocket ? socket.socket : socket;
|
||||
// Set up proper socket handlers for immediate routing
|
||||
setupSocketHandlers(
|
||||
socket,
|
||||
underlyingSocket,
|
||||
(reason) => {
|
||||
// Only cleanup if connection hasn't been fully established
|
||||
// Check if outgoing connection exists and is connected
|
||||
@ -206,7 +222,7 @@ export class RouteConnectionHandler {
|
||||
);
|
||||
|
||||
// Route immediately for non-TLS connections
|
||||
this.routeConnection(socket, record, '', undefined);
|
||||
this.routeConnection(underlyingSocket, record, '', undefined);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -363,7 +379,8 @@ export class RouteConnectionHandler {
|
||||
}
|
||||
|
||||
// Find the appropriate route for this connection
|
||||
this.routeConnection(socket, record, serverName, chunk);
|
||||
const underlyingSocket = socket instanceof WrappedSocket ? socket.socket : socket;
|
||||
this.routeConnection(underlyingSocket, record, serverName, chunk);
|
||||
});
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user