diff --git a/changelog.md b/changelog.md index f7c5b71..1587290 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,17 @@ # Changelog +## 2025-12-02 - 1.1.0 - feat(websocket) +Add TypedRouter WebSocket integration, connection registry, peer tagging and broadcast APIs + +- Add dependency on @api.global/typedrequest and re-export it via plugins +- Introduce typedRouter support in IWebSocketHooks and adapters (Node, Bun, Deno) to route JSON RPC messages through TypedRouter.routeAndAddResponse +- Add internal IWebSocketConnectionCallbacks to register/unregister peers; adapters receive these via a _connectionCallbacks property on websocket options +- Persist per-peer tags and data (peer.tags: Set) across adapters; Bun adapter stores persistent ws.data so tags survive re-wraps +- Add WebSocketConfigError and validate websocket config to prevent using typedRouter together with onMessage (throws if both are set) +- Expose connection-management APIs on SmartServe: getWebSocketConnections(), getWebSocketConnectionsByTag(tag), broadcastWebSocket(data) and broadcastWebSocketByTag(tag, data) +- Update README/hints to document TypedRouter mode, connection registry, peer tagging, and broadcast methods +- Legacy onMessage mode remains supported; typedRouter mode enables automatic JSON routing and connection registry + ## 2025-11-29 - 1.0.2 - fix(package) Update package metadata, scripts and dependency pins diff --git a/package.json b/package.json index 46c850b..1e7490d 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,7 @@ "@types/ws": "^8.18.1" }, "dependencies": { + "@api.global/typedrequest": "^3.0.0", "@push.rocks/lik": "^6.2.2", "@push.rocks/smartenv": "^6.0.0", "@push.rocks/smartlog": "^3.1.10", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 98d4c64..a6b8539 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -8,6 +8,9 @@ importers: .: dependencies: + '@api.global/typedrequest': + specifier: ^3.0.0 + version: 3.1.10 '@push.rocks/lik': specifier: ^6.2.2 version: 6.2.2 diff --git a/readme.hints.md b/readme.hints.md index c2a719c..ded55b8 100644 --- a/readme.hints.md +++ b/readme.hints.md @@ -93,8 +93,65 @@ await server.start(); - LOCK/UNLOCK - Exclusive write locking - GET/HEAD/PUT/DELETE - Standard file operations +## TypedRouter WebSocket Integration + +SmartServe supports first-class TypedRouter integration for type-safe RPC over WebSockets. + +### Usage + +```typescript +import { SmartServe } from '@push.rocks/smartserve'; +import { TypedRouter, TypedHandler } from '@api.global/typedrequest'; + +const router = new TypedRouter(); +router.addTypedHandler(new TypedHandler('echo', async (data) => { + return { echoed: data.message }; +})); + +const server = new SmartServe({ + port: 3000, + websocket: { + typedRouter: router, + onConnectionOpen: (peer) => { + peer.tags.add('authenticated'); + console.log(`Client connected: ${peer.id}`); + }, + onConnectionClose: (peer) => { + console.log(`Client disconnected: ${peer.id}`); + }, + }, +}); + +await server.start(); + +// Broadcast to all connections +server.broadcastWebSocket({ type: 'notification', message: 'Hello!' }); + +// Broadcast to specific tag +server.broadcastWebSocketByTag('authenticated', { type: 'secure-message' }); + +// Get all connections +const connections = server.getWebSocketConnections(); +``` + +### Key Features + +- **TypedRouter mode**: Set `typedRouter` for automatic JSON-RPC routing (mutually exclusive with `onMessage`) +- **Connection registry**: Active only when `typedRouter` is set +- **Peer tags**: Use `peer.tags.add/has/delete` for connection filtering +- **Broadcast methods**: `broadcastWebSocket()` and `broadcastWebSocketByTag()` +- **Lifecycle hooks**: `onConnectionOpen` and `onConnectionClose` (alongside existing `onOpen`/`onClose`) + +### Architecture Notes + +- `typedRouter` and `onMessage` are mutually exclusive (throws `WebSocketConfigError` if both set) +- Connection registry only active when `typedRouter` is configured +- Bun adapter stores peer ID/tags in `ws.data` for persistence across events +- Internal `_connectionCallbacks` passed to adapters for registry communication + ## TODO - [x] WebDAV protocol support (PROPFIND, MKCOL, COPY, MOVE, LOCK, UNLOCK) +- [x] TypedRouter WebSocket integration - [ ] HTTP/2 support investigation - [ ] Performance benchmarks diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 7c45a29..7c4197c 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartserve', - version: '1.0.2', + version: '1.1.0', description: 'a cross platform server module for Node, Deno and Bun' } diff --git a/ts/adapters/adapter.bun.ts b/ts/adapters/adapter.bun.ts index 7405125..14d5f16 100644 --- a/ts/adapters/adapter.bun.ts +++ b/ts/adapters/adapter.bun.ts @@ -1,4 +1,4 @@ -import type { ISmartServeInstance, IConnectionInfo } from '../core/smartserve.interfaces.js'; +import type { ISmartServeInstance, IConnectionInfo, IWebSocketPeer, IWebSocketConnectionCallbacks } from '../core/smartserve.interfaces.js'; import { BaseAdapter, type IAdapterCharacteristics, type TRequestHandler } from './adapter.base.js'; // Bun types (for type checking without requiring Bun runtime) @@ -46,9 +46,17 @@ export class BunAdapter extends BaseAdapter { this.stats.requestsActive++; try { - // Handle WebSocket upgrade + // Handle WebSocket upgrade - store data for persistence across events if (this.options.websocket && request.headers.get('upgrade') === 'websocket') { - const upgraded = server.upgrade(request); + const peerId = crypto.randomUUID(); + const upgraded = server.upgrade(request, { + data: { + id: peerId, + url: new URL(request.url).pathname, + customData: new Map(), + tags: new Set(), + }, + }); if (upgraded) { return undefined; // Bun handles the upgrade } @@ -108,33 +116,65 @@ export class BunAdapter extends BaseAdapter { const hooks = this.options.websocket; if (!hooks) return undefined; + // Get internal callbacks if typedRouter mode + const callbacks = (hooks as any)._connectionCallbacks as IWebSocketConnectionCallbacks | undefined; + const typedRouter = hooks.typedRouter; + return { open: (ws: any) => { const peer = this.wrapBunWebSocket(ws); + // Register connection if typedRouter mode + if (callbacks) { + callbacks.onRegister(peer); + } hooks.onOpen?.(peer); }, - message: (ws: any, message: string | ArrayBuffer) => { + + message: async (ws: any, message: string | ArrayBuffer) => { const peer = this.wrapBunWebSocket(ws); - const msg = { - type: typeof message === 'string' ? 'text' as const : 'binary' as const, - text: typeof message === 'string' ? message : undefined, - data: message instanceof ArrayBuffer ? new Uint8Array(message) : undefined, - size: typeof message === 'string' ? message.length : (message as ArrayBuffer).byteLength, - }; - hooks.onMessage?.(peer, msg); + + // If typedRouter is configured, route through it + if (typedRouter && typeof message === 'string') { + try { + const requestObj = JSON.parse(message); + const response = await typedRouter.routeAndAddResponse(requestObj); + if (response) { + peer.send(JSON.stringify(response)); + } + } catch (error) { + console.error('TypedRouter message handling error:', error); + } + } else { + // Legacy mode: use onMessage hook + const msg = { + type: typeof message === 'string' ? 'text' as const : 'binary' as const, + text: typeof message === 'string' ? message : undefined, + data: message instanceof ArrayBuffer ? new Uint8Array(message) : undefined, + size: typeof message === 'string' ? message.length : (message as ArrayBuffer).byteLength, + }; + hooks.onMessage?.(peer, msg); + } }, + close: (ws: any, code: number, reason: string) => { const peer = this.wrapBunWebSocket(ws); + // Unregister connection if typedRouter mode + if (callbacks) { + callbacks.onUnregister(peer.id); + } hooks.onClose?.(peer, code, reason); }, + error: (ws: any, error: Error) => { const peer = this.wrapBunWebSocket(ws); hooks.onError?.(peer, error); }, + ping: (ws: any, data: ArrayBuffer) => { const peer = this.wrapBunWebSocket(ws); hooks.onPing?.(peer, new Uint8Array(data)); }, + pong: (ws: any, data: ArrayBuffer) => { const peer = this.wrapBunWebSocket(ws); hooks.onPong?.(peer, new Uint8Array(data)); @@ -142,10 +182,18 @@ export class BunAdapter extends BaseAdapter { }; } - private wrapBunWebSocket(ws: any): any { + private wrapBunWebSocket(ws: any): IWebSocketPeer { + // IMPORTANT: Use persistent data from ws.data since Bun re-wraps on each event + const wsData = ws.data || { + id: crypto.randomUUID(), + url: '', + customData: new Map(), + tags: new Set(), + }; + return { - id: ws.data?.id ?? crypto.randomUUID(), - url: ws.data?.url ?? '', + id: wsData.id, + url: wsData.url, get readyState() { return ws.readyState; }, protocol: ws.protocol ?? '', extensions: ws.extensions ?? '', @@ -155,7 +203,8 @@ export class BunAdapter extends BaseAdapter { ping: (data?: Uint8Array) => ws.ping(data), terminate: () => ws.terminate(), context: {} as any, - data: ws.data?.customData ?? new Map(), + data: wsData.customData, + tags: wsData.tags, // Reference to persistent Set }; } } diff --git a/ts/adapters/adapter.deno.ts b/ts/adapters/adapter.deno.ts index 37d6d80..8942a2b 100644 --- a/ts/adapters/adapter.deno.ts +++ b/ts/adapters/adapter.deno.ts @@ -1,4 +1,4 @@ -import type { ISmartServeInstance, IConnectionInfo } from '../core/smartserve.interfaces.js'; +import type { ISmartServeInstance, IConnectionInfo, IWebSocketPeer, IWebSocketConnectionCallbacks } from '../core/smartserve.interfaces.js'; import { BaseAdapter, type IAdapterCharacteristics, type TRequestHandler } from './adapter.base.js'; // Deno types (for type checking without requiring Deno runtime) @@ -98,21 +98,47 @@ export class DenoAdapter extends BaseAdapter { const peer = this.createWebSocketPeer(socket, request); + // Get internal callbacks if typedRouter mode + const callbacks = (hooks as any)._connectionCallbacks as IWebSocketConnectionCallbacks | undefined; + const typedRouter = hooks.typedRouter; + socket.onopen = () => { + // Register connection if typedRouter mode + if (callbacks) { + callbacks.onRegister(peer); + } hooks.onOpen?.(peer); }; - socket.onmessage = (event) => { - const message = { - type: typeof event.data === 'string' ? 'text' as const : 'binary' as const, - text: typeof event.data === 'string' ? event.data : undefined, - data: event.data instanceof Uint8Array ? event.data : undefined, - size: typeof event.data === 'string' ? event.data.length : (event.data as ArrayBuffer).byteLength, - }; - hooks.onMessage?.(peer, message); + socket.onmessage = async (event) => { + // If typedRouter is configured, route through it + if (typedRouter && typeof event.data === 'string') { + try { + const requestObj = JSON.parse(event.data); + const response = await typedRouter.routeAndAddResponse(requestObj); + if (response) { + peer.send(JSON.stringify(response)); + } + } catch (error) { + console.error('TypedRouter message handling error:', error); + } + } else { + // Legacy mode: use onMessage hook + const message = { + type: typeof event.data === 'string' ? 'text' as const : 'binary' as const, + text: typeof event.data === 'string' ? event.data : undefined, + data: event.data instanceof Uint8Array ? event.data : undefined, + size: typeof event.data === 'string' ? event.data.length : (event.data as ArrayBuffer).byteLength, + }; + hooks.onMessage?.(peer, message); + } }; socket.onclose = (event) => { + // Unregister connection if typedRouter mode + if (callbacks) { + callbacks.onUnregister(peer.id); + } hooks.onClose?.(peer, event.code, event.reason); }; @@ -121,12 +147,12 @@ export class DenoAdapter extends BaseAdapter { }; } - private createWebSocketPeer(socket: WebSocket, request: Request): any { + private createWebSocketPeer(socket: WebSocket, request: Request): IWebSocketPeer { const url = this.parseUrl(request); return { id: crypto.randomUUID(), url: url.pathname, - get readyState() { return socket.readyState; }, + get readyState() { return socket.readyState as 0 | 1 | 2 | 3; }, protocol: socket.protocol, extensions: socket.extensions, send: (data: string) => socket.send(data), @@ -134,8 +160,9 @@ export class DenoAdapter extends BaseAdapter { close: (code?: number, reason?: string) => socket.close(code, reason), ping: () => { /* Deno handles ping/pong automatically */ }, terminate: () => socket.close(), - context: {} as any, // Will be populated with IRequestContext + context: {} as any, data: new Map(), + tags: new Set(), }; } } diff --git a/ts/adapters/adapter.node.ts b/ts/adapters/adapter.node.ts index a5d70ed..a671bd2 100644 --- a/ts/adapters/adapter.node.ts +++ b/ts/adapters/adapter.node.ts @@ -1,5 +1,5 @@ import * as plugins from '../plugins.js'; -import type { ISmartServeInstance, IConnectionInfo } from '../core/smartserve.interfaces.js'; +import type { ISmartServeInstance, IConnectionInfo, IWebSocketPeer, IWebSocketConnectionCallbacks } from '../core/smartserve.interfaces.js'; import { BaseAdapter, type IAdapterCharacteristics, type TRequestHandler } from './adapter.base.js'; /** @@ -256,6 +256,10 @@ export class NodeAdapter extends BaseAdapter { const wss = new WebSocketServer({ noServer: true }); + // Get internal callbacks if typedRouter mode + const callbacks = (hooks as any)._connectionCallbacks as IWebSocketConnectionCallbacks | undefined; + const typedRouter = hooks.typedRouter; + this.server.on('upgrade', (request, socket, head) => { wss.handleUpgrade(request, socket, head, (ws) => { wss.emit('connection', ws, request); @@ -265,19 +269,44 @@ export class NodeAdapter extends BaseAdapter { wss.on('connection', (ws: any, request: any) => { const peer = this.wrapNodeWebSocket(ws, request); + // Register connection if typedRouter mode + if (callbacks) { + callbacks.onRegister(peer); + } + + // Call user's onOpen hook hooks.onOpen?.(peer); - ws.on('message', (data: Buffer | string) => { - const message = { - type: typeof data === 'string' ? 'text' as const : 'binary' as const, - text: typeof data === 'string' ? data : undefined, - data: Buffer.isBuffer(data) ? new Uint8Array(data) : undefined, - size: typeof data === 'string' ? data.length : data.length, - }; - hooks.onMessage?.(peer, message); + ws.on('message', async (data: Buffer | string) => { + // If typedRouter is configured, route through it + if (typedRouter) { + try { + const messageText = typeof data === 'string' ? data : data.toString('utf8'); + const requestObj = JSON.parse(messageText); + const response = await typedRouter.routeAndAddResponse(requestObj); + if (response) { + peer.send(JSON.stringify(response)); + } + } catch (error) { + console.error('TypedRouter message handling error:', error); + } + } else { + // Legacy mode: use onMessage hook + const message = { + type: typeof data === 'string' ? 'text' as const : 'binary' as const, + text: typeof data === 'string' ? data : undefined, + data: Buffer.isBuffer(data) ? new Uint8Array(data) : undefined, + size: typeof data === 'string' ? data.length : data.length, + }; + hooks.onMessage?.(peer, message); + } }); ws.on('close', (code: number, reason: Buffer) => { + // Unregister connection if typedRouter mode + if (callbacks) { + callbacks.onUnregister(peer.id); + } hooks.onClose?.(peer, code, reason.toString()); }); @@ -298,7 +327,7 @@ export class NodeAdapter extends BaseAdapter { } } - private wrapNodeWebSocket(ws: any, request: any): any { + private wrapNodeWebSocket(ws: any, request: any): IWebSocketPeer { return { id: crypto.randomUUID(), url: request.url ?? '', @@ -312,6 +341,7 @@ export class NodeAdapter extends BaseAdapter { terminate: () => ws.terminate(), context: {} as any, data: new Map(), + tags: new Set(), }; } } diff --git a/ts/core/smartserve.classes.smartserve.ts b/ts/core/smartserve.classes.smartserve.ts index 8348b37..178124a 100644 --- a/ts/core/smartserve.classes.smartserve.ts +++ b/ts/core/smartserve.classes.smartserve.ts @@ -13,8 +13,10 @@ import type { IInterceptOptions, TRequestInterceptor, TResponseInterceptor, + IWebSocketPeer, + IWebSocketConnectionCallbacks, } from './smartserve.interfaces.js'; -import { HttpError, RouteNotFoundError, ServerAlreadyRunningError } from './smartserve.errors.js'; +import { HttpError, RouteNotFoundError, ServerAlreadyRunningError, WebSocketConfigError } from './smartserve.errors.js'; import { AdapterFactory, type BaseAdapter, type TRequestHandler } from '../adapters/index.js'; import { ControllerRegistry, type ICompiledRoute } from '../decorators/index.js'; import { FileServer } from '../files/index.js'; @@ -42,12 +44,31 @@ export class SmartServe { private fileServer: FileServer | null = null; private webdavHandler: WebDAVHandler | null = null; + /** WebSocket connection registry (only active when typedRouter is set) */ + private wsConnections: Map | null = null; + constructor(options: ISmartServeOptions) { + // Validate websocket configuration - mutual exclusivity + if (options.websocket) { + const { typedRouter, onMessage } = options.websocket; + if (typedRouter && onMessage) { + throw new WebSocketConfigError( + 'Cannot use both typedRouter and onMessage. ' + + 'typedRouter handles message routing automatically.' + ); + } + } + this.options = { hostname: '0.0.0.0', ...options, }; + // Initialize connection registry only when typedRouter is configured + if (this.options.websocket?.typedRouter) { + this.wsConnections = new Map(); + } + // Initialize file server if static options provided if (this.options.static) { this.fileServer = new FileServer(this.options.static); @@ -90,8 +111,37 @@ export class SmartServe { throw new ServerAlreadyRunningError(); } + // Prepare options with internal callbacks if typedRouter is configured + let adapterOptions = this.options; + + if (this.options.websocket?.typedRouter && this.wsConnections) { + // Clone options and add internal callbacks for adapter communication + const connectionCallbacks: IWebSocketConnectionCallbacks = { + onRegister: (peer: IWebSocketPeer) => { + this.wsConnections!.set(peer.id, peer); + this.options.websocket?.onConnectionOpen?.(peer); + }, + onUnregister: (peerId: string) => { + const peer = this.wsConnections!.get(peerId); + if (peer) { + this.wsConnections!.delete(peerId); + this.options.websocket?.onConnectionClose?.(peer); + } + }, + }; + + adapterOptions = { + ...this.options, + websocket: { + ...this.options.websocket, + // Internal property for adapter communication (not part of public API) + _connectionCallbacks: connectionCallbacks, + } as typeof this.options.websocket & { _connectionCallbacks: IWebSocketConnectionCallbacks }, + }; + } + // Create adapter for current runtime - this.adapter = await AdapterFactory.createAdapter(this.options); + this.adapter = await AdapterFactory.createAdapter(adapterOptions); // Create request handler const handler = this.createRequestHandler(); @@ -127,6 +177,70 @@ export class SmartServe { return this.instance !== null; } + // =========================================================================== + // WebSocket Connection Management (only available with typedRouter) + // =========================================================================== + + /** + * Get all active WebSocket connections + * Only available when typedRouter is configured + */ + getWebSocketConnections(): IWebSocketPeer[] { + if (!this.wsConnections) { + return []; + } + return Array.from(this.wsConnections.values()); + } + + /** + * Get WebSocket connections filtered by tag + * Only available when typedRouter is configured + */ + getWebSocketConnectionsByTag(tag: string): IWebSocketPeer[] { + if (!this.wsConnections) { + return []; + } + return Array.from(this.wsConnections.values()).filter((peer) => peer.tags.has(tag)); + } + + /** + * Broadcast message to all WebSocket connections + * Only available when typedRouter is configured + */ + broadcastWebSocket(data: string | object): void { + if (!this.wsConnections) { + return; + } + const message = typeof data === 'string' ? data : JSON.stringify(data); + for (const peer of this.wsConnections.values()) { + try { + peer.send(message); + } catch (error) { + console.error(`Failed to send to peer ${peer.id}:`, error); + } + } + } + + /** + * Broadcast message to WebSocket connections with specific tag + * Only available when typedRouter is configured + */ + broadcastWebSocketByTag(tag: string, data: string | object): void { + if (!this.wsConnections) { + return; + } + const message = typeof data === 'string' ? data : JSON.stringify(data); + for (const peer of this.wsConnections.values()) { + if (peer.tags.has(tag)) { + try { + peer.send(message); + } catch (error) { + console.error(`Failed to send to peer ${peer.id}:`, error); + } + } + } + } + /** * Create the main request handler */ diff --git a/ts/core/smartserve.errors.ts b/ts/core/smartserve.errors.ts index 375f880..ebabe72 100644 --- a/ts/core/smartserve.errors.ts +++ b/ts/core/smartserve.errors.ts @@ -127,3 +127,13 @@ export class ServerNotRunningError extends Error { this.name = 'ServerNotRunningError'; } } + +/** + * Error thrown when WebSocket configuration is invalid + */ +export class WebSocketConfigError extends Error { + constructor(message: string) { + super(message); + this.name = 'WebSocketConfigError'; + } +} diff --git a/ts/core/smartserve.interfaces.ts b/ts/core/smartserve.interfaces.ts index 2c2ecbc..e8588b2 100644 --- a/ts/core/smartserve.interfaces.ts +++ b/ts/core/smartserve.interfaces.ts @@ -3,6 +3,8 @@ * Uses Web Standards API (Request/Response) for cross-platform compatibility */ +import type { TypedRouter } from '@api.global/typedrequest'; + // ============================================================================= // HTTP Types // ============================================================================= @@ -168,18 +170,55 @@ export interface IWebSocketPeer { context: IRequestContext; /** Custom per-peer data storage */ data: Map; + /** Tags for connection filtering/grouping */ + tags: Set; } /** * WebSocket event hooks */ export interface IWebSocketHooks { + /** Called when WebSocket connection opens */ onOpen?: (peer: IWebSocketPeer) => void | Promise; + /** Called when message received. Mutually exclusive with typedRouter. */ onMessage?: (peer: IWebSocketPeer, message: IWebSocketMessage) => void | Promise; + /** Called when WebSocket connection closes */ onClose?: (peer: IWebSocketPeer, code: number, reason: string) => void | Promise; + /** Called on WebSocket error */ onError?: (peer: IWebSocketPeer, error: Error) => void | Promise; + /** Called when ping received */ onPing?: (peer: IWebSocketPeer, data: Uint8Array) => void | Promise; + /** Called when pong received */ onPong?: (peer: IWebSocketPeer, data: Uint8Array) => void | Promise; + + /** + * TypedRouter for type-safe RPC over WebSocket. + * Mutually exclusive with onMessage - cannot use both. + * When set, enables connection registry and broadcast methods. + */ + typedRouter?: TypedRouter; + + /** + * Called when connection is established and registered. + * Only available when typedRouter is configured. + * Use this to tag connections for filtering. + */ + onConnectionOpen?: (peer: IWebSocketPeer) => void | Promise; + + /** + * Called when connection is closed and unregistered. + * Only available when typedRouter is configured. + */ + onConnectionClose?: (peer: IWebSocketPeer) => void | Promise; +} + +/** + * Internal callbacks for adapter-to-SmartServe communication + * @internal + */ +export interface IWebSocketConnectionCallbacks { + onRegister: (peer: IWebSocketPeer) => void; + onUnregister: (peerId: string) => void; } // ============================================================================= diff --git a/ts/plugins.ts b/ts/plugins.ts index 35e1ac4..f8dc606 100644 --- a/ts/plugins.ts +++ b/ts/plugins.ts @@ -13,3 +13,8 @@ import * as smartlog from '@push.rocks/smartlog'; import * as lik from '@push.rocks/lik'; export { smartpath, smartenv, smartlog, lik }; + +// @api.global scope +import * as typedrequest from '@api.global/typedrequest'; + +export { typedrequest };