fix(core): update
This commit is contained in:
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@api.global/typedrequest',
|
||||
version: '3.0.6',
|
||||
version: '3.0.7',
|
||||
description: 'make typed requests towards apis'
|
||||
}
|
||||
|
||||
@@ -20,7 +20,6 @@ export class TypedRequest<T extends plugins.typedRequestInterfaces.ITypedRequest
|
||||
public method: string;
|
||||
|
||||
/**
|
||||
* note the overloading is thought to deal with promises
|
||||
* @param postEndPointArg
|
||||
* @param methodArg
|
||||
*/
|
||||
@@ -49,8 +48,8 @@ export class TypedRequest<T extends plugins.typedRequestInterfaces.ITypedRequest
|
||||
|
||||
// lets preprocess the payload
|
||||
payloadSending = VirtualStream.encodePayloadForNetwork(payloadSending, {
|
||||
sendMethod: (payloadArg: plugins.typedRequestInterfaces.ITypedRequest) => {
|
||||
return this.postTrObject(payloadArg);
|
||||
sendMethod: (payloadArg: plugins.typedRequestInterfaces.IStreamRequest) => {
|
||||
return this.postTrObject(payloadArg) as Promise<plugins.typedRequestInterfaces.IStreamRequest>;
|
||||
}
|
||||
});
|
||||
|
||||
@@ -59,8 +58,8 @@ export class TypedRequest<T extends plugins.typedRequestInterfaces.ITypedRequest
|
||||
|
||||
// lets preprocess the response
|
||||
payloadReceiving = VirtualStream.decodePayloadFromNetwork(payloadReceiving, {
|
||||
sendMethod: (payloadArg: plugins.typedRequestInterfaces.ITypedRequest) => {
|
||||
return this.postTrObject(payloadArg);
|
||||
sendMethod: (payloadArg: plugins.typedRequestInterfaces.IStreamRequest) => {
|
||||
return this.postTrObject(payloadArg) as Promise<plugins.typedRequestInterfaces.IStreamRequest>;
|
||||
}
|
||||
});
|
||||
return payloadReceiving.response;
|
||||
|
||||
@@ -14,7 +14,7 @@ export class TypedRouter {
|
||||
public handlerMap = new plugins.lik.ObjectMap<
|
||||
TypedHandler<any & plugins.typedRequestInterfaces.ITypedRequest>
|
||||
>();
|
||||
public registeredVirtualStreams = new plugins.lik.ObjectMap<VirtualStream>();
|
||||
public registeredVirtualStreams = new plugins.lik.ObjectMap<VirtualStream<any>>();
|
||||
|
||||
public fireEventInterestMap = new plugins.lik.InterestMap<
|
||||
string,
|
||||
@@ -102,7 +102,8 @@ export class TypedRouter {
|
||||
|
||||
// lets do stream processing
|
||||
if (typedRequestArg.method === '##VirtualStream##') {
|
||||
const result = await this.handleStreamTypedRequest(typedRequestArg as plugins.typedRequestInterfaces.IStreamRequest);
|
||||
const result: any = await this.handleStreamTypedRequest(typedRequestArg as plugins.typedRequestInterfaces.IStreamRequest);
|
||||
result.localData = null;
|
||||
return result as T;
|
||||
}
|
||||
|
||||
@@ -152,6 +153,13 @@ export class TypedRouter {
|
||||
const relevantVirtualStream = await this.registeredVirtualStreams.find(async virtualStreamArg => {
|
||||
return virtualStreamArg.streamId === streamTrArg.request.streamId;
|
||||
});
|
||||
if (!relevantVirtualStream) {
|
||||
console.log(`no relevant virtual stream found for stream with id ${streamTrArg.request.streamId}`);
|
||||
console.log(this.registeredVirtualStreams.getArray());
|
||||
return streamTrArg;
|
||||
} else {
|
||||
console.log(`success: found relevant virtual stream with id ${streamTrArg.request.streamId}`);
|
||||
}
|
||||
const result = await relevantVirtualStream.handleStreamTr(streamTrArg);
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -3,16 +3,25 @@ import type { TypedRouter } from './typedrequest.classes.typedrouter.js';
|
||||
|
||||
export interface ICommFunctions {
|
||||
sendMethod?: (
|
||||
sendPayload: plugins.typedRequestInterfaces.ITypedRequest
|
||||
) => Promise<plugins.typedRequestInterfaces.ITypedRequest>;
|
||||
sendPayload: plugins.typedRequestInterfaces.IStreamRequest
|
||||
) => Promise<plugins.typedRequestInterfaces.IStreamRequest>;
|
||||
typedrouter?: TypedRouter;
|
||||
}
|
||||
|
||||
export class VirtualStream {
|
||||
/**
|
||||
* 1. A VirtualStream connects over the network
|
||||
* 2. It is always paired to one other VirtualStream
|
||||
* on the other side with the same streamId.
|
||||
* 3. It has a Readable and Writable side.
|
||||
* 4. The Writable side is Readable on the other side and vice versa.
|
||||
*/
|
||||
export class VirtualStream<T = ArrayBufferLike> {
|
||||
// STATIC
|
||||
public static encodePayloadForNetwork(
|
||||
objectPayload: any,
|
||||
commFunctions: ICommFunctions
|
||||
commFunctions: ICommFunctions,
|
||||
originalPayload?: any,
|
||||
path = []
|
||||
): any {
|
||||
if (objectPayload instanceof VirtualStream) {
|
||||
if (!objectPayload.side && commFunctions.sendMethod) {
|
||||
@@ -22,20 +31,41 @@ export class VirtualStream {
|
||||
if (!objectPayload.side && commFunctions.typedrouter) {
|
||||
objectPayload.side = 'responding';
|
||||
objectPayload.typedrouter = commFunctions.typedrouter;
|
||||
commFunctions.typedrouter.registeredVirtualStreams.add(objectPayload);
|
||||
}
|
||||
if (!originalPayload.response || path.includes('response')) {
|
||||
return {
|
||||
_isVirtualStream: true,
|
||||
streamId: objectPayload.streamId,
|
||||
};
|
||||
} else {
|
||||
return {
|
||||
_OBMITTED_VIRTUAL_STREAM: true,
|
||||
reason: 'path is under .request: obmitted for deduplication reasons in response cycle.',
|
||||
};
|
||||
}
|
||||
return {
|
||||
_isVirtualStream: true,
|
||||
streamId: objectPayload.streamId,
|
||||
};
|
||||
} else if (Array.isArray(objectPayload)) {
|
||||
const returnArray = [];
|
||||
for (const item of objectPayload) {
|
||||
returnArray.push(VirtualStream.encodePayloadForNetwork(item, commFunctions));
|
||||
returnArray.push(
|
||||
VirtualStream.encodePayloadForNetwork(
|
||||
item,
|
||||
commFunctions,
|
||||
originalPayload || objectPayload,
|
||||
path
|
||||
)
|
||||
);
|
||||
}
|
||||
return returnArray;
|
||||
} else if (objectPayload !== null && typeof objectPayload === 'object') {
|
||||
return Object.keys(objectPayload).reduce((acc, key) => {
|
||||
acc[key] = VirtualStream.encodePayloadForNetwork(objectPayload[key], commFunctions);
|
||||
path.push(key);
|
||||
acc[key] = VirtualStream.encodePayloadForNetwork(
|
||||
objectPayload[key],
|
||||
commFunctions,
|
||||
originalPayload || objectPayload,
|
||||
path
|
||||
);
|
||||
return acc;
|
||||
}, {});
|
||||
} else {
|
||||
@@ -55,6 +85,7 @@ export class VirtualStream {
|
||||
if (!virtualStream.side && commFunctions.typedrouter) {
|
||||
virtualStream.side = 'responding';
|
||||
virtualStream.typedrouter = commFunctions.typedrouter;
|
||||
commFunctions.typedrouter.registeredVirtualStreams.add(virtualStream);
|
||||
}
|
||||
return virtualStream;
|
||||
} else if (Array.isArray(objectPayload)) {
|
||||
@@ -78,23 +109,238 @@ export class VirtualStream {
|
||||
|
||||
public side: 'requesting' | 'responding';
|
||||
public streamId: string = plugins.isounique.uni();
|
||||
|
||||
// integration with typedrequest mechanics
|
||||
public sendMethod: ICommFunctions['sendMethod'];
|
||||
public typedrouter: TypedRouter;
|
||||
public sendMethod: (
|
||||
sendPayloadArg: plugins.typedRequestInterfaces.ITypedRequest
|
||||
) => Promise<plugins.typedRequestInterfaces.ITypedRequest>;
|
||||
|
||||
constructor() {}
|
||||
// wether to keep the stream alive
|
||||
private keepAlive = true;
|
||||
private lastKeepAliveEvent = Date.now();
|
||||
|
||||
// backpressured arrays
|
||||
private sendBackpressuredArray =
|
||||
new plugins.lik.BackpressuredArray<T>(
|
||||
16
|
||||
);
|
||||
private receiveBackpressuredArray =
|
||||
new plugins.lik.BackpressuredArray<T>(
|
||||
16
|
||||
);
|
||||
|
||||
constructor() {
|
||||
this.startKeepAliveLoop();
|
||||
}
|
||||
|
||||
/**
|
||||
* takes care of sending
|
||||
*/
|
||||
private async workOnQueue() {
|
||||
if(this.side === 'requesting') {
|
||||
// helper functions
|
||||
const getFeedback = async () => {
|
||||
const streamTr = await this.sendMethod({
|
||||
method: '##VirtualStream##',
|
||||
request: {
|
||||
streamId: this.streamId,
|
||||
cycleId: plugins.isounique.uni(),
|
||||
cycle: 'request',
|
||||
mainPurpose: 'feedback',
|
||||
next: this.sendBackpressuredArray.data.length > 0,
|
||||
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
|
||||
},
|
||||
response: null,
|
||||
}).catch(() => {
|
||||
console.log('stream ended immaturely');
|
||||
this.keepAlive = false;
|
||||
});
|
||||
if (streamTr && streamTr.response) {
|
||||
otherSideIsBackpressured = streamTr.response.backpressure
|
||||
otherSideHasNext = streamTr.response.next;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// do work loop
|
||||
let thisSideIsBackpressured = this.receiveBackpressuredArray.checkSpaceAvailable();
|
||||
let otherSideHasNext = false;
|
||||
let otherSideIsBackpressured = false;
|
||||
while (this.sendBackpressuredArray.data.length > 0 || otherSideHasNext) {
|
||||
const dataArg = this.sendBackpressuredArray.shift();
|
||||
const streamTr = await this.sendMethod({
|
||||
method: '##VirtualStream##',
|
||||
request: {
|
||||
streamId: this.streamId,
|
||||
cycleId: plugins.isounique.uni(),
|
||||
cycle: 'request',
|
||||
mainPurpose: 'chunk',
|
||||
backpressure: thisSideIsBackpressured,
|
||||
next: this.sendBackpressuredArray.data.length > 0,
|
||||
chunkData: dataArg,
|
||||
},
|
||||
response: null,
|
||||
}).catch(() => {
|
||||
console.log('stream ended immaturely');
|
||||
this.keepAlive = false;
|
||||
});
|
||||
|
||||
if (streamTr && streamTr.response && streamTr.response.chunkData) {
|
||||
this.receiveBackpressuredArray.push(streamTr.response.chunkData);
|
||||
}
|
||||
thisSideIsBackpressured = this.receiveBackpressuredArray.checkSpaceAvailable();
|
||||
|
||||
// lets care about looping
|
||||
otherSideHasNext = streamTr && streamTr.response && streamTr.response.next;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method handles the stream only on the responding side
|
||||
* @param streamTrArg
|
||||
* @returns
|
||||
*/
|
||||
public async handleStreamTr(streamTrArg: plugins.typedRequestInterfaces.IStreamRequest) {
|
||||
if (streamTrArg.request.keepAlive === true && this.keepAlive === true) {
|
||||
this.lastKeepAliveEvent = Date.now();
|
||||
} else if (streamTrArg.request.keepAlive === false) {
|
||||
this.keepAlive = false;
|
||||
}
|
||||
|
||||
// keepAlive handling
|
||||
if (streamTrArg.request.mainPurpose === 'keepAlive') {
|
||||
// if the main purpose is keepAlive, we answer with a keepAlive
|
||||
streamTrArg.response = {
|
||||
streamId: this.streamId,
|
||||
cycleId: streamTrArg.request.cycleId,
|
||||
cycle: 'response',
|
||||
mainPurpose: 'keepAlive',
|
||||
keepAlive: this.keepAlive,
|
||||
next: this.sendBackpressuredArray.data.length > 0,
|
||||
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
|
||||
};
|
||||
}
|
||||
|
||||
// feedback handling
|
||||
if (streamTrArg.request.mainPurpose === 'feedback') {
|
||||
streamTrArg.response = {
|
||||
streamId: this.streamId,
|
||||
cycleId: streamTrArg.request.cycleId,
|
||||
cycle: 'response',
|
||||
mainPurpose: 'feedback',
|
||||
next: this.sendBackpressuredArray.data.length > 0,
|
||||
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
|
||||
};
|
||||
streamTrArg.request = null;
|
||||
}
|
||||
|
||||
// chunk handling
|
||||
if (streamTrArg.request.mainPurpose === 'chunk') {
|
||||
this.receiveBackpressuredArray.push(streamTrArg.request.chunkData);
|
||||
if (this.sendBackpressuredArray.data.length > 0 && streamTrArg.response.backpressure === false) {
|
||||
const dataArg = this.sendBackpressuredArray.shift();
|
||||
streamTrArg.response = {
|
||||
streamId: this.streamId,
|
||||
cycleId: streamTrArg.request.cycleId,
|
||||
cycle: 'response',
|
||||
mainPurpose: 'chunk',
|
||||
next: this.sendBackpressuredArray.data.length > 1,
|
||||
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
|
||||
chunkData: this.sendBackpressuredArray.shift(),
|
||||
};
|
||||
} else {
|
||||
streamTrArg.response = {
|
||||
streamId: this.streamId,
|
||||
cycleId: streamTrArg.request.cycleId,
|
||||
cycle: 'response',
|
||||
mainPurpose: 'feedback',
|
||||
next: this.sendBackpressuredArray.data.length > 0,
|
||||
};
|
||||
}
|
||||
streamTrArg.request = null;
|
||||
}
|
||||
|
||||
return streamTrArg;
|
||||
}
|
||||
|
||||
/**
|
||||
* closes the virtual stream
|
||||
*/
|
||||
close() {
|
||||
public async cleanup() {
|
||||
if (this.typedrouter) {
|
||||
this.typedrouter.registeredVirtualStreams.remove(this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* a keepAlive loop that works across technologies
|
||||
*/
|
||||
private async startKeepAliveLoop() {
|
||||
// initially wait for a second
|
||||
await plugins.smartdelay.delayFor(0);
|
||||
let counter = 0;
|
||||
keepAliveLoop: while (this.keepAlive) {
|
||||
const triggerResult = await this.triggerKeepAlive();
|
||||
await plugins.smartdelay.delayFor(1000);
|
||||
}
|
||||
await plugins.smartdelay.delayFor(1000);
|
||||
await this.cleanup();
|
||||
console.log(`cleaned up for stream ${this.streamId}`);
|
||||
}
|
||||
|
||||
private async triggerKeepAlive() {
|
||||
if (this.side === 'requesting') {
|
||||
console.log(`keepalive sent.`);
|
||||
const streamTr = await this.sendMethod({
|
||||
method: '##VirtualStream##',
|
||||
request: {
|
||||
streamId: this.streamId,
|
||||
cycleId: plugins.isounique.uni(),
|
||||
cycle: 'request',
|
||||
mainPurpose: 'keepAlive',
|
||||
keepAlive: true,
|
||||
},
|
||||
response: null,
|
||||
}).catch(() => {
|
||||
this.keepAlive = false;
|
||||
});
|
||||
|
||||
// lets handle keepAlive
|
||||
if (streamTr && streamTr.response && streamTr.response.keepAlive === false) {
|
||||
this.keepAlive = false;
|
||||
} else {
|
||||
this.lastKeepAliveEvent = Date.now();
|
||||
}
|
||||
if (streamTr && streamTr.response && streamTr.response.next) {
|
||||
this.workOnQueue();
|
||||
}
|
||||
}
|
||||
if (Date.now() - this.lastKeepAliveEvent > 10000) {
|
||||
console.log(`closing stream for ${this.streamId}`);
|
||||
this.keepAlive = false;
|
||||
}
|
||||
}
|
||||
|
||||
// Data sending and receiving
|
||||
public async sendData(dataArg: T): Promise<void> {
|
||||
this.sendBackpressuredArray.push(dataArg);
|
||||
this.workOnQueue();
|
||||
await this.sendBackpressuredArray.waitForSpace();
|
||||
}
|
||||
public async fetchData(): Promise<T> {
|
||||
if (this.receiveBackpressuredArray.hasSpace) {
|
||||
// do something maybe?
|
||||
}
|
||||
await this.receiveBackpressuredArray.waitForItems();
|
||||
const dataPackage = this.receiveBackpressuredArray.shift();
|
||||
return dataPackage;
|
||||
}
|
||||
|
||||
public pipeWebStream(webStream: any) {
|
||||
// lets do the piping
|
||||
webStream.on('data', (data: any) => {});
|
||||
webStream.on('end', () => {});
|
||||
webStream.on('error', (error: any) => {});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user