Files
smartserve/ts/adapters/adapter.node.ts
2025-11-29 15:24:00 +00:00

318 lines
9.2 KiB
TypeScript

import * as plugins from '../plugins.js';
import type { ISmartServeInstance, IConnectionInfo } from '../core/smartserve.interfaces.js';
import { BaseAdapter, type IAdapterCharacteristics, type TRequestHandler } from './adapter.base.js';
/**
* Node.js adapter - converts IncomingMessage/ServerResponse to web standards
*/
export class NodeAdapter extends BaseAdapter {
private server: plugins.http.Server | plugins.https.Server | null = null;
get name(): 'node' {
return 'node';
}
get characteristics(): IAdapterCharacteristics {
return {
zeroCopyStreaming: false, // Requires conversion
http2Support: true,
maxConnections: 16384,
nativeWebSocket: false, // Requires ws library
};
}
isSupported(): boolean {
return typeof process !== 'undefined' && !!process.versions?.node;
}
async start(handler: TRequestHandler): Promise<ISmartServeInstance> {
this.handler = handler;
this.startTime = Date.now();
const requestListener = this.createRequestListener(handler);
if (this.options.tls) {
// Convert Uint8Array to Buffer if needed
const toBuffer = (data: string | Uint8Array | undefined): string | Buffer | undefined => {
if (data === undefined) return undefined;
if (typeof data === 'string') return data;
return Buffer.from(data);
};
const tlsOptions: plugins.https.ServerOptions = {
cert: toBuffer(this.options.tls.cert),
key: toBuffer(this.options.tls.key),
ca: toBuffer(this.options.tls.ca),
passphrase: this.options.tls.passphrase,
};
if (this.options.tls.alpnProtocols) {
tlsOptions.ALPNProtocols = this.options.tls.alpnProtocols;
}
if (this.options.tls.minVersion) {
tlsOptions.minVersion = this.options.tls.minVersion;
}
this.server = plugins.https.createServer(tlsOptions, requestListener);
} else {
this.server = plugins.http.createServer(requestListener);
}
// Configure keep-alive
if (this.options.keepAlive?.enabled) {
this.server.keepAliveTimeout = this.options.keepAlive.timeout ?? 5000;
(this.server as any).maxRequestsPerSocket = this.options.keepAlive.maxRequests ?? 1000;
}
// Set up connection tracking
this.server.on('connection', (socket) => {
this.stats.connectionsTotal++;
this.stats.connectionsActive++;
socket.on('close', () => {
this.stats.connectionsActive--;
});
});
// WebSocket upgrade handling (if ws library available)
if (this.options.websocket) {
await this.setupWebSocket();
}
return new Promise((resolve, reject) => {
this.server!.listen(
this.options.port,
this.options.hostname ?? '0.0.0.0',
() => {
resolve(this.createInstance());
}
);
this.server!.on('error', reject);
});
}
async stop(): Promise<void> {
return new Promise((resolve, reject) => {
if (this.server) {
this.server.close((err) => {
if (err) reject(err);
else resolve();
});
this.server = null;
} else {
resolve();
}
});
}
/**
* Create Node.js request listener that converts to web standards
*/
private createRequestListener(handler: TRequestHandler) {
return async (
req: plugins.http.IncomingMessage,
res: plugins.http.ServerResponse
) => {
this.stats.requestsTotal++;
this.stats.requestsActive++;
try {
// Convert to web standard Request
const request = this.toWebRequest(req);
// Create connection info
const connectionInfo: IConnectionInfo = {
remoteAddr: req.socket.remoteAddress ?? 'unknown',
remotePort: req.socket.remotePort ?? 0,
localAddr: req.socket.localAddress ?? '0.0.0.0',
localPort: req.socket.localPort ?? this.options.port,
encrypted: !!(req.socket as any).encrypted,
tlsVersion: (req.socket as any).getCipher?.()?.version,
};
// Call handler and send response
const response = await handler(request, connectionInfo);
await this.sendResponse(res, response);
} catch (error) {
if (this.options.onError) {
try {
const errorResponse = await this.options.onError(error as Error);
await this.sendResponse(res, errorResponse);
} catch {
res.statusCode = 500;
res.end('Internal Server Error');
}
} else {
res.statusCode = 500;
res.end('Internal Server Error');
}
} finally {
this.stats.requestsActive--;
}
};
}
/**
* Convert Node.js IncomingMessage to Web Standard Request
*/
private toWebRequest(req: plugins.http.IncomingMessage): Request {
const protocol = (req.socket as any).encrypted ? 'https' : 'http';
const host = req.headers.host ?? 'localhost';
const url = new URL(req.url ?? '/', `${protocol}://${host}`);
// Convert headers
const headers = new Headers();
for (const [key, value] of Object.entries(req.headers)) {
if (value !== undefined) {
if (Array.isArray(value)) {
value.forEach(v => headers.append(key, v));
} else {
headers.set(key, value);
}
}
}
// Create body stream for non-GET/HEAD requests
let body: ReadableStream<Uint8Array> | null = null;
const method = req.method?.toUpperCase() ?? 'GET';
if (method !== 'GET' && method !== 'HEAD') {
body = new ReadableStream({
start(controller) {
req.on('data', (chunk: Buffer) => {
controller.enqueue(new Uint8Array(chunk));
});
req.on('end', () => {
controller.close();
});
req.on('error', (err) => {
controller.error(err);
});
},
cancel() {
req.destroy();
},
});
}
// Use proper init object for Request
const init: RequestInit = {
method,
headers,
};
if (body) {
init.body = body;
// @ts-ignore - duplex is needed for streaming body in Node.js
init.duplex = 'half';
}
return new Request(url.toString(), init);
}
/**
* Send Web Standard Response via Node.js ServerResponse
*/
private async sendResponse(
res: plugins.http.ServerResponse,
response: Response
): Promise<void> {
res.statusCode = response.status;
res.statusMessage = response.statusText;
// Set headers
response.headers.forEach((value, key) => {
res.setHeader(key, value);
});
// Stream body
if (response.body) {
const reader = response.body.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
res.write(value);
this.stats.bytesSent += value.byteLength;
}
} finally {
reader.releaseLock();
}
}
res.end();
}
/**
* Set up WebSocket support using ws library
*/
private async setupWebSocket(): Promise<void> {
const hooks = this.options.websocket;
if (!hooks || !this.server) return;
try {
// Dynamic import of ws library
const { WebSocketServer } = await import('ws');
const wss = new WebSocketServer({ noServer: true });
this.server.on('upgrade', (request, socket, head) => {
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request);
});
});
wss.on('connection', (ws: any, request: any) => {
const peer = this.wrapNodeWebSocket(ws, request);
hooks.onOpen?.(peer);
ws.on('message', (data: Buffer | string) => {
const message = {
type: typeof data === 'string' ? 'text' as const : 'binary' as const,
text: typeof data === 'string' ? data : undefined,
data: Buffer.isBuffer(data) ? new Uint8Array(data) : undefined,
size: typeof data === 'string' ? data.length : data.length,
};
hooks.onMessage?.(peer, message);
});
ws.on('close', (code: number, reason: Buffer) => {
hooks.onClose?.(peer, code, reason.toString());
});
ws.on('error', (error: Error) => {
hooks.onError?.(peer, error);
});
ws.on('ping', (data: Buffer) => {
hooks.onPing?.(peer, new Uint8Array(data));
});
ws.on('pong', (data: Buffer) => {
hooks.onPong?.(peer, new Uint8Array(data));
});
});
} catch {
console.warn('WebSocket support requires the "ws" package. Install with: pnpm add ws');
}
}
private wrapNodeWebSocket(ws: any, request: any): any {
return {
id: crypto.randomUUID(),
url: request.url ?? '',
get readyState() { return ws.readyState; },
protocol: ws.protocol ?? '',
extensions: ws.extensions ?? '',
send: (data: string) => ws.send(data),
sendBinary: (data: Uint8Array | ArrayBuffer) => ws.send(data),
close: (code?: number, reason?: string) => ws.close(code, reason),
ping: (data?: Uint8Array) => ws.ping(data),
terminate: () => ws.terminate(),
context: {} as any,
data: new Map(),
};
}
}