Files
siprouter/ts/proxybridge.ts

547 lines
16 KiB
TypeScript

/**
* Proxy engine bridge — manages the Rust proxy-engine subprocess.
*
* The proxy-engine handles ALL SIP protocol mechanics. TypeScript only:
* - Sends configuration
* - Receives high-level events (incoming_call, call_ended, etc.)
* - Sends high-level commands (hangup, make_call, add_leg, webrtc_offer)
*
* No raw SIP ever touches TypeScript.
*/
import path from 'node:path';
import { RustBridge } from '@push.rocks/smartrust';
import type { TProxyEventMap } from './shared/proxy-events.ts';
export type {
ICallAnsweredEvent,
ICallEndedEvent,
ICallRingingEvent,
IDeviceRegisteredEvent,
IIncomingCallEvent,
ILegAddedEvent,
ILegRemovedEvent,
ILegStateChangedEvent,
IOutboundCallEvent,
IOutboundCallStartedEvent,
IProviderRegisteredEvent,
IRecordingDoneEvent,
ISipUnhandledEvent,
IVoicemailErrorEvent,
IVoicemailStartedEvent,
IWebRtcAudioRxEvent,
IWebRtcIceCandidateEvent,
IWebRtcStateEvent,
IWebRtcTrackEvent,
TProxyEventMap,
} from './shared/proxy-events.ts';
// ---------------------------------------------------------------------------
// Command type map for smartrust
// ---------------------------------------------------------------------------
type TProxyCommands = {
configure: {
params: Record<string, unknown>;
result: { bound: string };
};
hangup: {
params: { call_id: string };
result: Record<string, never>;
};
make_call: {
params: { number: string; device_id?: string; provider_id?: string };
result: { call_id: string };
};
add_leg: {
params: { call_id: string; number: string; provider_id?: string };
result: { leg_id: string };
};
remove_leg: {
params: { call_id: string; leg_id: string };
result: Record<string, never>;
};
add_device_leg: {
params: { call_id: string; device_id: string };
result: { leg_id: string };
};
transfer_leg: {
params: { source_call_id: string; leg_id: string; target_call_id: string };
result: Record<string, never>;
};
replace_leg: {
params: { call_id: string; old_leg_id: string; number: string; provider_id?: string };
result: { new_leg_id: string };
};
start_interaction: {
params: {
call_id: string;
leg_id: string;
prompt_wav: string;
expected_digits: string;
timeout_ms: number;
};
result: { result: 'digit' | 'timeout' | 'cancelled'; digit?: string };
};
add_tool_leg: {
params: {
call_id: string;
tool_type: 'recording' | 'transcription';
config?: Record<string, unknown>;
};
result: { tool_leg_id: string };
};
remove_tool_leg: {
params: { call_id: string; tool_leg_id: string };
result: Record<string, never>;
};
set_leg_metadata: {
params: { call_id: string; leg_id: string; key: string; value: unknown };
result: Record<string, never>;
};
generate_tts: {
params: { model: string; voices: string; voice: string; text: string; output: string; cacheable?: boolean };
result: { output: string };
};
// WebRTC signaling — bridged from the browser via the TS control plane.
webrtc_offer: {
params: { session_id: string; sdp: string };
result: { sdp: string };
};
webrtc_ice: {
params: {
session_id: string;
candidate: string;
sdp_mid?: string;
sdp_mline_index?: number;
};
result: Record<string, never>;
};
webrtc_link: {
params: {
session_id: string;
call_id: string;
provider_media_addr: string;
provider_media_port: number;
sip_pt?: number;
};
result: Record<string, never>;
};
webrtc_close: {
params: { session_id: string };
result: Record<string, never>;
};
};
// ---------------------------------------------------------------------------
// Bridge singleton
// ---------------------------------------------------------------------------
let bridge: RustBridge<TProxyCommands> | null = null;
let initialized = false;
let logFn: ((msg: string) => void) | undefined;
type TWebRtcIceCandidate = {
candidate?: string;
sdpMid?: string;
sdpMLineIndex?: number;
} | string;
function errorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}
function buildLocalPaths(): string[] {
const root = process.cwd();
// Map Node's process.arch to tsrust's friendly target name.
// tsrust writes multi-target binaries as <bin>_<os>_<arch>,
// e.g. proxy-engine_linux_amd64 / proxy-engine_linux_arm64.
const archSuffix =
process.arch === 'arm64' ? 'linux_arm64' :
process.arch === 'x64' ? 'linux_amd64' :
null;
const multiTarget = archSuffix
? [path.join(root, 'dist_rust', `proxy-engine_${archSuffix}`)]
: [];
return [
// 1. Multi-target output matching the running host arch (Docker image, CI, multi-target dev).
...multiTarget,
// 2. Single-target (unsuffixed) output — legacy/fallback when tsrust runs without targets.
path.join(root, 'dist_rust', 'proxy-engine'),
// 3. Direct cargo builds for dev iteration.
path.join(root, 'rust', 'target', 'release', 'proxy-engine'),
path.join(root, 'rust', 'target', 'debug', 'proxy-engine'),
];
}
/**
* Initialize the proxy engine — spawn the Rust binary.
* Call configure() separately to push config and start SIP.
*/
export async function initProxyEngine(log?: (msg: string) => void): Promise<boolean> {
if (initialized && bridge) return true;
logFn = log;
try {
bridge = new RustBridge<TProxyCommands>({
binaryName: 'proxy-engine',
localPaths: buildLocalPaths(),
});
const spawned = await bridge.spawn();
if (!spawned) {
log?.('[proxy-engine] failed to spawn binary');
bridge = null;
return false;
}
bridge.on('exit', () => {
logFn?.('[proxy-engine] process exited — will need re-init');
bridge = null;
initialized = false;
});
// Forward stderr for debugging.
bridge.on('stderr', (line: string) => {
logFn?.(`[proxy-engine:stderr] ${line}`);
});
initialized = true;
log?.('[proxy-engine] spawned and ready');
return true;
} catch (error: unknown) {
log?.(`[proxy-engine] init error: ${errorMessage(error)}`);
bridge = null;
return false;
}
}
/**
* Send the full app config to the proxy engine.
* This binds the SIP socket, starts provider registrations, etc.
*/
export async function configureProxyEngine(config: TProxyCommands['configure']['params']): Promise<boolean> {
if (!bridge || !initialized) return false;
try {
const result = await sendProxyCommand('configure', config);
logFn?.(`[proxy-engine] configured, SIP bound on ${result.bound || '?'}`);
return true;
} catch (error: unknown) {
logFn?.(`[proxy-engine] configure error: ${errorMessage(error)}`);
return false;
}
}
/**
* Initiate an outbound call via Rust. Returns the call ID or null on failure.
*/
export async function makeCall(number: string, deviceId?: string, providerId?: string): Promise<string | null> {
if (!bridge || !initialized) return null;
try {
const result = await sendProxyCommand('make_call', {
number,
device_id: deviceId,
provider_id: providerId,
});
return result.call_id || null;
} catch (error: unknown) {
logFn?.(`[proxy-engine] make_call error: ${errorMessage(error)}`);
return null;
}
}
/**
* Send a hangup command.
*/
export async function hangupCall(callId: string): Promise<boolean> {
if (!bridge || !initialized) return false;
try {
await sendProxyCommand('hangup', { call_id: callId });
return true;
} catch {
return false;
}
}
/**
* Send a WebRTC offer to the proxy engine. Returns the SDP answer.
*/
export async function webrtcOffer(sessionId: string, sdp: string): Promise<{ sdp: string } | null> {
if (!bridge || !initialized) return null;
try {
return await sendProxyCommand('webrtc_offer', { session_id: sessionId, sdp });
} catch (error: unknown) {
logFn?.(`[proxy-engine] webrtc_offer error: ${errorMessage(error)}`);
return null;
}
}
/**
* Forward an ICE candidate to the proxy engine.
*/
export async function webrtcIce(sessionId: string, candidate: TWebRtcIceCandidate): Promise<void> {
if (!bridge || !initialized) return;
try {
await sendProxyCommand('webrtc_ice', {
session_id: sessionId,
candidate: typeof candidate === 'string' ? candidate : candidate.candidate || '',
sdp_mid: typeof candidate === 'string' ? undefined : candidate.sdpMid,
sdp_mline_index: typeof candidate === 'string' ? undefined : candidate.sdpMLineIndex,
});
} catch { /* ignore */ }
}
/**
* Link a WebRTC session to a SIP call — enables audio bridging.
* The browser's Opus audio will be transcoded and sent to the provider.
*/
export async function webrtcLink(sessionId: string, callId: string, providerMediaAddr: string, providerMediaPort: number, sipPt: number = 9): Promise<boolean> {
if (!bridge || !initialized) return false;
try {
await sendProxyCommand('webrtc_link', {
session_id: sessionId,
call_id: callId,
provider_media_addr: providerMediaAddr,
provider_media_port: providerMediaPort,
sip_pt: sipPt,
});
return true;
} catch (error: unknown) {
logFn?.(`[proxy-engine] webrtc_link error: ${errorMessage(error)}`);
return false;
}
}
/**
* Add an external SIP leg to an existing call (multiparty).
*/
export async function addLeg(callId: string, number: string, providerId?: string): Promise<string | null> {
if (!bridge || !initialized) return null;
try {
const result = await sendProxyCommand('add_leg', {
call_id: callId,
number,
provider_id: providerId,
});
return result.leg_id || null;
} catch (error: unknown) {
logFn?.(`[proxy-engine] add_leg error: ${errorMessage(error)}`);
return null;
}
}
/**
* Remove a leg from a call.
*/
export async function removeLeg(callId: string, legId: string): Promise<boolean> {
if (!bridge || !initialized) return false;
try {
await sendProxyCommand('remove_leg', { call_id: callId, leg_id: legId });
return true;
} catch (error: unknown) {
logFn?.(`[proxy-engine] remove_leg error: ${errorMessage(error)}`);
return false;
}
}
/**
* Close a WebRTC session.
*/
export async function webrtcClose(sessionId: string): Promise<void> {
if (!bridge || !initialized) return;
try {
await sendProxyCommand('webrtc_close', { session_id: sessionId });
} catch { /* ignore */ }
}
// ---------------------------------------------------------------------------
// Device leg & interaction commands
// ---------------------------------------------------------------------------
/**
* Add a local SIP device to an existing call (mid-call INVITE to desk phone).
*/
export async function addDeviceLeg(callId: string, deviceId: string): Promise<string | null> {
if (!bridge || !initialized) return null;
try {
const result = await sendProxyCommand('add_device_leg', {
call_id: callId,
device_id: deviceId,
});
return result.leg_id || null;
} catch (error: unknown) {
logFn?.(`[proxy-engine] add_device_leg error: ${errorMessage(error)}`);
return null;
}
}
/**
* Transfer a leg from one call to another (leg stays connected, switches mixer).
*/
export async function transferLeg(
sourceCallId: string,
legId: string,
targetCallId: string,
): Promise<boolean> {
if (!bridge || !initialized) return false;
try {
await sendProxyCommand('transfer_leg', {
source_call_id: sourceCallId,
leg_id: legId,
target_call_id: targetCallId,
});
return true;
} catch (error: unknown) {
logFn?.(`[proxy-engine] transfer_leg error: ${errorMessage(error)}`);
return false;
}
}
/**
* Replace a leg: terminate the old leg and dial a new number into the same call.
*/
export async function replaceLeg(
callId: string,
oldLegId: string,
number: string,
providerId?: string,
): Promise<string | null> {
if (!bridge || !initialized) return null;
try {
const result = await sendProxyCommand('replace_leg', {
call_id: callId,
old_leg_id: oldLegId,
number,
provider_id: providerId,
});
return result.new_leg_id || null;
} catch (error: unknown) {
logFn?.(`[proxy-engine] replace_leg error: ${errorMessage(error)}`);
return null;
}
}
/**
* Start an interaction on a specific leg — isolate it, play a prompt, collect DTMF.
* Blocks until the interaction completes (digit pressed, timeout, or cancelled).
*/
export async function startInteraction(
callId: string,
legId: string,
promptWav: string,
expectedDigits: string,
timeoutMs: number,
): Promise<{ result: 'digit' | 'timeout' | 'cancelled'; digit?: string } | null> {
if (!bridge || !initialized) return null;
try {
return await sendProxyCommand('start_interaction', {
call_id: callId,
leg_id: legId,
prompt_wav: promptWav,
expected_digits: expectedDigits,
timeout_ms: timeoutMs,
});
} catch (error: unknown) {
logFn?.(`[proxy-engine] start_interaction error: ${errorMessage(error)}`);
return null;
}
}
/**
* Add a tool leg (recording or transcription) to a call.
* Tool legs receive per-source unmerged audio from all participants.
*/
export async function addToolLeg(
callId: string,
toolType: 'recording' | 'transcription',
config?: Record<string, unknown>,
): Promise<string | null> {
if (!bridge || !initialized) return null;
try {
const result = await sendProxyCommand('add_tool_leg', {
call_id: callId,
tool_type: toolType,
config,
});
return result.tool_leg_id || null;
} catch (error: unknown) {
logFn?.(`[proxy-engine] add_tool_leg error: ${errorMessage(error)}`);
return null;
}
}
/**
* Remove a tool leg from a call. Triggers finalization (WAV files, metadata).
*/
export async function removeToolLeg(callId: string, toolLegId: string): Promise<boolean> {
if (!bridge || !initialized) return false;
try {
await sendProxyCommand('remove_tool_leg', {
call_id: callId,
tool_leg_id: toolLegId,
});
return true;
} catch (error: unknown) {
logFn?.(`[proxy-engine] remove_tool_leg error: ${errorMessage(error)}`);
return false;
}
}
/**
* Set a metadata key-value pair on a leg.
*/
export async function setLegMetadata(
callId: string,
legId: string,
key: string,
value: unknown,
): Promise<boolean> {
if (!bridge || !initialized) return false;
try {
await sendProxyCommand('set_leg_metadata', {
call_id: callId,
leg_id: legId,
key,
value,
});
return true;
} catch (error: unknown) {
logFn?.(`[proxy-engine] set_leg_metadata error: ${errorMessage(error)}`);
return false;
}
}
/**
* Subscribe to an event from the proxy engine.
* Event names: incoming_call, outbound_device_call, call_ringing,
* call_answered, call_ended, provider_registered, device_registered,
* dtmf_digit, recording_done, tool_recording_done, tool_transcription_done,
* leg_added, leg_removed, sip_unhandled
*/
export function onProxyEvent<K extends keyof TProxyEventMap>(event: K, handler: (data: TProxyEventMap[K]) => void): void {
if (!bridge) throw new Error('proxy engine not initialized');
bridge.on(`management:${event}`, handler);
}
/** Check if the proxy engine is ready. */
export function isProxyReady(): boolean {
return initialized && bridge !== null;
}
/** Send an arbitrary command to the proxy engine bridge. */
export async function sendProxyCommand<K extends keyof TProxyCommands>(
method: K,
params: TProxyCommands[K]['params'],
): Promise<TProxyCommands[K]['result']> {
if (!bridge || !initialized) throw new Error('proxy engine not initialized');
return bridge.sendCommand(method, params) as Promise<TProxyCommands[K]['result']>;
}
/** Shut down the proxy engine. */
export function shutdownProxyEngine(): void {
if (bridge) {
try { bridge.kill(); } catch { /* ignore */ }
bridge = null;
initialized = false;
}
}