Files
typedsocket/ts/typedsocket.classes.typedsocket.ts

614 lines
19 KiB
TypeScript

import * as plugins from './typedsocket.plugins.js';
export type TTypedSocketSide = 'server' | 'client';
export type TConnectionStatus = 'new' | 'connecting' | 'connected' | 'disconnected' | 'reconnecting';
const TAG_PREFIX = '__typedsocket_tag__';
/**
* Internal TypedRequest interfaces for tag management
*/
interface IReq_SetClientTag extends plugins.typedrequestInterfaces.ITypedRequest {
method: '__typedsocket_setTag';
request: { name: string; payload: any };
response: { success: boolean };
}
interface IReq_RemoveClientTag extends plugins.typedrequestInterfaces.ITypedRequest {
method: '__typedsocket_removeTag';
request: { name: string };
response: { success: boolean };
}
/**
* Options for creating a TypedSocket client
*/
export interface ITypedSocketClientOptions {
autoReconnect?: boolean;
maxRetries?: number;
initialBackoffMs?: number;
maxBackoffMs?: number;
}
/**
* Wrapper for SmartServe's IWebSocketPeer to provide tag compatibility
*/
export interface ISmartServeConnectionWrapper {
peer: plugins.IWebSocketPeer;
getTagById(tagId: string): Promise<{ id: string; payload: any } | undefined>;
}
/**
* Creates a wrapper around IWebSocketPeer for tag compatibility
*/
function wrapSmartServePeer(peer: plugins.IWebSocketPeer): ISmartServeConnectionWrapper {
return {
peer,
async getTagById(tagId: string): Promise<{ id: string; payload: any } | undefined> {
if (!peer.tags.has(tagId)) {
return undefined;
}
const payload = peer.data.get(`${TAG_PREFIX}${tagId}`);
return { id: tagId, payload };
},
};
}
export class TypedSocket {
// ============================================================================
// STATIC METHODS
// ============================================================================
/**
* Creates a TypedSocket client using native WebSocket.
* Works in both browser and Node.js environments.
*
* @param typedrouterArg - TypedRouter for handling server-initiated requests
* @param serverUrlArg - Server URL (e.g., 'http://localhost:3000' or 'wss://example.com')
* @param options - Connection options
*
* @example
* ```typescript
* const typedRouter = new TypedRouter();
* const client = await TypedSocket.createClient(
* typedRouter,
* 'http://localhost:3000',
* { autoReconnect: true }
* );
* ```
*/
public static async createClient(
typedrouterArg: plugins.typedrequest.TypedRouter,
serverUrlArg: string,
options: ITypedSocketClientOptions = {}
): Promise<TypedSocket> {
const defaultOptions: Required<ITypedSocketClientOptions> = {
autoReconnect: true,
maxRetries: 100,
initialBackoffMs: 1000,
maxBackoffMs: 60000,
};
const opts = { ...defaultOptions, ...options };
const typedSocket = new TypedSocket('client', typedrouterArg);
typedSocket.clientOptions = opts;
typedSocket.serverUrl = serverUrlArg;
typedSocket.currentBackoff = opts.initialBackoffMs;
await typedSocket.connect();
return typedSocket;
}
/**
* Returns the current window location origin URL.
* Useful in browser environments for connecting to the same origin.
*/
public static useWindowLocationOriginUrl = (): string => {
return plugins.smarturl.Smarturl.createFromUrl(globalThis.location.origin).toString();
};
/**
* Creates a TypedSocket server from an existing SmartServe instance.
* This is the only way to create a server-side TypedSocket.
*
* @param smartServeArg - SmartServe instance with typedRouter configured in websocket options
* @param typedRouterArg - TypedRouter for handling requests (must match SmartServe's typedRouter)
*
* @example
* ```typescript
* const typedRouter = new TypedRouter();
* const smartServe = new SmartServe({
* port: 3000,
* websocket: {
* typedRouter,
* onConnectionOpen: (peer) => peer.tags.add('client')
* }
* });
* await smartServe.start();
* const typedSocket = TypedSocket.fromSmartServe(smartServe, typedRouter);
* ```
*/
public static fromSmartServe(
smartServeArg: plugins.SmartServe,
typedRouterArg: plugins.typedrequest.TypedRouter
): TypedSocket {
const connectionWrappers = new Map<string, ISmartServeConnectionWrapper>();
// Register built-in tag handlers
TypedSocket.registerTagHandlers(typedRouterArg);
const typedSocket = new TypedSocket('server', typedRouterArg);
typedSocket.smartServeRef = smartServeArg;
typedSocket.smartServeConnectionWrappers = connectionWrappers;
return typedSocket;
}
/**
* Registers built-in TypedHandlers for tag management
*/
private static registerTagHandlers(typedRouter: plugins.typedrequest.TypedRouter): void {
// Set tag handler
typedRouter.addTypedHandler<IReq_SetClientTag>(
new plugins.typedrequest.TypedHandler('__typedsocket_setTag', async (data, meta) => {
const peer = meta?.localData?.peer as plugins.IWebSocketPeer;
if (!peer) {
console.warn('setTag: No peer found in request context');
return { success: false };
}
peer.tags.add(data.name);
peer.data.set(`${TAG_PREFIX}${data.name}`, data.payload);
return { success: true };
})
);
// Remove tag handler
typedRouter.addTypedHandler<IReq_RemoveClientTag>(
new plugins.typedrequest.TypedHandler('__typedsocket_removeTag', async (data, meta) => {
const peer = meta?.localData?.peer as plugins.IWebSocketPeer;
if (!peer) {
console.warn('removeTag: No peer found in request context');
return { success: false };
}
peer.tags.delete(data.name);
peer.data.delete(`${TAG_PREFIX}${data.name}`);
return { success: true };
})
);
}
// ============================================================================
// INSTANCE PROPERTIES
// ============================================================================
public readonly side: TTypedSocketSide;
public readonly typedrouter: plugins.typedrequest.TypedRouter;
// Connection status observable
public statusSubject = new plugins.smartrx.rxjs.Subject<TConnectionStatus>();
private connectionStatus: TConnectionStatus = 'new';
// Client-specific properties
private websocket: WebSocket | null = null;
private clientOptions: Required<ITypedSocketClientOptions> | null = null;
private serverUrl: string = '';
private retryCount = 0;
private currentBackoff = 1000;
private pendingRequests = new Map<string, {
resolve: (response: any) => void;
reject: (error: Error) => void;
}>();
// Server-specific properties (SmartServe mode)
private smartServeRef: plugins.SmartServe | null = null;
private smartServeConnectionWrappers = new Map<string, ISmartServeConnectionWrapper>();
// ============================================================================
// CONSTRUCTOR
// ============================================================================
private constructor(
sideArg: TTypedSocketSide,
typedrouterArg: plugins.typedrequest.TypedRouter
) {
this.side = sideArg;
this.typedrouter = typedrouterArg;
}
// ============================================================================
// CLIENT METHODS
// ============================================================================
/**
* Connects the client to the server using native WebSocket
*/
private async connect(): Promise<void> {
const done = plugins.smartpromise.defer<void>();
this.updateStatus('connecting');
// Convert HTTP URL to WebSocket URL
const wsUrl = this.toWebSocketUrl(this.serverUrl);
console.log(`TypedSocket connecting to ${wsUrl}...`);
this.websocket = new WebSocket(wsUrl);
const connectionTimeout = setTimeout(() => {
if (this.connectionStatus !== 'connected') {
console.warn('TypedSocket connection timeout');
this.websocket?.close();
done.reject(new Error('Connection timeout'));
}
}, 10000);
this.websocket.onopen = () => {
clearTimeout(connectionTimeout);
console.log('TypedSocket connected!');
this.updateStatus('connected');
this.retryCount = 0;
this.currentBackoff = this.clientOptions?.initialBackoffMs ?? 1000;
done.resolve();
};
this.websocket.onmessage = async (event) => {
await this.handleMessage(event.data);
};
this.websocket.onclose = () => {
clearTimeout(connectionTimeout);
this.handleDisconnect();
};
this.websocket.onerror = (error) => {
console.error('TypedSocket WebSocket error:', error);
};
try {
await done.promise;
} catch (err) {
clearTimeout(connectionTimeout);
if (this.clientOptions?.autoReconnect) {
await this.scheduleReconnect();
} else {
throw err;
}
}
}
/**
* Converts an HTTP(S) URL to a WebSocket URL
*/
private toWebSocketUrl(url: string): string {
const parsed = new URL(url);
const wsProtocol = parsed.protocol === 'https:' ? 'wss:' : 'ws:';
return `${wsProtocol}//${parsed.host}${parsed.pathname}`;
}
/**
* Handles incoming WebSocket messages
*/
private async handleMessage(data: string | ArrayBuffer): Promise<void> {
try {
const messageText = typeof data === 'string' ? data : new TextDecoder().decode(data);
const message = plugins.smartjson.parse(messageText) as plugins.typedrequestInterfaces.ITypedRequest;
// Check if this is a response to a pending request
if (message.correlation?.id && this.pendingRequests.has(message.correlation.id)) {
const pending = this.pendingRequests.get(message.correlation.id)!;
this.pendingRequests.delete(message.correlation.id);
pending.resolve(message);
return;
}
// Server-initiated request - route through TypedRouter
const response = await this.typedrouter.routeAndAddResponse(message);
if (response && this.websocket?.readyState === WebSocket.OPEN) {
this.websocket.send(plugins.smartjson.stringify(response));
}
} catch (err) {
console.error('TypedSocket failed to process message:', err);
}
}
/**
* Handles WebSocket disconnection
*/
private handleDisconnect(): void {
if (this.connectionStatus === 'disconnected') {
return; // Already handled
}
this.updateStatus('disconnected');
if (this.clientOptions?.autoReconnect && this.retryCount < this.clientOptions.maxRetries) {
this.scheduleReconnect();
}
}
/**
* Schedules a reconnection attempt with exponential backoff
*/
private async scheduleReconnect(): Promise<void> {
if (!this.clientOptions) return;
this.updateStatus('reconnecting');
this.retryCount++;
// Exponential backoff with jitter
const jitter = this.currentBackoff * 0.2 * (Math.random() * 2 - 1);
const delay = Math.min(this.currentBackoff + jitter, this.clientOptions.maxBackoffMs);
console.log(`TypedSocket reconnecting in ${Math.round(delay)}ms (attempt ${this.retryCount}/${this.clientOptions.maxRetries})`);
await plugins.smartdelay.delayFor(delay);
// Increase backoff for next time
this.currentBackoff = Math.min(this.currentBackoff * 2, this.clientOptions.maxBackoffMs);
try {
await this.connect();
} catch (err) {
console.error('TypedSocket reconnection failed:', err);
}
}
/**
* Updates connection status and notifies subscribers
*/
private updateStatus(status: TConnectionStatus): void {
if (this.connectionStatus !== status) {
this.connectionStatus = status;
this.statusSubject.next(status);
}
}
/**
* Sends a request to the server and waits for response (client-side)
*/
private async sendRequest<T extends plugins.typedrequestInterfaces.ITypedRequest>(
request: T
): Promise<T> {
if (!this.websocket || this.websocket.readyState !== WebSocket.OPEN) {
throw new Error('WebSocket not connected');
}
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
this.pendingRequests.delete(request.correlation.id);
reject(new Error('Request timeout'));
}, 30000);
this.pendingRequests.set(request.correlation.id, {
resolve: (response) => {
clearTimeout(timeout);
resolve(response);
},
reject: (error) => {
clearTimeout(timeout);
reject(error);
},
});
this.websocket!.send(plugins.smartjson.stringify(request));
});
}
// ============================================================================
// PUBLIC API - SHARED
// ============================================================================
/**
* Creates a TypedRequest for the specified method.
* On clients, sends to the server.
* On servers, sends to the specified target connection.
*/
public createTypedRequest<T extends plugins.typedrequestInterfaces.ITypedRequest>(
methodName: T['method'],
targetConnection?: ISmartServeConnectionWrapper
): plugins.typedrequest.TypedRequest<T> {
const postMethod = async (requestDataArg: T): Promise<T> => {
if (this.side === 'client') {
return this.sendRequest(requestDataArg);
}
// Server-side: send to target connection
if (!this.smartServeRef) {
throw new Error('Server not initialized');
}
let target = targetConnection;
if (!target) {
const allConnections = this.smartServeRef.getWebSocketConnections();
if (allConnections.length === 1) {
const peer = allConnections[0];
target = this.getOrCreateWrapper(peer);
} else if (allConnections.length === 0) {
throw new Error('No WebSocket connections available');
} else {
throw new Error('Multiple connections available - specify targetConnection');
}
}
// Register interest for response
const interest = await this.typedrouter.fireEventInterestMap.addInterest(
requestDataArg.correlation.id,
requestDataArg
);
// Send request
target.peer.send(plugins.smartjson.stringify(requestDataArg));
// Wait for response
return await interest.interestFullfilled as T;
};
return new plugins.typedrequest.TypedRequest<T>(
new plugins.typedrequest.TypedTarget({ postMethod }),
methodName
);
}
/**
* Gets the current connection status
*/
public getStatus(): TConnectionStatus {
return this.connectionStatus;
}
/**
* Stops the TypedSocket client or cleans up server state
*/
public async stop(): Promise<void> {
if (this.side === 'client') {
if (this.clientOptions) {
this.clientOptions.autoReconnect = false;
}
if (this.websocket) {
this.websocket.close();
this.websocket = null;
}
this.pendingRequests.clear();
} else {
// Server mode - just clear wrappers (SmartServe manages its own lifecycle)
this.smartServeConnectionWrappers.clear();
}
}
// ============================================================================
// CLIENT-ONLY METHODS
// ============================================================================
/**
* Sets a tag on this client connection.
* Tags are stored on the server and can be used for filtering.
* @client-only
*/
public async setTag<T extends plugins.typedrequestInterfaces.ITag>(
name: T['name'],
payload: T['payload']
): Promise<void> {
if (this.side !== 'client') {
throw new Error('setTag is only available on clients');
}
const request = this.createTypedRequest<IReq_SetClientTag>('__typedsocket_setTag');
const response = await request.fire({ name, payload });
if (!response.success) {
throw new Error('Failed to set tag on server');
}
}
/**
* Removes a tag from this client connection.
* @client-only
*/
public async removeTag(name: string): Promise<void> {
if (this.side !== 'client') {
throw new Error('removeTag is only available on clients');
}
const request = this.createTypedRequest<IReq_RemoveClientTag>('__typedsocket_removeTag');
const response = await request.fire({ name });
if (!response.success) {
throw new Error('Failed to remove tag on server');
}
}
// ============================================================================
// SERVER-ONLY METHODS
// ============================================================================
/**
* Gets or creates a connection wrapper for a peer
*/
private getOrCreateWrapper(peer: plugins.IWebSocketPeer): ISmartServeConnectionWrapper {
let wrapper = this.smartServeConnectionWrappers.get(peer.id);
if (!wrapper) {
wrapper = wrapSmartServePeer(peer);
this.smartServeConnectionWrappers.set(peer.id, wrapper);
}
return wrapper;
}
/**
* Finds all connections matching the filter function.
* @server-only
*/
public async findAllTargetConnections(
asyncFindFuncArg: (connectionArg: ISmartServeConnectionWrapper) => Promise<boolean>
): Promise<ISmartServeConnectionWrapper[]> {
if (this.side !== 'server' || !this.smartServeRef) {
throw new Error('findAllTargetConnections is only available on servers');
}
const matchingConnections: ISmartServeConnectionWrapper[] = [];
for (const peer of this.smartServeRef.getWebSocketConnections()) {
const wrapper = this.getOrCreateWrapper(peer);
if (await asyncFindFuncArg(wrapper)) {
matchingConnections.push(wrapper);
}
}
return matchingConnections;
}
/**
* Finds the first connection matching the filter function.
* @server-only
*/
public async findTargetConnection(
asyncFindFuncArg: (connectionArg: ISmartServeConnectionWrapper) => Promise<boolean>
): Promise<ISmartServeConnectionWrapper | undefined> {
const allMatching = await this.findAllTargetConnections(asyncFindFuncArg);
return allMatching[0];
}
/**
* Finds all connections with the specified tag.
* @server-only
*/
public async findAllTargetConnectionsByTag<TTag extends plugins.typedrequestInterfaces.ITag = any>(
keyArg: TTag['name'],
payloadArg?: TTag['payload']
): Promise<ISmartServeConnectionWrapper[]> {
if (this.side !== 'server' || !this.smartServeRef) {
throw new Error('findAllTargetConnectionsByTag is only available on servers');
}
const peers = this.smartServeRef.getWebSocketConnectionsByTag(keyArg);
const results: ISmartServeConnectionWrapper[] = [];
for (const peer of peers) {
const wrapper = this.getOrCreateWrapper(peer);
// If payload specified, also filter by payload
if (payloadArg !== undefined) {
const tag = await wrapper.getTagById(keyArg);
if (plugins.smartjson.stringify(tag?.payload) !== plugins.smartjson.stringify(payloadArg)) {
continue;
}
}
results.push(wrapper);
}
return results;
}
/**
* Finds the first connection with the specified tag.
* @server-only
*/
public async findTargetConnectionByTag<TTag extends plugins.typedrequestInterfaces.ITag = any>(
keyArg: TTag['name'],
payloadArg?: TTag['payload']
): Promise<ISmartServeConnectionWrapper | undefined> {
const allResults = await this.findAllTargetConnectionsByTag(keyArg, payloadArg);
return allResults[0];
}
}