/** * WebRtcLeg — a WebRTC connection from the Call hub to a browser client. * * Wraps a werift RTCPeerConnection and handles: * - WebRTC offer/answer/ICE negotiation * - Opus <-> G.722/PCMU/PCMA transcoding via Rust IPC * - RTP header rebuilding with correct PT, timestamp, SSRC */ import dgram from 'node:dgram'; import { Buffer } from 'node:buffer'; import { WebSocket } from 'ws'; import type { IEndpoint } from '../sip/index.ts'; import type { TLegState, ILegStatus } from './types.ts'; import type { ILeg } from './leg.ts'; import { rtpClockIncrement, buildRtpHeader, codecDisplayName } from './leg.ts'; import { createTranscoder, OPUS_PT } from '../codec.ts'; import type { IRtpTranscoder } from '../codec.ts'; import { createSession, destroySession } from '../opusbridge.ts'; import type { SipDialog } from '../sip/index.ts'; import type { SipMessage } from '../sip/index.ts'; // --------------------------------------------------------------------------- // WebRtcLeg config // --------------------------------------------------------------------------- export interface IWebRtcLegConfig { /** The browser's WebSocket connection. */ ws: WebSocket; /** The browser's session ID. */ sessionId: string; /** RTP port and socket (pre-allocated from the pool). */ rtpPort: number; rtpSock: dgram.Socket; /** Logging function. */ log: (msg: string) => void; } // --------------------------------------------------------------------------- // WebRtcLeg // --------------------------------------------------------------------------- export class WebRtcLeg implements ILeg { readonly id: string; readonly type = 'webrtc' as const; state: TLegState = 'inviting'; readonly sessionId: string; /** The werift RTCPeerConnection instance. */ private pc: any = null; /** RTP socket for bridging to SIP. */ readonly rtpSock: dgram.Socket; readonly rtpPort: number; /** Remote media endpoint (the other side of the bridge, set by Call). */ remoteMedia: IEndpoint | null = null; /** Negotiated WebRTC codec payload type. */ codec: number | null = null; /** Transcoders for WebRTC <-> SIP conversion. */ transcoder: IRtpTranscoder | null = null; // used by Call for fan-out private toSipTranscoder: IRtpTranscoder | null = null; private fromSipTranscoder: IRtpTranscoder | null = null; /** RTP counters for outgoing (to SIP) direction. */ private toSipSeq = 0; private toSipTs = 0; private toSipSsrc = (Math.random() * 0xffffffff) >>> 0; /** RTP counters for incoming (from SIP) direction. * Initialized to random values so announcements and provider audio share * a continuous sequence — prevents the browser jitter buffer from discarding * packets after the announcement→provider transition. */ readonly fromSipCounters = { seq: Math.floor(Math.random() * 0xffff), ts: Math.floor(Math.random() * 0xffffffff), }; fromSipSsrc = (Math.random() * 0xffffffff) >>> 0; /** Packet counters. */ pktSent = 0; pktReceived = 0; /** Callback set by Call. */ onRtpReceived: ((data: Buffer) => void) | null = null; /** Callback to send transcoded RTP to the provider via the SipLeg's socket. * Set by CallManager when the bridge is established. If null, falls back to own rtpSock. */ onSendToProvider: ((data: Buffer, dest: IEndpoint) => void) | null = null; /** Lifecycle callbacks. */ onConnected: ((leg: WebRtcLeg) => void) | null = null; onTerminated: ((leg: WebRtcLeg) => void) | null = null; /** Cancel handle for an in-progress announcement. */ announcementCancel: (() => void) | null = null; private ws: WebSocket; private config: IWebRtcLegConfig; private pendingIceCandidates: any[] = []; // SipDialog is not applicable for WebRTC legs. readonly dialog: SipDialog | null = null; readonly sipCallId: string; constructor(id: string, config: IWebRtcLegConfig) { this.id = id; this.sessionId = config.sessionId; this.ws = config.ws; this.rtpSock = config.rtpSock; this.rtpPort = config.rtpPort; this.config = config; this.sipCallId = `webrtc-${id}`; // Log RTP arriving on this socket (symmetric RTP from provider). // Audio forwarding is handled by the Call hub: SipLeg → forwardRtp → WebRtcLeg.sendRtp. // We do NOT transcode here to avoid double-processing (the SipLeg also receives these packets). let sipRxCount = 0; this.rtpSock.on('message', (data: Buffer) => { sipRxCount++; if (sipRxCount === 1 || sipRxCount === 50 || sipRxCount % 500 === 0) { this.config.log(`[webrtc-leg:${this.id}] SIP->browser rtp #${sipRxCount} (${data.length}b) [symmetric, ignored]`); } }); } // ------------------------------------------------------------------------- // WebRTC offer/answer // ------------------------------------------------------------------------- /** * Handle a WebRTC offer from the browser. Creates the PeerConnection, * sets remote offer, creates answer, and sends it back. */ async handleOffer(offerSdp: string): Promise { this.config.log(`[webrtc-leg:${this.id}] received offer`); try { const werift = await import('werift'); this.pc = new werift.RTCPeerConnection({ iceServers: [] }); // Add sendrecv transceiver before setRemoteDescription. this.pc.addTransceiver('audio', { direction: 'sendrecv' }); // Handle incoming audio from browser. this.pc.ontrack = (event: any) => { const track = event.track; this.config.log(`[webrtc-leg:${this.id}] got track: ${track.kind}`); let rxCount = 0; track.onReceiveRtp.subscribe((rtp: any) => { if (!this.remoteMedia) return; rxCount++; if (rxCount === 1 || rxCount === 50 || rxCount % 500 === 0) { this.config.log(`[webrtc-leg:${this.id}] browser->SIP rtp #${rxCount}`); } this.forwardToSip(rtp, rxCount); }); }; // ICE candidate handling. this.pc.onicecandidate = (candidate: any) => { if (candidate) { const json = candidate.toJSON?.() || candidate; this.wsSend({ type: 'webrtc-ice', sessionId: this.sessionId, candidate: json }); } }; this.pc.onconnectionstatechange = () => { this.config.log(`[webrtc-leg:${this.id}] connection state: ${this.pc.connectionState}`); if (this.pc.connectionState === 'connected') { this.state = 'connected'; this.onConnected?.(this); } else if (this.pc.connectionState === 'failed' || this.pc.connectionState === 'closed') { this.state = 'terminated'; this.onTerminated?.(this); } }; if (this.pc.oniceconnectionstatechange !== undefined) { this.pc.oniceconnectionstatechange = () => { this.config.log(`[webrtc-leg:${this.id}] ICE state: ${this.pc.iceConnectionState}`); }; } // Set remote offer and create answer. await this.pc.setRemoteDescription({ type: 'offer', sdp: offerSdp }); const answer = await this.pc.createAnswer(); await this.pc.setLocalDescription(answer); const sdp: string = this.pc.localDescription!.sdp; // Detect negotiated codec. const mAudio = sdp.match(/m=audio\s+\d+\s+\S+\s+(\d+)/); if (mAudio) { this.codec = parseInt(mAudio[1], 10); this.config.log(`[webrtc-leg:${this.id}] negotiated audio PT=${this.codec}`); } // Extract sender SSRC from SDP. const ssrcMatch = sdp.match(/a=ssrc:(\d+)\s/); if (ssrcMatch) { this.fromSipSsrc = parseInt(ssrcMatch[1], 10); } // Also try from sender object. const senders = this.pc.getSenders(); if (senders[0]) { const senderSsrc = (senders[0] as any).ssrc ?? (senders[0] as any)._ssrc; if (senderSsrc) this.fromSipSsrc = senderSsrc; } // Send answer to browser. this.wsSend({ type: 'webrtc-answer', sessionId: this.sessionId, sdp }); this.config.log(`[webrtc-leg:${this.id}] sent answer, rtp port=${this.rtpPort}`); // Process buffered ICE candidates. for (const c of this.pendingIceCandidates) { try { await this.pc.addIceCandidate(c); } catch { /* ignore */ } } this.pendingIceCandidates = []; } catch (err: any) { this.config.log(`[webrtc-leg:${this.id}] offer error: ${err.message}`); this.wsSend({ type: 'webrtc-error', sessionId: this.sessionId, error: err.message }); this.state = 'terminated'; this.onTerminated?.(this); } } /** Add an ICE candidate from the browser. */ async addIceCandidate(candidate: any): Promise { if (!this.pc) { this.pendingIceCandidates.push(candidate); return; } try { if (candidate) await this.pc.addIceCandidate(candidate); } catch (err: any) { this.config.log(`[webrtc-leg:${this.id}] ICE error: ${err.message}`); } } // ------------------------------------------------------------------------- // Transcoding setup // ------------------------------------------------------------------------- /** Codec session ID for isolated Rust codec state (unique per leg). */ private codecSessionId = `webrtc-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; /** * Set up transcoders for bridging between WebRTC and SIP codecs. * Called by the Call when the remote media endpoint is known. * Creates an isolated Rust codec session so concurrent calls don't * corrupt each other's stateful codec state (Opus/G.722 ADPCM). */ async setupTranscoders(sipPT: number): Promise { const webrtcPT = this.codec ?? OPUS_PT; // Create isolated codec session for this leg. await createSession(this.codecSessionId); this.toSipTranscoder = createTranscoder(webrtcPT, sipPT, this.codecSessionId, 'to_sip'); this.fromSipTranscoder = createTranscoder(sipPT, webrtcPT, this.codecSessionId, 'to_browser'); const mode = this.toSipTranscoder ? `transcoding PT ${webrtcPT}<->${sipPT}` : `pass-through PT ${webrtcPT}`; this.config.log(`[webrtc-leg:${this.id}] ${mode} (session: ${this.codecSessionId})`); } // ------------------------------------------------------------------------- // RTP forwarding // ------------------------------------------------------------------------- /** Forward RTP from SIP side to browser via WebRTC. */ private forwardToBrowser(data: Buffer, count: number): void { const sender = this.pc?.getSenders()[0]; if (!sender) return; if (this.fromSipTranscoder && data.length > 12) { const payload = Buffer.from(data.subarray(12)); // Stop announcement if still playing — provider audio takes over. if (this.announcementCancel) { this.announcementCancel(); this.announcementCancel = null; } // Capture seq/ts BEFORE async transcode to preserve ordering. const toPT = this.fromSipTranscoder.toPT; const seq = this.fromSipCounters.seq++; const ts = this.fromSipCounters.ts; this.fromSipCounters.ts += rtpClockIncrement(toPT); const result = this.fromSipTranscoder.payload(payload); const sendTranscoded = (transcoded: Buffer) => { if (transcoded.length === 0) return; // transcoding failed try { const hdr = buildRtpHeader(toPT, seq, ts, this.fromSipSsrc, false); const out = Buffer.concat([hdr, transcoded]); const r = sender.sendRtp(out); if (r instanceof Promise) r.catch(() => {}); } catch { /* ignore */ } }; if (result instanceof Promise) result.then(sendTranscoded).catch(() => {}); else sendTranscoded(result); } else if (!this.fromSipTranscoder) { // No transcoder — either same codec or not set up yet. // Only forward if we don't expect transcoding. if (this.codec === null) { try { sender.sendRtp(data); } catch { /* ignore */ } } } } /** Forward RTP from browser to SIP side. */ private forwardToSip(rtp: any, count: number): void { if (!this.remoteMedia) return; if (this.toSipTranscoder) { const payload: Buffer = rtp.payload; if (!payload || payload.length === 0) return; // Capture seq/ts BEFORE async transcode to preserve ordering. const toPT = this.toSipTranscoder.toPT; const seq = this.toSipSeq++; const ts = this.toSipTs; this.toSipTs += rtpClockIncrement(toPT); const result = this.toSipTranscoder.payload(payload); const sendTranscoded = (transcoded: Buffer) => { if (transcoded.length === 0) return; // transcoding failed const hdr = buildRtpHeader(toPT, seq, ts, this.toSipSsrc, false); const out = Buffer.concat([hdr, transcoded]); if (this.onSendToProvider) { this.onSendToProvider(out, this.remoteMedia!); } else { this.rtpSock.send(out, this.remoteMedia!.port, this.remoteMedia!.address); } this.pktSent++; }; if (result instanceof Promise) result.then(sendTranscoded).catch(() => {}); else sendTranscoded(result); } else if (this.codec === null) { // Same codec (no transcoding needed) — pass through. const raw = rtp.serialize(); if (this.onSendToProvider) { this.onSendToProvider(raw, this.remoteMedia); } else { this.rtpSock.send(raw, this.remoteMedia.port, this.remoteMedia.address); } this.pktSent++; } // If codec is set but transcoder is null, drop the packet — transcoder not ready yet. // This prevents raw Opus from being sent to a G.722 endpoint. } /** * Send RTP to the browser via WebRTC (used by Call hub for fan-out). * This transcodes and sends through the PeerConnection, NOT to a UDP address. */ sendRtp(data: Buffer): void { this.forwardToBrowser(data, this.pktSent); this.pktSent++; } /** * Send a pre-encoded RTP packet directly to the browser via PeerConnection. * Used for announcements — the packet must already be in the correct codec (Opus). */ sendDirectToBrowser(pkt: Buffer): void { const sender = this.pc?.getSenders()[0]; if (!sender) return; try { const r = sender.sendRtp(pkt); if (r instanceof Promise) r.catch(() => {}); } catch { /* ignore */ } } /** No-op: WebRTC legs don't process SIP messages. */ handleSipMessage(_msg: SipMessage, _rinfo: IEndpoint): void { // WebRTC legs don't handle SIP messages. } // ------------------------------------------------------------------------- // Lifecycle // ------------------------------------------------------------------------- teardown(): void { this.state = 'terminated'; try { this.pc?.close(); } catch { /* ignore */ } this.pc = null; // Destroy the isolated Rust codec session for this leg. destroySession(this.codecSessionId).catch(() => {}); // Note: RTP socket is NOT closed here — the RtpPortPool manages that. } getStatus(): ILegStatus { return { id: this.id, type: this.type, state: this.state, remoteMedia: this.remoteMedia, rtpPort: this.rtpPort, pktSent: this.pktSent, pktReceived: this.pktReceived, codec: codecDisplayName(this.codec), transcoding: this.toSipTranscoder !== null || this.fromSipTranscoder !== null, }; } // ------------------------------------------------------------------------- // Helpers // ------------------------------------------------------------------------- private wsSend(data: unknown): void { try { if (this.ws.readyState === WebSocket.OPEN) { this.ws.send(JSON.stringify(data)); } } catch { /* ignore */ } } }