feat(websocket): Add TypedRouter WebSocket integration, connection registry, peer tagging and broadcast APIs

This commit is contained in:
2025-12-02 12:13:46 +00:00
parent fddba44a5f
commit 643a6cec55
12 changed files with 387 additions and 40 deletions

View File

@@ -1,4 +1,4 @@
import type { ISmartServeInstance, IConnectionInfo } from '../core/smartserve.interfaces.js';
import type { ISmartServeInstance, IConnectionInfo, IWebSocketPeer, IWebSocketConnectionCallbacks } from '../core/smartserve.interfaces.js';
import { BaseAdapter, type IAdapterCharacteristics, type TRequestHandler } from './adapter.base.js';
// Bun types (for type checking without requiring Bun runtime)
@@ -46,9 +46,17 @@ export class BunAdapter extends BaseAdapter {
this.stats.requestsActive++;
try {
// Handle WebSocket upgrade
// Handle WebSocket upgrade - store data for persistence across events
if (this.options.websocket && request.headers.get('upgrade') === 'websocket') {
const upgraded = server.upgrade(request);
const peerId = crypto.randomUUID();
const upgraded = server.upgrade(request, {
data: {
id: peerId,
url: new URL(request.url).pathname,
customData: new Map(),
tags: new Set<string>(),
},
});
if (upgraded) {
return undefined; // Bun handles the upgrade
}
@@ -108,33 +116,65 @@ export class BunAdapter extends BaseAdapter {
const hooks = this.options.websocket;
if (!hooks) return undefined;
// Get internal callbacks if typedRouter mode
const callbacks = (hooks as any)._connectionCallbacks as IWebSocketConnectionCallbacks | undefined;
const typedRouter = hooks.typedRouter;
return {
open: (ws: any) => {
const peer = this.wrapBunWebSocket(ws);
// Register connection if typedRouter mode
if (callbacks) {
callbacks.onRegister(peer);
}
hooks.onOpen?.(peer);
},
message: (ws: any, message: string | ArrayBuffer) => {
message: async (ws: any, message: string | ArrayBuffer) => {
const peer = this.wrapBunWebSocket(ws);
const msg = {
type: typeof message === 'string' ? 'text' as const : 'binary' as const,
text: typeof message === 'string' ? message : undefined,
data: message instanceof ArrayBuffer ? new Uint8Array(message) : undefined,
size: typeof message === 'string' ? message.length : (message as ArrayBuffer).byteLength,
};
hooks.onMessage?.(peer, msg);
// If typedRouter is configured, route through it
if (typedRouter && typeof message === 'string') {
try {
const requestObj = JSON.parse(message);
const response = await typedRouter.routeAndAddResponse(requestObj);
if (response) {
peer.send(JSON.stringify(response));
}
} catch (error) {
console.error('TypedRouter message handling error:', error);
}
} else {
// Legacy mode: use onMessage hook
const msg = {
type: typeof message === 'string' ? 'text' as const : 'binary' as const,
text: typeof message === 'string' ? message : undefined,
data: message instanceof ArrayBuffer ? new Uint8Array(message) : undefined,
size: typeof message === 'string' ? message.length : (message as ArrayBuffer).byteLength,
};
hooks.onMessage?.(peer, msg);
}
},
close: (ws: any, code: number, reason: string) => {
const peer = this.wrapBunWebSocket(ws);
// Unregister connection if typedRouter mode
if (callbacks) {
callbacks.onUnregister(peer.id);
}
hooks.onClose?.(peer, code, reason);
},
error: (ws: any, error: Error) => {
const peer = this.wrapBunWebSocket(ws);
hooks.onError?.(peer, error);
},
ping: (ws: any, data: ArrayBuffer) => {
const peer = this.wrapBunWebSocket(ws);
hooks.onPing?.(peer, new Uint8Array(data));
},
pong: (ws: any, data: ArrayBuffer) => {
const peer = this.wrapBunWebSocket(ws);
hooks.onPong?.(peer, new Uint8Array(data));
@@ -142,10 +182,18 @@ export class BunAdapter extends BaseAdapter {
};
}
private wrapBunWebSocket(ws: any): any {
private wrapBunWebSocket(ws: any): IWebSocketPeer {
// IMPORTANT: Use persistent data from ws.data since Bun re-wraps on each event
const wsData = ws.data || {
id: crypto.randomUUID(),
url: '',
customData: new Map(),
tags: new Set<string>(),
};
return {
id: ws.data?.id ?? crypto.randomUUID(),
url: ws.data?.url ?? '',
id: wsData.id,
url: wsData.url,
get readyState() { return ws.readyState; },
protocol: ws.protocol ?? '',
extensions: ws.extensions ?? '',
@@ -155,7 +203,8 @@ export class BunAdapter extends BaseAdapter {
ping: (data?: Uint8Array) => ws.ping(data),
terminate: () => ws.terminate(),
context: {} as any,
data: ws.data?.customData ?? new Map(),
data: wsData.customData,
tags: wsData.tags, // Reference to persistent Set
};
}
}

View File

@@ -1,4 +1,4 @@
import type { ISmartServeInstance, IConnectionInfo } from '../core/smartserve.interfaces.js';
import type { ISmartServeInstance, IConnectionInfo, IWebSocketPeer, IWebSocketConnectionCallbacks } from '../core/smartserve.interfaces.js';
import { BaseAdapter, type IAdapterCharacteristics, type TRequestHandler } from './adapter.base.js';
// Deno types (for type checking without requiring Deno runtime)
@@ -98,21 +98,47 @@ export class DenoAdapter extends BaseAdapter {
const peer = this.createWebSocketPeer(socket, request);
// Get internal callbacks if typedRouter mode
const callbacks = (hooks as any)._connectionCallbacks as IWebSocketConnectionCallbacks | undefined;
const typedRouter = hooks.typedRouter;
socket.onopen = () => {
// Register connection if typedRouter mode
if (callbacks) {
callbacks.onRegister(peer);
}
hooks.onOpen?.(peer);
};
socket.onmessage = (event) => {
const message = {
type: typeof event.data === 'string' ? 'text' as const : 'binary' as const,
text: typeof event.data === 'string' ? event.data : undefined,
data: event.data instanceof Uint8Array ? event.data : undefined,
size: typeof event.data === 'string' ? event.data.length : (event.data as ArrayBuffer).byteLength,
};
hooks.onMessage?.(peer, message);
socket.onmessage = async (event) => {
// If typedRouter is configured, route through it
if (typedRouter && typeof event.data === 'string') {
try {
const requestObj = JSON.parse(event.data);
const response = await typedRouter.routeAndAddResponse(requestObj);
if (response) {
peer.send(JSON.stringify(response));
}
} catch (error) {
console.error('TypedRouter message handling error:', error);
}
} else {
// Legacy mode: use onMessage hook
const message = {
type: typeof event.data === 'string' ? 'text' as const : 'binary' as const,
text: typeof event.data === 'string' ? event.data : undefined,
data: event.data instanceof Uint8Array ? event.data : undefined,
size: typeof event.data === 'string' ? event.data.length : (event.data as ArrayBuffer).byteLength,
};
hooks.onMessage?.(peer, message);
}
};
socket.onclose = (event) => {
// Unregister connection if typedRouter mode
if (callbacks) {
callbacks.onUnregister(peer.id);
}
hooks.onClose?.(peer, event.code, event.reason);
};
@@ -121,12 +147,12 @@ export class DenoAdapter extends BaseAdapter {
};
}
private createWebSocketPeer(socket: WebSocket, request: Request): any {
private createWebSocketPeer(socket: WebSocket, request: Request): IWebSocketPeer {
const url = this.parseUrl(request);
return {
id: crypto.randomUUID(),
url: url.pathname,
get readyState() { return socket.readyState; },
get readyState() { return socket.readyState as 0 | 1 | 2 | 3; },
protocol: socket.protocol,
extensions: socket.extensions,
send: (data: string) => socket.send(data),
@@ -134,8 +160,9 @@ export class DenoAdapter extends BaseAdapter {
close: (code?: number, reason?: string) => socket.close(code, reason),
ping: () => { /* Deno handles ping/pong automatically */ },
terminate: () => socket.close(),
context: {} as any, // Will be populated with IRequestContext
context: {} as any,
data: new Map(),
tags: new Set<string>(),
};
}
}

View File

@@ -1,5 +1,5 @@
import * as plugins from '../plugins.js';
import type { ISmartServeInstance, IConnectionInfo } from '../core/smartserve.interfaces.js';
import type { ISmartServeInstance, IConnectionInfo, IWebSocketPeer, IWebSocketConnectionCallbacks } from '../core/smartserve.interfaces.js';
import { BaseAdapter, type IAdapterCharacteristics, type TRequestHandler } from './adapter.base.js';
/**
@@ -256,6 +256,10 @@ export class NodeAdapter extends BaseAdapter {
const wss = new WebSocketServer({ noServer: true });
// Get internal callbacks if typedRouter mode
const callbacks = (hooks as any)._connectionCallbacks as IWebSocketConnectionCallbacks | undefined;
const typedRouter = hooks.typedRouter;
this.server.on('upgrade', (request, socket, head) => {
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request);
@@ -265,19 +269,44 @@ export class NodeAdapter extends BaseAdapter {
wss.on('connection', (ws: any, request: any) => {
const peer = this.wrapNodeWebSocket(ws, request);
// Register connection if typedRouter mode
if (callbacks) {
callbacks.onRegister(peer);
}
// Call user's onOpen hook
hooks.onOpen?.(peer);
ws.on('message', (data: Buffer | string) => {
const message = {
type: typeof data === 'string' ? 'text' as const : 'binary' as const,
text: typeof data === 'string' ? data : undefined,
data: Buffer.isBuffer(data) ? new Uint8Array(data) : undefined,
size: typeof data === 'string' ? data.length : data.length,
};
hooks.onMessage?.(peer, message);
ws.on('message', async (data: Buffer | string) => {
// If typedRouter is configured, route through it
if (typedRouter) {
try {
const messageText = typeof data === 'string' ? data : data.toString('utf8');
const requestObj = JSON.parse(messageText);
const response = await typedRouter.routeAndAddResponse(requestObj);
if (response) {
peer.send(JSON.stringify(response));
}
} catch (error) {
console.error('TypedRouter message handling error:', error);
}
} else {
// Legacy mode: use onMessage hook
const message = {
type: typeof data === 'string' ? 'text' as const : 'binary' as const,
text: typeof data === 'string' ? data : undefined,
data: Buffer.isBuffer(data) ? new Uint8Array(data) : undefined,
size: typeof data === 'string' ? data.length : data.length,
};
hooks.onMessage?.(peer, message);
}
});
ws.on('close', (code: number, reason: Buffer) => {
// Unregister connection if typedRouter mode
if (callbacks) {
callbacks.onUnregister(peer.id);
}
hooks.onClose?.(peer, code, reason.toString());
});
@@ -298,7 +327,7 @@ export class NodeAdapter extends BaseAdapter {
}
}
private wrapNodeWebSocket(ws: any, request: any): any {
private wrapNodeWebSocket(ws: any, request: any): IWebSocketPeer {
return {
id: crypto.randomUUID(),
url: request.url ?? '',
@@ -312,6 +341,7 @@ export class NodeAdapter extends BaseAdapter {
terminate: () => ws.terminate(),
context: {} as any,
data: new Map(),
tags: new Set<string>(),
};
}
}