Files
dcrouter/ts/remoteingress/classes.remoteingress-manager.ts
T

544 lines
18 KiB
TypeScript

import * as plugins from '../plugins.js';
import type { IDcRouterRouteConfig, IRemoteIngress, IRemoteIngressHubSettings, IRemoteIngressPerformanceConfig, TRemoteIngressHubSettingsUpdate, TRemoteIngressPerformanceProfile } from '../../ts_interfaces/data/remoteingress.js';
import { RemoteIngressEdgeDoc, RemoteIngressHubSettingsDoc } from '../db/index.js';
interface IRemoteIngressFirewallConfig {
blockedIps?: string[];
}
type TPerformanceIntegerField =
| 'maxStreamsPerEdge'
| 'totalWindowBudgetBytes'
| 'minStreamWindowBytes'
| 'maxStreamWindowBytes'
| 'sustainedStreamWindowBytes'
| 'quicDatagramReceiveBufferBytes'
| 'streamFramePayloadBytes'
| 'firstDataConnectTimeoutMs'
| 'clientWriteTimeoutMs';
const performanceIntegerMaxByField: Record<TPerformanceIntegerField, number> = {
maxStreamsPerEdge: 100_000,
totalWindowBudgetBytes: 1_073_741_824,
minStreamWindowBytes: 16_777_216,
maxStreamWindowBytes: 134_217_728,
sustainedStreamWindowBytes: 134_217_728,
quicDatagramReceiveBufferBytes: 67_108_864,
streamFramePayloadBytes: 16_777_216,
firstDataConnectTimeoutMs: 3_600_000,
clientWriteTimeoutMs: 3_600_000,
};
const maxServerFirstPorts = 128;
const defaultTunnelPort = 8443;
function hasOwn(objectArg: object, keyArg: string): boolean {
return Object.prototype.hasOwnProperty.call(objectArg, keyArg);
}
function extractPorts(portRange: plugins.smartproxy.IRouteConfig['match']['ports']): number[] {
const ports = new Set<number>(plugins.smartproxy.expandPortRange(portRange) as number[]);
return [...ports].sort((a, b) => a - b);
}
/**
* Manages CRUD for remote ingress edge registrations.
* Persists edge configs via smartdata document classes and provides
* the allowed edges list for the Rust hub.
*/
export class RemoteIngressManager {
private edges: Map<string, IRemoteIngress> = new Map();
private routes: IDcRouterRouteConfig[] = [];
private firewallConfig?: IRemoteIngressFirewallConfig;
private hubSettings: IRemoteIngressHubSettings = {
enabled: false,
tunnelPort: defaultTunnelPort,
updatedAt: 0,
updatedBy: 'default',
};
constructor() {}
/**
* Load all edge registrations from the database into memory.
*/
public async initialize(): Promise<void> {
const docs = await RemoteIngressEdgeDoc.findAll();
for (const doc of docs) {
// Migration: old edges without autoDerivePorts default to true
if ((doc as any).autoDerivePorts === undefined) {
doc.autoDerivePorts = true;
await doc.save();
}
const edge: IRemoteIngress = {
id: doc.id,
name: doc.name,
secret: doc.secret,
listenPorts: doc.listenPorts,
listenPortsUdp: doc.listenPortsUdp,
enabled: doc.enabled,
autoDerivePorts: doc.autoDerivePorts,
performance: doc.performance,
tags: doc.tags,
createdAt: doc.createdAt,
updatedAt: doc.updatedAt,
};
this.edges.set(edge.id, edge);
}
await this.initializeHubSettings();
}
private async initializeHubSettings(): Promise<void> {
let doc = await RemoteIngressHubSettingsDoc.load();
if (!doc) {
doc = new RemoteIngressHubSettingsDoc();
doc.settingsId = 'remote-ingress-hub-settings';
doc.enabled = false;
doc.tunnelPort = defaultTunnelPort;
doc.hubDomain = '';
doc.updatedAt = Date.now();
doc.updatedBy = 'default';
await doc.save();
}
this.hubSettings = this.toHubSettings(doc);
}
/**
* Store the current route configs for port derivation.
*/
public setRoutes(routes: IDcRouterRouteConfig[]): void {
this.routes = routes;
}
/**
* Set the full desired firewall snapshot pushed to all edges.
*/
public setFirewallConfig(firewallConfig?: IRemoteIngressFirewallConfig): void {
this.firewallConfig = firewallConfig;
}
public getHubSettings(): IRemoteIngressHubSettings {
return {
...this.hubSettings,
performance: this.hubSettings.performance ? { ...this.hubSettings.performance } : undefined,
};
}
public getHubPerformanceConfig(): IRemoteIngressPerformanceConfig | undefined {
return this.hubSettings.performance && Object.keys(this.hubSettings.performance).length > 0
? { ...this.hubSettings.performance }
: undefined;
}
public async updateHubSettings(
updates: TRemoteIngressHubSettingsUpdate,
updatedBy: string,
): Promise<IRemoteIngressHubSettings> {
let doc = await RemoteIngressHubSettingsDoc.load();
if (!doc) {
doc = new RemoteIngressHubSettingsDoc();
doc.settingsId = 'remote-ingress-hub-settings';
doc.enabled = false;
doc.tunnelPort = defaultTunnelPort;
}
const normalized = this.normalizeHubSettingsUpdate(updates);
if (hasOwn(normalized, 'enabled')) {
doc.enabled = normalized.enabled;
}
if (hasOwn(normalized, 'tunnelPort')) {
doc.tunnelPort = normalized.tunnelPort;
}
if (hasOwn(updates, 'hubDomain')) {
doc.hubDomain = normalized.hubDomain || '';
}
if (hasOwn(updates, 'performance')) {
doc.performance = normalized.performance || undefined;
}
doc.updatedAt = Date.now();
doc.updatedBy = updatedBy;
await doc.save();
this.hubSettings = this.toHubSettings(doc);
return this.getHubSettings();
}
/**
* Derive listen ports for an edge from routes tagged with remoteIngress.enabled.
* When a route specifies edgeFilter, only edges whose id or tags match get that route's ports.
* When edgeFilter is absent, the route applies to all edges.
*/
public derivePortsForEdge(edgeId: string, edgeTags?: string[]): number[] {
const ports = new Set<number>();
for (const route of this.routes) {
if (!route.remoteIngress?.enabled) continue;
// Apply edge filter if present
const filter = route.remoteIngress.edgeFilter;
if (filter && filter.length > 0) {
const idMatch = filter.includes(edgeId);
const tagMatch = edgeTags?.some((tag) => filter.includes(tag)) ?? false;
if (!idMatch && !tagMatch) continue;
}
// Extract ports from the route match
if (route.match?.ports) {
for (const p of extractPorts(route.match.ports)) {
ports.add(p);
}
}
}
return [...ports].sort((a, b) => a - b);
}
/**
* Derive UDP listen ports for an edge from routes with transport 'udp' or 'all'.
* These ports need UDP listeners on the edge (e.g. for QUIC/HTTP3).
*/
public deriveUdpPortsForEdge(edgeId: string, edgeTags?: string[]): number[] {
const ports = new Set<number>();
for (const route of this.routes) {
if (!route.remoteIngress?.enabled) continue;
// Apply edge filter if present
const filter = route.remoteIngress.edgeFilter;
if (filter && filter.length > 0) {
const idMatch = filter.includes(edgeId);
const tagMatch = edgeTags?.some((tag) => filter.includes(tag)) ?? false;
if (!idMatch && !tagMatch) continue;
}
// Only include ports from routes that listen on UDP
const transport = route.match?.transport;
if (transport === 'udp' || transport === 'all') {
if (route.match?.ports) {
for (const p of extractPorts(route.match.ports)) {
ports.add(p);
}
}
}
}
return [...ports].sort((a, b) => a - b);
}
/**
* Get the effective listen ports for an edge.
* Manual ports are always included. Auto-derived ports are added (union) when autoDerivePorts is true.
*/
public getEffectiveListenPorts(edge: IRemoteIngress): number[] {
const manualPorts = edge.listenPorts || [];
const shouldDerive = edge.autoDerivePorts !== false;
if (!shouldDerive) return [...manualPorts].sort((a, b) => a - b);
const derivedPorts = this.derivePortsForEdge(edge.id, edge.tags);
return [...new Set([...manualPorts, ...derivedPorts])].sort((a, b) => a - b);
}
/**
* Get the effective UDP listen ports for an edge.
* Manual UDP ports are always included. Auto-derived UDP ports are added when autoDerivePorts is true.
*/
public getEffectiveListenPortsUdp(edge: IRemoteIngress): number[] {
const manualPorts = edge.listenPortsUdp || [];
const shouldDerive = edge.autoDerivePorts !== false;
if (!shouldDerive) return [...manualPorts].sort((a, b) => a - b);
const derivedPorts = this.deriveUdpPortsForEdge(edge.id, edge.tags);
return [...new Set([...manualPorts, ...derivedPorts])].sort((a, b) => a - b);
}
/**
* Get manual and derived port breakdown for an edge (used in API responses).
* Derived ports exclude any ports already present in the manual list.
*/
public getPortBreakdown(edge: IRemoteIngress): { manual: number[]; derived: number[] } {
const manual = edge.listenPorts || [];
const shouldDerive = edge.autoDerivePorts !== false;
if (!shouldDerive) return { manual, derived: [] };
const manualSet = new Set(manual);
const allDerived = this.derivePortsForEdge(edge.id, edge.tags);
const derived = allDerived.filter((p) => !manualSet.has(p));
return { manual, derived };
}
/**
* Create a new edge registration.
*/
public async createEdge(
name: string,
listenPorts: number[] = [],
tags?: string[],
autoDerivePorts: boolean = true,
performance?: IRemoteIngressPerformanceConfig,
): Promise<IRemoteIngress> {
const id = plugins.uuid.v4();
const secret = plugins.crypto.randomBytes(32).toString('hex');
const now = Date.now();
const edge: IRemoteIngress = {
id,
name,
secret,
listenPorts,
enabled: true,
autoDerivePorts,
performance,
tags: tags || [],
createdAt: now,
updatedAt: now,
};
const doc = new RemoteIngressEdgeDoc();
Object.assign(doc, edge);
await doc.save();
this.edges.set(id, edge);
return edge;
}
/**
* Get an edge by ID.
*/
public getEdge(id: string): IRemoteIngress | undefined {
return this.edges.get(id);
}
/**
* Get all edge registrations.
*/
public getAllEdges(): IRemoteIngress[] {
return Array.from(this.edges.values());
}
/**
* Update an edge registration.
*/
public async updateEdge(
id: string,
updates: {
name?: string;
listenPorts?: number[];
autoDerivePorts?: boolean;
enabled?: boolean;
performance?: IRemoteIngressPerformanceConfig;
tags?: string[];
},
): Promise<IRemoteIngress | null> {
const edge = this.edges.get(id);
if (!edge) {
return null;
}
if (updates.name !== undefined) edge.name = updates.name;
if (updates.listenPorts !== undefined) edge.listenPorts = updates.listenPorts;
if (updates.autoDerivePorts !== undefined) edge.autoDerivePorts = updates.autoDerivePorts;
if (updates.enabled !== undefined) edge.enabled = updates.enabled;
if (updates.performance !== undefined) edge.performance = updates.performance;
if (updates.tags !== undefined) edge.tags = updates.tags;
edge.updatedAt = Date.now();
const doc = await RemoteIngressEdgeDoc.findById(id);
if (doc) {
Object.assign(doc, edge);
await doc.save();
}
this.edges.set(id, edge);
return edge;
}
/**
* Delete an edge registration.
*/
public async deleteEdge(id: string): Promise<boolean> {
if (!this.edges.has(id)) {
return false;
}
const doc = await RemoteIngressEdgeDoc.findById(id);
if (doc) {
await doc.delete();
}
this.edges.delete(id);
return true;
}
/**
* Regenerate the secret for an edge.
*/
public async regenerateSecret(id: string): Promise<string | null> {
const edge = this.edges.get(id);
if (!edge) {
return null;
}
edge.secret = plugins.crypto.randomBytes(32).toString('hex');
edge.updatedAt = Date.now();
const doc = await RemoteIngressEdgeDoc.findById(id);
if (doc) {
Object.assign(doc, edge);
await doc.save();
}
this.edges.set(id, edge);
return edge.secret;
}
/**
* Verify an edge's secret using constant-time comparison.
*/
public verifySecret(id: string, secret: string): boolean {
const edge = this.edges.get(id);
if (!edge) {
return false;
}
const expected = Buffer.from(edge.secret);
const provided = Buffer.from(secret);
if (expected.length !== provided.length) {
return false;
}
return plugins.crypto.timingSafeEqual(expected, provided);
}
/**
* Get the list of allowed edges (enabled only) for the Rust hub.
* Includes listenPortsUdp when routes with transport 'udp' or 'all' are present.
*/
public getAllowedEdges(): Array<{ id: string; secret: string; listenPorts: number[]; listenPortsUdp?: number[]; firewallConfig?: IRemoteIngressFirewallConfig; performance?: IRemoteIngressPerformanceConfig }> {
const result: Array<{ id: string; secret: string; listenPorts: number[]; listenPortsUdp?: number[]; firewallConfig?: IRemoteIngressFirewallConfig; performance?: IRemoteIngressPerformanceConfig }> = [];
for (const edge of this.edges.values()) {
if (edge.enabled) {
const listenPortsUdp = this.getEffectiveListenPortsUdp(edge);
const performance = edge.performance && Object.keys(edge.performance).length > 0 ? edge.performance : undefined;
result.push({
id: edge.id,
secret: edge.secret,
listenPorts: this.getEffectiveListenPorts(edge),
...(listenPortsUdp.length > 0 ? { listenPortsUdp } : {}),
...(this.firewallConfig ? { firewallConfig: this.firewallConfig } : {}),
...(performance ? { performance } : {}),
});
}
}
return result;
}
private normalizeHubSettingsUpdate(
updates: TRemoteIngressHubSettingsUpdate,
): TRemoteIngressHubSettingsUpdate {
const next: TRemoteIngressHubSettingsUpdate = {};
if (hasOwn(updates, 'enabled') && updates.enabled !== undefined) {
next.enabled = Boolean(updates.enabled);
}
if (hasOwn(updates, 'tunnelPort') && updates.tunnelPort !== undefined) {
const tunnelPort = Number(updates.tunnelPort);
if (!Number.isInteger(tunnelPort) || tunnelPort < 1 || tunnelPort > 65535) {
throw new Error('tunnelPort must be a valid TCP port');
}
next.tunnelPort = tunnelPort;
}
if (hasOwn(updates, 'hubDomain')) {
const hubDomain = `${updates.hubDomain || ''}`.trim();
next.hubDomain = hubDomain || undefined;
}
if (hasOwn(updates, 'performance')) {
next.performance = updates.performance === null
? undefined
: this.normalizePerformanceConfig(updates.performance || undefined);
}
return next;
}
private normalizePerformanceConfig(
performance?: IRemoteIngressPerformanceConfig,
): IRemoteIngressPerformanceConfig | undefined {
if (!performance) {
return undefined;
}
const next: IRemoteIngressPerformanceConfig = {};
const validProfiles: TRemoteIngressPerformanceProfile[] = ['balanced', 'throughput', 'highConcurrency'];
if (performance.profile !== undefined) {
if (!validProfiles.includes(performance.profile)) {
throw new Error('Invalid RemoteIngress performance profile');
}
next.profile = performance.profile;
}
const assignPositiveInteger = (field: TPerformanceIntegerField) => {
const value = performance[field];
if (value === undefined) {
return;
}
const maxValue = performanceIntegerMaxByField[field];
if (!Number.isSafeInteger(value) || value < 1 || value > maxValue) {
throw new Error(`${field} must be a positive safe integer no greater than ${maxValue}`);
}
(next as Record<string, number>)[field] = value;
};
assignPositiveInteger('maxStreamsPerEdge');
assignPositiveInteger('totalWindowBudgetBytes');
assignPositiveInteger('minStreamWindowBytes');
assignPositiveInteger('maxStreamWindowBytes');
assignPositiveInteger('sustainedStreamWindowBytes');
assignPositiveInteger('quicDatagramReceiveBufferBytes');
assignPositiveInteger('streamFramePayloadBytes');
assignPositiveInteger('firstDataConnectTimeoutMs');
assignPositiveInteger('clientWriteTimeoutMs');
if (
next.minStreamWindowBytes !== undefined
&& next.maxStreamWindowBytes !== undefined
&& next.minStreamWindowBytes > next.maxStreamWindowBytes
) {
throw new Error('minStreamWindowBytes must not exceed maxStreamWindowBytes');
}
if (
next.sustainedStreamWindowBytes !== undefined
&& next.maxStreamWindowBytes !== undefined
&& next.sustainedStreamWindowBytes > next.maxStreamWindowBytes
) {
throw new Error('sustainedStreamWindowBytes must not exceed maxStreamWindowBytes');
}
const configuredServerFirstPorts = performance.serverFirstPorts;
if (configuredServerFirstPorts !== undefined) {
if (!Array.isArray(configuredServerFirstPorts)) {
throw new Error('serverFirstPorts must contain valid port numbers');
}
if (configuredServerFirstPorts.length > maxServerFirstPorts) {
throw new Error(`serverFirstPorts must contain at most ${maxServerFirstPorts} ports`);
}
const serverFirstPorts = [...new Set(configuredServerFirstPorts.map((port) => Number(port)))].sort((a, b) => a - b);
for (const port of serverFirstPorts) {
if (!Number.isInteger(port) || port < 1 || port > 65535) {
throw new Error('serverFirstPorts must contain valid port numbers');
}
if (port === 443) {
throw new Error('Port 443 is client-first TLS and must not be listed as server-first');
}
}
if (serverFirstPorts.length > 0) {
next.serverFirstPorts = serverFirstPorts;
}
}
return Object.keys(next).length > 0 ? next : undefined;
}
private toHubSettings(doc: RemoteIngressHubSettingsDoc): IRemoteIngressHubSettings {
return {
enabled: doc.enabled ?? false,
tunnelPort: doc.tunnelPort ?? defaultTunnelPort,
hubDomain: doc.hubDomain || undefined,
performance: doc.performance,
updatedAt: doc.updatedAt,
updatedBy: doc.updatedBy,
};
}
}