Compare commits

...

16 Commits

Author SHA1 Message Date
90311ad65e 3.0.9 2024-02-29 19:50:26 +01:00
407e1383f8 fix(core): update 2024-02-29 19:50:25 +01:00
ad106909e2 3.0.8 2024-02-25 01:54:01 +01:00
b346da01f1 fix(core): update 2024-02-25 01:54:01 +01:00
51fedb270b 3.0.7 2024-02-24 12:20:13 +01:00
fd26b48ff6 fix(core): update 2024-02-24 12:20:13 +01:00
1bfe10691a 3.0.6 2024-02-21 18:29:36 +01:00
bf81c34dbc fix(core): update 2024-02-21 18:29:35 +01:00
f837bb5230 3.0.5 2024-02-20 17:40:31 +01:00
1b76e6882f fix(core): update 2024-02-20 17:40:30 +01:00
83b43f501d 3.0.4 2023-12-08 19:36:25 +01:00
9b626a562d fix(core): update 2023-12-08 19:36:24 +01:00
c216f97bfd 3.0.3 2023-12-05 23:42:44 +01:00
71453877d7 fix(core): update 2023-12-05 23:42:43 +01:00
d1a601d006 3.0.2 2023-10-20 17:39:09 +02:00
0a9236a605 fix(core): update 2023-10-20 17:39:08 +02:00
13 changed files with 2922 additions and 1319 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@api.global/typedrequest",
"version": "3.0.1",
"version": "3.0.9",
"private": false,
"description": "make typed requests towards apis",
"main": "dist_ts/index.js",
@ -14,22 +14,24 @@
"buildDocs": "tsdoc"
},
"devDependencies": {
"@api.global/typedserver": "^3.0.1",
"@gitzone/tsbuild": "^2.1.66",
"@gitzone/tsbundle": "^2.0.8",
"@gitzone/tsrun": "^1.2.44",
"@gitzone/tstest": "^1.0.77",
"@push.rocks/smartenv": "^5.0.3",
"@push.rocks/tapbundle": "^5.0.12",
"@types/node": "^20.4.7"
"@api.global/typedserver": "^3.0.25",
"@git.zone/tsbuild": "^2.1.72",
"@git.zone/tsbundle": "^2.0.15",
"@git.zone/tsrun": "^1.2.44",
"@git.zone/tstest": "^1.0.86",
"@push.rocks/smartenv": "^5.0.12",
"@push.rocks/tapbundle": "^5.0.15",
"@types/node": "^20.11.24"
},
"dependencies": {
"@api.global/typedrequest-interfaces": "^3.0.1",
"@api.global/typedrequest-interfaces": "^3.0.18",
"@push.rocks/isounique": "^1.0.5",
"@push.rocks/lik": "^6.0.3",
"@push.rocks/lik": "^6.0.14",
"@push.rocks/smartbuffer": "^1.0.6",
"@push.rocks/smartdelay": "^3.0.5",
"@push.rocks/smartpromise": "^4.0.3",
"@push.rocks/webrequest": "^3.0.32"
"@push.rocks/webrequest": "^3.0.34",
"@push.rocks/webstream": "^1.0.8"
},
"files": [
"ts/**/*",

3658
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@ -2,8 +2,10 @@ import { expect, tap } from '@push.rocks/tapbundle';
import * as typedserver from '@api.global/typedserver';
import * as typedrequest from '../ts/index.js';
import * as typedrequestInterfaces from '@api.global/typedrequest-interfaces';
let testServer: typedserver.servertools.Server;
let testTypedRouter: typedrequest.TypedRouter;
let testTypedHandler: typedrequest.TypedHandler<ITestReqRes>;
// lets define an interface
@ -17,6 +19,16 @@ interface ITestReqRes {
};
}
interface ITestStream {
method: 'handleStream';
request: {
requestStream: typedrequestInterfaces.IVirtualStream;
};
response: {
responseStream: typedrequestInterfaces.IVirtualStream;
};
}
tap.test('should create a typedHandler', async () => {
// lets use the interface in a TypedHandler
testTypedHandler = new typedrequest.TypedHandler<ITestReqRes>('hi', async (reqArg) => {
@ -35,7 +47,7 @@ tap.test('should spawn a server to test with', async () => {
});
tap.test('should define a testHandler', async () => {
const testTypedRouter = new typedrequest.TypedRouter(); // typed routers can broker typedrequests between handlers
testTypedRouter = new typedrequest.TypedRouter(); // typed routers can broker typedrequests between handlers
testTypedRouter.addTypedHandler(testTypedHandler);
testServer.addRoute(
'/testroute',
@ -60,7 +72,36 @@ tap.test('should fire a request', async () => {
expect(response.surname).toEqual('wow');
});
tap.test('should end the server', async () => {
tap.test('should allow VirtualStreams', async () => {
const newRequestingVS = new typedrequest.VirtualStream();
const newRespondingVS = new typedrequest.VirtualStream();
let generatedRequestingVS: typedrequestInterfaces.IVirtualStream;
let generatedRespondingVS: typedrequestInterfaces.IVirtualStream;
testTypedRouter.addTypedHandler(new typedrequest.TypedHandler<ITestStream>('handleStream', async (reqArg) => {
console.log('hey there');
console.log(reqArg.requestStream);
generatedRequestingVS = reqArg.requestStream;
return {
responseStream: newRespondingVS,
};
}));
const typedRequest = new typedrequest.TypedRequest<ITestStream>(
'http://localhost:3000/testroute',
'handleStream'
);
const response = await typedRequest.fire({
requestStream: newRequestingVS,
});
console.log(response.responseStream);
newRequestingVS.sendData(Buffer.from('hello'));
const data = await generatedRequestingVS.fetchData();
const decodedData = data.toString();
expect(data.toString()).toEqual('hello');
});
tap.test('should end the server', async (toolsArg) => {
await toolsArg.delayFor(5000);
await testServer.stop();
});

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@api.global/typedrequest',
version: '3.0.1',
version: '3.0.9',
description: 'make typed requests towards apis'
}

View File

@ -3,3 +3,4 @@ export * from './typedrequest.classes.typedhandler.js';
export * from './typedrequest.classes.typedrouter.js';
export * from './typedrequest.classes.typedresponseerror.js';
export * from './typedrequest.classes.typedtarget.js';
export * from './typedrequest.classes.virtualstream.js';

View File

@ -6,8 +6,10 @@ export { typedRequestInterfaces };
// pushrocks scope
import * as isounique from '@push.rocks/isounique';
import * as lik from '@push.rocks/lik';
import * as smartbuffer from '@push.rocks/smartbuffer';
import * as smartdelay from '@push.rocks/smartdelay';
import * as smartpromise from '@push.rocks/smartpromise';
import * as webrequest from '@push.rocks/webrequest';
import * as webstream from '@push.rocks/webstream';
export { isounique, lik, smartdelay, smartpromise, webrequest };
export { isounique, lik, smartbuffer, smartdelay, smartpromise, webrequest, webstream };

View File

@ -1,4 +1,4 @@
import * as plugins from './typedrequest.plugins.js';
import * as plugins from './plugins.js';
import { TypedResponseError } from './typedrequest.classes.typedresponseerror.js';
export type THandlerFunction<T extends plugins.typedRequestInterfaces.ITypedRequest> = (

View File

@ -1,4 +1,5 @@
import * as plugins from './typedrequest.plugins.js';
import * as plugins from './plugins.js';
import { VirtualStream } from './typedrequest.classes.virtualstream.js';
import { TypedResponseError } from './typedrequest.classes.typedresponseerror.js';
import { TypedRouter } from './typedrequest.classes.typedrouter.js';
import { TypedTarget } from './typedrequest.classes.typedtarget.js';
@ -19,7 +20,6 @@ export class TypedRequest<T extends plugins.typedRequestInterfaces.ITypedRequest
public method: string;
/**
* note the overloading is thought to deal with promises
* @param postEndPointArg
* @param methodArg
*/
@ -36,7 +36,7 @@ export class TypedRequest<T extends plugins.typedRequestInterfaces.ITypedRequest
* fires the request
*/
public async fire(fireArg: T['request'], useCacheArg: boolean = false): Promise<T['response']> {
const payload: plugins.typedRequestInterfaces.ITypedRequest = {
let payloadSending: plugins.typedRequestInterfaces.ITypedRequest = {
method: this.method,
request: fireArg,
response: null,
@ -46,30 +46,58 @@ export class TypedRequest<T extends plugins.typedRequestInterfaces.ITypedRequest
},
};
let responseBody: plugins.typedRequestInterfaces.ITypedRequest;
// lets preprocess the payload
payloadSending = VirtualStream.encodePayloadForNetwork(payloadSending, {
sendMethod: (payloadArg: plugins.typedRequestInterfaces.IStreamRequest) => {
return this.postTrObject(payloadArg) as Promise<plugins.typedRequestInterfaces.IStreamRequest>;
}
});
let payloadReceiving: plugins.typedRequestInterfaces.ITypedRequest;
payloadReceiving = await this.postTrObject(payloadSending, useCacheArg);
// lets preprocess the response
payloadReceiving = VirtualStream.decodePayloadFromNetwork(payloadReceiving, {
sendMethod: (payloadArg: plugins.typedRequestInterfaces.IStreamRequest) => {
return this.postTrObject(payloadArg) as Promise<plugins.typedRequestInterfaces.IStreamRequest>;
}
});
return payloadReceiving.response;
}
private async postTrObject(payloadSendingArg: plugins.typedRequestInterfaces.ITypedRequest, useCacheArg: boolean = false) {
let payloadReceiving: plugins.typedRequestInterfaces.ITypedRequest;
if (this.urlEndPoint) {
const response = await webrequestInstance.postJson(this.urlEndPoint, payload, useCacheArg);
responseBody = response;
const response = await webrequestInstance.postJson(
this.urlEndPoint,
payloadSendingArg,
useCacheArg
);
payloadReceiving = response;
} else {
responseBody = await this.typedTarget.post(payload);
payloadReceiving = await this.typedTarget.post(payloadSendingArg);
}
if (responseBody.error) {
if (payloadReceiving.error) {
console.error(
`Got an error ${responseBody.error.text} with data ${JSON.stringify(
responseBody.error.data
`Got an error ${payloadReceiving.error.text} with data ${JSON.stringify(
payloadReceiving.error.data,
null,
2
)}`
);
if (!responseBody.retry) {
throw new TypedResponseError(responseBody.error.text, responseBody.error.data);
if (!payloadReceiving.retry) {
throw new TypedResponseError(payloadReceiving.error.text, payloadReceiving.error.data);
}
return null;
}
if (responseBody.retry) {
console.log(`server requested retry for the following reason: ${responseBody.retry.reason}`);
await plugins.smartdelay.delayFor(responseBody.retry.waitForMs);
if (payloadReceiving.retry) {
console.log(
`server requested retry for the following reason: ${payloadReceiving.retry.reason}`
);
await plugins.smartdelay.delayFor(payloadReceiving.retry.waitForMs);
// tslint:disable-next-line: no-return-await
return await this.fire(fireArg);
payloadReceiving = await this.postTrObject(payloadSendingArg, useCacheArg);
}
return responseBody.response;
return payloadReceiving;
}
}

View File

@ -1,4 +1,4 @@
import * as plugins from './typedrequest.plugins.js';
import * as plugins from './plugins.js';
export class TypedResponseError {
public errorText: string;

View File

@ -1,4 +1,5 @@
import * as plugins from './typedrequest.plugins.js';
import * as plugins from './plugins.js';
import { VirtualStream } from './typedrequest.classes.virtualstream.js';
import { TypedHandler } from './typedrequest.classes.typedhandler.js';
import { TypedRequest } from './typedrequest.classes.typedrequest.js';
@ -13,6 +14,7 @@ export class TypedRouter {
public handlerMap = new plugins.lik.ObjectMap<
TypedHandler<any & plugins.typedRequestInterfaces.ITypedRequest>
>();
public registeredVirtualStreams = new plugins.lik.ObjectMap<VirtualStream<any>>();
public fireEventInterestMap = new plugins.lik.InterestMap<
string,
@ -89,6 +91,23 @@ export class TypedRouter {
public async routeAndAddResponse<
T extends plugins.typedRequestInterfaces.ITypedRequest = plugins.typedRequestInterfaces.ITypedRequest
>(typedRequestArg: T, localRequestArg = false): Promise<T> {
// decoding first
typedRequestArg = VirtualStream.decodePayloadFromNetwork(typedRequestArg, {
typedrouter: this,
});
// localdata second
typedRequestArg.localData = typedRequestArg.localData || {};
typedRequestArg.localData.firstTypedrouter = this;
// lets do stream processing
if (typedRequestArg.method === '##VirtualStream##') {
const result: any = await this.handleStreamTypedRequest(typedRequestArg as plugins.typedRequestInterfaces.IStreamRequest);
result.localData = null;
return result as T;
}
// lets do normal routing
if (typedRequestArg?.correlation?.phase === 'request' || localRequestArg) {
const typedHandler = this.getTypedHandlerForMethod(typedRequestArg.method);
@ -99,10 +118,20 @@ export class TypedRouter {
data: {},
};
typedRequestArg.correlation.phase = 'response';
// encode again before handing back
typedRequestArg = VirtualStream.encodePayloadForNetwork(typedRequestArg, {
typedrouter: this,
});
return typedRequestArg;
}
typedRequestArg = await typedHandler.addResponse(typedRequestArg);
typedRequestArg.localData = null;
// encode again before handing back
typedRequestArg = VirtualStream.encodePayloadForNetwork(typedRequestArg, {
typedrouter: this,
});
return typedRequestArg;
} else if (typedRequestArg?.correlation?.phase === 'response') {
this.fireEventInterestMap
@ -115,4 +144,23 @@ export class TypedRouter {
return null;
}
}
/**
* handle streaming
* @param streamTrArg
*/
public async handleStreamTypedRequest(streamTrArg: plugins.typedRequestInterfaces.IStreamRequest) {
const relevantVirtualStream = await this.registeredVirtualStreams.find(async virtualStreamArg => {
return virtualStreamArg.streamId === streamTrArg.request.streamId;
});
if (!relevantVirtualStream) {
console.log(`no relevant virtual stream found for stream with id ${streamTrArg.request.streamId}`);
console.log(this.registeredVirtualStreams.getArray());
return streamTrArg;
} else {
console.log(`success: found relevant virtual stream with id ${streamTrArg.request.streamId}`);
}
const result = await relevantVirtualStream.handleStreamTr(streamTrArg);
return result;
}
}

View File

@ -1,5 +1,5 @@
import { TypedRouter } from './typedrequest.classes.typedrouter.js';
import * as plugins from './typedrequest.plugins.js';
import * as plugins from './plugins.js';
export type IPostMethod = (
typedRequestPostObject: plugins.typedRequestInterfaces.ITypedRequest

View File

@ -0,0 +1,375 @@
import * as plugins from './plugins.js';
import type { TypedRouter } from './typedrequest.classes.typedrouter.js';
export interface ICommFunctions {
sendMethod?: (
sendPayload: plugins.typedRequestInterfaces.IStreamRequest
) => Promise<plugins.typedRequestInterfaces.IStreamRequest>;
typedrouter?: TypedRouter;
}
/**
* 1. A VirtualStream connects over the network
* 2. It is always paired to one other VirtualStream
* on the other side with the same streamId.
* 3. It has a Readable and Writable side.
* 4. The Writable side is Readable on the other side and vice versa.
*/
export class VirtualStream<T = ArrayBufferLike> implements plugins.typedRequestInterfaces.IVirtualStream<T> {
// STATIC
public static encodePayloadForNetwork(
objectPayload: any,
commFunctions: ICommFunctions,
originalPayload?: any,
path = []
): any {
if (plugins.smartbuffer.isBufferLike(objectPayload)) {
return objectPayload;
}
if (objectPayload instanceof VirtualStream) {
if (!objectPayload.side && commFunctions.sendMethod) {
objectPayload.side = 'requesting';
objectPayload.sendMethod = commFunctions.sendMethod;
}
if (!objectPayload.side && commFunctions.typedrouter) {
objectPayload.side = 'responding';
objectPayload.typedrouter = commFunctions.typedrouter;
commFunctions.typedrouter.registeredVirtualStreams.add(objectPayload);
}
if (!originalPayload.response || path.includes('response')) {
objectPayload.startKeepAliveLoop();
return {
_isVirtualStream: true,
streamId: objectPayload.streamId,
};
} else {
return {
_OBMITTED_VIRTUAL_STREAM: true,
reason: 'path is under .request: obmitted for deduplication reasons in response cycle.',
};
}
} else if (Array.isArray(objectPayload)) {
const returnArray = [];
for (const item of objectPayload) {
returnArray.push(
VirtualStream.encodePayloadForNetwork(
item,
commFunctions,
originalPayload || objectPayload,
path
)
);
}
return returnArray;
} else if (objectPayload !== null && typeof objectPayload === 'object') {
return Object.keys(objectPayload).reduce((acc, key) => {
path.push(key);
acc[key] = VirtualStream.encodePayloadForNetwork(
objectPayload[key],
commFunctions,
originalPayload || objectPayload,
path
);
return acc;
}, {});
} else {
return objectPayload;
}
}
public static decodePayloadFromNetwork(objectPayload: any, commFunctions: ICommFunctions): any {
if (plugins.smartbuffer.isBufferLike(objectPayload)) {
return objectPayload;
}
if (objectPayload !== null && typeof objectPayload === 'object') {
if (objectPayload._isVirtualStream) {
const virtualStream = new VirtualStream();
virtualStream.streamId = objectPayload.streamId;
if (!virtualStream.side && commFunctions.sendMethod) {
virtualStream.side = 'requesting';
virtualStream.sendMethod = commFunctions.sendMethod;
}
if (!virtualStream.side && commFunctions.typedrouter) {
virtualStream.side = 'responding';
virtualStream.typedrouter = commFunctions.typedrouter;
commFunctions.typedrouter.registeredVirtualStreams.add(virtualStream);
}
virtualStream.startKeepAliveLoop();
return virtualStream;
} else if (Array.isArray(objectPayload)) {
const returnArray = [];
for (const item of objectPayload) {
returnArray.push(VirtualStream.decodePayloadFromNetwork(item, commFunctions));
}
return returnArray;
} else {
return Object.keys(objectPayload).reduce((acc, key) => {
acc[key] = VirtualStream.decodePayloadFromNetwork(objectPayload[key], commFunctions);
return acc;
}, {});
}
} else {
return objectPayload;
}
}
// INSTANCE
public side: 'requesting' | 'responding';
public streamId: string = plugins.isounique.uni();
// integration with typedrequest mechanics
public sendMethod: ICommFunctions['sendMethod'];
public typedrouter: TypedRouter;
// wether to keep the stream alive
private keepAlive = true;
private lastKeepAliveEvent: number;
// backpressured arrays
private sendBackpressuredArray =
new plugins.lik.BackpressuredArray<T>(
16
);
private receiveBackpressuredArray =
new plugins.lik.BackpressuredArray<T>(
16
);
constructor() {}
/**
* takes care of sending
*/
private async workOnQueue() {
if(this.side === 'requesting') {
let thisSideIsBackpressured = !this.receiveBackpressuredArray.checkSpaceAvailable();
let otherSideHasNext = false;
let otherSideIsBackpressured = false;
// helper functions
const getFeedback = async () => {
const streamTr = await this.sendMethod({
method: '##VirtualStream##',
request: {
streamId: this.streamId,
cycleId: plugins.isounique.uni(),
cycle: 'request',
mainPurpose: 'feedback',
next: this.sendBackpressuredArray.data.length > 0,
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
},
response: null,
}).catch(() => {
console.log('stream ended immaturely');
this.keepAlive = false;
});
if (streamTr && streamTr.response) {
otherSideIsBackpressured = streamTr.response.backpressure
otherSideHasNext = streamTr.response.next;
}
}
await getFeedback();
// do work loop
while (this.sendBackpressuredArray.data.length > 0 || otherSideHasNext) {
let dataArg: typeof this.sendBackpressuredArray.data[0];
if (this.sendBackpressuredArray.data.length > 0) {
dataArg = this.sendBackpressuredArray.shift();
}
let streamTr: plugins.typedRequestInterfaces.IStreamRequest;
streamTr = await this.sendMethod({
method: '##VirtualStream##',
request: {
streamId: this.streamId,
cycleId: plugins.isounique.uni(),
cycle: 'request',
mainPurpose: dataArg ? 'chunk' : 'read',
backpressure: thisSideIsBackpressured,
next: this.sendBackpressuredArray.data.length > 0,
...dataArg ? { chunkData: dataArg } : {},
},
response: null,
}).catch(() => {
console.log('stream ended immaturely');
this.keepAlive = false;
return null;
});
if (streamTr && streamTr.response && streamTr.response.chunkData) {
this.receiveBackpressuredArray.push(streamTr.response.chunkData);
}
thisSideIsBackpressured = this.receiveBackpressuredArray.checkSpaceAvailable();
// lets care about looping
otherSideHasNext = streamTr && streamTr.response && streamTr.response.next;
}
}
}
/**
* This method handles the stream only on the responding side
* @param streamTrArg
* @returns
*/
public async handleStreamTr(streamTrArg: plugins.typedRequestInterfaces.IStreamRequest) {
if (streamTrArg.request.keepAlive === true && this.keepAlive === true) {
this.lastKeepAliveEvent = Date.now();
} else if (streamTrArg.request.keepAlive === false) {
this.keepAlive = false;
}
// keepAlive handling
if (streamTrArg.request.mainPurpose === 'keepAlive') {
// if the main purpose is keepAlive, we answer with a keepAlive
streamTrArg.response = {
streamId: this.streamId,
cycleId: streamTrArg.request.cycleId,
cycle: 'response',
mainPurpose: 'keepAlive',
keepAlive: this.keepAlive,
next: this.sendBackpressuredArray.data.length > 0,
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
};
}
// feedback handling
if (streamTrArg.request.mainPurpose === 'feedback') {
streamTrArg.response = {
streamId: this.streamId,
cycleId: streamTrArg.request.cycleId,
cycle: 'response',
mainPurpose: 'feedback',
next: this.sendBackpressuredArray.data.length > 0,
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
};
}
// chunk handling
if (streamTrArg.request.mainPurpose === 'chunk') {
this.receiveBackpressuredArray.push(streamTrArg.request.chunkData);
if (this.sendBackpressuredArray.data.length > 0 && streamTrArg.response.backpressure === false) {
const dataArg = this.sendBackpressuredArray.shift();
streamTrArg.response = {
streamId: this.streamId,
cycleId: streamTrArg.request.cycleId,
cycle: 'response',
mainPurpose: 'chunk',
next: this.sendBackpressuredArray.data.length > 1,
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
chunkData: this.sendBackpressuredArray.shift(),
};
} else {
streamTrArg.response = {
streamId: this.streamId,
cycleId: streamTrArg.request.cycleId,
cycle: 'response',
mainPurpose: 'feedback',
next: this.sendBackpressuredArray.data.length > 0,
};
}
streamTrArg.request = null;
}
return streamTrArg;
}
// lifecycle methods
/**
* closes the virtual stream
*/
public async cleanup() {
if (this.typedrouter) {
this.typedrouter.registeredVirtualStreams.remove(this);
}
}
/**
* a keepAlive loop that works across technologies
*/
private async startKeepAliveLoop() {
// initially wait for a second
await plugins.smartdelay.delayFor(0);
let counter = 0;
keepAliveLoop: while (this.keepAlive) {
const triggerResult = await this.triggerKeepAlive();
await plugins.smartdelay.delayFor(1000);
}
await plugins.smartdelay.delayFor(1000);
await this.cleanup();
console.log(`cleaned up for stream ${this.streamId}`);
}
private async triggerKeepAlive() {
if (this.side === 'requesting') {
console.log(`keepalive sent.`);
const streamTr = await this.sendMethod({
method: '##VirtualStream##',
request: {
streamId: this.streamId,
cycleId: plugins.isounique.uni(),
cycle: 'request',
mainPurpose: 'keepAlive',
keepAlive: true,
},
response: null,
}).catch(() => {
this.keepAlive = false;
});
// lets handle keepAlive
if (streamTr && streamTr.response && streamTr.response.keepAlive === false) {
this.keepAlive = false;
} else {
this.lastKeepAliveEvent = Date.now();
}
if (streamTr && streamTr.response && streamTr.response.next) {
this.workOnQueue();
}
}
if (Date.now() - this.lastKeepAliveEvent > 10000) {
console.log(`closing stream for ${this.streamId}`);
this.keepAlive = false;
}
}
// Data sending and receiving
public async sendData(dataArg: T): Promise<void> {
this.sendBackpressuredArray.push(dataArg);
this.workOnQueue();
await this.sendBackpressuredArray.waitForSpace();
}
public async fetchData(): Promise<T> {
if (this.receiveBackpressuredArray.hasSpace) {
// do something maybe?
}
await this.receiveBackpressuredArray.waitForItems();
const dataPackage = this.receiveBackpressuredArray.shift();
return dataPackage;
}
/**
* reads from a Readable and sends it to the other side
* @param readableStreamArg
*/
public async readFromWebstream(readableStreamArg: ReadableStream<T>) {
const reader = readableStreamArg.getReader();
let streamIsDone = false;
while(!streamIsDone) {
const { value, done } = await reader.read();
if(value) {
await this.sendData(value);
}
streamIsDone = done;
}
}
public async writeToWebstream(writableStreamArg: WritableStream<T>) {
const writer = writableStreamArg.getWriter();
while(this.keepAlive) {
await writer.write(await this.fetchData());
}
}
}

View File

@ -3,8 +3,12 @@
"experimentalDecorators": true,
"useDefineForClassFields": false,
"target": "ES2022",
"module": "ES2022",
"moduleResolution": "nodenext",
"esModuleInterop": true
}
"module": "NodeNext",
"moduleResolution": "NodeNext",
"esModuleInterop": true,
"verbatimModuleSyntax": true
},
"exclude": [
"dist_*/**/*.d.ts"
]
}