Compare commits
24 Commits
Author | SHA1 | Date | |
---|---|---|---|
8b4befc828 | |||
77dddd9157 | |||
737f413324 | |||
e613937c43 | |||
9c66752f8b | |||
5c6922c710 | |||
c8e4343ac7 | |||
924bc2c5a7 | |||
2274afcd38 | |||
23aab2adf8 | |||
90311ad65e | |||
407e1383f8 | |||
ad106909e2 | |||
b346da01f1 | |||
51fedb270b | |||
fd26b48ff6 | |||
1bfe10691a | |||
bf81c34dbc | |||
f837bb5230 | |||
1b76e6882f | |||
83b43f501d | |||
9b626a562d | |||
c216f97bfd | |||
71453877d7 |
22
package.json
22
package.json
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@api.global/typedrequest",
|
||||
"version": "3.0.2",
|
||||
"version": "3.0.14",
|
||||
"private": false,
|
||||
"description": "make typed requests towards apis",
|
||||
"main": "dist_ts/index.js",
|
||||
@ -14,22 +14,24 @@
|
||||
"buildDocs": "tsdoc"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@api.global/typedserver": "^3.0.5",
|
||||
"@git.zone/tsbuild": "^2.1.66",
|
||||
"@git.zone/tsbundle": "^2.0.8",
|
||||
"@api.global/typedserver": "^3.0.25",
|
||||
"@git.zone/tsbuild": "^2.1.72",
|
||||
"@git.zone/tsbundle": "^2.0.15",
|
||||
"@git.zone/tsrun": "^1.2.44",
|
||||
"@git.zone/tstest": "^1.0.77",
|
||||
"@push.rocks/smartenv": "^5.0.10",
|
||||
"@git.zone/tstest": "^1.0.86",
|
||||
"@push.rocks/smartenv": "^5.0.12",
|
||||
"@push.rocks/tapbundle": "^5.0.15",
|
||||
"@types/node": "^20.8.7"
|
||||
"@types/node": "^20.11.24"
|
||||
},
|
||||
"dependencies": {
|
||||
"@api.global/typedrequest-interfaces": "^3.0.1",
|
||||
"@api.global/typedrequest-interfaces": "^3.0.18",
|
||||
"@push.rocks/isounique": "^1.0.5",
|
||||
"@push.rocks/lik": "^6.0.5",
|
||||
"@push.rocks/lik": "^6.0.14",
|
||||
"@push.rocks/smartbuffer": "^1.0.7",
|
||||
"@push.rocks/smartdelay": "^3.0.5",
|
||||
"@push.rocks/smartpromise": "^4.0.3",
|
||||
"@push.rocks/webrequest": "^3.0.33"
|
||||
"@push.rocks/webrequest": "^3.0.34",
|
||||
"@push.rocks/webstream": "^1.0.8"
|
||||
},
|
||||
"files": [
|
||||
"ts/**/*",
|
||||
|
2177
pnpm-lock.yaml
generated
2177
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
@ -2,8 +2,10 @@ import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import * as typedserver from '@api.global/typedserver';
|
||||
|
||||
import * as typedrequest from '../ts/index.js';
|
||||
import * as typedrequestInterfaces from '@api.global/typedrequest-interfaces';
|
||||
|
||||
let testServer: typedserver.servertools.Server;
|
||||
let testTypedRouter: typedrequest.TypedRouter;
|
||||
let testTypedHandler: typedrequest.TypedHandler<ITestReqRes>;
|
||||
|
||||
// lets define an interface
|
||||
@ -17,6 +19,16 @@ interface ITestReqRes {
|
||||
};
|
||||
}
|
||||
|
||||
interface ITestStream {
|
||||
method: 'handleStream';
|
||||
request: {
|
||||
requestStream: typedrequestInterfaces.IVirtualStream;
|
||||
};
|
||||
response: {
|
||||
responseStream: typedrequestInterfaces.IVirtualStream;
|
||||
};
|
||||
}
|
||||
|
||||
tap.test('should create a typedHandler', async () => {
|
||||
// lets use the interface in a TypedHandler
|
||||
testTypedHandler = new typedrequest.TypedHandler<ITestReqRes>('hi', async (reqArg) => {
|
||||
@ -35,7 +47,7 @@ tap.test('should spawn a server to test with', async () => {
|
||||
});
|
||||
|
||||
tap.test('should define a testHandler', async () => {
|
||||
const testTypedRouter = new typedrequest.TypedRouter(); // typed routers can broker typedrequests between handlers
|
||||
testTypedRouter = new typedrequest.TypedRouter(); // typed routers can broker typedrequests between handlers
|
||||
testTypedRouter.addTypedHandler(testTypedHandler);
|
||||
testServer.addRoute(
|
||||
'/testroute',
|
||||
@ -60,7 +72,36 @@ tap.test('should fire a request', async () => {
|
||||
expect(response.surname).toEqual('wow');
|
||||
});
|
||||
|
||||
tap.test('should end the server', async () => {
|
||||
tap.test('should allow VirtualStreams', async () => {
|
||||
const newRequestingVS = new typedrequest.VirtualStream();
|
||||
const newRespondingVS = new typedrequest.VirtualStream();
|
||||
let generatedRequestingVS: typedrequestInterfaces.IVirtualStream;
|
||||
let generatedRespondingVS: typedrequestInterfaces.IVirtualStream;
|
||||
testTypedRouter.addTypedHandler(new typedrequest.TypedHandler<ITestStream>('handleStream', async (reqArg) => {
|
||||
console.log('hey there');
|
||||
console.log(reqArg.requestStream);
|
||||
generatedRequestingVS = reqArg.requestStream;
|
||||
return {
|
||||
responseStream: newRespondingVS,
|
||||
};
|
||||
}));
|
||||
const typedRequest = new typedrequest.TypedRequest<ITestStream>(
|
||||
'http://localhost:3000/testroute',
|
||||
'handleStream'
|
||||
);
|
||||
const response = await typedRequest.fire({
|
||||
requestStream: newRequestingVS,
|
||||
});
|
||||
console.log(response.responseStream);
|
||||
|
||||
newRequestingVS.sendData(Buffer.from('hello'));
|
||||
const data = await generatedRequestingVS.fetchData();
|
||||
const decodedData = data.toString();
|
||||
expect(data.toString()).toEqual('hello');
|
||||
});
|
||||
|
||||
tap.test('should end the server', async (toolsArg) => {
|
||||
await toolsArg.delayFor(5000);
|
||||
await testServer.stop();
|
||||
});
|
||||
|
@ -31,9 +31,15 @@ tap.test('should define a testHandler', async () => {
|
||||
|
||||
tap.test('should fire a request', async () => {
|
||||
const typedRequest = new typedrequest.TypedRequest<ITestReqRes>(
|
||||
'http://localhost:3000/testroute',
|
||||
'http://localhost:3000/typedrequest',
|
||||
'hi'
|
||||
);
|
||||
const result = await typedRequest.fire({
|
||||
name: 'yes',
|
||||
}).catch(err => {
|
||||
console.log(err);
|
||||
});
|
||||
console.log(result);
|
||||
});
|
||||
|
||||
tap.start();
|
||||
|
@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@api.global/typedrequest',
|
||||
version: '3.0.2',
|
||||
version: '3.0.14',
|
||||
description: 'make typed requests towards apis'
|
||||
}
|
||||
|
@ -3,3 +3,4 @@ export * from './typedrequest.classes.typedhandler.js';
|
||||
export * from './typedrequest.classes.typedrouter.js';
|
||||
export * from './typedrequest.classes.typedresponseerror.js';
|
||||
export * from './typedrequest.classes.typedtarget.js';
|
||||
export * from './typedrequest.classes.virtualstream.js';
|
@ -6,8 +6,10 @@ export { typedRequestInterfaces };
|
||||
// pushrocks scope
|
||||
import * as isounique from '@push.rocks/isounique';
|
||||
import * as lik from '@push.rocks/lik';
|
||||
import * as smartbuffer from '@push.rocks/smartbuffer';
|
||||
import * as smartdelay from '@push.rocks/smartdelay';
|
||||
import * as smartpromise from '@push.rocks/smartpromise';
|
||||
import * as webrequest from '@push.rocks/webrequest';
|
||||
import * as webstream from '@push.rocks/webstream';
|
||||
|
||||
export { isounique, lik, smartdelay, smartpromise, webrequest };
|
||||
export { isounique, lik, smartbuffer, smartdelay, smartpromise, webrequest, webstream };
|
@ -1,4 +1,4 @@
|
||||
import * as plugins from './typedrequest.plugins.js';
|
||||
import * as plugins from './plugins.js';
|
||||
import { TypedResponseError } from './typedrequest.classes.typedresponseerror.js';
|
||||
|
||||
export type THandlerFunction<T extends plugins.typedRequestInterfaces.ITypedRequest> = (
|
||||
|
@ -1,4 +1,5 @@
|
||||
import * as plugins from './typedrequest.plugins.js';
|
||||
import * as plugins from './plugins.js';
|
||||
import { VirtualStream } from './typedrequest.classes.virtualstream.js';
|
||||
import { TypedResponseError } from './typedrequest.classes.typedresponseerror.js';
|
||||
import { TypedRouter } from './typedrequest.classes.typedrouter.js';
|
||||
import { TypedTarget } from './typedrequest.classes.typedtarget.js';
|
||||
@ -19,7 +20,6 @@ export class TypedRequest<T extends plugins.typedRequestInterfaces.ITypedRequest
|
||||
public method: string;
|
||||
|
||||
/**
|
||||
* note the overloading is thought to deal with promises
|
||||
* @param postEndPointArg
|
||||
* @param methodArg
|
||||
*/
|
||||
@ -36,7 +36,7 @@ export class TypedRequest<T extends plugins.typedRequestInterfaces.ITypedRequest
|
||||
* fires the request
|
||||
*/
|
||||
public async fire(fireArg: T['request'], useCacheArg: boolean = false): Promise<T['response']> {
|
||||
const payload: plugins.typedRequestInterfaces.ITypedRequest = {
|
||||
let payloadSending: plugins.typedRequestInterfaces.ITypedRequest = {
|
||||
method: this.method,
|
||||
request: fireArg,
|
||||
response: null,
|
||||
@ -46,30 +46,58 @@ export class TypedRequest<T extends plugins.typedRequestInterfaces.ITypedRequest
|
||||
},
|
||||
};
|
||||
|
||||
let responseBody: plugins.typedRequestInterfaces.ITypedRequest;
|
||||
// lets preprocess the payload
|
||||
payloadSending = VirtualStream.encodePayloadForNetwork(payloadSending, {
|
||||
sendMethod: (payloadArg: plugins.typedRequestInterfaces.IStreamRequest) => {
|
||||
return this.postTrObject(payloadArg) as Promise<plugins.typedRequestInterfaces.IStreamRequest>;
|
||||
}
|
||||
});
|
||||
|
||||
let payloadReceiving: plugins.typedRequestInterfaces.ITypedRequest;
|
||||
payloadReceiving = await this.postTrObject(payloadSending, useCacheArg);
|
||||
|
||||
// lets preprocess the response
|
||||
payloadReceiving = VirtualStream.decodePayloadFromNetwork(payloadReceiving, {
|
||||
sendMethod: (payloadArg: plugins.typedRequestInterfaces.IStreamRequest) => {
|
||||
return this.postTrObject(payloadArg) as Promise<plugins.typedRequestInterfaces.IStreamRequest>;
|
||||
}
|
||||
});
|
||||
return payloadReceiving.response;
|
||||
}
|
||||
|
||||
private async postTrObject(payloadSendingArg: plugins.typedRequestInterfaces.ITypedRequest, useCacheArg: boolean = false) {
|
||||
let payloadReceiving: plugins.typedRequestInterfaces.ITypedRequest;
|
||||
if (this.urlEndPoint) {
|
||||
const response = await webrequestInstance.postJson(this.urlEndPoint, payload, useCacheArg);
|
||||
responseBody = response;
|
||||
const response = await webrequestInstance.postJson(
|
||||
this.urlEndPoint,
|
||||
payloadSendingArg,
|
||||
useCacheArg
|
||||
);
|
||||
payloadReceiving = response;
|
||||
} else {
|
||||
responseBody = await this.typedTarget.post(payload);
|
||||
payloadReceiving = await this.typedTarget.post(payloadSendingArg);
|
||||
}
|
||||
if (responseBody.error) {
|
||||
if (payloadReceiving.error) {
|
||||
console.error(
|
||||
`Got an error ${responseBody.error.text} with data ${JSON.stringify(
|
||||
responseBody.error.data
|
||||
`Got an error ${payloadReceiving.error.text} with data ${JSON.stringify(
|
||||
payloadReceiving.error.data,
|
||||
null,
|
||||
2
|
||||
)}`
|
||||
);
|
||||
if (!responseBody.retry) {
|
||||
throw new TypedResponseError(responseBody.error.text, responseBody.error.data);
|
||||
if (!payloadReceiving.retry) {
|
||||
throw new TypedResponseError(payloadReceiving.error.text, payloadReceiving.error.data);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
if (responseBody.retry) {
|
||||
console.log(`server requested retry for the following reason: ${responseBody.retry.reason}`);
|
||||
await plugins.smartdelay.delayFor(responseBody.retry.waitForMs);
|
||||
if (payloadReceiving.retry) {
|
||||
console.log(
|
||||
`server requested retry for the following reason: ${payloadReceiving.retry.reason}`
|
||||
);
|
||||
await plugins.smartdelay.delayFor(payloadReceiving.retry.waitForMs);
|
||||
// tslint:disable-next-line: no-return-await
|
||||
return await this.fire(fireArg);
|
||||
payloadReceiving = await this.postTrObject(payloadSendingArg, useCacheArg);
|
||||
}
|
||||
return responseBody.response;
|
||||
return payloadReceiving;
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
import * as plugins from './typedrequest.plugins.js';
|
||||
import * as plugins from './plugins.js';
|
||||
|
||||
export class TypedResponseError {
|
||||
public errorText: string;
|
||||
|
@ -1,4 +1,5 @@
|
||||
import * as plugins from './typedrequest.plugins.js';
|
||||
import * as plugins from './plugins.js';
|
||||
import { VirtualStream } from './typedrequest.classes.virtualstream.js';
|
||||
|
||||
import { TypedHandler } from './typedrequest.classes.typedhandler.js';
|
||||
import { TypedRequest } from './typedrequest.classes.typedrequest.js';
|
||||
@ -13,6 +14,7 @@ export class TypedRouter {
|
||||
public handlerMap = new plugins.lik.ObjectMap<
|
||||
TypedHandler<any & plugins.typedRequestInterfaces.ITypedRequest>
|
||||
>();
|
||||
public registeredVirtualStreams = new plugins.lik.ObjectMap<VirtualStream<any>>();
|
||||
|
||||
public fireEventInterestMap = new plugins.lik.InterestMap<
|
||||
string,
|
||||
@ -89,6 +91,23 @@ export class TypedRouter {
|
||||
public async routeAndAddResponse<
|
||||
T extends plugins.typedRequestInterfaces.ITypedRequest = plugins.typedRequestInterfaces.ITypedRequest
|
||||
>(typedRequestArg: T, localRequestArg = false): Promise<T> {
|
||||
// decoding first
|
||||
typedRequestArg = VirtualStream.decodePayloadFromNetwork(typedRequestArg, {
|
||||
typedrouter: this,
|
||||
});
|
||||
|
||||
// localdata second
|
||||
typedRequestArg.localData = typedRequestArg.localData || {};
|
||||
typedRequestArg.localData.firstTypedrouter = this;
|
||||
|
||||
// lets do stream processing
|
||||
if (typedRequestArg.method === '##VirtualStream##') {
|
||||
const result: any = await this.handleStreamTypedRequest(typedRequestArg as plugins.typedRequestInterfaces.IStreamRequest);
|
||||
result.localData = null;
|
||||
return result as T;
|
||||
}
|
||||
|
||||
// lets do normal routing
|
||||
if (typedRequestArg?.correlation?.phase === 'request' || localRequestArg) {
|
||||
const typedHandler = this.getTypedHandlerForMethod(typedRequestArg.method);
|
||||
|
||||
@ -99,10 +118,21 @@ export class TypedRouter {
|
||||
data: {},
|
||||
};
|
||||
typedRequestArg.correlation.phase = 'response';
|
||||
|
||||
// encode again before handing back
|
||||
typedRequestArg.localData = null;
|
||||
typedRequestArg = VirtualStream.encodePayloadForNetwork(typedRequestArg, {
|
||||
typedrouter: this,
|
||||
});
|
||||
return typedRequestArg;
|
||||
}
|
||||
|
||||
typedRequestArg = await typedHandler.addResponse(typedRequestArg);
|
||||
typedRequestArg.localData = null;
|
||||
// encode again before handing back
|
||||
typedRequestArg = VirtualStream.encodePayloadForNetwork(typedRequestArg, {
|
||||
typedrouter: this,
|
||||
});
|
||||
return typedRequestArg;
|
||||
} else if (typedRequestArg?.correlation?.phase === 'response') {
|
||||
this.fireEventInterestMap
|
||||
@ -115,4 +145,23 @@ export class TypedRouter {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* handle streaming
|
||||
* @param streamTrArg
|
||||
*/
|
||||
public async handleStreamTypedRequest(streamTrArg: plugins.typedRequestInterfaces.IStreamRequest) {
|
||||
const relevantVirtualStream = await this.registeredVirtualStreams.find(async virtualStreamArg => {
|
||||
return virtualStreamArg.streamId === streamTrArg.request.streamId;
|
||||
});
|
||||
if (!relevantVirtualStream) {
|
||||
console.log(`no relevant virtual stream found for stream with id ${streamTrArg.request.streamId}`);
|
||||
console.log(this.registeredVirtualStreams.getArray());
|
||||
return streamTrArg;
|
||||
} else {
|
||||
console.log(`success: found relevant virtual stream with id ${streamTrArg.request.streamId}`);
|
||||
}
|
||||
const result = await relevantVirtualStream.handleStreamTr(streamTrArg);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
import { TypedRouter } from './typedrequest.classes.typedrouter.js';
|
||||
import * as plugins from './typedrequest.plugins.js';
|
||||
import * as plugins from './plugins.js';
|
||||
|
||||
export type IPostMethod = (
|
||||
typedRequestPostObject: plugins.typedRequestInterfaces.ITypedRequest
|
||||
|
376
ts/typedrequest.classes.virtualstream.ts
Normal file
376
ts/typedrequest.classes.virtualstream.ts
Normal file
@ -0,0 +1,376 @@
|
||||
import * as plugins from './plugins.js';
|
||||
import type { TypedRouter } from './typedrequest.classes.typedrouter.js';
|
||||
|
||||
export interface ICommFunctions {
|
||||
sendMethod?: (
|
||||
sendPayload: plugins.typedRequestInterfaces.IStreamRequest
|
||||
) => Promise<plugins.typedRequestInterfaces.IStreamRequest>;
|
||||
typedrouter?: TypedRouter;
|
||||
}
|
||||
|
||||
/**
|
||||
* 1. A VirtualStream connects over the network
|
||||
* 2. It is always paired to one other VirtualStream
|
||||
* on the other side with the same streamId.
|
||||
* 3. It has a Readable and Writable side.
|
||||
* 4. The Writable side is Readable on the other side and vice versa.
|
||||
*/
|
||||
export class VirtualStream<T = ArrayBufferLike> implements plugins.typedRequestInterfaces.IVirtualStream<T> {
|
||||
// STATIC
|
||||
public static encodePayloadForNetwork(
|
||||
objectPayload: any,
|
||||
commFunctions: ICommFunctions,
|
||||
originalPayload?: any,
|
||||
path = []
|
||||
): any {
|
||||
if (!objectPayload) {
|
||||
return objectPayload;
|
||||
}
|
||||
if (plugins.smartbuffer.isBufferLike(objectPayload)) {
|
||||
return objectPayload;
|
||||
}
|
||||
if (objectPayload instanceof VirtualStream) {
|
||||
if (!objectPayload.side && commFunctions.sendMethod) {
|
||||
objectPayload.side = 'requesting';
|
||||
objectPayload.sendMethod = commFunctions.sendMethod;
|
||||
}
|
||||
if (!objectPayload.side && commFunctions.typedrouter) {
|
||||
objectPayload.side = 'responding';
|
||||
objectPayload.typedrouter = commFunctions.typedrouter;
|
||||
commFunctions.typedrouter.registeredVirtualStreams.add(objectPayload);
|
||||
}
|
||||
if (!originalPayload.response || path.includes('response')) {
|
||||
objectPayload.startKeepAliveLoop();
|
||||
return {
|
||||
_isVirtualStream: true,
|
||||
streamId: objectPayload.streamId,
|
||||
};
|
||||
} else {
|
||||
return {
|
||||
_OBMITTED_VIRTUAL_STREAM: true,
|
||||
reason: 'path is under .request: obmitted for deduplication reasons in response cycle.',
|
||||
};
|
||||
}
|
||||
} else if (Array.isArray(objectPayload)) {
|
||||
// For arrays, we recurse over each item.
|
||||
return objectPayload.map((item, index) =>
|
||||
VirtualStream.encodePayloadForNetwork(
|
||||
item,
|
||||
commFunctions,
|
||||
originalPayload || objectPayload,
|
||||
path.concat(String(index)) // Convert index to string and concatenate to path
|
||||
)
|
||||
);
|
||||
} else if (objectPayload !== null && typeof objectPayload === 'object') {
|
||||
// For objects, we recurse over each key-value pair.
|
||||
return Object.entries(objectPayload).reduce((acc, [key, value]) => {
|
||||
const newPath = path.concat(key); // Concatenate the new key to the path
|
||||
acc[key] = VirtualStream.encodePayloadForNetwork(
|
||||
value,
|
||||
commFunctions,
|
||||
originalPayload || objectPayload,
|
||||
newPath
|
||||
);
|
||||
return acc;
|
||||
}, {});
|
||||
} else {
|
||||
return objectPayload;
|
||||
}
|
||||
}
|
||||
|
||||
public static decodePayloadFromNetwork(objectPayload: any, commFunctions: ICommFunctions): any {
|
||||
if (plugins.smartbuffer.isBufferLike(objectPayload)) {
|
||||
return objectPayload;
|
||||
}
|
||||
if (objectPayload !== null && typeof objectPayload === 'object') {
|
||||
if (objectPayload._isVirtualStream) {
|
||||
const virtualStream = new VirtualStream();
|
||||
virtualStream.streamId = objectPayload.streamId;
|
||||
if (!virtualStream.side && commFunctions.sendMethod) {
|
||||
virtualStream.side = 'requesting';
|
||||
virtualStream.sendMethod = commFunctions.sendMethod;
|
||||
}
|
||||
if (!virtualStream.side && commFunctions.typedrouter) {
|
||||
virtualStream.side = 'responding';
|
||||
virtualStream.typedrouter = commFunctions.typedrouter;
|
||||
commFunctions.typedrouter.registeredVirtualStreams.add(virtualStream);
|
||||
}
|
||||
virtualStream.startKeepAliveLoop();
|
||||
return virtualStream;
|
||||
} else if (Array.isArray(objectPayload)) {
|
||||
const returnArray = [];
|
||||
for (const item of objectPayload) {
|
||||
returnArray.push(VirtualStream.decodePayloadFromNetwork(item, commFunctions));
|
||||
}
|
||||
return returnArray;
|
||||
} else {
|
||||
return Object.keys(objectPayload).reduce((acc, key) => {
|
||||
acc[key] = VirtualStream.decodePayloadFromNetwork(objectPayload[key], commFunctions);
|
||||
return acc;
|
||||
}, {});
|
||||
}
|
||||
} else {
|
||||
return objectPayload;
|
||||
}
|
||||
}
|
||||
|
||||
// INSTANCE
|
||||
|
||||
public side: 'requesting' | 'responding';
|
||||
public streamId: string = plugins.isounique.uni();
|
||||
|
||||
// integration with typedrequest mechanics
|
||||
public sendMethod: ICommFunctions['sendMethod'];
|
||||
public typedrouter: TypedRouter;
|
||||
|
||||
// wether to keep the stream alive
|
||||
private keepAlive = true;
|
||||
private lastKeepAliveEvent: number;
|
||||
|
||||
// backpressured arrays
|
||||
private sendBackpressuredArray =
|
||||
new plugins.lik.BackpressuredArray<T>(
|
||||
16
|
||||
);
|
||||
private receiveBackpressuredArray =
|
||||
new plugins.lik.BackpressuredArray<T>(
|
||||
16
|
||||
);
|
||||
|
||||
constructor() {}
|
||||
|
||||
/**
|
||||
* takes care of sending
|
||||
*/
|
||||
private async workOnQueue() {
|
||||
if(this.side === 'requesting') {
|
||||
let thisSideIsBackpressured = !this.receiveBackpressuredArray.checkSpaceAvailable();
|
||||
let otherSideHasNext = false;
|
||||
let otherSideIsBackpressured = false;
|
||||
|
||||
// helper functions
|
||||
const getFeedback = async () => {
|
||||
const streamTr = await this.sendMethod({
|
||||
method: '##VirtualStream##',
|
||||
request: {
|
||||
streamId: this.streamId,
|
||||
cycleId: plugins.isounique.uni(),
|
||||
cycle: 'request',
|
||||
mainPurpose: 'feedback',
|
||||
next: this.sendBackpressuredArray.data.length > 0,
|
||||
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
|
||||
},
|
||||
response: null,
|
||||
}).catch(() => {
|
||||
console.log('stream ended immaturely');
|
||||
this.keepAlive = false;
|
||||
});
|
||||
if (streamTr && streamTr.response) {
|
||||
otherSideIsBackpressured = streamTr.response.backpressure
|
||||
otherSideHasNext = streamTr.response.next;
|
||||
}
|
||||
}
|
||||
await getFeedback();
|
||||
|
||||
// do work loop
|
||||
while (this.sendBackpressuredArray.data.length > 0 || otherSideHasNext) {
|
||||
let dataArg: typeof this.sendBackpressuredArray.data[0];
|
||||
if (this.sendBackpressuredArray.data.length > 0) {
|
||||
dataArg = this.sendBackpressuredArray.shift();
|
||||
}
|
||||
let streamTr: plugins.typedRequestInterfaces.IStreamRequest;
|
||||
streamTr = await this.sendMethod({
|
||||
method: '##VirtualStream##',
|
||||
request: {
|
||||
streamId: this.streamId,
|
||||
cycleId: plugins.isounique.uni(),
|
||||
cycle: 'request',
|
||||
mainPurpose: dataArg ? 'chunk' : 'read',
|
||||
backpressure: thisSideIsBackpressured,
|
||||
next: this.sendBackpressuredArray.data.length > 0,
|
||||
...dataArg ? { chunkData: dataArg } : {},
|
||||
},
|
||||
response: null,
|
||||
}).catch(() => {
|
||||
console.log('stream ended immaturely');
|
||||
this.keepAlive = false;
|
||||
return null;
|
||||
});
|
||||
|
||||
if (streamTr && streamTr.response && streamTr.response.chunkData) {
|
||||
this.receiveBackpressuredArray.push(streamTr.response.chunkData);
|
||||
}
|
||||
thisSideIsBackpressured = this.receiveBackpressuredArray.checkSpaceAvailable();
|
||||
|
||||
// lets care about looping
|
||||
otherSideHasNext = streamTr && streamTr.response && streamTr.response.next;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method handles the stream only on the responding side
|
||||
* @param streamTrArg
|
||||
* @returns
|
||||
*/
|
||||
public async handleStreamTr(streamTrArg: plugins.typedRequestInterfaces.IStreamRequest) {
|
||||
if (streamTrArg.request.keepAlive === true && this.keepAlive === true) {
|
||||
this.lastKeepAliveEvent = Date.now();
|
||||
} else if (streamTrArg.request.keepAlive === false) {
|
||||
this.keepAlive = false;
|
||||
}
|
||||
|
||||
// keepAlive handling
|
||||
if (streamTrArg.request.mainPurpose === 'keepAlive') {
|
||||
// if the main purpose is keepAlive, we answer with a keepAlive
|
||||
streamTrArg.response = {
|
||||
streamId: this.streamId,
|
||||
cycleId: streamTrArg.request.cycleId,
|
||||
cycle: 'response',
|
||||
mainPurpose: 'keepAlive',
|
||||
keepAlive: this.keepAlive,
|
||||
next: this.sendBackpressuredArray.data.length > 0,
|
||||
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
|
||||
};
|
||||
}
|
||||
|
||||
// feedback handling
|
||||
if (streamTrArg.request.mainPurpose === 'feedback') {
|
||||
streamTrArg.response = {
|
||||
streamId: this.streamId,
|
||||
cycleId: streamTrArg.request.cycleId,
|
||||
cycle: 'response',
|
||||
mainPurpose: 'feedback',
|
||||
next: this.sendBackpressuredArray.data.length > 0,
|
||||
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
|
||||
};
|
||||
}
|
||||
|
||||
// chunk handling
|
||||
if (streamTrArg.request.mainPurpose === 'chunk') {
|
||||
this.receiveBackpressuredArray.push(streamTrArg.request.chunkData);
|
||||
if (this.sendBackpressuredArray.data.length > 0 && streamTrArg.response.backpressure === false) {
|
||||
const dataArg = this.sendBackpressuredArray.shift();
|
||||
streamTrArg.response = {
|
||||
streamId: this.streamId,
|
||||
cycleId: streamTrArg.request.cycleId,
|
||||
cycle: 'response',
|
||||
mainPurpose: 'chunk',
|
||||
next: this.sendBackpressuredArray.data.length > 1,
|
||||
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
|
||||
chunkData: this.sendBackpressuredArray.shift(),
|
||||
};
|
||||
} else {
|
||||
streamTrArg.response = {
|
||||
streamId: this.streamId,
|
||||
cycleId: streamTrArg.request.cycleId,
|
||||
cycle: 'response',
|
||||
mainPurpose: 'feedback',
|
||||
next: this.sendBackpressuredArray.data.length > 0,
|
||||
};
|
||||
}
|
||||
streamTrArg.request = null;
|
||||
}
|
||||
|
||||
return streamTrArg;
|
||||
}
|
||||
|
||||
// lifecycle methods
|
||||
/**
|
||||
* closes the virtual stream
|
||||
*/
|
||||
public async cleanup() {
|
||||
if (this.typedrouter) {
|
||||
this.typedrouter.registeredVirtualStreams.remove(this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* a keepAlive loop that works across technologies
|
||||
*/
|
||||
private async startKeepAliveLoop() {
|
||||
// initially wait for a second
|
||||
await plugins.smartdelay.delayFor(0);
|
||||
let counter = 0;
|
||||
keepAliveLoop: while (this.keepAlive) {
|
||||
const triggerResult = await this.triggerKeepAlive();
|
||||
await plugins.smartdelay.delayFor(1000);
|
||||
}
|
||||
await plugins.smartdelay.delayFor(1000);
|
||||
await this.cleanup();
|
||||
console.log(`cleaned up for stream ${this.streamId}`);
|
||||
}
|
||||
|
||||
private async triggerKeepAlive() {
|
||||
if (this.side === 'requesting') {
|
||||
console.log(`keepalive sent.`);
|
||||
const streamTr = await this.sendMethod({
|
||||
method: '##VirtualStream##',
|
||||
request: {
|
||||
streamId: this.streamId,
|
||||
cycleId: plugins.isounique.uni(),
|
||||
cycle: 'request',
|
||||
mainPurpose: 'keepAlive',
|
||||
keepAlive: true,
|
||||
},
|
||||
response: null,
|
||||
}).catch(() => {
|
||||
this.keepAlive = false;
|
||||
});
|
||||
|
||||
// lets handle keepAlive
|
||||
if (streamTr && streamTr.response && streamTr.response.keepAlive === false) {
|
||||
this.keepAlive = false;
|
||||
} else {
|
||||
this.lastKeepAliveEvent = Date.now();
|
||||
}
|
||||
if (streamTr && streamTr.response && streamTr.response.next) {
|
||||
this.workOnQueue();
|
||||
}
|
||||
}
|
||||
if (Date.now() - this.lastKeepAliveEvent > 10000) {
|
||||
console.log(`closing stream for ${this.streamId}`);
|
||||
this.keepAlive = false;
|
||||
}
|
||||
}
|
||||
|
||||
// Data sending and receiving
|
||||
public async sendData(dataArg: T): Promise<void> {
|
||||
this.sendBackpressuredArray.push(dataArg);
|
||||
this.workOnQueue();
|
||||
await this.sendBackpressuredArray.waitForSpace();
|
||||
}
|
||||
|
||||
public async fetchData(): Promise<T> {
|
||||
if (this.receiveBackpressuredArray.hasSpace) {
|
||||
// do something maybe?
|
||||
}
|
||||
await this.receiveBackpressuredArray.waitForItems();
|
||||
const dataPackage = this.receiveBackpressuredArray.shift();
|
||||
return dataPackage;
|
||||
}
|
||||
|
||||
/**
|
||||
* reads from a Readable and sends it to the other side
|
||||
* @param readableStreamArg
|
||||
*/
|
||||
public async readFromWebstream(readableStreamArg: ReadableStream<T>) {
|
||||
const reader = readableStreamArg.getReader();
|
||||
let streamIsDone = false;
|
||||
while(!streamIsDone) {
|
||||
const { value, done } = await reader.read();
|
||||
if(value) {
|
||||
await this.sendData(value);
|
||||
}
|
||||
streamIsDone = done;
|
||||
}
|
||||
}
|
||||
|
||||
public async writeToWebstream(writableStreamArg: WritableStream<T>) {
|
||||
const writer = writableStreamArg.getWriter();
|
||||
while(this.keepAlive) {
|
||||
await writer.write(await this.fetchData());
|
||||
}
|
||||
}
|
||||
}
|
@ -3,8 +3,12 @@
|
||||
"experimentalDecorators": true,
|
||||
"useDefineForClassFields": false,
|
||||
"target": "ES2022",
|
||||
"module": "ES2022",
|
||||
"moduleResolution": "nodenext",
|
||||
"esModuleInterop": true
|
||||
}
|
||||
"module": "NodeNext",
|
||||
"moduleResolution": "NodeNext",
|
||||
"esModuleInterop": true,
|
||||
"verbatimModuleSyntax": true
|
||||
},
|
||||
"exclude": [
|
||||
"dist_*/**/*.d.ts"
|
||||
]
|
||||
}
|
||||
|
Reference in New Issue
Block a user