Files
isotransport/ts/index.ts
T

301 lines
9.0 KiB
TypeScript

import type { RawData, WebSocket, WebSocketServer } from 'ws';
export type TIsotransportMessage = string | ArrayBuffer | Uint8Array;
export interface IIsotransportServerOptions {
port: number;
host?: string;
path?: string;
}
export interface IIsotransportClientOptions {
url: string;
protocols?: string | string[];
}
interface IIsotransportConnectionEvents {
message: TIsotransportMessage;
close: void;
error: Error;
}
interface IIsotransportServerEvents {
connection: IsotransportConnection;
close: void;
error: Error;
}
interface IIsotransportClientEvents {
open: IsotransportConnection;
message: TIsotransportMessage;
close: void;
error: Error;
}
type TEventListener<TPayload> = (payload: TPayload) => void;
class TypedEventEmitter<TEvents extends object> {
private listeners = new Map<keyof TEvents, Set<TEventListener<TEvents[keyof TEvents]>>>();
public on<TKey extends keyof TEvents>(
eventName: TKey,
listener: TEventListener<TEvents[TKey]>
): () => void {
const listenersForEvent = this.listeners.get(eventName) ?? new Set<TEventListener<TEvents[keyof TEvents]>>();
listenersForEvent.add(listener as TEventListener<TEvents[keyof TEvents]>);
this.listeners.set(eventName, listenersForEvent);
return () => this.off(eventName, listener);
}
public off<TKey extends keyof TEvents>(
eventName: TKey,
listener: TEventListener<TEvents[TKey]>
): void {
const listenersForEvent = this.listeners.get(eventName);
listenersForEvent?.delete(listener as TEventListener<TEvents[keyof TEvents]>);
}
protected emit<TKey extends keyof TEvents>(eventName: TKey, payload: TEvents[TKey]): void {
const listenersForEvent = this.listeners.get(eventName);
if (!listenersForEvent) {
return;
}
for (const listener of listenersForEvent) {
listener(payload);
}
}
}
const normalizeError = (errorArg: unknown): Error => {
return errorArg instanceof Error ? errorArg : new Error(String(errorArg));
};
const normalizeMessage = (messageArg: unknown): TIsotransportMessage => {
if (typeof messageArg === 'string') {
return messageArg;
}
if (messageArg instanceof ArrayBuffer) {
return messageArg;
}
if (messageArg instanceof Uint8Array) {
return messageArg;
}
if (typeof messageArg === 'object' && messageArg !== null && 'data' in messageArg) {
return normalizeMessage((messageArg as { data: unknown }).data);
}
return String(messageArg);
};
const randomId = (): string => {
return Math.random().toString(36).slice(2);
};
export class IsotransportConnection extends TypedEventEmitter<IIsotransportConnectionEvents> {
public readonly id: string;
private sendFunction: (messageArg: TIsotransportMessage) => void;
private closeFunction: () => void;
private readyStateFunction: () => number;
constructor(optionsArg: {
id?: string;
sendFunction: (messageArg: TIsotransportMessage) => void;
closeFunction: () => void;
readyStateFunction: () => number;
}) {
super();
this.id = optionsArg.id ?? randomId();
this.sendFunction = optionsArg.sendFunction;
this.closeFunction = optionsArg.closeFunction;
this.readyStateFunction = optionsArg.readyStateFunction;
}
public send(messageArg: TIsotransportMessage): void {
if (this.readyStateFunction() !== 1) {
throw new Error('Cannot send on a closed isotransport connection.');
}
this.sendFunction(messageArg);
}
public close(): void {
this.closeFunction();
}
public handleMessage(messageArg: unknown): void {
this.emit('message', normalizeMessage(messageArg));
}
public handleClose(): void {
this.emit('close', undefined);
}
public handleError(errorArg: unknown): void {
this.emit('error', normalizeError(errorArg));
}
}
export class IsotransportServer extends TypedEventEmitter<IIsotransportServerEvents> {
public options: IIsotransportServerOptions;
public connections = new Set<IsotransportConnection>();
private webSocketServer?: WebSocketServer;
constructor(optionsArg: IIsotransportServerOptions) {
super();
this.options = optionsArg;
}
public async listen(): Promise<void> {
if (this.webSocketServer) {
return;
}
const wsModule = await import('ws');
this.webSocketServer = new wsModule.WebSocketServer({
port: this.options.port,
host: this.options.host,
path: this.options.path,
});
this.webSocketServer.on('connection', (socketArg: WebSocket) => {
const connection = new IsotransportConnection({
sendFunction: (messageArg) => socketArg.send(messageArg),
closeFunction: () => socketArg.close(),
readyStateFunction: () => socketArg.readyState,
});
this.connections.add(connection);
socketArg.on('message', (messageArg: RawData, isBinaryArg: boolean) => {
connection.handleMessage(isBinaryArg ? messageArg : messageArg.toString());
});
socketArg.on('close', () => {
this.connections.delete(connection);
connection.handleClose();
});
socketArg.on('error', (errorArg: Error) => connection.handleError(errorArg));
this.emit('connection', connection);
});
this.webSocketServer.on('error', (errorArg: Error) => this.emit('error', errorArg));
await new Promise<void>((resolve, reject) => {
this.webSocketServer!.once('listening', resolve);
this.webSocketServer!.once('error', reject);
});
}
public async close(): Promise<void> {
if (!this.webSocketServer) {
return;
}
for (const connection of this.connections) {
connection.close();
}
await new Promise<void>((resolve, reject) => {
this.webSocketServer!.close((errorArg?: Error) => {
if (errorArg) {
reject(errorArg);
return;
}
resolve();
});
});
this.connections.clear();
this.webSocketServer = undefined;
this.emit('close', undefined);
}
}
type TWebSocketClientSocket = {
readyState: number;
binaryType?: BinaryType;
send: (messageArg: TIsotransportMessage) => void;
close: () => void;
addEventListener?: (eventNameArg: string, listenerArg: (eventArg: unknown) => void) => void;
on?: (eventNameArg: string, listenerArg: (...args: unknown[]) => void) => void;
};
type TWebSocketClientConstructor = new (
urlArg: string,
protocolsArg?: string | string[]
) => TWebSocketClientSocket;
export class IsotransportClient extends TypedEventEmitter<IIsotransportClientEvents> {
public options: IIsotransportClientOptions;
public connection?: IsotransportConnection;
private socket?: TWebSocketClientSocket;
constructor(optionsArg: IIsotransportClientOptions) {
super();
this.options = optionsArg;
}
public async connect(): Promise<void> {
if (this.connection) {
return;
}
const WebSocketConstructor = await this.getWebSocketConstructor();
const socket = new WebSocketConstructor(this.options.url, this.options.protocols);
if (socket.binaryType !== undefined) {
socket.binaryType = 'arraybuffer';
}
this.socket = socket;
const connection = new IsotransportConnection({
sendFunction: (messageArg) => socket.send(messageArg),
closeFunction: () => socket.close(),
readyStateFunction: () => socket.readyState,
});
this.connection = connection;
connection.on('message', (messageArg) => this.emit('message', messageArg));
connection.on('close', () => this.emit('close', undefined));
connection.on('error', (errorArg) => this.emit('error', errorArg));
await new Promise<void>((resolve, reject) => {
this.addSocketListener(socket, 'open', () => {
this.emit('open', connection);
resolve();
});
this.addSocketListener(socket, 'message', (messageArg) => connection.handleMessage(messageArg));
this.addSocketListener(socket, 'close', () => {
connection.handleClose();
this.connection = undefined;
this.socket = undefined;
});
this.addSocketListener(socket, 'error', (errorArg) => {
const normalizedError = normalizeError(errorArg);
connection.handleError(normalizedError);
reject(normalizedError);
});
});
}
public send(messageArg: TIsotransportMessage): void {
if (!this.connection) {
throw new Error('Cannot send before isotransport client is connected.');
}
this.connection.send(messageArg);
}
public close(): void {
this.connection?.close();
}
private async getWebSocketConstructor(): Promise<TWebSocketClientConstructor> {
if (globalThis.WebSocket) {
return globalThis.WebSocket as unknown as TWebSocketClientConstructor;
}
const wsModule = await import('ws');
return wsModule.WebSocket as unknown as TWebSocketClientConstructor;
}
private addSocketListener(
socketArg: TWebSocketClientSocket,
eventNameArg: string,
listenerArg: (...args: unknown[]) => void
): void {
if (socketArg.addEventListener) {
socketArg.addEventListener(eventNameArg, (eventArg) => listenerArg(eventArg));
return;
}
socketArg.on?.(eventNameArg, listenerArg);
}
}