/** * Chat completions handler. */ import * as http from 'node:http'; import type { IApiError, IChatCompletionRequest } from '../../interfaces/api.ts'; import { ClusterCoordinator } from '../../cluster/coordinator.ts'; import { ContainerManager } from '../../containers/container-manager.ts'; import { UpstreamTimeoutError } from '../../containers/base-container.ts'; import { API_SERVER } from '../../constants.ts'; import { logger } from '../../logger.ts'; import { ModelRegistry } from '../../models/registry.ts'; import { ModelLoader } from '../../models/loader.ts'; export class ChatHandler { private containerManager: ContainerManager; private modelRegistry: ModelRegistry; private modelLoader: ModelLoader; private clusterCoordinator: ClusterCoordinator; constructor( containerManager: ContainerManager, modelRegistry: ModelRegistry, modelLoader: ModelLoader, clusterCoordinator: ClusterCoordinator, ) { this.containerManager = containerManager; this.modelRegistry = modelRegistry; this.modelLoader = modelLoader; this.clusterCoordinator = clusterCoordinator; } public async handleChatCompletion( req: http.IncomingMessage, res: http.ServerResponse, body: IChatCompletionRequest, ): Promise { const canonicalModel = await this.resolveCanonicalModel(body.model); const requestBody: IChatCompletionRequest = { ...body, model: canonicalModel, }; logger.dim(`Chat completion request for model: ${canonicalModel}`); try { const container = await this.findOrLoadLocalModel(canonicalModel); if (container) { if (requestBody.stream) { await this.handleStreamingCompletion(res, container, requestBody); } else { await this.handleNonStreamingCompletion(res, container, requestBody); } return; } const ensured = await this.clusterCoordinator.ensureModelViaControlPlane(canonicalModel); if (!ensured) { this.sendError( res, 404, `Model "${canonicalModel}" not found or could not be deployed`, 'model_not_found', ); return; } if (ensured.location.nodeName === this.clusterCoordinator.getLocalNodeName()) { const localContainer = await this.findLocalContainer(canonicalModel); if (!localContainer) { this.sendError( res, 503, `Model "${canonicalModel}" was scheduled locally but is not ready`, 'server_error', ); return; } if (requestBody.stream) { await this.handleStreamingCompletion(res, localContainer, requestBody); } else { await this.handleNonStreamingCompletion(res, localContainer, requestBody); } return; } await this.proxyChatRequest(req, res, ensured.location.endpoint, requestBody); } catch (error) { if (error instanceof UpstreamTimeoutError) { this.sendError(res, 504, error.message, 'upstream_timeout'); return; } const message = error instanceof Error ? error.message : String(error); logger.error(`Chat completion error: ${message}`); this.sendError(res, 500, `Chat completion failed: ${message}`, 'server_error'); } } private async resolveCanonicalModel(modelName: string): Promise { const model = await this.modelRegistry.getModel(modelName); return model?.id || modelName; } private async findLocalContainer( modelName: string, ): Promise { return this.containerManager.findContainerForModel(modelName); } private async findOrLoadLocalModel( modelName: string, ): Promise { const existing = await this.findLocalContainer(modelName); if (existing) { return existing; } if (!this.clusterCoordinator.shouldDeployLocallyFirst()) { return null; } logger.info(`Model ${modelName} not loaded, attempting local deploy...`); const loadResult = await this.modelLoader.loadModel(modelName); if (!loadResult.success) { return null; } return this.findLocalContainer(loadResult.model); } private async handleNonStreamingCompletion( res: http.ServerResponse, container: import('../../containers/base-container.ts').BaseContainer, body: IChatCompletionRequest, ): Promise { const response = await container.chatCompletion(body); res.writeHead(200, { 'Content-Type': 'application/json' }); res.end(JSON.stringify(response)); } private async handleStreamingCompletion( res: http.ServerResponse, container: import('../../containers/base-container.ts').BaseContainer, body: IChatCompletionRequest, ): Promise { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', Connection: 'keep-alive', 'X-Accel-Buffering': 'no', }); await container.chatCompletionStream(body, (chunk) => { res.write(chunk); }); res.end(); } private async proxyChatRequest( req: http.IncomingMessage, res: http.ServerResponse, targetEndpoint: string, body: IChatCompletionRequest, ): Promise { const controller = new AbortController(); const timeout = setTimeout(() => controller.abort(), API_SERVER.REQUEST_TIMEOUT_MS); const response = await fetch(`${targetEndpoint}/v1/chat/completions`, { method: 'POST', headers: this.buildForwardHeaders(req), body: JSON.stringify(body), signal: controller.signal, }).catch((error) => { if (error instanceof Error && error.name === 'AbortError') { throw new UpstreamTimeoutError(); } throw error; }).finally(() => clearTimeout(timeout)); if (body.stream) { res.writeHead(response.status, { 'Content-Type': response.headers.get('content-type') || 'text/event-stream', 'Cache-Control': 'no-cache', Connection: 'keep-alive', }); const reader = response.body?.getReader(); if (!reader) { res.end(); return; } while (true) { const { done, value } = await reader.read(); if (done) { break; } res.write(value); } res.end(); return; } const text = await response.text(); res.writeHead(response.status, { 'Content-Type': response.headers.get('content-type') || 'application/json', }); res.end(text); } private buildForwardHeaders(req: http.IncomingMessage): Record { const headers: Record = { 'Content-Type': 'application/json', }; if (typeof req.headers.authorization === 'string') { headers.Authorization = req.headers.authorization; } if (typeof req.headers['x-request-id'] === 'string') { headers['X-Request-Id'] = req.headers['x-request-id']; } return headers; } private sendError( res: http.ServerResponse, statusCode: number, message: string, type: string, param?: string, ): void { const error: IApiError = { error: { message, type, param, }, }; res.writeHead(statusCode, { 'Content-Type': 'application/json' }); res.end(JSON.stringify(error)); } }