Files
smartproxy/ts/proxies/smart-proxy/socket-handler-server.ts

280 lines
9.2 KiB
TypeScript

import * as plugins from '../../plugins.js';
import { logger } from '../../core/utils/logger.js';
import type { IRouteConfig, IRouteContext } from './models/route-types.js';
import type { RoutePreprocessor } from './route-preprocessor.js';
/**
* Unix domain socket server that receives relayed connections from the Rust proxy.
*
* When Rust encounters a route of type `socket-handler`, it connects to this
* Unix socket, sends a JSON metadata line, then proxies the raw TCP bytes.
* This server reads the metadata, finds the original JS handler, builds an
* IRouteContext, and hands the socket to the handler.
*/
export class SocketHandlerServer {
private server: plugins.net.Server | null = null;
private socketPath: string;
private preprocessor: RoutePreprocessor;
private activeSockets = new Set<plugins.net.Socket>();
constructor(preprocessor: RoutePreprocessor) {
this.preprocessor = preprocessor;
this.socketPath = `/tmp/smartproxy-relay-${process.pid}.sock`;
}
/**
* The Unix socket path this server listens on.
*/
public getSocketPath(): string {
return this.socketPath;
}
/**
* Start listening for relayed connections from Rust.
*/
public async start(): Promise<void> {
// Clean up stale socket file
try {
await plugins.fs.promises.unlink(this.socketPath);
} catch {
// Ignore if doesn't exist
}
return new Promise<void>((resolve, reject) => {
this.server = plugins.net.createServer((socket) => {
this.activeSockets.add(socket);
socket.on('close', () => this.activeSockets.delete(socket));
this.handleConnection(socket);
});
this.server.on('error', (err) => {
logger.log('error', `SocketHandlerServer error: ${err.message}`, { component: 'socket-handler-server' });
});
this.server.listen(this.socketPath, () => {
logger.log('info', `SocketHandlerServer listening on ${this.socketPath}`, { component: 'socket-handler-server' });
resolve();
});
this.server.on('error', reject);
});
}
/**
* Stop the server and clean up.
*/
public async stop(): Promise<void> {
// Destroy all active connections first
for (const socket of this.activeSockets) {
socket.destroy();
}
this.activeSockets.clear();
if (this.server) {
return new Promise<void>((resolve) => {
this.server!.close(() => {
this.server = null;
// Clean up socket file
plugins.fs.unlink(this.socketPath, () => resolve());
});
});
}
}
/**
* Handle an incoming relayed connection from Rust.
*
* Protocol: Rust sends a single JSON line with metadata, then raw bytes follow.
* JSON format: { "routeKey": "my-route", "remoteIP": "1.2.3.4", "remotePort": 12345,
* "localPort": 443, "isTLS": true, "domain": "example.com" }
*/
private handleConnection(socket: plugins.net.Socket): void {
let metadataBuffer = '';
let metadataParsed = false;
const onData = (chunk: Buffer) => {
if (metadataParsed) return;
metadataBuffer += chunk.toString('utf8');
const newlineIndex = metadataBuffer.indexOf('\n');
if (newlineIndex === -1) {
// Haven't received full metadata line yet
if (metadataBuffer.length > 8192) {
logger.log('error', 'Socket handler metadata too large, closing', { component: 'socket-handler-server' });
socket.destroy();
}
return;
}
metadataParsed = true;
socket.removeListener('data', onData);
socket.pause(); // Prevent data loss between handler removal and pipe setup
const metadataJson = metadataBuffer.slice(0, newlineIndex);
const remainingData = metadataBuffer.slice(newlineIndex + 1);
let metadata: any;
try {
metadata = JSON.parse(metadataJson);
} catch {
logger.log('error', `Invalid socket handler metadata JSON: ${metadataJson.slice(0, 200)}`, { component: 'socket-handler-server' });
socket.destroy();
return;
}
this.dispatchToHandler(socket, metadata, remainingData);
};
socket.on('data', onData);
socket.on('error', (err) => {
logger.log('error', `Socket handler relay error: ${err.message}`, { component: 'socket-handler-server' });
});
}
/**
* Dispatch a relayed connection to the appropriate JS handler.
*/
private dispatchToHandler(socket: plugins.net.Socket, metadata: any, remainingData: string): void {
const routeKey = metadata.routeKey as string;
if (!routeKey) {
logger.log('error', 'Socket handler relay missing routeKey', { component: 'socket-handler-server' });
socket.destroy();
return;
}
const originalRoute = this.preprocessor.getOriginalRoute(routeKey);
if (!originalRoute) {
logger.log('error', `No handler found for route: ${routeKey}`, { component: 'socket-handler-server' });
socket.destroy();
return;
}
// Build route context
const context: IRouteContext = {
port: metadata.localPort || 0,
domain: metadata.domain,
clientIp: metadata.remoteIP || 'unknown',
serverIp: '0.0.0.0',
path: metadata.path,
isTls: metadata.isTLS || false,
tlsVersion: metadata.tlsVersion,
routeName: originalRoute.name,
routeId: originalRoute.id,
timestamp: Date.now(),
connectionId: metadata.connectionId || `relay-${Date.now()}`,
};
// If there was remaining data after the metadata line, push it back
if (remainingData.length > 0) {
socket.unshift(Buffer.from(remainingData, 'utf8'));
}
const handler = originalRoute.action.socketHandler;
if (handler) {
// Route has an explicit socket handler callback
try {
const result = handler(socket, context);
// If the handler is async, wait for it to finish setup before resuming.
// This prevents data loss when async handlers need to do work before
// attaching their `data` listeners.
if (result && typeof (result as any).then === 'function') {
(result as any).then(() => {
socket.resume();
}).catch((err: any) => {
logger.log('error', `Async socket handler rejected for route ${routeKey}: ${err.message}`, { component: 'socket-handler-server' });
socket.destroy();
});
} else {
// Synchronous handler — listeners are already attached, safe to resume.
socket.resume();
}
} catch (err: any) {
logger.log('error', `Socket handler threw for route ${routeKey}: ${err.message}`, { component: 'socket-handler-server' });
socket.destroy();
}
return;
}
// Route has dynamic host/port functions - resolve and forward
if (originalRoute.action.targets && originalRoute.action.targets.length > 0) {
this.forwardDynamicRoute(socket, originalRoute, context);
return;
}
logger.log('error', `Route ${routeKey} has no socketHandler and no targets`, { component: 'socket-handler-server' });
socket.destroy();
}
/**
* Forward a connection to a dynamically resolved target.
* Used for routes with function-based host/port that Rust cannot handle.
*/
private forwardDynamicRoute(socket: plugins.net.Socket, route: IRouteConfig, context: IRouteContext): void {
const targets = route.action.targets!;
// Pick a target (round-robin would be ideal, but simple random for now)
const target = targets[Math.floor(Math.random() * targets.length)];
// Resolve host
let host: string;
if (typeof target.host === 'function') {
try {
const result = target.host(context);
host = Array.isArray(result) ? result[Math.floor(Math.random() * result.length)] : result;
} catch (err: any) {
logger.log('error', `Dynamic host function failed: ${err.message}`, { component: 'socket-handler-server' });
socket.destroy();
return;
}
} else if (typeof target.host === 'string') {
host = target.host;
} else if (Array.isArray(target.host)) {
host = target.host[Math.floor(Math.random() * target.host.length)];
} else {
host = 'localhost';
}
// Resolve port
let port: number;
if (typeof target.port === 'function') {
try {
port = target.port(context);
} catch (err: any) {
logger.log('error', `Dynamic port function failed: ${err.message}`, { component: 'socket-handler-server' });
socket.destroy();
return;
}
} else if (typeof target.port === 'number') {
port = target.port;
} else {
port = context.port;
}
logger.log('debug', `Dynamic forward: ${context.clientIp} -> ${host}:${port}`, { component: 'socket-handler-server' });
// Connect to the resolved target
const backend = plugins.net.connect(port, host, () => {
// Pipe bidirectionally
socket.pipe(backend);
backend.pipe(socket);
});
backend.on('error', (err) => {
logger.log('error', `Dynamic forward backend error: ${err.message}`, { component: 'socket-handler-server' });
socket.destroy();
});
socket.on('error', () => {
backend.destroy();
});
socket.on('close', () => {
backend.destroy();
});
backend.on('close', () => {
socket.destroy();
});
}
}