Files
smartproxy/ts/proxies/smart-proxy/smart-proxy.ts

618 lines
24 KiB
TypeScript

import * as plugins from '../../plugins.js';
import { logger } from '../../core/utils/logger.js';
// Rust bridge and helpers
import { RustProxyBridge } from './rust-proxy-bridge.js';
import { RoutePreprocessor } from './route-preprocessor.js';
import { SocketHandlerServer } from './socket-handler-server.js';
import { RustMetricsAdapter } from './rust-metrics-adapter.js';
// Route management
import { SharedRouteManager as RouteManager } from '../../core/routing/route-manager.js';
import { RouteValidator } from './utils/route-validator.js';
import { generateDefaultCertificate } from './utils/default-cert-generator.js';
import { Mutex } from './utils/mutex.js';
import { ConcurrencySemaphore } from './utils/concurrency-semaphore.js';
// Types
import type { ISmartProxyOptions, TSmartProxyCertProvisionObject, IAcmeOptions, ICertProvisionEventComms, ICertificateIssuedEvent, ICertificateFailedEvent } from './models/interfaces.js';
import type { IRouteConfig } from './models/route-types.js';
import type { IMetrics } from './models/metrics-types.js';
/**
* SmartProxy - Rust-backed proxy engine with TypeScript configuration API.
*
* All networking (TCP, TLS, HTTP reverse proxy, connection management, security,
* NFTables) is handled by the Rust binary. TypeScript is only:
* - The npm module interface (types, route helpers)
* - The thin IPC wrapper (this class)
* - Socket-handler callback relay (for JS-defined handlers)
* - Certificate provisioning callbacks (certProvisionFunction)
*/
export class SmartProxy extends plugins.EventEmitter {
public settings: ISmartProxyOptions;
public routeManager: RouteManager;
private bridge: RustProxyBridge;
private preprocessor: RoutePreprocessor;
private socketHandlerServer: SocketHandlerServer | null = null;
private metricsAdapter: RustMetricsAdapter;
private routeUpdateLock: Mutex;
private stopping = false;
private certProvisionPromise: Promise<void> | null = null;
constructor(settingsArg: ISmartProxyOptions) {
super();
// Apply defaults
this.settings = {
...settingsArg,
initialDataTimeout: settingsArg.initialDataTimeout || 120000,
socketTimeout: settingsArg.socketTimeout || 3600000,
maxConnectionLifetime: settingsArg.maxConnectionLifetime || 86400000,
inactivityTimeout: settingsArg.inactivityTimeout || 14400000,
gracefulShutdownTimeout: settingsArg.gracefulShutdownTimeout || 30000,
maxConnectionsPerIP: settingsArg.maxConnectionsPerIP || 100,
connectionRateLimitPerMinute: settingsArg.connectionRateLimitPerMinute || 300,
keepAliveTreatment: settingsArg.keepAliveTreatment || 'extended',
keepAliveInactivityMultiplier: settingsArg.keepAliveInactivityMultiplier || 6,
extendedKeepAliveLifetime: settingsArg.extendedKeepAliveLifetime || 7 * 24 * 60 * 60 * 1000,
};
// Normalize ACME options
if (this.settings.acme) {
if (this.settings.acme.accountEmail && !this.settings.acme.email) {
this.settings.acme.email = this.settings.acme.accountEmail;
}
this.settings.acme = {
enabled: this.settings.acme.enabled !== false,
port: this.settings.acme.port || 80,
email: this.settings.acme.email,
useProduction: this.settings.acme.useProduction || false,
renewThresholdDays: this.settings.acme.renewThresholdDays || 30,
autoRenew: this.settings.acme.autoRenew !== false,
skipConfiguredCerts: this.settings.acme.skipConfiguredCerts || false,
renewCheckIntervalHours: this.settings.acme.renewCheckIntervalHours || 24,
routeForwards: this.settings.acme.routeForwards || [],
...this.settings.acme,
};
}
// Validate routes
if (this.settings.routes?.length) {
const validation = RouteValidator.validateRoutes(this.settings.routes);
if (!validation.valid) {
RouteValidator.logValidationErrors(validation.errors);
throw new Error(`Initial route validation failed: ${validation.errors.size} route(s) have errors`);
}
}
// Create logger adapter
const loggerAdapter = {
debug: (message: string, data?: any) => logger.log('debug', message, data),
info: (message: string, data?: any) => logger.log('info', message, data),
warn: (message: string, data?: any) => logger.log('warn', message, data),
error: (message: string, data?: any) => logger.log('error', message, data),
};
// Initialize components
this.routeManager = new RouteManager({
logger: loggerAdapter,
enableDetailedLogging: this.settings.enableDetailedLogging,
routes: this.settings.routes,
});
this.bridge = new RustProxyBridge();
this.preprocessor = new RoutePreprocessor();
this.metricsAdapter = new RustMetricsAdapter(
this.bridge,
this.settings.metrics?.sampleIntervalMs ?? 1000
);
this.routeUpdateLock = new Mutex();
}
/**
* Start the proxy.
* Spawns the Rust binary, configures socket relay if needed, sends routes, handles cert provisioning.
*/
public async start(): Promise<void> {
// Spawn Rust binary
const spawned = await this.bridge.spawn();
if (!spawned) {
throw new Error(
'RustProxy binary not found. Set SMARTPROXY_RUST_BINARY env var, install the platform package, ' +
'or build locally with: pnpm build'
);
}
// Handle unexpected exit (only emits error if not intentionally stopping)
this.bridge.on('exit', (code: number | null, signal: string | null) => {
if (this.stopping) return;
logger.log('error', `RustProxy exited unexpectedly (code=${code}, signal=${signal})`, { component: 'smart-proxy' });
this.emit('error', new Error(`RustProxy exited (code=${code}, signal=${signal})`));
});
// Check if any routes need TS-side handling (socket handlers, dynamic functions)
const hasHandlerRoutes = this.settings.routes.some(
(r) =>
(r.action.type === 'socket-handler' && r.action.socketHandler) ||
r.action.targets?.some((t) => typeof t.host === 'function' || typeof t.port === 'function')
);
// Start socket handler relay server (but don't tell Rust yet - proxy not started)
if (hasHandlerRoutes) {
this.socketHandlerServer = new SocketHandlerServer(this.preprocessor);
await this.socketHandlerServer.start();
}
// Preprocess routes (strip JS functions, convert socket-handler routes)
const rustRoutes = this.preprocessor.preprocessForRust(this.settings.routes);
// When certProvisionFunction handles cert provisioning,
// disable Rust's built-in ACME to prevent race condition.
let acmeForRust = this.settings.acme;
if (this.settings.certProvisionFunction && acmeForRust?.enabled) {
acmeForRust = { ...acmeForRust, enabled: false };
logger.log('info', 'Rust ACME disabled — certProvisionFunction will handle certificate provisioning', { component: 'smart-proxy' });
}
// Build Rust config
const config = this.buildRustConfig(rustRoutes, acmeForRust);
// Start the Rust proxy
await this.bridge.startProxy(config);
// Now that Rust proxy is running, configure socket handler relay
if (this.socketHandlerServer) {
await this.bridge.setSocketHandlerRelay(this.socketHandlerServer.getSocketPath());
}
// Load default self-signed fallback certificate (domain: '*')
if (!this.settings.disableDefaultCert) {
try {
const defaultCert = generateDefaultCertificate();
await this.bridge.loadCertificate('*', defaultCert.cert, defaultCert.key);
logger.log('info', 'Default self-signed fallback certificate loaded', { component: 'smart-proxy' });
} catch (err: any) {
logger.log('warn', `Failed to generate default certificate: ${err.message}`, { component: 'smart-proxy' });
}
}
// Load consumer-stored certificates
const preloadedDomains = new Set<string>();
if (this.settings.certStore) {
try {
const stored = await this.settings.certStore.loadAll();
for (const entry of stored) {
await this.bridge.loadCertificate(entry.domain, entry.publicKey, entry.privateKey, entry.ca);
preloadedDomains.add(entry.domain);
}
logger.log('info', `Loaded ${stored.length} certificate(s) from consumer store`, { component: 'smart-proxy' });
} catch (err: any) {
logger.log('warn', `Failed to load certificates from consumer store: ${err.message}`, { component: 'smart-proxy' });
}
}
// Start metrics polling BEFORE cert provisioning — the Rust engine is already
// running and accepting connections, so metrics should be available immediately.
// Cert provisioning can hang indefinitely (e.g. DNS-01 ACME timeouts) and must
// not block metrics collection.
this.metricsAdapter.startPolling();
logger.log('info', 'SmartProxy started (Rust engine)', { component: 'smart-proxy' });
// Fire-and-forget cert provisioning — Rust engine is already running and serving traffic.
// Events (certificate-issued / certificate-failed) fire independently per domain.
this.certProvisionPromise = this.provisionCertificatesViaCallback(preloadedDomains)
.catch((err) => logger.log('error', `Unexpected error in cert provisioning: ${err.message}`, { component: 'smart-proxy' }));
}
/**
* Stop the proxy.
*/
public async stop(): Promise<void> {
logger.log('info', 'SmartProxy shutting down...', { component: 'smart-proxy' });
this.stopping = true;
// Wait for in-flight cert provisioning to bail out (it checks this.stopping)
if (this.certProvisionPromise) {
await this.certProvisionPromise;
this.certProvisionPromise = null;
}
// Stop metrics polling
this.metricsAdapter.stopPolling();
// Remove exit listener before killing to avoid spurious error events
this.bridge.removeAllListeners('exit');
// Stop Rust proxy
try {
await this.bridge.stopProxy();
} catch {
// Ignore if already stopped
}
this.bridge.kill();
// Stop socket handler relay
if (this.socketHandlerServer) {
await this.socketHandlerServer.stop();
this.socketHandlerServer = null;
}
logger.log('info', 'SmartProxy shutdown complete.', { component: 'smart-proxy' });
}
/**
* Update routes atomically.
*/
public async updateRoutes(newRoutes: IRouteConfig[]): Promise<void> {
await this.routeUpdateLock.runExclusive(async () => {
// Validate
const validation = RouteValidator.validateRoutes(newRoutes);
if (!validation.valid) {
RouteValidator.logValidationErrors(validation.errors);
throw new Error(`Route validation failed: ${validation.errors.size} route(s) have errors`);
}
// Preprocess for Rust
const rustRoutes = this.preprocessor.preprocessForRust(newRoutes);
// Send to Rust
await this.bridge.updateRoutes(rustRoutes);
// Update local route manager
this.routeManager.updateRoutes(newRoutes);
// Update socket handler relay if handler routes changed
const hasHandlerRoutes = newRoutes.some(
(r) =>
(r.action.type === 'socket-handler' && r.action.socketHandler) ||
r.action.targets?.some((t) => typeof t.host === 'function' || typeof t.port === 'function')
);
if (hasHandlerRoutes && !this.socketHandlerServer) {
this.socketHandlerServer = new SocketHandlerServer(this.preprocessor);
await this.socketHandlerServer.start();
await this.bridge.setSocketHandlerRelay(this.socketHandlerServer.getSocketPath());
} else if (!hasHandlerRoutes && this.socketHandlerServer) {
await this.socketHandlerServer.stop();
this.socketHandlerServer = null;
}
// Update stored routes
this.settings.routes = newRoutes;
logger.log('info', `Routes updated (${newRoutes.length} routes)`, { component: 'smart-proxy' });
});
// Fire-and-forget cert provisioning outside the mutex — routes are already updated,
// cert provisioning doesn't need the route update lock and may be slow.
this.certProvisionPromise = this.provisionCertificatesViaCallback()
.catch((err) => logger.log('error', `Unexpected error in cert provisioning after route update: ${err.message}`, { component: 'smart-proxy' }));
}
/**
* Provision a certificate for a named route.
*/
public async provisionCertificate(routeName: string): Promise<void> {
await this.bridge.provisionCertificate(routeName);
}
/**
* Force renewal of a certificate.
*/
public async renewCertificate(routeName: string): Promise<void> {
await this.bridge.renewCertificate(routeName);
}
/**
* Get certificate status for a route (async - calls Rust).
*/
public async getCertificateStatus(routeName: string): Promise<any> {
return this.bridge.getCertificateStatus(routeName);
}
/**
* Get the metrics interface.
*/
public getMetrics(): IMetrics {
return this.metricsAdapter;
}
/**
* Get statistics (async - calls Rust).
*/
public async getStatistics(): Promise<any> {
return this.bridge.getStatistics();
}
/**
* Add a listening port at runtime.
*/
public async addListeningPort(port: number): Promise<void> {
await this.bridge.addListeningPort(port);
}
/**
* Remove a listening port at runtime.
*/
public async removeListeningPort(port: number): Promise<void> {
await this.bridge.removeListeningPort(port);
}
/**
* Get all currently listening ports (async - calls Rust).
*/
public async getListeningPorts(): Promise<number[]> {
if (!this.bridge.running) return [];
return this.bridge.getListeningPorts();
}
/**
* Get eligible domains for ACME certificates (sync - reads local routes).
*/
public getEligibleDomainsForCertificates(): string[] {
const domains: string[] = [];
for (const route of this.settings.routes || []) {
if (!route.match.domains) continue;
if (
route.action.type !== 'forward' ||
!route.action.tls ||
route.action.tls.mode === 'passthrough' ||
route.action.tls.certificate !== 'auto'
)
continue;
const routeDomains = Array.isArray(route.match.domains) ? route.match.domains : [route.match.domains];
const eligible = routeDomains.filter((d) => !d.includes('*') && this.isValidDomain(d));
domains.push(...eligible);
}
return domains;
}
/**
* Get NFTables status (async - calls Rust).
*/
public async getNfTablesStatus(): Promise<Record<string, any>> {
return this.bridge.getNftablesStatus();
}
// --- Private helpers ---
/**
* Build the Rust configuration object from TS settings.
*/
private buildRustConfig(routes: IRouteConfig[], acmeOverride?: IAcmeOptions): any {
const acme = acmeOverride !== undefined ? acmeOverride : this.settings.acme;
return {
routes,
defaults: this.settings.defaults,
acme: acme
? {
enabled: acme.enabled,
email: acme.email,
useProduction: acme.useProduction,
port: acme.port,
renewThresholdDays: acme.renewThresholdDays,
autoRenew: acme.autoRenew,
renewCheckIntervalHours: acme.renewCheckIntervalHours,
}
: undefined,
connectionTimeout: this.settings.connectionTimeout,
initialDataTimeout: this.settings.initialDataTimeout,
socketTimeout: this.settings.socketTimeout,
maxConnectionLifetime: this.settings.maxConnectionLifetime,
gracefulShutdownTimeout: this.settings.gracefulShutdownTimeout,
maxConnectionsPerIp: this.settings.maxConnectionsPerIP,
connectionRateLimitPerMinute: this.settings.connectionRateLimitPerMinute,
keepAliveTreatment: this.settings.keepAliveTreatment,
keepAliveInactivityMultiplier: this.settings.keepAliveInactivityMultiplier,
extendedKeepAliveLifetime: this.settings.extendedKeepAliveLifetime,
proxyIps: this.settings.proxyIPs,
acceptProxyProtocol: this.settings.acceptProxyProtocol,
sendProxyProtocol: this.settings.sendProxyProtocol,
metrics: this.settings.metrics,
};
}
/**
* For routes with certificate: 'auto', call certProvisionFunction if set.
* If the callback returns a cert object, load it into Rust.
* If it returns 'http01', let Rust handle ACME.
*/
private async provisionCertificatesViaCallback(skipDomains: Set<string> = new Set()): Promise<void> {
const provisionFn = this.settings.certProvisionFunction;
if (!provisionFn) return;
// Phase 1: Collect all unique (domain, route) pairs that need provisioning
const seen = new Set<string>(skipDomains);
const tasks: Array<{ domain: string; route: IRouteConfig }> = [];
for (const route of this.settings.routes) {
if (route.action.tls?.certificate !== 'auto') continue;
if (!route.match.domains) continue;
const rawDomains = Array.isArray(route.match.domains) ? route.match.domains : [route.match.domains];
const certDomains = this.normalizeDomainsForCertProvisioning(rawDomains);
for (const domain of certDomains) {
if (seen.has(domain)) continue;
seen.add(domain);
tasks.push({ domain, route });
}
}
if (tasks.length === 0) return;
// Phase 2: Process all domains in parallel with concurrency limit
const concurrency = this.settings.certProvisionConcurrency ?? 4;
const semaphore = new ConcurrencySemaphore(concurrency);
const promises = tasks.map(async ({ domain, route }) => {
await semaphore.acquire();
try {
await this.provisionSingleDomain(domain, route, provisionFn);
} finally {
semaphore.release();
}
});
await Promise.allSettled(promises);
}
/**
* Provision a single domain's certificate via the callback.
* Includes per-domain timeout and shutdown checks.
*/
private async provisionSingleDomain(
domain: string,
route: IRouteConfig,
provisionFn: (domain: string, eventComms: ICertProvisionEventComms) => Promise<TSmartProxyCertProvisionObject>,
): Promise<void> {
if (this.stopping) return;
let expiryDate: string | undefined;
let source = 'certProvisionFunction';
const eventComms: ICertProvisionEventComms = {
log: (msg) => logger.log('info', `[certProvision ${domain}] ${msg}`, { component: 'smart-proxy' }),
warn: (msg) => logger.log('warn', `[certProvision ${domain}] ${msg}`, { component: 'smart-proxy' }),
error: (msg) => logger.log('error', `[certProvision ${domain}] ${msg}`, { component: 'smart-proxy' }),
setExpiryDate: (date) => { expiryDate = date.toISOString(); },
setSource: (s) => { source = s; },
};
const timeoutMs = this.settings.certProvisionTimeout ?? 300_000; // 5 min default
try {
const result: TSmartProxyCertProvisionObject = await this.withTimeout(
provisionFn(domain, eventComms),
timeoutMs,
`Certificate provisioning timed out for ${domain} after ${timeoutMs}ms`,
);
if (this.stopping) return;
if (result === 'http01') {
if (route.name) {
try {
await this.bridge.provisionCertificate(route.name);
logger.log('info', `Triggered Rust ACME for ${domain} (route: ${route.name})`, { component: 'smart-proxy' });
} catch (provisionErr: any) {
logger.log('warn', `Cannot provision cert for ${domain} — callback returned 'http01' but Rust ACME failed: ${provisionErr.message}. ` +
'Note: Rust ACME is disabled when certProvisionFunction is set.', { component: 'smart-proxy' });
}
}
return;
}
if (result && typeof result === 'object') {
if (this.stopping) return;
const certObj = result as plugins.tsclass.network.ICert;
await this.bridge.loadCertificate(
domain,
certObj.publicKey,
certObj.privateKey,
);
logger.log('info', `Certificate loaded via provision function for ${domain}`, { component: 'smart-proxy' });
// Persist to consumer store
if (this.settings.certStore?.save) {
try {
await this.settings.certStore.save(domain, certObj.publicKey, certObj.privateKey);
} catch (storeErr: any) {
logger.log('warn', `certStore.save() failed for ${domain}: ${storeErr.message}`, { component: 'smart-proxy' });
}
}
this.emit('certificate-issued', {
domain,
expiryDate: expiryDate || (certObj.validUntil ? new Date(certObj.validUntil).toISOString() : undefined),
source,
} satisfies ICertificateIssuedEvent);
}
} catch (err: any) {
logger.log('warn', `certProvisionFunction failed for ${domain}: ${err.message}`, { component: 'smart-proxy' });
this.emit('certificate-failed', {
domain,
error: err.message,
source,
} satisfies ICertificateFailedEvent);
// Fallback to ACME if enabled and route has a name
if (this.settings.certProvisionFallbackToAcme !== false && route.name) {
try {
await this.bridge.provisionCertificate(route.name);
logger.log('info', `Falling back to Rust ACME for ${domain} (route: ${route.name})`, { component: 'smart-proxy' });
} catch (acmeErr: any) {
logger.log('warn', `ACME fallback also failed for ${domain}: ${acmeErr.message}` +
(this.settings.disableDefaultCert
? ' — TLS will fail for this domain (disableDefaultCert is true)'
: ' — default self-signed fallback cert will be used'), { component: 'smart-proxy' });
}
}
}
}
/**
* Race a promise against a timeout. Rejects with the given message if the timeout fires first.
*/
private withTimeout<T>(promise: Promise<T>, ms: number, message: string): Promise<T> {
return new Promise<T>((resolve, reject) => {
const timer = setTimeout(() => reject(new Error(message)), ms);
promise.then(
(val) => { clearTimeout(timer); resolve(val); },
(err) => { clearTimeout(timer); reject(err); },
);
});
}
/**
* Normalize routing glob patterns into valid domain identifiers for cert provisioning.
* - `*nevermind.cloud` → `['nevermind.cloud', '*.nevermind.cloud']`
* - `*.lossless.digital` → `['*.lossless.digital']` (already valid wildcard)
* - `code.foss.global` → `['code.foss.global']` (plain domain)
* - `*mid*.example.com` → skipped with warning (unsupported glob)
*/
private normalizeDomainsForCertProvisioning(rawDomains: string[]): string[] {
const result: string[] = [];
for (const raw of rawDomains) {
// Plain domain — no glob characters
if (!raw.includes('*')) {
result.push(raw);
continue;
}
// Valid wildcard: *.example.com
if (raw.startsWith('*.') && !raw.slice(2).includes('*')) {
result.push(raw);
continue;
}
// Routing glob like *example.com (leading star, no dot after it)
// Convert to bare domain + wildcard pair
if (raw.startsWith('*') && !raw.startsWith('*.') && !raw.slice(1).includes('*')) {
const baseDomain = raw.slice(1); // Remove leading *
result.push(baseDomain);
result.push(`*.${baseDomain}`);
continue;
}
// Unsupported glob pattern (e.g. *mid*.example.com)
logger.log('warn', `Skipping unsupported glob pattern for cert provisioning: ${raw}`, { component: 'smart-proxy' });
}
return result;
}
private isValidDomain(domain: string): boolean {
if (!domain || domain.length === 0) return false;
if (domain.includes('*')) return false;
const validDomainRegex =
/^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$/;
return validDomainRegex.test(domain);
}
}