fix(core): update

This commit is contained in:
2024-02-25 01:54:01 +01:00
parent 51fedb270b
commit b346da01f1
6 changed files with 78 additions and 39 deletions

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@api.global/typedrequest',
version: '3.0.7',
version: '3.0.8',
description: 'make typed requests towards apis'
}

View File

@ -6,8 +6,9 @@ export { typedRequestInterfaces };
// pushrocks scope
import * as isounique from '@push.rocks/isounique';
import * as lik from '@push.rocks/lik';
import * as smartbuffer from '@push.rocks/smartbuffer';
import * as smartdelay from '@push.rocks/smartdelay';
import * as smartpromise from '@push.rocks/smartpromise';
import * as webrequest from '@push.rocks/webrequest';
export { isounique, lik, smartdelay, smartpromise, webrequest };
export { isounique, lik, smartbuffer, smartdelay, smartpromise, webrequest };

View File

@ -15,7 +15,7 @@ export interface ICommFunctions {
* 3. It has a Readable and Writable side.
* 4. The Writable side is Readable on the other side and vice versa.
*/
export class VirtualStream<T = ArrayBufferLike> {
export class VirtualStream<T = ArrayBufferLike> implements plugins.typedRequestInterfaces.IVirtualStream<T> {
// STATIC
public static encodePayloadForNetwork(
objectPayload: any,
@ -23,6 +23,9 @@ export class VirtualStream<T = ArrayBufferLike> {
originalPayload?: any,
path = []
): any {
if (plugins.smartbuffer.isBufferLike(objectPayload)) {
return objectPayload;
}
if (objectPayload instanceof VirtualStream) {
if (!objectPayload.side && commFunctions.sendMethod) {
objectPayload.side = 'requesting';
@ -34,6 +37,7 @@ export class VirtualStream<T = ArrayBufferLike> {
commFunctions.typedrouter.registeredVirtualStreams.add(objectPayload);
}
if (!originalPayload.response || path.includes('response')) {
objectPayload.startKeepAliveLoop();
return {
_isVirtualStream: true,
streamId: objectPayload.streamId,
@ -74,6 +78,9 @@ export class VirtualStream<T = ArrayBufferLike> {
}
public static decodePayloadFromNetwork(objectPayload: any, commFunctions: ICommFunctions): any {
if (plugins.smartbuffer.isBufferLike(objectPayload)) {
return objectPayload;
}
if (objectPayload !== null && typeof objectPayload === 'object') {
if (objectPayload._isVirtualStream) {
const virtualStream = new VirtualStream();
@ -87,6 +94,7 @@ export class VirtualStream<T = ArrayBufferLike> {
virtualStream.typedrouter = commFunctions.typedrouter;
commFunctions.typedrouter.registeredVirtualStreams.add(virtualStream);
}
virtualStream.startKeepAliveLoop();
return virtualStream;
} else if (Array.isArray(objectPayload)) {
const returnArray = [];
@ -116,7 +124,7 @@ export class VirtualStream<T = ArrayBufferLike> {
// wether to keep the stream alive
private keepAlive = true;
private lastKeepAliveEvent = Date.now();
private lastKeepAliveEvent: number;
// backpressured arrays
private sendBackpressuredArray =
@ -128,9 +136,7 @@ export class VirtualStream<T = ArrayBufferLike> {
16
);
constructor() {
this.startKeepAliveLoop();
}
constructor() {}
/**
* takes care of sending
@ -162,26 +168,31 @@ export class VirtualStream<T = ArrayBufferLike> {
// do work loop
let thisSideIsBackpressured = this.receiveBackpressuredArray.checkSpaceAvailable();
let thisSideIsBackpressured = !this.receiveBackpressuredArray.checkSpaceAvailable();
let otherSideHasNext = false;
let otherSideIsBackpressured = false;
while (this.sendBackpressuredArray.data.length > 0 || otherSideHasNext) {
const dataArg = this.sendBackpressuredArray.shift();
const streamTr = await this.sendMethod({
let dataArg: typeof this.sendBackpressuredArray.data[0];
if (this.sendBackpressuredArray.data.length > 0) {
dataArg = this.sendBackpressuredArray.shift();
}
let streamTr: plugins.typedRequestInterfaces.IStreamRequest;
streamTr = await this.sendMethod({
method: '##VirtualStream##',
request: {
streamId: this.streamId,
cycleId: plugins.isounique.uni(),
cycle: 'request',
mainPurpose: 'chunk',
mainPurpose: dataArg ? 'chunk' : 'read',
backpressure: thisSideIsBackpressured,
next: this.sendBackpressuredArray.data.length > 0,
chunkData: dataArg,
...dataArg ? { chunkData: dataArg } : {},
},
response: null,
}).catch(() => {
console.log('stream ended immaturely');
this.keepAlive = false;
return null;
});
if (streamTr && streamTr.response && streamTr.response.chunkData) {
@ -264,6 +275,7 @@ export class VirtualStream<T = ArrayBufferLike> {
return streamTrArg;
}
// lifecycle methods
/**
* closes the virtual stream
*/
@ -328,6 +340,7 @@ export class VirtualStream<T = ArrayBufferLike> {
this.workOnQueue();
await this.sendBackpressuredArray.waitForSpace();
}
public async fetchData(): Promise<T> {
if (this.receiveBackpressuredArray.hasSpace) {
// do something maybe?