385 lines
12 KiB
TypeScript
385 lines
12 KiB
TypeScript
import * as plugins from './plugins.js';
|
|
import { TypedRouter } from './classes.typedrouter.js';
|
|
|
|
export interface ICommFunctions {
|
|
sendMethod?: (
|
|
sendPayload: plugins.typedRequestInterfaces.IStreamRequest
|
|
) => Promise<plugins.typedRequestInterfaces.IStreamRequest>;
|
|
typedrouter?: TypedRouter;
|
|
}
|
|
|
|
/**
|
|
* 1. A VirtualStream connects over the network
|
|
* 2. It is always paired to one other VirtualStream
|
|
* on the other side with the same streamId.
|
|
* 3. It has a Readable and Writable side.
|
|
* 4. The Writable side is Readable on the other side and vice versa.
|
|
*/
|
|
export class VirtualStream<T = Uint8Array> implements plugins.typedRequestInterfaces.IVirtualStream<T> {
|
|
// STATIC
|
|
public static encodePayloadForNetwork(
|
|
objectPayload: any,
|
|
commFunctions: ICommFunctions,
|
|
originalPayload?: any,
|
|
path = []
|
|
): any {
|
|
if (!objectPayload) {
|
|
return objectPayload;
|
|
}
|
|
if (plugins.smartbuffer.isBufferLike(objectPayload)) {
|
|
return objectPayload;
|
|
}
|
|
if (objectPayload instanceof VirtualStream) {
|
|
if (!objectPayload.side && commFunctions.sendMethod) {
|
|
objectPayload.side = 'requesting';
|
|
objectPayload.sendMethod = commFunctions.sendMethod;
|
|
}
|
|
if (!objectPayload.side && commFunctions.typedrouter) {
|
|
objectPayload.side = 'responding';
|
|
objectPayload.typedrouter = commFunctions.typedrouter;
|
|
commFunctions.typedrouter.registeredVirtualStreams.add(objectPayload);
|
|
}
|
|
if (!originalPayload.response || path.includes('response')) {
|
|
objectPayload.startKeepAliveLoop();
|
|
return {
|
|
_isVirtualStream: true,
|
|
streamId: objectPayload.streamId,
|
|
};
|
|
} else {
|
|
return {
|
|
_OBMITTED_VIRTUAL_STREAM: true,
|
|
reason: 'path is under .request: obmitted for deduplication reasons in response cycle.',
|
|
};
|
|
}
|
|
} else if (Array.isArray(objectPayload)) {
|
|
// For arrays, we recurse over each item.
|
|
return objectPayload.map((item, index) =>
|
|
VirtualStream.encodePayloadForNetwork(
|
|
item,
|
|
commFunctions,
|
|
originalPayload || objectPayload,
|
|
path.concat(String(index)) // Convert index to string and concatenate to path
|
|
)
|
|
);
|
|
} else if (objectPayload !== null && typeof objectPayload === 'object') {
|
|
// For objects, we recurse over each key-value pair.
|
|
return Object.entries(objectPayload).reduce((acc, [key, value]) => {
|
|
const newPath = path.concat(key); // Concatenate the new key to the path
|
|
acc[key] = VirtualStream.encodePayloadForNetwork(
|
|
value,
|
|
commFunctions,
|
|
originalPayload || objectPayload,
|
|
newPath
|
|
);
|
|
return acc;
|
|
}, {});
|
|
} else {
|
|
return objectPayload;
|
|
}
|
|
}
|
|
|
|
public static decodePayloadFromNetwork(objectPayload: any, commFunctions: ICommFunctions): any {
|
|
|
|
if (
|
|
plugins.smartbuffer.isBufferLike(objectPayload)
|
|
|| objectPayload instanceof TypedRouter
|
|
) {
|
|
return objectPayload;
|
|
}
|
|
if (objectPayload !== null && typeof objectPayload === 'object') {
|
|
if (objectPayload._isVirtualStream) {
|
|
const virtualStream = new VirtualStream();
|
|
virtualStream.streamId = objectPayload.streamId;
|
|
if (!virtualStream.side && commFunctions.sendMethod) {
|
|
virtualStream.side = 'requesting';
|
|
virtualStream.sendMethod = commFunctions.sendMethod;
|
|
}
|
|
if (!virtualStream.side && commFunctions.typedrouter) {
|
|
virtualStream.side = 'responding';
|
|
virtualStream.typedrouter = commFunctions.typedrouter;
|
|
commFunctions.typedrouter.registeredVirtualStreams.add(virtualStream);
|
|
}
|
|
virtualStream.startKeepAliveLoop();
|
|
return virtualStream;
|
|
} else if (Array.isArray(objectPayload)) {
|
|
const returnArray = [];
|
|
for (const item of objectPayload) {
|
|
returnArray.push(VirtualStream.decodePayloadFromNetwork(item, commFunctions));
|
|
}
|
|
return returnArray;
|
|
} else {
|
|
return Object.keys(objectPayload).reduce((acc, key) => {
|
|
acc[key] = VirtualStream.decodePayloadFromNetwork(objectPayload[key], commFunctions);
|
|
return acc;
|
|
}, {});
|
|
}
|
|
} else {
|
|
return objectPayload;
|
|
}
|
|
}
|
|
|
|
// INSTANCE
|
|
|
|
public side: 'requesting' | 'responding';
|
|
public streamId: string = plugins.isounique.uni();
|
|
|
|
// integration with typedrequest mechanics
|
|
public sendMethod: ICommFunctions['sendMethod'];
|
|
public typedrouter: TypedRouter;
|
|
|
|
// wether to keep the stream alive
|
|
private keepAlive = true;
|
|
private lastKeepAliveEvent: number;
|
|
|
|
// backpressured arrays
|
|
private sendBackpressuredArray =
|
|
new plugins.lik.BackpressuredArray<T>(
|
|
16
|
|
);
|
|
private receiveBackpressuredArray =
|
|
new plugins.lik.BackpressuredArray<T>(
|
|
16
|
|
);
|
|
|
|
constructor() {}
|
|
|
|
/**
|
|
* takes care of sending
|
|
*/
|
|
private async workOnQueue() {
|
|
if(this.side === 'requesting') {
|
|
let thisSideIsBackpressured = !this.receiveBackpressuredArray.checkSpaceAvailable();
|
|
let otherSideHasNext = false;
|
|
let otherSideIsBackpressured = false;
|
|
|
|
// helper functions
|
|
const getFeedback = async () => {
|
|
const streamTr = await this.sendMethod({
|
|
method: '##VirtualStream##',
|
|
request: {
|
|
streamId: this.streamId,
|
|
cycleId: plugins.isounique.uni(),
|
|
cycle: 'request',
|
|
mainPurpose: 'feedback',
|
|
next: this.sendBackpressuredArray.data.length > 0,
|
|
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
|
|
},
|
|
response: null,
|
|
}).catch(() => {
|
|
console.log('stream ended immaturely');
|
|
this.keepAlive = false;
|
|
});
|
|
if (streamTr && streamTr.response) {
|
|
otherSideIsBackpressured = streamTr.response.backpressure
|
|
otherSideHasNext = streamTr.response.next;
|
|
}
|
|
}
|
|
await getFeedback();
|
|
|
|
// do work loop
|
|
while (this.sendBackpressuredArray.data.length > 0 || otherSideHasNext) {
|
|
let dataArg: typeof this.sendBackpressuredArray.data[0];
|
|
if (this.sendBackpressuredArray.data.length > 0) {
|
|
dataArg = this.sendBackpressuredArray.shift();
|
|
}
|
|
let streamTr: plugins.typedRequestInterfaces.IStreamRequest;
|
|
streamTr = await this.sendMethod({
|
|
method: '##VirtualStream##',
|
|
request: {
|
|
streamId: this.streamId,
|
|
cycleId: plugins.isounique.uni(),
|
|
cycle: 'request',
|
|
mainPurpose: dataArg ? 'chunk' : 'read',
|
|
backpressure: thisSideIsBackpressured,
|
|
next: this.sendBackpressuredArray.data.length > 0,
|
|
...dataArg ? { chunkData: dataArg } : {},
|
|
},
|
|
response: null,
|
|
}).catch(() => {
|
|
console.log('stream ended immaturely');
|
|
this.keepAlive = false;
|
|
return null;
|
|
});
|
|
|
|
if (streamTr && streamTr.response && streamTr.response.chunkData) {
|
|
this.receiveBackpressuredArray.push(streamTr.response.chunkData);
|
|
}
|
|
thisSideIsBackpressured = this.receiveBackpressuredArray.checkSpaceAvailable();
|
|
|
|
// lets care about looping
|
|
otherSideHasNext = streamTr && streamTr.response && streamTr.response.next;
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
/**
|
|
* This method handles the stream only on the responding side
|
|
* @param streamTrArg
|
|
* @returns
|
|
*/
|
|
public async handleStreamTr(streamTrArg: plugins.typedRequestInterfaces.IStreamRequest) {
|
|
if (streamTrArg.request.keepAlive === true && this.keepAlive === true) {
|
|
this.lastKeepAliveEvent = Date.now();
|
|
} else if (streamTrArg.request.keepAlive === false) {
|
|
this.keepAlive = false;
|
|
}
|
|
|
|
// keepAlive handling
|
|
if (streamTrArg.request.mainPurpose === 'keepAlive') {
|
|
// if the main purpose is keepAlive, we answer with a keepAlive
|
|
streamTrArg.response = {
|
|
streamId: this.streamId,
|
|
cycleId: streamTrArg.request.cycleId,
|
|
cycle: 'response',
|
|
mainPurpose: 'keepAlive',
|
|
keepAlive: this.keepAlive,
|
|
next: this.sendBackpressuredArray.data.length > 0,
|
|
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
|
|
};
|
|
}
|
|
|
|
// feedback handling
|
|
if (streamTrArg.request.mainPurpose === 'feedback') {
|
|
streamTrArg.response = {
|
|
streamId: this.streamId,
|
|
cycleId: streamTrArg.request.cycleId,
|
|
cycle: 'response',
|
|
mainPurpose: 'feedback',
|
|
next: this.sendBackpressuredArray.data.length > 0,
|
|
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
|
|
};
|
|
}
|
|
|
|
// chunk handling
|
|
if (streamTrArg.request.mainPurpose === 'chunk') {
|
|
this.receiveBackpressuredArray.push(streamTrArg.request.chunkData);
|
|
if (this.sendBackpressuredArray.data.length > 0 && streamTrArg.response.backpressure === false) {
|
|
const dataArg = this.sendBackpressuredArray.shift();
|
|
streamTrArg.response = {
|
|
streamId: this.streamId,
|
|
cycleId: streamTrArg.request.cycleId,
|
|
cycle: 'response',
|
|
mainPurpose: 'chunk',
|
|
next: this.sendBackpressuredArray.data.length > 1,
|
|
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
|
|
chunkData: this.sendBackpressuredArray.shift(),
|
|
};
|
|
} else {
|
|
streamTrArg.response = {
|
|
streamId: this.streamId,
|
|
cycleId: streamTrArg.request.cycleId,
|
|
cycle: 'response',
|
|
mainPurpose: 'feedback',
|
|
next: this.sendBackpressuredArray.data.length > 0,
|
|
};
|
|
}
|
|
streamTrArg.request = null;
|
|
}
|
|
|
|
return streamTrArg;
|
|
}
|
|
|
|
// lifecycle methods
|
|
/**
|
|
* closes the virtual stream
|
|
*/
|
|
public async cleanup() {
|
|
if (this.typedrouter) {
|
|
this.typedrouter.registeredVirtualStreams.remove(this);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* a keepAlive loop that works across technologies
|
|
*/
|
|
private async startKeepAliveLoop() {
|
|
// initially wait for a second
|
|
await plugins.smartdelay.delayFor(0);
|
|
let counter = 0;
|
|
keepAliveLoop: while (this.keepAlive) {
|
|
await this.triggerKeepAlive();
|
|
await plugins.smartdelay.delayFor(1000);
|
|
}
|
|
await plugins.smartdelay.delayFor(1000);
|
|
await this.cleanup();
|
|
console.log(`cleaned up for stream ${this.streamId}`);
|
|
}
|
|
|
|
private async triggerKeepAlive() {
|
|
if (this.side === 'requesting') {
|
|
console.log(`keepalive sent.`);
|
|
const streamTr = await this.sendMethod({
|
|
method: '##VirtualStream##',
|
|
request: {
|
|
streamId: this.streamId,
|
|
cycleId: plugins.isounique.uni(),
|
|
cycle: 'request',
|
|
mainPurpose: 'keepAlive',
|
|
keepAlive: true,
|
|
},
|
|
response: null,
|
|
}).catch(() => {
|
|
this.keepAlive = false;
|
|
});
|
|
|
|
// lets handle keepAlive
|
|
if (streamTr && streamTr.response && streamTr.response.keepAlive === false) {
|
|
this.keepAlive = false;
|
|
} else {
|
|
this.lastKeepAliveEvent = Date.now();
|
|
}
|
|
if (streamTr && streamTr.response && streamTr.response.next) {
|
|
this.workOnQueue();
|
|
}
|
|
}
|
|
if (Date.now() - this.lastKeepAliveEvent > 10000) {
|
|
console.log(`closing stream for ${this.streamId}`);
|
|
this.keepAlive = false;
|
|
}
|
|
}
|
|
|
|
// Data sending and receiving
|
|
public async sendData(dataArg: T): Promise<void> {
|
|
this.sendBackpressuredArray.push(dataArg);
|
|
this.workOnQueue();
|
|
await this.sendBackpressuredArray.waitForSpace();
|
|
}
|
|
|
|
public async fetchData(): Promise<T> {
|
|
if (this.receiveBackpressuredArray.hasSpace) {
|
|
// do something maybe?
|
|
}
|
|
await this.receiveBackpressuredArray.waitForItems();
|
|
const dataPackage = this.receiveBackpressuredArray.shift();
|
|
return dataPackage;
|
|
}
|
|
|
|
/**
|
|
* reads from a Readable and sends it to the other side
|
|
* @param readableStreamArg
|
|
*/
|
|
public async readFromWebstream(readableStreamArg: ReadableStream<T>) {
|
|
const reader = readableStreamArg.getReader();
|
|
let streamIsDone = false;
|
|
while(!streamIsDone) {
|
|
const { value, done } = await reader.read();
|
|
if(value) {
|
|
await this.sendData(value);
|
|
}
|
|
streamIsDone = done;
|
|
}
|
|
}
|
|
|
|
public async writeToWebstream(writableStreamArg: WritableStream<T>) {
|
|
const writer = writableStreamArg.getWriter();
|
|
while(this.keepAlive) {
|
|
await writer.write(await this.fetchData());
|
|
}
|
|
}
|
|
|
|
public async close() {
|
|
this.keepAlive = false;
|
|
}
|
|
}
|