typedrequest/ts/classes.virtualstream.ts

427 lines
14 KiB
TypeScript
Raw Normal View History

2024-02-21 18:29:35 +01:00
import * as plugins from './plugins.js';
2024-05-30 19:01:39 +02:00
import { TypedRouter } from './classes.typedrouter.js';
2024-02-21 18:29:35 +01:00
const closingBit: any = '#############CLOSING BIT#############';
2024-02-21 18:29:35 +01:00
export interface ICommFunctions {
sendMethod?: (
2024-02-24 12:20:13 +01:00
sendPayload: plugins.typedRequestInterfaces.IStreamRequest
) => Promise<plugins.typedRequestInterfaces.IStreamRequest>;
2024-02-21 18:29:35 +01:00
typedrouter?: TypedRouter;
}
2024-02-24 12:20:13 +01:00
/**
* 1. A VirtualStream connects over the network
* 2. It is always paired to one other VirtualStream
* on the other side with the same streamId.
* 3. It has a Readable and Writable side.
* 4. The Writable side is Readable on the other side and vice versa.
*/
2024-05-05 17:15:44 +02:00
export class VirtualStream<T = Uint8Array> implements plugins.typedRequestInterfaces.IVirtualStream<T> {
2024-02-21 18:29:35 +01:00
// STATIC
public static encodePayloadForNetwork(
objectPayload: any,
2024-02-24 12:20:13 +01:00
commFunctions: ICommFunctions,
originalPayload?: any,
path = []
2024-02-21 18:29:35 +01:00
): any {
2024-02-29 22:51:44 +01:00
if (!objectPayload) {
return objectPayload;
}
2024-02-25 01:54:01 +01:00
if (plugins.smartbuffer.isBufferLike(objectPayload)) {
return objectPayload;
}
2024-02-21 18:29:35 +01:00
if (objectPayload instanceof VirtualStream) {
if (!objectPayload.side && commFunctions.sendMethod) {
objectPayload.side = 'requesting';
objectPayload.sendMethod = commFunctions.sendMethod;
}
if (!objectPayload.side && commFunctions.typedrouter) {
objectPayload.side = 'responding';
objectPayload.typedrouter = commFunctions.typedrouter;
2024-02-24 12:20:13 +01:00
commFunctions.typedrouter.registeredVirtualStreams.add(objectPayload);
}
if (!originalPayload.response || path.includes('response')) {
2024-02-25 01:54:01 +01:00
objectPayload.startKeepAliveLoop();
2024-02-24 12:20:13 +01:00
return {
_isVirtualStream: true,
streamId: objectPayload.streamId,
};
} else {
return {
_OBMITTED_VIRTUAL_STREAM: true,
reason: 'path is under .request: obmitted for deduplication reasons in response cycle.',
};
2024-02-21 18:29:35 +01:00
}
} else if (Array.isArray(objectPayload)) {
2024-02-29 22:59:31 +01:00
// For arrays, we recurse over each item.
return objectPayload.map((item, index) =>
VirtualStream.encodePayloadForNetwork(
item,
commFunctions,
originalPayload || objectPayload,
path.concat(String(index)) // Convert index to string and concatenate to path
)
);
2024-02-21 18:29:35 +01:00
} else if (objectPayload !== null && typeof objectPayload === 'object') {
2024-02-29 22:59:31 +01:00
// For objects, we recurse over each key-value pair.
return Object.entries(objectPayload).reduce((acc, [key, value]) => {
const newPath = path.concat(key); // Concatenate the new key to the path
2024-02-24 12:20:13 +01:00
acc[key] = VirtualStream.encodePayloadForNetwork(
2024-02-29 22:59:31 +01:00
value,
2024-02-24 12:20:13 +01:00
commFunctions,
originalPayload || objectPayload,
2024-02-29 22:59:31 +01:00
newPath
2024-02-24 12:20:13 +01:00
);
2024-02-21 18:29:35 +01:00
return acc;
}, {});
} else {
return objectPayload;
}
}
public static decodePayloadFromNetwork(objectPayload: any, commFunctions: ICommFunctions): any {
2024-05-25 02:15:08 +02:00
if (
plugins.smartbuffer.isBufferLike(objectPayload)
|| objectPayload instanceof TypedRouter
) {
2024-02-25 01:54:01 +01:00
return objectPayload;
}
2024-02-21 18:29:35 +01:00
if (objectPayload !== null && typeof objectPayload === 'object') {
if (objectPayload._isVirtualStream) {
const virtualStream = new VirtualStream();
virtualStream.streamId = objectPayload.streamId;
if (!virtualStream.side && commFunctions.sendMethod) {
virtualStream.side = 'requesting';
virtualStream.sendMethod = commFunctions.sendMethod;
}
if (!virtualStream.side && commFunctions.typedrouter) {
virtualStream.side = 'responding';
virtualStream.typedrouter = commFunctions.typedrouter;
2024-02-24 12:20:13 +01:00
commFunctions.typedrouter.registeredVirtualStreams.add(virtualStream);
2024-02-21 18:29:35 +01:00
}
2024-02-25 01:54:01 +01:00
virtualStream.startKeepAliveLoop();
2024-02-21 18:29:35 +01:00
return virtualStream;
} else if (Array.isArray(objectPayload)) {
const returnArray = [];
for (const item of objectPayload) {
returnArray.push(VirtualStream.decodePayloadFromNetwork(item, commFunctions));
}
return returnArray;
} else {
return Object.keys(objectPayload).reduce((acc, key) => {
acc[key] = VirtualStream.decodePayloadFromNetwork(objectPayload[key], commFunctions);
return acc;
}, {});
}
} else {
return objectPayload;
}
}
// INSTANCE
public side: 'requesting' | 'responding';
public streamId: string = plugins.isounique.uni();
2024-02-24 12:20:13 +01:00
// integration with typedrequest mechanics
public sendMethod: ICommFunctions['sendMethod'];
2024-02-21 18:29:35 +01:00
public typedrouter: TypedRouter;
2024-02-24 12:20:13 +01:00
// wether to keep the stream alive
private keepAlive = true;
2024-02-25 01:54:01 +01:00
private lastKeepAliveEvent: number;
2024-02-24 12:20:13 +01:00
// backpressured arrays
private sendBackpressuredArray =
new plugins.lik.BackpressuredArray<T>(
16
);
private receiveBackpressuredArray =
new plugins.lik.BackpressuredArray<T>(
16
);
2024-02-25 01:54:01 +01:00
constructor() {}
2024-02-24 12:20:13 +01:00
workingDeferred: plugins.smartpromise.Deferred<void>;
2024-02-24 12:20:13 +01:00
/**
* takes care of sending
*/
private async workOnQueue() {
if (this.workingDeferred) {
return this.workingDeferred.promise;
} else {
this.workingDeferred = plugins.smartpromise.defer();
}
2024-02-24 12:20:13 +01:00
if(this.side === 'requesting') {
2024-02-29 19:50:25 +01:00
let thisSideIsBackpressured = !this.receiveBackpressuredArray.checkSpaceAvailable();
let otherSideHasNext = false;
let otherSideIsBackpressured = false;
2024-02-24 12:20:13 +01:00
// helper functions
const getFeedback = async () => {
const streamTr = await this.sendMethod({
method: '##VirtualStream##',
request: {
streamId: this.streamId,
cycleId: plugins.isounique.uni(),
cycle: 'request',
mainPurpose: 'feedback',
next: this.sendBackpressuredArray.data.length > 0,
backpressure: !this.receiveBackpressuredArray.checkSpaceAvailable(),
2024-02-24 12:20:13 +01:00
},
response: null,
}).catch(() => {
console.log('stream ended immaturely');
this.keepAlive = false;
});
if (streamTr && streamTr.response) {
otherSideIsBackpressured = streamTr.response.backpressure
otherSideHasNext = streamTr.response.next;
}
}
2024-02-29 19:50:25 +01:00
await getFeedback();
2024-02-24 12:20:13 +01:00
// do work loop
while (this.sendBackpressuredArray.data.length > 0 || otherSideHasNext) {
if (otherSideIsBackpressured) {
while (otherSideIsBackpressured) {
console.log('waiting for feedback because of backpressure...');
await plugins.smartdelay.delayFor(50);
await getFeedback();
}
}
2024-02-25 01:54:01 +01:00
let dataArg: typeof this.sendBackpressuredArray.data[0];
if (this.sendBackpressuredArray.data.length > 0) {
dataArg = this.sendBackpressuredArray.shift();
}
let streamTr: plugins.typedRequestInterfaces.IStreamRequest;
streamTr = await this.sendMethod({
2024-02-24 12:20:13 +01:00
method: '##VirtualStream##',
request: {
streamId: this.streamId,
cycleId: plugins.isounique.uni(),
cycle: 'request',
2024-02-25 01:54:01 +01:00
mainPurpose: dataArg ? 'chunk' : 'read',
2024-02-24 12:20:13 +01:00
backpressure: thisSideIsBackpressured,
next: this.sendBackpressuredArray.data.length > 0,
2024-02-25 01:54:01 +01:00
...dataArg ? { chunkData: dataArg } : {},
2024-02-24 12:20:13 +01:00
},
response: null,
}).catch(() => {
console.log('stream ended immaturely');
this.keepAlive = false;
2024-02-25 01:54:01 +01:00
return null;
2024-02-24 12:20:13 +01:00
});
2024-02-21 18:29:35 +01:00
2024-02-24 12:20:13 +01:00
if (streamTr && streamTr.response && streamTr.response.chunkData) {
this.receiveBackpressuredArray.push(streamTr.response.chunkData);
}
otherSideIsBackpressured = streamTr && streamTr.response && streamTr.response.backpressure;
thisSideIsBackpressured = !this.receiveBackpressuredArray.checkSpaceAvailable();
2024-02-24 12:20:13 +01:00
// lets care about looping
otherSideHasNext = streamTr && streamTr.response && streamTr.response.next;
}
}
this.workingDeferred.resolve();
this.workingDeferred = null;
2024-02-24 12:20:13 +01:00
}
/**
* This method handles the stream only on the responding side
* @param streamTrArg
* @returns
*/
2024-02-21 18:29:35 +01:00
public async handleStreamTr(streamTrArg: plugins.typedRequestInterfaces.IStreamRequest) {
2024-02-24 12:20:13 +01:00
if (streamTrArg.request.keepAlive === true && this.keepAlive === true) {
this.lastKeepAliveEvent = Date.now();
} else if (streamTrArg.request.keepAlive === false) {
this.keepAlive = false;
}
// keepAlive handling
if (streamTrArg.request.mainPurpose === 'keepAlive') {
// if the main purpose is keepAlive, we answer with a keepAlive
streamTrArg.response = {
streamId: this.streamId,
cycleId: streamTrArg.request.cycleId,
cycle: 'response',
mainPurpose: 'keepAlive',
keepAlive: this.keepAlive,
next: this.sendBackpressuredArray.data.length > 0,
backpressure: !this.receiveBackpressuredArray.checkSpaceAvailable(),
2024-02-24 12:20:13 +01:00
};
}
// feedback handling
if (streamTrArg.request.mainPurpose === 'feedback') {
streamTrArg.response = {
streamId: this.streamId,
cycleId: streamTrArg.request.cycleId,
cycle: 'response',
mainPurpose: 'feedback',
next: this.sendBackpressuredArray.data.length > 0,
backpressure: !this.receiveBackpressuredArray.checkSpaceAvailable(),
2024-02-24 12:20:13 +01:00
};
}
// chunk handling
if (streamTrArg.request.mainPurpose === 'chunk') {
this.receiveBackpressuredArray.push(streamTrArg.request.chunkData);
if (this.sendBackpressuredArray.data.length > 0 && streamTrArg.response.backpressure === false) {
const dataArg = this.sendBackpressuredArray.shift();
streamTrArg.response = {
streamId: this.streamId,
cycleId: streamTrArg.request.cycleId,
cycle: 'response',
mainPurpose: 'chunk',
next: this.sendBackpressuredArray.data.length > 1, // 1 and not 0 because we call shift a few lines down
backpressure: !this.receiveBackpressuredArray.checkSpaceAvailable(),
2024-02-24 12:20:13 +01:00
chunkData: this.sendBackpressuredArray.shift(),
};
} else {
streamTrArg.response = {
streamId: this.streamId,
cycleId: streamTrArg.request.cycleId,
cycle: 'response',
mainPurpose: 'feedback',
next: this.sendBackpressuredArray.data.length > 0,
backpressure: !this.receiveBackpressuredArray.checkSpaceAvailable(),
2024-02-24 12:20:13 +01:00
};
}
streamTrArg.request = null;
}
2024-02-21 18:29:35 +01:00
return streamTrArg;
}
2024-02-25 01:54:01 +01:00
// lifecycle methods
2024-02-21 18:29:35 +01:00
/**
* closes the virtual stream
*/
2024-02-24 12:20:13 +01:00
public async cleanup() {
2024-02-21 18:29:35 +01:00
if (this.typedrouter) {
this.typedrouter.registeredVirtualStreams.remove(this);
}
}
2024-02-24 12:20:13 +01:00
/**
* a keepAlive loop that works across technologies
*/
private async startKeepAliveLoop() {
// initially wait for a second
if (this.side === 'responding') {
return;
}
2024-02-24 12:20:13 +01:00
await plugins.smartdelay.delayFor(0);
console.log(`starting keepalive loop on side ${this.side}`);
2024-02-24 12:20:13 +01:00
let counter = 0;
keepAliveLoop: while (this.keepAlive) {
2024-05-31 22:41:17 +02:00
await this.triggerKeepAlive();
2024-02-24 12:20:13 +01:00
await plugins.smartdelay.delayFor(1000);
}
await plugins.smartdelay.delayFor(1000);
await this.cleanup();
console.log(`cleaned up for stream ${this.streamId}`);
}
private async triggerKeepAlive() {
if (this.side === 'requesting') {
console.log(`keepalive sent.`);
const streamTr = await this.sendMethod({
method: '##VirtualStream##',
request: {
streamId: this.streamId,
cycleId: plugins.isounique.uni(),
cycle: 'request',
mainPurpose: 'keepAlive',
keepAlive: this.keepAlive,
2024-02-24 12:20:13 +01:00
},
response: null,
}).catch(() => {
this.keepAlive = false;
});
// lets handle keepAlive
if (streamTr && streamTr.response && streamTr.response.keepAlive === false) {
this.keepAlive = false;
} else {
this.lastKeepAliveEvent = Date.now();
}
if (streamTr && streamTr.response && streamTr.response.next) {
this.workOnQueue();
}
}
if (Date.now() - this.lastKeepAliveEvent > 10000) {
console.log(`closing stream for ${this.streamId}`);
this.keepAlive = false;
}
}
// Data sending and receiving
public async sendData(dataArg: T): Promise<void> {
this.sendBackpressuredArray.push(dataArg);
this.workOnQueue();
await this.sendBackpressuredArray.waitForSpace();
}
2024-02-25 01:54:01 +01:00
2024-02-24 12:20:13 +01:00
public async fetchData(): Promise<T> {
if (this.receiveBackpressuredArray.hasSpace) {
// do something maybe?
}
await this.receiveBackpressuredArray.waitForItems();
const dataPackage = this.receiveBackpressuredArray.shift();
return dataPackage;
}
2024-02-29 19:50:25 +01:00
/**
* reads from a Readable and sends it to the other side
* @param readableStreamArg
*/
public async readFromWebstream(readableStreamArg: ReadableStream<T>, closeAfterReading = true) {
2024-02-29 19:50:25 +01:00
const reader = readableStreamArg.getReader();
let streamIsDone = false;
while(!streamIsDone) {
const { value, done } = await reader.read();
if(value) {
await this.sendData(value);
}
streamIsDone = done;
}
if (closeAfterReading) {
await this.close(true);
}
2024-02-29 19:50:25 +01:00
}
public async writeToWebstream(writableStreamArg: WritableStream<T>) {
const writer = writableStreamArg.getWriter();
while(this.keepAlive || this.receiveBackpressuredArray.checkHasItems()) {
const value = await this.fetchData();
if (value === closingBit) {
writer.releaseLock();
await writableStreamArg.close();
break;
}
await writer.write(value);
2024-02-29 19:50:25 +01:00
}
2024-02-24 12:20:13 +01:00
}
/**
* closes the stream
* if sendClosingBitArg is true, the stream will send a closing bit
* @param sendClosingBitArg
*/
public async close(sendClosingBitArg = false) {
if (sendClosingBitArg) {
this.sendData(closingBit);
}
this.keepAlive = false;
}
2024-02-21 18:29:35 +01:00
}