typedrequest/ts/typedrequest.classes.virtualstream.ts

347 lines
11 KiB
TypeScript
Raw Normal View History

2024-02-21 17:29:35 +00:00
import * as plugins from './plugins.js';
import type { TypedRouter } from './typedrequest.classes.typedrouter.js';
export interface ICommFunctions {
sendMethod?: (
2024-02-24 11:20:13 +00:00
sendPayload: plugins.typedRequestInterfaces.IStreamRequest
) => Promise<plugins.typedRequestInterfaces.IStreamRequest>;
2024-02-21 17:29:35 +00:00
typedrouter?: TypedRouter;
}
2024-02-24 11:20:13 +00:00
/**
* 1. A VirtualStream connects over the network
* 2. It is always paired to one other VirtualStream
* on the other side with the same streamId.
* 3. It has a Readable and Writable side.
* 4. The Writable side is Readable on the other side and vice versa.
*/
export class VirtualStream<T = ArrayBufferLike> {
2024-02-21 17:29:35 +00:00
// STATIC
public static encodePayloadForNetwork(
objectPayload: any,
2024-02-24 11:20:13 +00:00
commFunctions: ICommFunctions,
originalPayload?: any,
path = []
2024-02-21 17:29:35 +00:00
): any {
if (objectPayload instanceof VirtualStream) {
if (!objectPayload.side && commFunctions.sendMethod) {
objectPayload.side = 'requesting';
objectPayload.sendMethod = commFunctions.sendMethod;
}
if (!objectPayload.side && commFunctions.typedrouter) {
objectPayload.side = 'responding';
objectPayload.typedrouter = commFunctions.typedrouter;
2024-02-24 11:20:13 +00:00
commFunctions.typedrouter.registeredVirtualStreams.add(objectPayload);
}
if (!originalPayload.response || path.includes('response')) {
return {
_isVirtualStream: true,
streamId: objectPayload.streamId,
};
} else {
return {
_OBMITTED_VIRTUAL_STREAM: true,
reason: 'path is under .request: obmitted for deduplication reasons in response cycle.',
};
2024-02-21 17:29:35 +00:00
}
} else if (Array.isArray(objectPayload)) {
const returnArray = [];
for (const item of objectPayload) {
2024-02-24 11:20:13 +00:00
returnArray.push(
VirtualStream.encodePayloadForNetwork(
item,
commFunctions,
originalPayload || objectPayload,
path
)
);
2024-02-21 17:29:35 +00:00
}
return returnArray;
} else if (objectPayload !== null && typeof objectPayload === 'object') {
return Object.keys(objectPayload).reduce((acc, key) => {
2024-02-24 11:20:13 +00:00
path.push(key);
acc[key] = VirtualStream.encodePayloadForNetwork(
objectPayload[key],
commFunctions,
originalPayload || objectPayload,
path
);
2024-02-21 17:29:35 +00:00
return acc;
}, {});
} else {
return objectPayload;
}
}
public static decodePayloadFromNetwork(objectPayload: any, commFunctions: ICommFunctions): any {
if (objectPayload !== null && typeof objectPayload === 'object') {
if (objectPayload._isVirtualStream) {
const virtualStream = new VirtualStream();
virtualStream.streamId = objectPayload.streamId;
if (!virtualStream.side && commFunctions.sendMethod) {
virtualStream.side = 'requesting';
virtualStream.sendMethod = commFunctions.sendMethod;
}
if (!virtualStream.side && commFunctions.typedrouter) {
virtualStream.side = 'responding';
virtualStream.typedrouter = commFunctions.typedrouter;
2024-02-24 11:20:13 +00:00
commFunctions.typedrouter.registeredVirtualStreams.add(virtualStream);
2024-02-21 17:29:35 +00:00
}
return virtualStream;
} else if (Array.isArray(objectPayload)) {
const returnArray = [];
for (const item of objectPayload) {
returnArray.push(VirtualStream.decodePayloadFromNetwork(item, commFunctions));
}
return returnArray;
} else {
return Object.keys(objectPayload).reduce((acc, key) => {
acc[key] = VirtualStream.decodePayloadFromNetwork(objectPayload[key], commFunctions);
return acc;
}, {});
}
} else {
return objectPayload;
}
}
// INSTANCE
public side: 'requesting' | 'responding';
public streamId: string = plugins.isounique.uni();
2024-02-24 11:20:13 +00:00
// integration with typedrequest mechanics
public sendMethod: ICommFunctions['sendMethod'];
2024-02-21 17:29:35 +00:00
public typedrouter: TypedRouter;
2024-02-24 11:20:13 +00:00
// wether to keep the stream alive
private keepAlive = true;
private lastKeepAliveEvent = Date.now();
// backpressured arrays
private sendBackpressuredArray =
new plugins.lik.BackpressuredArray<T>(
16
);
private receiveBackpressuredArray =
new plugins.lik.BackpressuredArray<T>(
16
);
constructor() {
this.startKeepAliveLoop();
}
/**
* takes care of sending
*/
private async workOnQueue() {
if(this.side === 'requesting') {
// helper functions
const getFeedback = async () => {
const streamTr = await this.sendMethod({
method: '##VirtualStream##',
request: {
streamId: this.streamId,
cycleId: plugins.isounique.uni(),
cycle: 'request',
mainPurpose: 'feedback',
next: this.sendBackpressuredArray.data.length > 0,
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
},
response: null,
}).catch(() => {
console.log('stream ended immaturely');
this.keepAlive = false;
});
if (streamTr && streamTr.response) {
otherSideIsBackpressured = streamTr.response.backpressure
otherSideHasNext = streamTr.response.next;
}
}
// do work loop
let thisSideIsBackpressured = this.receiveBackpressuredArray.checkSpaceAvailable();
let otherSideHasNext = false;
let otherSideIsBackpressured = false;
while (this.sendBackpressuredArray.data.length > 0 || otherSideHasNext) {
const dataArg = this.sendBackpressuredArray.shift();
const streamTr = await this.sendMethod({
method: '##VirtualStream##',
request: {
streamId: this.streamId,
cycleId: plugins.isounique.uni(),
cycle: 'request',
mainPurpose: 'chunk',
backpressure: thisSideIsBackpressured,
next: this.sendBackpressuredArray.data.length > 0,
chunkData: dataArg,
},
response: null,
}).catch(() => {
console.log('stream ended immaturely');
this.keepAlive = false;
});
2024-02-21 17:29:35 +00:00
2024-02-24 11:20:13 +00:00
if (streamTr && streamTr.response && streamTr.response.chunkData) {
this.receiveBackpressuredArray.push(streamTr.response.chunkData);
}
thisSideIsBackpressured = this.receiveBackpressuredArray.checkSpaceAvailable();
// lets care about looping
otherSideHasNext = streamTr && streamTr.response && streamTr.response.next;
}
}
}
/**
* This method handles the stream only on the responding side
* @param streamTrArg
* @returns
*/
2024-02-21 17:29:35 +00:00
public async handleStreamTr(streamTrArg: plugins.typedRequestInterfaces.IStreamRequest) {
2024-02-24 11:20:13 +00:00
if (streamTrArg.request.keepAlive === true && this.keepAlive === true) {
this.lastKeepAliveEvent = Date.now();
} else if (streamTrArg.request.keepAlive === false) {
this.keepAlive = false;
}
// keepAlive handling
if (streamTrArg.request.mainPurpose === 'keepAlive') {
// if the main purpose is keepAlive, we answer with a keepAlive
streamTrArg.response = {
streamId: this.streamId,
cycleId: streamTrArg.request.cycleId,
cycle: 'response',
mainPurpose: 'keepAlive',
keepAlive: this.keepAlive,
next: this.sendBackpressuredArray.data.length > 0,
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
};
}
// feedback handling
if (streamTrArg.request.mainPurpose === 'feedback') {
streamTrArg.response = {
streamId: this.streamId,
cycleId: streamTrArg.request.cycleId,
cycle: 'response',
mainPurpose: 'feedback',
next: this.sendBackpressuredArray.data.length > 0,
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
};
streamTrArg.request = null;
}
// chunk handling
if (streamTrArg.request.mainPurpose === 'chunk') {
this.receiveBackpressuredArray.push(streamTrArg.request.chunkData);
if (this.sendBackpressuredArray.data.length > 0 && streamTrArg.response.backpressure === false) {
const dataArg = this.sendBackpressuredArray.shift();
streamTrArg.response = {
streamId: this.streamId,
cycleId: streamTrArg.request.cycleId,
cycle: 'response',
mainPurpose: 'chunk',
next: this.sendBackpressuredArray.data.length > 1,
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
chunkData: this.sendBackpressuredArray.shift(),
};
} else {
streamTrArg.response = {
streamId: this.streamId,
cycleId: streamTrArg.request.cycleId,
cycle: 'response',
mainPurpose: 'feedback',
next: this.sendBackpressuredArray.data.length > 0,
};
}
streamTrArg.request = null;
}
2024-02-21 17:29:35 +00:00
return streamTrArg;
}
/**
* closes the virtual stream
*/
2024-02-24 11:20:13 +00:00
public async cleanup() {
2024-02-21 17:29:35 +00:00
if (this.typedrouter) {
this.typedrouter.registeredVirtualStreams.remove(this);
}
}
2024-02-24 11:20:13 +00:00
/**
* a keepAlive loop that works across technologies
*/
private async startKeepAliveLoop() {
// initially wait for a second
await plugins.smartdelay.delayFor(0);
let counter = 0;
keepAliveLoop: while (this.keepAlive) {
const triggerResult = await this.triggerKeepAlive();
await plugins.smartdelay.delayFor(1000);
}
await plugins.smartdelay.delayFor(1000);
await this.cleanup();
console.log(`cleaned up for stream ${this.streamId}`);
}
private async triggerKeepAlive() {
if (this.side === 'requesting') {
console.log(`keepalive sent.`);
const streamTr = await this.sendMethod({
method: '##VirtualStream##',
request: {
streamId: this.streamId,
cycleId: plugins.isounique.uni(),
cycle: 'request',
mainPurpose: 'keepAlive',
keepAlive: true,
},
response: null,
}).catch(() => {
this.keepAlive = false;
});
// lets handle keepAlive
if (streamTr && streamTr.response && streamTr.response.keepAlive === false) {
this.keepAlive = false;
} else {
this.lastKeepAliveEvent = Date.now();
}
if (streamTr && streamTr.response && streamTr.response.next) {
this.workOnQueue();
}
}
if (Date.now() - this.lastKeepAliveEvent > 10000) {
console.log(`closing stream for ${this.streamId}`);
this.keepAlive = false;
}
}
// Data sending and receiving
public async sendData(dataArg: T): Promise<void> {
this.sendBackpressuredArray.push(dataArg);
this.workOnQueue();
await this.sendBackpressuredArray.waitForSpace();
}
public async fetchData(): Promise<T> {
if (this.receiveBackpressuredArray.hasSpace) {
// do something maybe?
}
await this.receiveBackpressuredArray.waitForItems();
const dataPackage = this.receiveBackpressuredArray.shift();
return dataPackage;
}
public pipeWebStream(webStream: any) {
// lets do the piping
webStream.on('data', (data: any) => {});
webStream.on('end', () => {});
webStream.on('error', (error: any) => {});
}
2024-02-21 17:29:35 +00:00
}