initial
This commit is contained in:
317
ts/adapters/adapter.node.ts
Normal file
317
ts/adapters/adapter.node.ts
Normal file
@@ -0,0 +1,317 @@
|
||||
import * as plugins from '../plugins.js';
|
||||
import type { ISmartServeInstance, IConnectionInfo } from '../core/smartserve.interfaces.js';
|
||||
import { BaseAdapter, type IAdapterCharacteristics, type TRequestHandler } from './adapter.base.js';
|
||||
|
||||
/**
|
||||
* Node.js adapter - converts IncomingMessage/ServerResponse to web standards
|
||||
*/
|
||||
export class NodeAdapter extends BaseAdapter {
|
||||
private server: plugins.http.Server | plugins.https.Server | null = null;
|
||||
|
||||
get name(): 'node' {
|
||||
return 'node';
|
||||
}
|
||||
|
||||
get characteristics(): IAdapterCharacteristics {
|
||||
return {
|
||||
zeroCopyStreaming: false, // Requires conversion
|
||||
http2Support: true,
|
||||
maxConnections: 16384,
|
||||
nativeWebSocket: false, // Requires ws library
|
||||
};
|
||||
}
|
||||
|
||||
isSupported(): boolean {
|
||||
return typeof process !== 'undefined' && !!process.versions?.node;
|
||||
}
|
||||
|
||||
async start(handler: TRequestHandler): Promise<ISmartServeInstance> {
|
||||
this.handler = handler;
|
||||
this.startTime = Date.now();
|
||||
|
||||
const requestListener = this.createRequestListener(handler);
|
||||
|
||||
if (this.options.tls) {
|
||||
// Convert Uint8Array to Buffer if needed
|
||||
const toBuffer = (data: string | Uint8Array | undefined): string | Buffer | undefined => {
|
||||
if (data === undefined) return undefined;
|
||||
if (typeof data === 'string') return data;
|
||||
return Buffer.from(data);
|
||||
};
|
||||
|
||||
const tlsOptions: plugins.https.ServerOptions = {
|
||||
cert: toBuffer(this.options.tls.cert),
|
||||
key: toBuffer(this.options.tls.key),
|
||||
ca: toBuffer(this.options.tls.ca),
|
||||
passphrase: this.options.tls.passphrase,
|
||||
};
|
||||
|
||||
if (this.options.tls.alpnProtocols) {
|
||||
tlsOptions.ALPNProtocols = this.options.tls.alpnProtocols;
|
||||
}
|
||||
|
||||
if (this.options.tls.minVersion) {
|
||||
tlsOptions.minVersion = this.options.tls.minVersion;
|
||||
}
|
||||
|
||||
this.server = plugins.https.createServer(tlsOptions, requestListener);
|
||||
} else {
|
||||
this.server = plugins.http.createServer(requestListener);
|
||||
}
|
||||
|
||||
// Configure keep-alive
|
||||
if (this.options.keepAlive?.enabled) {
|
||||
this.server.keepAliveTimeout = this.options.keepAlive.timeout ?? 5000;
|
||||
(this.server as any).maxRequestsPerSocket = this.options.keepAlive.maxRequests ?? 1000;
|
||||
}
|
||||
|
||||
// Set up connection tracking
|
||||
this.server.on('connection', (socket) => {
|
||||
this.stats.connectionsTotal++;
|
||||
this.stats.connectionsActive++;
|
||||
socket.on('close', () => {
|
||||
this.stats.connectionsActive--;
|
||||
});
|
||||
});
|
||||
|
||||
// WebSocket upgrade handling (if ws library available)
|
||||
if (this.options.websocket) {
|
||||
await this.setupWebSocket();
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
this.server!.listen(
|
||||
this.options.port,
|
||||
this.options.hostname ?? '0.0.0.0',
|
||||
() => {
|
||||
resolve(this.createInstance());
|
||||
}
|
||||
);
|
||||
this.server!.on('error', reject);
|
||||
});
|
||||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (this.server) {
|
||||
this.server.close((err) => {
|
||||
if (err) reject(err);
|
||||
else resolve();
|
||||
});
|
||||
this.server = null;
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Create Node.js request listener that converts to web standards
|
||||
*/
|
||||
private createRequestListener(handler: TRequestHandler) {
|
||||
return async (
|
||||
req: plugins.http.IncomingMessage,
|
||||
res: plugins.http.ServerResponse
|
||||
) => {
|
||||
this.stats.requestsTotal++;
|
||||
this.stats.requestsActive++;
|
||||
|
||||
try {
|
||||
// Convert to web standard Request
|
||||
const request = this.toWebRequest(req);
|
||||
|
||||
// Create connection info
|
||||
const connectionInfo: IConnectionInfo = {
|
||||
remoteAddr: req.socket.remoteAddress ?? 'unknown',
|
||||
remotePort: req.socket.remotePort ?? 0,
|
||||
localAddr: req.socket.localAddress ?? '0.0.0.0',
|
||||
localPort: req.socket.localPort ?? this.options.port,
|
||||
encrypted: !!(req.socket as any).encrypted,
|
||||
tlsVersion: (req.socket as any).getCipher?.()?.version,
|
||||
};
|
||||
|
||||
// Call handler and send response
|
||||
const response = await handler(request, connectionInfo);
|
||||
await this.sendResponse(res, response);
|
||||
} catch (error) {
|
||||
if (this.options.onError) {
|
||||
try {
|
||||
const errorResponse = await this.options.onError(error as Error);
|
||||
await this.sendResponse(res, errorResponse);
|
||||
} catch {
|
||||
res.statusCode = 500;
|
||||
res.end('Internal Server Error');
|
||||
}
|
||||
} else {
|
||||
res.statusCode = 500;
|
||||
res.end('Internal Server Error');
|
||||
}
|
||||
} finally {
|
||||
this.stats.requestsActive--;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert Node.js IncomingMessage to Web Standard Request
|
||||
*/
|
||||
private toWebRequest(req: plugins.http.IncomingMessage): Request {
|
||||
const protocol = (req.socket as any).encrypted ? 'https' : 'http';
|
||||
const host = req.headers.host ?? 'localhost';
|
||||
const url = new URL(req.url ?? '/', `${protocol}://${host}`);
|
||||
|
||||
// Convert headers
|
||||
const headers = new Headers();
|
||||
for (const [key, value] of Object.entries(req.headers)) {
|
||||
if (value !== undefined) {
|
||||
if (Array.isArray(value)) {
|
||||
value.forEach(v => headers.append(key, v));
|
||||
} else {
|
||||
headers.set(key, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create body stream for non-GET/HEAD requests
|
||||
let body: ReadableStream<Uint8Array> | null = null;
|
||||
const method = req.method?.toUpperCase() ?? 'GET';
|
||||
|
||||
if (method !== 'GET' && method !== 'HEAD') {
|
||||
body = new ReadableStream({
|
||||
start(controller) {
|
||||
req.on('data', (chunk: Buffer) => {
|
||||
controller.enqueue(new Uint8Array(chunk));
|
||||
});
|
||||
req.on('end', () => {
|
||||
controller.close();
|
||||
});
|
||||
req.on('error', (err) => {
|
||||
controller.error(err);
|
||||
});
|
||||
},
|
||||
cancel() {
|
||||
req.destroy();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
// Use proper init object for Request
|
||||
const init: RequestInit = {
|
||||
method,
|
||||
headers,
|
||||
};
|
||||
|
||||
if (body) {
|
||||
init.body = body;
|
||||
// @ts-ignore - duplex is needed for streaming body in Node.js
|
||||
init.duplex = 'half';
|
||||
}
|
||||
|
||||
return new Request(url.toString(), init);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send Web Standard Response via Node.js ServerResponse
|
||||
*/
|
||||
private async sendResponse(
|
||||
res: plugins.http.ServerResponse,
|
||||
response: Response
|
||||
): Promise<void> {
|
||||
res.statusCode = response.status;
|
||||
res.statusMessage = response.statusText;
|
||||
|
||||
// Set headers
|
||||
response.headers.forEach((value, key) => {
|
||||
res.setHeader(key, value);
|
||||
});
|
||||
|
||||
// Stream body
|
||||
if (response.body) {
|
||||
const reader = response.body.getReader();
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
res.write(value);
|
||||
this.stats.bytesSent += value.byteLength;
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
}
|
||||
|
||||
res.end();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up WebSocket support using ws library
|
||||
*/
|
||||
private async setupWebSocket(): Promise<void> {
|
||||
const hooks = this.options.websocket;
|
||||
if (!hooks || !this.server) return;
|
||||
|
||||
try {
|
||||
// Dynamic import of ws library
|
||||
const { WebSocketServer } = await import('ws');
|
||||
|
||||
const wss = new WebSocketServer({ noServer: true });
|
||||
|
||||
this.server.on('upgrade', (request, socket, head) => {
|
||||
wss.handleUpgrade(request, socket, head, (ws) => {
|
||||
wss.emit('connection', ws, request);
|
||||
});
|
||||
});
|
||||
|
||||
wss.on('connection', (ws: any, request: any) => {
|
||||
const peer = this.wrapNodeWebSocket(ws, request);
|
||||
|
||||
hooks.onOpen?.(peer);
|
||||
|
||||
ws.on('message', (data: Buffer | string) => {
|
||||
const message = {
|
||||
type: typeof data === 'string' ? 'text' as const : 'binary' as const,
|
||||
text: typeof data === 'string' ? data : undefined,
|
||||
data: Buffer.isBuffer(data) ? new Uint8Array(data) : undefined,
|
||||
size: typeof data === 'string' ? data.length : data.length,
|
||||
};
|
||||
hooks.onMessage?.(peer, message);
|
||||
});
|
||||
|
||||
ws.on('close', (code: number, reason: Buffer) => {
|
||||
hooks.onClose?.(peer, code, reason.toString());
|
||||
});
|
||||
|
||||
ws.on('error', (error: Error) => {
|
||||
hooks.onError?.(peer, error);
|
||||
});
|
||||
|
||||
ws.on('ping', (data: Buffer) => {
|
||||
hooks.onPing?.(peer, new Uint8Array(data));
|
||||
});
|
||||
|
||||
ws.on('pong', (data: Buffer) => {
|
||||
hooks.onPong?.(peer, new Uint8Array(data));
|
||||
});
|
||||
});
|
||||
} catch {
|
||||
console.warn('WebSocket support requires the "ws" package. Install with: pnpm add ws');
|
||||
}
|
||||
}
|
||||
|
||||
private wrapNodeWebSocket(ws: any, request: any): any {
|
||||
return {
|
||||
id: crypto.randomUUID(),
|
||||
url: request.url ?? '',
|
||||
get readyState() { return ws.readyState; },
|
||||
protocol: ws.protocol ?? '',
|
||||
extensions: ws.extensions ?? '',
|
||||
send: (data: string) => ws.send(data),
|
||||
sendBinary: (data: Uint8Array | ArrayBuffer) => ws.send(data),
|
||||
close: (code?: number, reason?: string) => ws.close(code, reason),
|
||||
ping: (data?: Uint8Array) => ws.ping(data),
|
||||
terminate: () => ws.terminate(),
|
||||
context: {} as any,
|
||||
data: new Map(),
|
||||
};
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user