feat(smart-proxy): add socket-handler relay, fast-path port-only forwarding, metrics and bridge improvements, and various TS/Rust integration fixes

This commit is contained in:
2026-02-09 16:25:33 +00:00
parent 41efdb47f8
commit f7605e042e
17 changed files with 724 additions and 300 deletions

View File

@@ -6,7 +6,11 @@ import type { RustProxyBridge } from './rust-proxy-bridge.js';
*
* Polls the Rust binary periodically via the bridge and caches the result.
* All IMetrics getters read from the cache synchronously.
* Fields not yet in Rust (percentiles, per-IP, history) return zero/empty.
*
* Rust Metrics JSON fields (camelCase via serde):
* activeConnections, totalConnections, bytesIn, bytesOut,
* throughputInBytesPerSec, throughputOutBytesPerSec,
* routes: { [routeName]: { activeConnections, totalConnections, bytesIn, bytesOut, ... } }
*/
export class RustMetricsAdapter implements IMetrics {
private bridge: RustProxyBridge;
@@ -14,30 +18,28 @@ export class RustMetricsAdapter implements IMetrics {
private pollTimer: ReturnType<typeof setInterval> | null = null;
private pollIntervalMs: number;
// Cumulative totals tracked across polls
private cumulativeBytesIn = 0;
private cumulativeBytesOut = 0;
private cumulativeConnections = 0;
constructor(bridge: RustProxyBridge, pollIntervalMs = 1000) {
this.bridge = bridge;
this.pollIntervalMs = pollIntervalMs;
}
/**
* Poll Rust for metrics once. Can be awaited to ensure cache is fresh.
*/
public async poll(): Promise<void> {
try {
this.cache = await this.bridge.getMetrics();
} catch {
// Ignore poll errors (bridge may be shutting down)
}
}
public startPolling(): void {
if (this.pollTimer) return;
this.pollTimer = setInterval(async () => {
try {
this.cache = await this.bridge.getMetrics();
// Update cumulative totals
if (this.cache) {
this.cumulativeBytesIn = this.cache.totalBytesIn ?? this.cache.total_bytes_in ?? 0;
this.cumulativeBytesOut = this.cache.totalBytesOut ?? this.cache.total_bytes_out ?? 0;
this.cumulativeConnections = this.cache.totalConnections ?? this.cache.total_connections ?? 0;
}
} catch {
// Ignore poll errors (bridge may be shutting down)
}
// Immediate first poll so cache is populated ASAP
this.poll();
this.pollTimer = setInterval(() => {
this.poll();
}, this.pollIntervalMs);
if (this.pollTimer.unref) {
this.pollTimer.unref();
@@ -55,25 +57,36 @@ export class RustMetricsAdapter implements IMetrics {
public connections = {
active: (): number => {
return this.cache?.activeConnections ?? this.cache?.active_connections ?? 0;
return this.cache?.activeConnections ?? 0;
},
total: (): number => {
return this.cumulativeConnections;
return this.cache?.totalConnections ?? 0;
},
byRoute: (): Map<string, number> => {
return new Map();
const result = new Map<string, number>();
if (this.cache?.routes) {
for (const [name, rm] of Object.entries(this.cache.routes)) {
result.set(name, (rm as any).activeConnections ?? 0);
}
}
return result;
},
byIP: (): Map<string, number> => {
// Per-IP tracking not yet available from Rust
return new Map();
},
topIPs: (_limit?: number): Array<{ ip: string; count: number }> => {
// Per-IP tracking not yet available from Rust
return [];
},
};
public throughput = {
instant: (): IThroughputData => {
return { in: this.cache?.bytesInPerSecond ?? 0, out: this.cache?.bytesOutPerSecond ?? 0 };
return {
in: this.cache?.throughputInBytesPerSec ?? 0,
out: this.cache?.throughputOutBytesPerSec ?? 0,
};
},
recent: (): IThroughputData => {
return this.throughput.instant();
@@ -85,10 +98,20 @@ export class RustMetricsAdapter implements IMetrics {
return this.throughput.instant();
},
history: (_seconds: number): Array<IThroughputHistoryPoint> => {
// Throughput history not yet available from Rust
return [];
},
byRoute: (_windowSeconds?: number): Map<string, IThroughputData> => {
return new Map();
const result = new Map<string, IThroughputData>();
if (this.cache?.routes) {
for (const [name, rm] of Object.entries(this.cache.routes)) {
result.set(name, {
in: (rm as any).throughputInBytesPerSec ?? 0,
out: (rm as any).throughputOutBytesPerSec ?? 0,
});
}
}
return result;
},
byIP: (_windowSeconds?: number): Map<string, IThroughputData> => {
return new Map();
@@ -97,25 +120,27 @@ export class RustMetricsAdapter implements IMetrics {
public requests = {
perSecond: (): number => {
return this.cache?.requestsPerSecond ?? 0;
// Rust tracks connections, not HTTP requests (TCP-level proxy)
return 0;
},
perMinute: (): number => {
return (this.cache?.requestsPerSecond ?? 0) * 60;
return 0;
},
total: (): number => {
return this.cache?.totalRequests ?? this.cache?.total_requests ?? 0;
// Use total connections as a proxy for total requests
return this.cache?.totalConnections ?? 0;
},
};
public totals = {
bytesIn: (): number => {
return this.cumulativeBytesIn;
return this.cache?.bytesIn ?? 0;
},
bytesOut: (): number => {
return this.cumulativeBytesOut;
return this.cache?.bytesOut ?? 0;
},
connections: (): number => {
return this.cumulativeConnections;
return this.cache?.totalConnections ?? 0;
},
};

View File

@@ -68,12 +68,13 @@ export class RustProxyBridge extends plugins.EventEmitter {
});
// Handle stderr (logging from Rust goes here)
this.process.stderr?.on('data', (data: Buffer) => {
const stderrHandler = (data: Buffer) => {
const lines = data.toString().split('\n').filter(l => l.trim());
for (const line of lines) {
logger.log('debug', `[rustproxy] ${line}`, { component: 'rust-bridge' });
}
});
};
this.process.stderr?.on('data', stderrHandler);
// Handle stdout (JSON IPC)
this.readline = createInterface({ input: this.process.stdout! });
@@ -204,16 +205,47 @@ export class RustProxyBridge extends plugins.EventEmitter {
}
/**
* Kill the Rust process.
* Kill the Rust process and clean up all stdio streams.
*/
public kill(): void {
if (this.process) {
this.process.kill('SIGTERM');
const proc = this.process;
this.process = null;
this.isRunning = false;
// Close readline (reads from stdout)
if (this.readline) {
this.readline.close();
this.readline = null;
}
// Reject pending requests
for (const [, pending] of this.pendingRequests) {
clearTimeout(pending.timer);
pending.reject(new Error('RustProxy process killed'));
}
this.pendingRequests.clear();
// Remove all listeners so nothing keeps references
proc.removeAllListeners();
proc.stdout?.removeAllListeners();
proc.stderr?.removeAllListeners();
proc.stdin?.removeAllListeners();
// Kill the process
try { proc.kill('SIGTERM'); } catch { /* already dead */ }
// Destroy all stdio pipes to free handles
try { proc.stdin?.destroy(); } catch { /* ignore */ }
try { proc.stdout?.destroy(); } catch { /* ignore */ }
try { proc.stderr?.destroy(); } catch { /* ignore */ }
// Unref process so Node doesn't wait for it
try { proc.unref(); } catch { /* ignore */ }
// Force kill after 5 seconds
setTimeout(() => {
if (this.process) {
this.process.kill('SIGKILL');
}
try { proc.kill('SIGKILL'); } catch { /* already dead */ }
}, 5000).unref();
}
}

View File

@@ -37,6 +37,7 @@ export class SmartProxy extends plugins.EventEmitter {
private socketHandlerServer: SocketHandlerServer | null = null;
private metricsAdapter: RustMetricsAdapter;
private routeUpdateLock: Mutex;
private stopping = false;
constructor(settingsArg: ISmartProxyOptions) {
super();
@@ -102,7 +103,10 @@ export class SmartProxy extends plugins.EventEmitter {
this.bridge = new RustProxyBridge();
this.preprocessor = new RoutePreprocessor();
this.metricsAdapter = new RustMetricsAdapter(this.bridge);
this.metricsAdapter = new RustMetricsAdapter(
this.bridge,
this.settings.metrics?.sampleIntervalMs ?? 1000
);
this.routeUpdateLock = new Mutex();
}
@@ -120,23 +124,24 @@ export class SmartProxy extends plugins.EventEmitter {
);
}
// Handle unexpected exit
// Handle unexpected exit (only emits error if not intentionally stopping)
this.bridge.on('exit', (code: number | null, signal: string | null) => {
if (this.stopping) return;
logger.log('error', `RustProxy exited unexpectedly (code=${code}, signal=${signal})`, { component: 'smart-proxy' });
this.emit('error', new Error(`RustProxy exited (code=${code}, signal=${signal})`));
});
// Start socket handler relay if any routes need TS-side handling
// Check if any routes need TS-side handling (socket handlers, dynamic functions)
const hasHandlerRoutes = this.settings.routes.some(
(r) =>
(r.action.type === 'socket-handler' && r.action.socketHandler) ||
r.action.targets?.some((t) => typeof t.host === 'function' || typeof t.port === 'function')
);
// Start socket handler relay server (but don't tell Rust yet - proxy not started)
if (hasHandlerRoutes) {
this.socketHandlerServer = new SocketHandlerServer(this.preprocessor);
await this.socketHandlerServer.start();
await this.bridge.setSocketHandlerRelay(this.socketHandlerServer.getSocketPath());
}
// Preprocess routes (strip JS functions, convert socket-handler routes)
@@ -148,6 +153,11 @@ export class SmartProxy extends plugins.EventEmitter {
// Start the Rust proxy
await this.bridge.startProxy(config);
// Now that Rust proxy is running, configure socket handler relay
if (this.socketHandlerServer) {
await this.bridge.setSocketHandlerRelay(this.socketHandlerServer.getSocketPath());
}
// Handle certProvisionFunction
await this.provisionCertificatesViaCallback();
@@ -162,10 +172,14 @@ export class SmartProxy extends plugins.EventEmitter {
*/
public async stop(): Promise<void> {
logger.log('info', 'SmartProxy shutting down...', { component: 'smart-proxy' });
this.stopping = true;
// Stop metrics polling
this.metricsAdapter.stopPolling();
// Remove exit listener before killing to avoid spurious error events
this.bridge.removeAllListeners('exit');
// Stop Rust proxy
try {
await this.bridge.stopProxy();
@@ -283,6 +297,7 @@ export class SmartProxy extends plugins.EventEmitter {
* Get all currently listening ports (async - calls Rust).
*/
public async getListeningPorts(): Promise<number[]> {
if (!this.bridge.running) return [];
return this.bridge.getListeningPorts();
}

View File

@@ -15,6 +15,7 @@ export class SocketHandlerServer {
private server: plugins.net.Server | null = null;
private socketPath: string;
private preprocessor: RoutePreprocessor;
private activeSockets = new Set<plugins.net.Socket>();
constructor(preprocessor: RoutePreprocessor) {
this.preprocessor = preprocessor;
@@ -41,6 +42,8 @@ export class SocketHandlerServer {
return new Promise<void>((resolve, reject) => {
this.server = plugins.net.createServer((socket) => {
this.activeSockets.add(socket);
socket.on('close', () => this.activeSockets.delete(socket));
this.handleConnection(socket);
});
@@ -61,6 +64,12 @@ export class SocketHandlerServer {
* Stop the server and clean up.
*/
public async stop(): Promise<void> {
// Destroy all active connections first
for (const socket of this.activeSockets) {
socket.destroy();
}
this.activeSockets.clear();
if (this.server) {
return new Promise<void>((resolve) => {
this.server!.close(() => {
@@ -100,6 +109,7 @@ export class SocketHandlerServer {
metadataParsed = true;
socket.removeListener('data', onData);
socket.pause(); // Prevent data loss between handler removal and pipe setup
const metadataJson = metadataBuffer.slice(0, newlineIndex);
const remainingData = metadataBuffer.slice(newlineIndex + 1);
@@ -140,13 +150,6 @@ export class SocketHandlerServer {
return;
}
const handler = originalRoute.action.socketHandler;
if (!handler) {
logger.log('error', `Route ${routeKey} has no socketHandler`, { component: 'socket-handler-server' });
socket.destroy();
return;
}
// Build route context
const context: IRouteContext = {
port: metadata.localPort || 0,
@@ -167,12 +170,110 @@ export class SocketHandlerServer {
socket.unshift(Buffer.from(remainingData, 'utf8'));
}
// Call the handler
try {
handler(socket, context);
} catch (err: any) {
logger.log('error', `Socket handler threw for route ${routeKey}: ${err.message}`, { component: 'socket-handler-server' });
socket.destroy();
const handler = originalRoute.action.socketHandler;
if (handler) {
// Route has an explicit socket handler callback
try {
const result = handler(socket, context);
// If the handler is async, wait for it to finish setup before resuming.
// This prevents data loss when async handlers need to do work before
// attaching their `data` listeners.
if (result && typeof (result as any).then === 'function') {
(result as any).then(() => {
socket.resume();
}).catch((err: any) => {
logger.log('error', `Async socket handler rejected for route ${routeKey}: ${err.message}`, { component: 'socket-handler-server' });
socket.destroy();
});
} else {
// Synchronous handler — listeners are already attached, safe to resume.
socket.resume();
}
} catch (err: any) {
logger.log('error', `Socket handler threw for route ${routeKey}: ${err.message}`, { component: 'socket-handler-server' });
socket.destroy();
}
return;
}
// Route has dynamic host/port functions - resolve and forward
if (originalRoute.action.targets && originalRoute.action.targets.length > 0) {
this.forwardDynamicRoute(socket, originalRoute, context);
return;
}
logger.log('error', `Route ${routeKey} has no socketHandler and no targets`, { component: 'socket-handler-server' });
socket.destroy();
}
/**
* Forward a connection to a dynamically resolved target.
* Used for routes with function-based host/port that Rust cannot handle.
*/
private forwardDynamicRoute(socket: plugins.net.Socket, route: IRouteConfig, context: IRouteContext): void {
const targets = route.action.targets!;
// Pick a target (round-robin would be ideal, but simple random for now)
const target = targets[Math.floor(Math.random() * targets.length)];
// Resolve host
let host: string;
if (typeof target.host === 'function') {
try {
const result = target.host(context);
host = Array.isArray(result) ? result[Math.floor(Math.random() * result.length)] : result;
} catch (err: any) {
logger.log('error', `Dynamic host function failed: ${err.message}`, { component: 'socket-handler-server' });
socket.destroy();
return;
}
} else if (typeof target.host === 'string') {
host = target.host;
} else if (Array.isArray(target.host)) {
host = target.host[Math.floor(Math.random() * target.host.length)];
} else {
host = 'localhost';
}
// Resolve port
let port: number;
if (typeof target.port === 'function') {
try {
port = target.port(context);
} catch (err: any) {
logger.log('error', `Dynamic port function failed: ${err.message}`, { component: 'socket-handler-server' });
socket.destroy();
return;
}
} else if (typeof target.port === 'number') {
port = target.port;
} else {
port = context.port;
}
logger.log('debug', `Dynamic forward: ${context.clientIp} -> ${host}:${port}`, { component: 'socket-handler-server' });
// Connect to the resolved target
const backend = plugins.net.connect(port, host, () => {
// Pipe bidirectionally
socket.pipe(backend);
backend.pipe(socket);
});
backend.on('error', (err) => {
logger.log('error', `Dynamic forward backend error: ${err.message}`, { component: 'socket-handler-server' });
socket.destroy();
});
socket.on('error', () => {
backend.destroy();
});
socket.on('close', () => {
backend.destroy();
});
backend.on('close', () => {
socket.destroy();
});
}
}