fix(core): update

This commit is contained in:
2024-02-29 19:50:25 +01:00
parent ad106909e2
commit 407e1383f8
6 changed files with 190 additions and 106 deletions

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@api.global/typedrequest',
version: '3.0.8',
version: '3.0.9',
description: 'make typed requests towards apis'
}

View File

@ -10,5 +10,6 @@ import * as smartbuffer from '@push.rocks/smartbuffer';
import * as smartdelay from '@push.rocks/smartdelay';
import * as smartpromise from '@push.rocks/smartpromise';
import * as webrequest from '@push.rocks/webrequest';
import * as webstream from '@push.rocks/webstream';
export { isounique, lik, smartbuffer, smartdelay, smartpromise, webrequest };
export { isounique, lik, smartbuffer, smartdelay, smartpromise, webrequest, webstream };

View File

@ -143,6 +143,10 @@ export class VirtualStream<T = ArrayBufferLike> implements plugins.typedRequestI
*/
private async workOnQueue() {
if(this.side === 'requesting') {
let thisSideIsBackpressured = !this.receiveBackpressuredArray.checkSpaceAvailable();
let otherSideHasNext = false;
let otherSideIsBackpressured = false;
// helper functions
const getFeedback = async () => {
const streamTr = await this.sendMethod({
@ -165,12 +169,9 @@ export class VirtualStream<T = ArrayBufferLike> implements plugins.typedRequestI
otherSideHasNext = streamTr.response.next;
}
}
await getFeedback();
// do work loop
let thisSideIsBackpressured = !this.receiveBackpressuredArray.checkSpaceAvailable();
let otherSideHasNext = false;
let otherSideIsBackpressured = false;
while (this.sendBackpressuredArray.data.length > 0 || otherSideHasNext) {
let dataArg: typeof this.sendBackpressuredArray.data[0];
if (this.sendBackpressuredArray.data.length > 0) {
@ -243,7 +244,6 @@ export class VirtualStream<T = ArrayBufferLike> implements plugins.typedRequestI
next: this.sendBackpressuredArray.data.length > 0,
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
};
streamTrArg.request = null;
}
// chunk handling
@ -350,10 +350,26 @@ export class VirtualStream<T = ArrayBufferLike> implements plugins.typedRequestI
return dataPackage;
}
public pipeWebStream(webStream: any) {
// lets do the piping
webStream.on('data', (data: any) => {});
webStream.on('end', () => {});
webStream.on('error', (error: any) => {});
/**
* reads from a Readable and sends it to the other side
* @param readableStreamArg
*/
public async readFromWebstream(readableStreamArg: ReadableStream<T>) {
const reader = readableStreamArg.getReader();
let streamIsDone = false;
while(!streamIsDone) {
const { value, done } = await reader.read();
if(value) {
await this.sendData(value);
}
streamIsDone = done;
}
}
public async writeToWebstream(writableStreamArg: WritableStream<T>) {
const writer = writableStreamArg.getWriter();
while(this.keepAlive) {
await writer.write(await this.fetchData());
}
}
}