316 lines
10 KiB
TypeScript
316 lines
10 KiB
TypeScript
import * as plugins from './plugins.js';
|
|
import { VirtualStream } from './classes.virtualstream.js';
|
|
|
|
import { TypedHandler } from './classes.typedhandler.js';
|
|
import { TypedRequest } from './classes.typedrequest.js';
|
|
|
|
/**
|
|
* Log entry for TypedRequest traffic monitoring
|
|
*/
|
|
export interface ITypedRequestLogEntry {
|
|
correlationId: string;
|
|
method: string;
|
|
direction: 'outgoing' | 'incoming';
|
|
phase: 'request' | 'response';
|
|
timestamp: number;
|
|
durationMs?: number;
|
|
payload: any;
|
|
error?: string;
|
|
}
|
|
|
|
/**
|
|
* Hooks for intercepting TypedRequest traffic
|
|
*/
|
|
export interface ITypedRouterHooks {
|
|
onOutgoingRequest?: (entry: ITypedRequestLogEntry) => void;
|
|
onIncomingResponse?: (entry: ITypedRequestLogEntry) => void;
|
|
onIncomingRequest?: (entry: ITypedRequestLogEntry) => void;
|
|
onOutgoingResponse?: (entry: ITypedRequestLogEntry) => void;
|
|
}
|
|
|
|
/**
|
|
* A typed router decides on which typed handler to call based on the method
|
|
* specified in the typed request
|
|
* This is thought for reusing the same url endpoint for different methods
|
|
*/
|
|
export class TypedRouter {
|
|
// Use globalThis for cross-bundle hook sharing
|
|
public static get globalHooks(): ITypedRouterHooks {
|
|
if (!(globalThis as any).__typedRouterGlobalHooks) {
|
|
(globalThis as any).__typedRouterGlobalHooks = {};
|
|
}
|
|
return (globalThis as any).__typedRouterGlobalHooks;
|
|
}
|
|
|
|
public static set globalHooks(value: ITypedRouterHooks) {
|
|
(globalThis as any).__typedRouterGlobalHooks = value;
|
|
}
|
|
|
|
/**
|
|
* Set global hooks for monitoring all TypedRequest traffic
|
|
* Hooks are shared across all bundles via globalThis
|
|
*/
|
|
public static setGlobalHooks(hooks: ITypedRouterHooks): void {
|
|
const current = TypedRouter.globalHooks;
|
|
TypedRouter.globalHooks = { ...current, ...hooks };
|
|
}
|
|
|
|
/**
|
|
* Clear all global hooks
|
|
*/
|
|
public static clearGlobalHooks(): void {
|
|
(globalThis as any).__typedRouterGlobalHooks = {};
|
|
}
|
|
|
|
// Instance-level hooks (for per-router monitoring)
|
|
public hooks: ITypedRouterHooks = {};
|
|
|
|
/**
|
|
* Set instance-level hooks for monitoring traffic through this router
|
|
*/
|
|
public setHooks(hooks: ITypedRouterHooks): void {
|
|
this.hooks = { ...this.hooks, ...hooks };
|
|
}
|
|
|
|
/**
|
|
* Helper to call both global and instance hooks
|
|
*/
|
|
private callHook(
|
|
hookName: keyof ITypedRouterHooks,
|
|
entry: ITypedRequestLogEntry
|
|
): void {
|
|
try {
|
|
// Call global hooks
|
|
TypedRouter.globalHooks[hookName]?.(entry);
|
|
// Call instance hooks
|
|
this.hooks[hookName]?.(entry);
|
|
} catch (err) {
|
|
console.error(`TypedRouter hook error (${hookName}):`, err);
|
|
}
|
|
}
|
|
|
|
public routerMap = new plugins.lik.ObjectMap<TypedRouter>();
|
|
public handlerMap = new plugins.lik.ObjectMap<
|
|
TypedHandler<any & plugins.typedRequestInterfaces.ITypedRequest>
|
|
>();
|
|
public registeredVirtualStreams = new plugins.lik.ObjectMap<VirtualStream<any>>();
|
|
|
|
public fireEventInterestMap = new plugins.lik.InterestMap<
|
|
string,
|
|
plugins.typedRequestInterfaces.ITypedRequest
|
|
>((correlationId: string) => correlationId);
|
|
|
|
/**
|
|
* adds the handler to the routing map
|
|
* @param typedHandlerArg
|
|
*/
|
|
public addTypedHandler<T extends plugins.typedRequestInterfaces.ITypedRequest>(
|
|
typedHandlerArg: TypedHandler<T>
|
|
) {
|
|
// lets check for deduplication
|
|
const existingTypedHandler = this.getTypedHandlerForMethod(typedHandlerArg.method);
|
|
if (existingTypedHandler) {
|
|
throw new Error(
|
|
`a TypedHandler for ${typedHandlerArg.method} alredy exists! Can't add another one.`
|
|
);
|
|
}
|
|
|
|
this.handlerMap.add(typedHandlerArg);
|
|
}
|
|
|
|
/**
|
|
* adds another sub typedRouter
|
|
* @param typedRequest
|
|
*/
|
|
public addTypedRouter(typedRouterArg: TypedRouter) {
|
|
const routerExists = this.routerMap.findSync((routerArg) => routerArg === typedRouterArg);
|
|
if (!routerExists) {
|
|
this.routerMap.add(typedRouterArg);
|
|
typedRouterArg.addTypedRouter(this);
|
|
}
|
|
}
|
|
|
|
public checkForTypedHandler(methodArg: string): boolean {
|
|
return !!this.getTypedHandlerForMethod(methodArg);
|
|
}
|
|
|
|
/**
|
|
* gets a typed Router from the router chain, upstream and downstream
|
|
* @param methodArg
|
|
* @param checkUpstreamRouter
|
|
*/
|
|
public getTypedHandlerForMethod(
|
|
methodArg: string,
|
|
checkedRouters: TypedRouter[] = []
|
|
): TypedHandler<any> {
|
|
checkedRouters.push(this);
|
|
|
|
let typedHandler: TypedHandler<any>;
|
|
|
|
typedHandler = this.handlerMap.findSync((handler) => {
|
|
return handler.method === methodArg;
|
|
});
|
|
|
|
if (!typedHandler) {
|
|
this.routerMap.getArray().forEach((typedRouterArg) => {
|
|
if (!typedHandler && !checkedRouters.includes(typedRouterArg)) {
|
|
typedHandler = typedRouterArg.getTypedHandlerForMethod(methodArg, checkedRouters);
|
|
}
|
|
});
|
|
}
|
|
|
|
return typedHandler;
|
|
}
|
|
|
|
/**
|
|
* Options for routeAndAddResponse
|
|
*/
|
|
public static defaultRouteOptions = {
|
|
localRequest: false,
|
|
skipHooks: false,
|
|
};
|
|
|
|
/**
|
|
* if typedrequest object has correlation.phase === 'request' -> routes a typed request object to a handler
|
|
* if typedrequest object has correlation.phase === 'response' -> routes a typed request object to request fire event
|
|
* @param typedRequestArg
|
|
* @param optionsArg - Options object with:
|
|
* - localRequest: treat as local request (default: false)
|
|
* - skipHooks: skip calling hooks for this routing (default: false, use for broadcast-received messages)
|
|
*/
|
|
public async routeAndAddResponse<
|
|
T extends plugins.typedRequestInterfaces.ITypedRequest = plugins.typedRequestInterfaces.ITypedRequest
|
|
>(typedRequestArg: T, optionsArg: { localRequest?: boolean; skipHooks?: boolean } = {}): Promise<T> {
|
|
const options = { ...TypedRouter.defaultRouteOptions, ...optionsArg };
|
|
// decoding first
|
|
typedRequestArg = VirtualStream.decodePayloadFromNetwork(typedRequestArg, {
|
|
typedrouter: this,
|
|
});
|
|
|
|
// localdata second
|
|
typedRequestArg.localData = typedRequestArg.localData || {};
|
|
typedRequestArg.localData.firstTypedrouter = this;
|
|
|
|
// lets do stream processing
|
|
if (typedRequestArg.method === '##VirtualStream##') {
|
|
const result: any = await this.handleStreamTypedRequest(typedRequestArg as plugins.typedRequestInterfaces.IStreamRequest);
|
|
result.localData = null;
|
|
return result as T;
|
|
}
|
|
|
|
// lets do normal routing
|
|
if (typedRequestArg?.correlation?.phase === 'request' || options.localRequest) {
|
|
const requestStartTime = Date.now();
|
|
|
|
// Hook: incoming request (skip if routing broadcast-received messages)
|
|
if (!options.skipHooks) {
|
|
this.callHook('onIncomingRequest', {
|
|
correlationId: typedRequestArg.correlation?.id || 'unknown',
|
|
method: typedRequestArg.method,
|
|
direction: 'incoming',
|
|
phase: 'request',
|
|
timestamp: requestStartTime,
|
|
payload: typedRequestArg.request,
|
|
});
|
|
}
|
|
|
|
const typedHandler = this.getTypedHandlerForMethod(typedRequestArg.method);
|
|
|
|
if (!typedHandler) {
|
|
console.log(`Cannot find handler for methodname ${typedRequestArg.method}`);
|
|
typedRequestArg.error = {
|
|
text: 'There is no available method for this call on the server side',
|
|
data: {},
|
|
};
|
|
typedRequestArg.correlation.phase = 'response';
|
|
|
|
// encode again before handing back
|
|
typedRequestArg.localData = null;
|
|
typedRequestArg = VirtualStream.encodePayloadForNetwork(typedRequestArg, {
|
|
typedrouter: this,
|
|
});
|
|
|
|
// Hook: outgoing response (error - no handler)
|
|
if (!options.skipHooks) {
|
|
this.callHook('onOutgoingResponse', {
|
|
correlationId: typedRequestArg.correlation?.id || 'unknown',
|
|
method: typedRequestArg.method,
|
|
direction: 'outgoing',
|
|
phase: 'response',
|
|
timestamp: Date.now(),
|
|
durationMs: Date.now() - requestStartTime,
|
|
payload: typedRequestArg.response,
|
|
error: typedRequestArg.error?.text,
|
|
});
|
|
}
|
|
|
|
return typedRequestArg;
|
|
}
|
|
|
|
typedRequestArg = await typedHandler.addResponse(typedRequestArg);
|
|
typedRequestArg.localData = null;
|
|
// encode again before handing back
|
|
typedRequestArg = VirtualStream.encodePayloadForNetwork(typedRequestArg, {
|
|
typedrouter: this,
|
|
});
|
|
|
|
// Hook: outgoing response (success)
|
|
if (!options.skipHooks) {
|
|
this.callHook('onOutgoingResponse', {
|
|
correlationId: typedRequestArg.correlation?.id || 'unknown',
|
|
method: typedRequestArg.method,
|
|
direction: 'outgoing',
|
|
phase: 'response',
|
|
timestamp: Date.now(),
|
|
durationMs: Date.now() - requestStartTime,
|
|
payload: typedRequestArg.response,
|
|
error: typedRequestArg.error?.text,
|
|
});
|
|
}
|
|
|
|
return typedRequestArg;
|
|
} else if (typedRequestArg?.correlation?.phase === 'response') {
|
|
// Hook: incoming response
|
|
if (!options.skipHooks) {
|
|
this.callHook('onIncomingResponse', {
|
|
correlationId: typedRequestArg.correlation?.id || 'unknown',
|
|
method: typedRequestArg.method,
|
|
direction: 'incoming',
|
|
phase: 'response',
|
|
timestamp: Date.now(),
|
|
payload: typedRequestArg.response,
|
|
error: typedRequestArg.error?.text,
|
|
});
|
|
}
|
|
|
|
this.fireEventInterestMap
|
|
.findInterest(typedRequestArg.correlation.id)
|
|
?.fullfillInterest(typedRequestArg);
|
|
return null;
|
|
} else {
|
|
console.log('received weirdly shaped request');
|
|
console.log(typedRequestArg);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* handle streaming
|
|
* @param streamTrArg
|
|
*/
|
|
public async handleStreamTypedRequest(streamTrArg: plugins.typedRequestInterfaces.IStreamRequest) {
|
|
const relevantVirtualStream = await this.registeredVirtualStreams.find(async virtualStreamArg => {
|
|
return virtualStreamArg.streamId === streamTrArg.request.streamId;
|
|
});
|
|
if (!relevantVirtualStream) {
|
|
console.log(`no relevant virtual stream found for stream with id ${streamTrArg.request.streamId}`);
|
|
console.log(this.registeredVirtualStreams.getArray());
|
|
return streamTrArg;
|
|
} else {
|
|
console.log(`success: found relevant virtual stream with id ${streamTrArg.request.streamId}`);
|
|
}
|
|
const result = await relevantVirtualStream.handleStreamTr(streamTrArg);
|
|
return result;
|
|
}
|
|
}
|