import * as plugins from './plugins.js'; import { TypedRouter } from './classes.typedrouter.js'; const closingBit: any = '#############CLOSING BIT#############'; export interface ICommFunctions { sendMethod?: ( sendPayload: plugins.typedRequestInterfaces.IStreamRequest ) => Promise; typedrouter?: TypedRouter; } /** * 1. A VirtualStream connects over the network * 2. It is always paired to one other VirtualStream * on the other side with the same streamId. * 3. It has a Readable and Writable side. * 4. The Writable side is Readable on the other side and vice versa. */ export class VirtualStream implements plugins.typedRequestInterfaces.IVirtualStream { // STATIC public static encodePayloadForNetwork( objectPayload: any, commFunctions: ICommFunctions, originalPayload?: any, path = [] ): any { if (!objectPayload) { return objectPayload; } if (plugins.smartbuffer.isBufferLike(objectPayload)) { return objectPayload; } if (objectPayload instanceof VirtualStream) { if (!objectPayload.side && commFunctions.sendMethod) { objectPayload.side = 'requesting'; objectPayload.sendMethod = commFunctions.sendMethod; } if (!objectPayload.side && commFunctions.typedrouter) { objectPayload.side = 'responding'; objectPayload.typedrouter = commFunctions.typedrouter; commFunctions.typedrouter.registeredVirtualStreams.add(objectPayload); } if (!originalPayload.response || path.includes('response')) { objectPayload.startKeepAliveLoop(); return { _isVirtualStream: true, streamId: objectPayload.streamId, }; } else { return { _OBMITTED_VIRTUAL_STREAM: true, reason: 'path is under .request: obmitted for deduplication reasons in response cycle.', }; } } else if (Array.isArray(objectPayload)) { // For arrays, we recurse over each item. return objectPayload.map((item, index) => VirtualStream.encodePayloadForNetwork( item, commFunctions, originalPayload || objectPayload, path.concat(String(index)) // Convert index to string and concatenate to path ) ); } else if (objectPayload !== null && typeof objectPayload === 'object') { // For objects, we recurse over each key-value pair. return Object.entries(objectPayload).reduce((acc, [key, value]) => { const newPath = path.concat(key); // Concatenate the new key to the path acc[key] = VirtualStream.encodePayloadForNetwork( value, commFunctions, originalPayload || objectPayload, newPath ); return acc; }, {}); } else { return objectPayload; } } public static decodePayloadFromNetwork(objectPayload: any, commFunctions: ICommFunctions): any { if ( plugins.smartbuffer.isBufferLike(objectPayload) || objectPayload instanceof TypedRouter ) { return objectPayload; } if (objectPayload !== null && typeof objectPayload === 'object') { if (objectPayload._isVirtualStream) { const virtualStream = new VirtualStream(); virtualStream.streamId = objectPayload.streamId; if (!virtualStream.side && commFunctions.sendMethod) { virtualStream.side = 'requesting'; virtualStream.sendMethod = commFunctions.sendMethod; } if (!virtualStream.side && commFunctions.typedrouter) { virtualStream.side = 'responding'; virtualStream.typedrouter = commFunctions.typedrouter; commFunctions.typedrouter.registeredVirtualStreams.add(virtualStream); } virtualStream.startKeepAliveLoop(); return virtualStream; } else if (Array.isArray(objectPayload)) { const returnArray = []; for (const item of objectPayload) { returnArray.push(VirtualStream.decodePayloadFromNetwork(item, commFunctions)); } return returnArray; } else { return Object.keys(objectPayload).reduce((acc, key) => { acc[key] = VirtualStream.decodePayloadFromNetwork(objectPayload[key], commFunctions); return acc; }, {}); } } else { return objectPayload; } } // INSTANCE public side: 'requesting' | 'responding'; public streamId: string = plugins.isounique.uni(); // integration with typedrequest mechanics public sendMethod: ICommFunctions['sendMethod']; public typedrouter: TypedRouter; // wether to keep the stream alive private keepAlive = true; private lastKeepAliveEvent: number; // backpressured arrays private sendBackpressuredArray = new plugins.lik.BackpressuredArray( 16 ); private receiveBackpressuredArray = new plugins.lik.BackpressuredArray( 16 ); constructor() {} workingDeferred: plugins.smartpromise.Deferred; /** * takes care of sending */ private async workOnQueue() { if (this.workingDeferred) { return this.workingDeferred.promise; } else { this.workingDeferred = plugins.smartpromise.defer(); } if(this.side === 'requesting') { let thisSideIsBackpressured = !this.receiveBackpressuredArray.checkSpaceAvailable(); let otherSideHasNext = false; let otherSideIsBackpressured = false; // helper functions const getFeedback = async () => { const streamTr = await this.sendMethod({ method: '##VirtualStream##', request: { streamId: this.streamId, cycleId: plugins.isounique.uni(), cycle: 'request', mainPurpose: 'feedback', next: this.sendBackpressuredArray.data.length > 0, backpressure: !this.receiveBackpressuredArray.checkSpaceAvailable(), }, response: null, }).catch(() => { console.log('stream ended immaturely'); this.keepAlive = false; }); if (streamTr && streamTr.response) { otherSideIsBackpressured = streamTr.response.backpressure otherSideHasNext = streamTr.response.next; } } await getFeedback(); // do work loop while (this.sendBackpressuredArray.data.length > 0 || otherSideHasNext) { if (otherSideIsBackpressured) { while (otherSideIsBackpressured) { console.log('waiting for feedback because of backpressure...'); await plugins.smartdelay.delayFor(50); await getFeedback(); } } let dataArg: typeof this.sendBackpressuredArray.data[0]; if (this.sendBackpressuredArray.data.length > 0) { dataArg = this.sendBackpressuredArray.shift(); } let streamTr: plugins.typedRequestInterfaces.IStreamRequest; streamTr = await this.sendMethod({ method: '##VirtualStream##', request: { streamId: this.streamId, cycleId: plugins.isounique.uni(), cycle: 'request', mainPurpose: dataArg ? 'chunk' : 'read', backpressure: thisSideIsBackpressured, next: this.sendBackpressuredArray.data.length > 0, ...dataArg ? { chunkData: dataArg } : {}, }, response: null, }).catch(() => { console.log('stream ended immaturely'); this.keepAlive = false; return null; }); if (streamTr && streamTr.response && streamTr.response.chunkData) { this.receiveBackpressuredArray.push(streamTr.response.chunkData); } otherSideIsBackpressured = streamTr && streamTr.response && streamTr.response.backpressure; thisSideIsBackpressured = !this.receiveBackpressuredArray.checkSpaceAvailable(); // lets care about looping otherSideHasNext = streamTr && streamTr.response && streamTr.response.next; } } this.workingDeferred.resolve(); this.workingDeferred = null; } /** * This method handles the stream only on the responding side * @param streamTrArg * @returns */ public async handleStreamTr(streamTrArg: plugins.typedRequestInterfaces.IStreamRequest) { if (streamTrArg.request.keepAlive === true && this.keepAlive === true) { this.lastKeepAliveEvent = Date.now(); } else if (streamTrArg.request.keepAlive === false) { this.keepAlive = false; } // keepAlive handling if (streamTrArg.request.mainPurpose === 'keepAlive') { // if the main purpose is keepAlive, we answer with a keepAlive streamTrArg.response = { streamId: this.streamId, cycleId: streamTrArg.request.cycleId, cycle: 'response', mainPurpose: 'keepAlive', keepAlive: this.keepAlive, next: this.sendBackpressuredArray.data.length > 0, backpressure: !this.receiveBackpressuredArray.checkSpaceAvailable(), }; } // feedback handling if (streamTrArg.request.mainPurpose === 'feedback') { streamTrArg.response = { streamId: this.streamId, cycleId: streamTrArg.request.cycleId, cycle: 'response', mainPurpose: 'feedback', next: this.sendBackpressuredArray.data.length > 0, backpressure: !this.receiveBackpressuredArray.checkSpaceAvailable(), }; } // chunk handling if (streamTrArg.request.mainPurpose === 'chunk') { this.receiveBackpressuredArray.push(streamTrArg.request.chunkData); if (this.sendBackpressuredArray.data.length > 0 && streamTrArg.response.backpressure === false) { const dataArg = this.sendBackpressuredArray.shift(); streamTrArg.response = { streamId: this.streamId, cycleId: streamTrArg.request.cycleId, cycle: 'response', mainPurpose: 'chunk', next: this.sendBackpressuredArray.data.length > 1, // 1 and not 0 because we call shift a few lines down backpressure: !this.receiveBackpressuredArray.checkSpaceAvailable(), chunkData: this.sendBackpressuredArray.shift(), }; } else { streamTrArg.response = { streamId: this.streamId, cycleId: streamTrArg.request.cycleId, cycle: 'response', mainPurpose: 'feedback', next: this.sendBackpressuredArray.data.length > 0, backpressure: !this.receiveBackpressuredArray.checkSpaceAvailable(), }; } streamTrArg.request = null; } return streamTrArg; } // lifecycle methods /** * closes the virtual stream */ public async cleanup() { if (this.typedrouter) { this.typedrouter.registeredVirtualStreams.remove(this); } } /** * a keepAlive loop that works across technologies */ private async startKeepAliveLoop() { // initially wait for a second if (this.side === 'responding') { return; } await plugins.smartdelay.delayFor(0); console.log(`starting keepalive loop on side ${this.side}`); let counter = 0; keepAliveLoop: while (this.keepAlive) { await this.triggerKeepAlive(); await plugins.smartdelay.delayFor(1000); } await plugins.smartdelay.delayFor(1000); await this.cleanup(); console.log(`cleaned up for stream ${this.streamId}`); } private async triggerKeepAlive() { if (this.side === 'requesting') { console.log(`keepalive sent.`); const streamTr = await this.sendMethod({ method: '##VirtualStream##', request: { streamId: this.streamId, cycleId: plugins.isounique.uni(), cycle: 'request', mainPurpose: 'keepAlive', keepAlive: this.keepAlive, }, response: null, }).catch(() => { this.keepAlive = false; }); // lets handle keepAlive if (streamTr && streamTr.response && streamTr.response.keepAlive === false) { this.keepAlive = false; } else { this.lastKeepAliveEvent = Date.now(); } if (streamTr && streamTr.response && streamTr.response.next) { this.workOnQueue(); } } if (Date.now() - this.lastKeepAliveEvent > 10000) { console.log(`closing stream for ${this.streamId}`); this.keepAlive = false; } } // Data sending and receiving public async sendData(dataArg: T): Promise { this.sendBackpressuredArray.push(dataArg); this.workOnQueue(); await this.sendBackpressuredArray.waitForSpace(); } public async fetchData(): Promise { if (this.receiveBackpressuredArray.hasSpace) { // do something maybe? } await this.receiveBackpressuredArray.waitForItems(); const dataPackage = this.receiveBackpressuredArray.shift(); return dataPackage; } /** * reads from a Readable and sends it to the other side * @param readableStreamArg */ public async readFromWebstream(readableStreamArg: ReadableStream, closeAfterReading = true) { const reader = readableStreamArg.getReader(); let streamIsDone = false; while(!streamIsDone) { const { value, done } = await reader.read(); if(value) { await this.sendData(value); } streamIsDone = done; } if (closeAfterReading) { await this.close(true); } } public async writeToWebstream(writableStreamArg: WritableStream) { const writer = writableStreamArg.getWriter(); while(this.keepAlive || this.receiveBackpressuredArray.checkHasItems()) { const value = await this.fetchData(); if (value === closingBit) { await writableStreamArg.close(); break; } await writer.write(value); } } /** * closes the stream * if sendClosingBitArg is true, the stream will send a closing bit * @param sendClosingBitArg */ public async close(sendClosingBitArg = false) { if (sendClosingBitArg) { this.sendData(closingBit); } this.keepAlive = false; } }