235 lines
6.7 KiB
TypeScript
235 lines
6.7 KiB
TypeScript
/**
|
|
* Chat completions handler.
|
|
*/
|
|
|
|
import * as http from 'node:http';
|
|
import type { IApiError, IChatCompletionRequest } from '../../interfaces/api.ts';
|
|
import { ClusterCoordinator } from '../../cluster/coordinator.ts';
|
|
import { ContainerManager } from '../../containers/container-manager.ts';
|
|
import { logger } from '../../logger.ts';
|
|
import { ModelRegistry } from '../../models/registry.ts';
|
|
import { ModelLoader } from '../../models/loader.ts';
|
|
|
|
export class ChatHandler {
|
|
private containerManager: ContainerManager;
|
|
private modelRegistry: ModelRegistry;
|
|
private modelLoader: ModelLoader;
|
|
private clusterCoordinator: ClusterCoordinator;
|
|
|
|
constructor(
|
|
containerManager: ContainerManager,
|
|
modelRegistry: ModelRegistry,
|
|
modelLoader: ModelLoader,
|
|
clusterCoordinator: ClusterCoordinator,
|
|
) {
|
|
this.containerManager = containerManager;
|
|
this.modelRegistry = modelRegistry;
|
|
this.modelLoader = modelLoader;
|
|
this.clusterCoordinator = clusterCoordinator;
|
|
}
|
|
|
|
public async handleChatCompletion(
|
|
req: http.IncomingMessage,
|
|
res: http.ServerResponse,
|
|
body: IChatCompletionRequest,
|
|
): Promise<void> {
|
|
const canonicalModel = await this.resolveCanonicalModel(body.model);
|
|
const requestBody: IChatCompletionRequest = {
|
|
...body,
|
|
model: canonicalModel,
|
|
};
|
|
|
|
logger.dim(`Chat completion request for model: ${canonicalModel}`);
|
|
|
|
try {
|
|
const container = await this.findOrLoadLocalModel(canonicalModel);
|
|
if (container) {
|
|
if (requestBody.stream) {
|
|
await this.handleStreamingCompletion(res, container, requestBody);
|
|
} else {
|
|
await this.handleNonStreamingCompletion(res, container, requestBody);
|
|
}
|
|
return;
|
|
}
|
|
|
|
const ensured = await this.clusterCoordinator.ensureModelViaControlPlane(canonicalModel);
|
|
if (!ensured) {
|
|
this.sendError(
|
|
res,
|
|
404,
|
|
`Model "${canonicalModel}" not found or could not be deployed`,
|
|
'model_not_found',
|
|
);
|
|
return;
|
|
}
|
|
|
|
if (ensured.location.nodeName === this.clusterCoordinator.getLocalNodeName()) {
|
|
const localContainer = await this.findLocalContainer(canonicalModel);
|
|
if (!localContainer) {
|
|
this.sendError(
|
|
res,
|
|
503,
|
|
`Model "${canonicalModel}" was scheduled locally but is not ready`,
|
|
'server_error',
|
|
);
|
|
return;
|
|
}
|
|
|
|
if (requestBody.stream) {
|
|
await this.handleStreamingCompletion(res, localContainer, requestBody);
|
|
} else {
|
|
await this.handleNonStreamingCompletion(res, localContainer, requestBody);
|
|
}
|
|
return;
|
|
}
|
|
|
|
await this.proxyChatRequest(req, res, ensured.location.endpoint, requestBody);
|
|
} catch (error) {
|
|
const message = error instanceof Error ? error.message : String(error);
|
|
logger.error(`Chat completion error: ${message}`);
|
|
this.sendError(res, 500, `Chat completion failed: ${message}`, 'server_error');
|
|
}
|
|
}
|
|
|
|
private async resolveCanonicalModel(modelName: string): Promise<string> {
|
|
const model = await this.modelRegistry.getModel(modelName);
|
|
return model?.id || modelName;
|
|
}
|
|
|
|
private async findLocalContainer(
|
|
modelName: string,
|
|
): Promise<import('../../containers/base-container.ts').BaseContainer | null> {
|
|
return this.containerManager.findContainerForModel(modelName);
|
|
}
|
|
|
|
private async findOrLoadLocalModel(
|
|
modelName: string,
|
|
): Promise<import('../../containers/base-container.ts').BaseContainer | null> {
|
|
const existing = await this.findLocalContainer(modelName);
|
|
if (existing) {
|
|
return existing;
|
|
}
|
|
|
|
if (!this.clusterCoordinator.shouldDeployLocallyFirst()) {
|
|
return null;
|
|
}
|
|
|
|
logger.info(`Model ${modelName} not loaded, attempting local deploy...`);
|
|
const loadResult = await this.modelLoader.loadModel(modelName);
|
|
if (!loadResult.success) {
|
|
return null;
|
|
}
|
|
|
|
return this.findLocalContainer(loadResult.model);
|
|
}
|
|
|
|
private async handleNonStreamingCompletion(
|
|
res: http.ServerResponse,
|
|
container: import('../../containers/base-container.ts').BaseContainer,
|
|
body: IChatCompletionRequest,
|
|
): Promise<void> {
|
|
const response = await container.chatCompletion(body);
|
|
|
|
res.writeHead(200, { 'Content-Type': 'application/json' });
|
|
res.end(JSON.stringify(response));
|
|
}
|
|
|
|
private async handleStreamingCompletion(
|
|
res: http.ServerResponse,
|
|
container: import('../../containers/base-container.ts').BaseContainer,
|
|
body: IChatCompletionRequest,
|
|
): Promise<void> {
|
|
res.writeHead(200, {
|
|
'Content-Type': 'text/event-stream',
|
|
'Cache-Control': 'no-cache',
|
|
Connection: 'keep-alive',
|
|
'X-Accel-Buffering': 'no',
|
|
});
|
|
|
|
await container.chatCompletionStream(body, (chunk) => {
|
|
res.write(chunk);
|
|
});
|
|
res.end();
|
|
}
|
|
|
|
private async proxyChatRequest(
|
|
req: http.IncomingMessage,
|
|
res: http.ServerResponse,
|
|
targetEndpoint: string,
|
|
body: IChatCompletionRequest,
|
|
): Promise<void> {
|
|
const response = await fetch(`${targetEndpoint}/v1/chat/completions`, {
|
|
method: 'POST',
|
|
headers: this.buildForwardHeaders(req),
|
|
body: JSON.stringify(body),
|
|
});
|
|
|
|
if (body.stream) {
|
|
res.writeHead(response.status, {
|
|
'Content-Type': response.headers.get('content-type') || 'text/event-stream',
|
|
'Cache-Control': 'no-cache',
|
|
Connection: 'keep-alive',
|
|
});
|
|
|
|
const reader = response.body?.getReader();
|
|
if (!reader) {
|
|
res.end();
|
|
return;
|
|
}
|
|
|
|
while (true) {
|
|
const { done, value } = await reader.read();
|
|
if (done) {
|
|
break;
|
|
}
|
|
|
|
res.write(value);
|
|
}
|
|
|
|
res.end();
|
|
return;
|
|
}
|
|
|
|
const text = await response.text();
|
|
res.writeHead(response.status, {
|
|
'Content-Type': response.headers.get('content-type') || 'application/json',
|
|
});
|
|
res.end(text);
|
|
}
|
|
|
|
private buildForwardHeaders(req: http.IncomingMessage): Record<string, string> {
|
|
const headers: Record<string, string> = {
|
|
'Content-Type': 'application/json',
|
|
};
|
|
|
|
if (typeof req.headers.authorization === 'string') {
|
|
headers.Authorization = req.headers.authorization;
|
|
}
|
|
|
|
if (typeof req.headers['x-request-id'] === 'string') {
|
|
headers['X-Request-Id'] = req.headers['x-request-id'];
|
|
}
|
|
|
|
return headers;
|
|
}
|
|
|
|
private sendError(
|
|
res: http.ServerResponse,
|
|
statusCode: number,
|
|
message: string,
|
|
type: string,
|
|
param?: string,
|
|
): void {
|
|
const error: IApiError = {
|
|
error: {
|
|
message,
|
|
type,
|
|
param,
|
|
},
|
|
};
|
|
|
|
res.writeHead(statusCode, { 'Content-Type': 'application/json' });
|
|
res.end(JSON.stringify(error));
|
|
}
|
|
}
|