Compare commits

...

22 Commits

Author SHA1 Message Date
b003da7f59 3.0.17 2024-03-01 00:12:55 +01:00
735890bc3d fix(core): update 2024-03-01 00:12:55 +01:00
69035f49c8 3.0.16 2024-03-01 00:12:44 +01:00
5ec20ee526 fix(core): update 2024-03-01 00:12:43 +01:00
8b811ffd6b 3.0.15 2024-02-29 23:55:34 +01:00
35df3697c7 fix(core): update 2024-02-29 23:55:34 +01:00
8b4befc828 3.0.14 2024-02-29 23:12:52 +01:00
77dddd9157 fix(core): update 2024-02-29 23:12:51 +01:00
737f413324 3.0.13 2024-02-29 22:59:43 +01:00
e613937c43 fix(core): update 2024-02-29 22:59:42 +01:00
9c66752f8b 3.0.12 2024-02-29 22:59:32 +01:00
5c6922c710 fix(core): update 2024-02-29 22:59:31 +01:00
c8e4343ac7 3.0.11 2024-02-29 22:51:44 +01:00
924bc2c5a7 fix(core): update 2024-02-29 22:51:44 +01:00
2274afcd38 3.0.10 2024-02-29 22:47:54 +01:00
23aab2adf8 fix(core): update 2024-02-29 22:47:53 +01:00
90311ad65e 3.0.9 2024-02-29 19:50:26 +01:00
407e1383f8 fix(core): update 2024-02-29 19:50:25 +01:00
ad106909e2 3.0.8 2024-02-25 01:54:01 +01:00
b346da01f1 fix(core): update 2024-02-25 01:54:01 +01:00
51fedb270b 3.0.7 2024-02-24 12:20:13 +01:00
fd26b48ff6 fix(core): update 2024-02-24 12:20:13 +01:00
9 changed files with 821 additions and 838 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@api.global/typedrequest",
"version": "3.0.6",
"version": "3.0.17",
"private": false,
"description": "make typed requests towards apis",
"main": "dist_ts/index.js",
@ -14,22 +14,24 @@
"buildDocs": "tsdoc"
},
"devDependencies": {
"@api.global/typedserver": "^3.0.23",
"@api.global/typedserver": "^3.0.25",
"@git.zone/tsbuild": "^2.1.72",
"@git.zone/tsbundle": "^2.0.15",
"@git.zone/tsrun": "^1.2.44",
"@git.zone/tstest": "^1.0.86",
"@push.rocks/smartenv": "^5.0.12",
"@push.rocks/tapbundle": "^5.0.15",
"@types/node": "^20.11.19"
"@types/node": "^20.11.24"
},
"dependencies": {
"@api.global/typedrequest-interfaces": "^3.0.6",
"@api.global/typedrequest-interfaces": "^3.0.18",
"@push.rocks/isounique": "^1.0.5",
"@push.rocks/lik": "^6.0.12",
"@push.rocks/lik": "^6.0.14",
"@push.rocks/smartbuffer": "^1.0.7",
"@push.rocks/smartdelay": "^3.0.5",
"@push.rocks/smartpromise": "^4.0.3",
"@push.rocks/webrequest": "^3.0.34"
"@push.rocks/webrequest": "^3.0.34",
"@push.rocks/webstream": "^1.0.8"
},
"files": [
"ts/**/*",

1264
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@ -31,9 +31,17 @@ tap.test('should define a testHandler', async () => {
tap.test('should fire a request', async () => {
const typedRequest = new typedrequest.TypedRequest<ITestReqRes>(
'http://localhost:3000/testroute',
'http://localhost:3000/typedrequest',
'hi'
);
const result = await typedRequest.fire({
name: 'yes',
})
console.log(result);
});
tap.test('test', async (tools) => {
await tools.delayFor(5000);
})
tap.start();

View File

@ -2,6 +2,7 @@ import { expect, tap } from '@push.rocks/tapbundle';
import * as typedserver from '@api.global/typedserver';
import * as typedrequest from '../ts/index.js';
import * as typedrequestInterfaces from '@api.global/typedrequest-interfaces';
let testServer: typedserver.servertools.Server;
let testTypedRouter: typedrequest.TypedRouter;
@ -21,10 +22,10 @@ interface ITestReqRes {
interface ITestStream {
method: 'handleStream';
request: {
requestStream: any;
requestStream: typedrequestInterfaces.IVirtualStream;
};
response: {
responseStream: any;
responseStream: typedrequestInterfaces.IVirtualStream;
};
}
@ -72,11 +73,16 @@ tap.test('should fire a request', async () => {
});
tap.test('should allow VirtualStreams', async () => {
const newRequestingVS = new typedrequest.VirtualStream();
const newRespondingVS = new typedrequest.VirtualStream();
let generatedRequestingVS: typedrequestInterfaces.IVirtualStream;
let generatedRespondingVS: typedrequestInterfaces.IVirtualStream;
testTypedRouter.addTypedHandler(new typedrequest.TypedHandler<ITestStream>('handleStream', async (reqArg) => {
console.log('hey there');
console.log(reqArg.requestStream);
generatedRequestingVS = reqArg.requestStream;
return {
responseStream: new typedrequest.VirtualStream(),
responseStream: newRespondingVS,
};
}));
const typedRequest = new typedrequest.TypedRequest<ITestStream>(
@ -84,13 +90,20 @@ tap.test('should allow VirtualStreams', async () => {
'handleStream'
);
const response = await typedRequest.fire({
requestStream: new typedrequest.VirtualStream(),
requestStream: newRequestingVS,
});
console.log(response.responseStream);
})
tap.test('should end the server', async () => {
newRequestingVS.sendData(Buffer.from('hello'));
const data = await generatedRequestingVS.fetchData();
const decodedData = data.toString();
expect(data.toString()).toEqual('hello');
});
tap.test('should end the server', async (toolsArg) => {
await toolsArg.delayFor(1000);
await testServer.stop();
setTimeout(() => process.exit(0), 100);
});
tap.start();

View File

@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@api.global/typedrequest',
version: '3.0.6',
version: '3.0.17',
description: 'make typed requests towards apis'
}

View File

@ -6,8 +6,10 @@ export { typedRequestInterfaces };
// pushrocks scope
import * as isounique from '@push.rocks/isounique';
import * as lik from '@push.rocks/lik';
import * as smartbuffer from '@push.rocks/smartbuffer';
import * as smartdelay from '@push.rocks/smartdelay';
import * as smartpromise from '@push.rocks/smartpromise';
import * as webrequest from '@push.rocks/webrequest';
import * as webstream from '@push.rocks/webstream';
export { isounique, lik, smartdelay, smartpromise, webrequest };
export { isounique, lik, smartbuffer, smartdelay, smartpromise, webrequest, webstream };

View File

@ -20,7 +20,6 @@ export class TypedRequest<T extends plugins.typedRequestInterfaces.ITypedRequest
public method: string;
/**
* note the overloading is thought to deal with promises
* @param postEndPointArg
* @param methodArg
*/
@ -49,8 +48,8 @@ export class TypedRequest<T extends plugins.typedRequestInterfaces.ITypedRequest
// lets preprocess the payload
payloadSending = VirtualStream.encodePayloadForNetwork(payloadSending, {
sendMethod: (payloadArg: plugins.typedRequestInterfaces.ITypedRequest) => {
return this.postTrObject(payloadArg);
sendMethod: (payloadArg: plugins.typedRequestInterfaces.IStreamRequest) => {
return this.postTrObject(payloadArg) as Promise<plugins.typedRequestInterfaces.IStreamRequest>;
}
});
@ -59,8 +58,8 @@ export class TypedRequest<T extends plugins.typedRequestInterfaces.ITypedRequest
// lets preprocess the response
payloadReceiving = VirtualStream.decodePayloadFromNetwork(payloadReceiving, {
sendMethod: (payloadArg: plugins.typedRequestInterfaces.ITypedRequest) => {
return this.postTrObject(payloadArg);
sendMethod: (payloadArg: plugins.typedRequestInterfaces.IStreamRequest) => {
return this.postTrObject(payloadArg) as Promise<plugins.typedRequestInterfaces.IStreamRequest>;
}
});
return payloadReceiving.response;

View File

@ -14,7 +14,7 @@ export class TypedRouter {
public handlerMap = new plugins.lik.ObjectMap<
TypedHandler<any & plugins.typedRequestInterfaces.ITypedRequest>
>();
public registeredVirtualStreams = new plugins.lik.ObjectMap<VirtualStream>();
public registeredVirtualStreams = new plugins.lik.ObjectMap<VirtualStream<any>>();
public fireEventInterestMap = new plugins.lik.InterestMap<
string,
@ -102,7 +102,8 @@ export class TypedRouter {
// lets do stream processing
if (typedRequestArg.method === '##VirtualStream##') {
const result = await this.handleStreamTypedRequest(typedRequestArg as plugins.typedRequestInterfaces.IStreamRequest);
const result: any = await this.handleStreamTypedRequest(typedRequestArg as plugins.typedRequestInterfaces.IStreamRequest);
result.localData = null;
return result as T;
}
@ -119,6 +120,7 @@ export class TypedRouter {
typedRequestArg.correlation.phase = 'response';
// encode again before handing back
typedRequestArg.localData = null;
typedRequestArg = VirtualStream.encodePayloadForNetwork(typedRequestArg, {
typedrouter: this,
});
@ -152,6 +154,13 @@ export class TypedRouter {
const relevantVirtualStream = await this.registeredVirtualStreams.find(async virtualStreamArg => {
return virtualStreamArg.streamId === streamTrArg.request.streamId;
});
if (!relevantVirtualStream) {
console.log(`no relevant virtual stream found for stream with id ${streamTrArg.request.streamId}`);
console.log(this.registeredVirtualStreams.getArray());
return streamTrArg;
} else {
console.log(`success: found relevant virtual stream with id ${streamTrArg.request.streamId}`);
}
const result = await relevantVirtualStream.handleStreamTr(streamTrArg);
return result;
}

View File

@ -3,17 +3,32 @@ import type { TypedRouter } from './typedrequest.classes.typedrouter.js';
export interface ICommFunctions {
sendMethod?: (
sendPayload: plugins.typedRequestInterfaces.ITypedRequest
) => Promise<plugins.typedRequestInterfaces.ITypedRequest>;
sendPayload: plugins.typedRequestInterfaces.IStreamRequest
) => Promise<plugins.typedRequestInterfaces.IStreamRequest>;
typedrouter?: TypedRouter;
}
export class VirtualStream {
/**
* 1. A VirtualStream connects over the network
* 2. It is always paired to one other VirtualStream
* on the other side with the same streamId.
* 3. It has a Readable and Writable side.
* 4. The Writable side is Readable on the other side and vice versa.
*/
export class VirtualStream<T = ArrayBufferLike> implements plugins.typedRequestInterfaces.IVirtualStream<T> {
// STATIC
public static encodePayloadForNetwork(
objectPayload: any,
commFunctions: ICommFunctions
commFunctions: ICommFunctions,
originalPayload?: any,
path = []
): any {
if (!objectPayload) {
return objectPayload;
}
if (plugins.smartbuffer.isBufferLike(objectPayload)) {
return objectPayload;
}
if (objectPayload instanceof VirtualStream) {
if (!objectPayload.side && commFunctions.sendMethod) {
objectPayload.side = 'requesting';
@ -22,20 +37,40 @@ export class VirtualStream {
if (!objectPayload.side && commFunctions.typedrouter) {
objectPayload.side = 'responding';
objectPayload.typedrouter = commFunctions.typedrouter;
commFunctions.typedrouter.registeredVirtualStreams.add(objectPayload);
}
if (!originalPayload.response || path.includes('response')) {
objectPayload.startKeepAliveLoop();
return {
_isVirtualStream: true,
streamId: objectPayload.streamId,
};
} else {
return {
_OBMITTED_VIRTUAL_STREAM: true,
reason: 'path is under .request: obmitted for deduplication reasons in response cycle.',
};
}
return {
_isVirtualStream: true,
streamId: objectPayload.streamId,
};
} else if (Array.isArray(objectPayload)) {
const returnArray = [];
for (const item of objectPayload) {
returnArray.push(VirtualStream.encodePayloadForNetwork(item, commFunctions));
}
return returnArray;
// For arrays, we recurse over each item.
return objectPayload.map((item, index) =>
VirtualStream.encodePayloadForNetwork(
item,
commFunctions,
originalPayload || objectPayload,
path.concat(String(index)) // Convert index to string and concatenate to path
)
);
} else if (objectPayload !== null && typeof objectPayload === 'object') {
return Object.keys(objectPayload).reduce((acc, key) => {
acc[key] = VirtualStream.encodePayloadForNetwork(objectPayload[key], commFunctions);
// For objects, we recurse over each key-value pair.
return Object.entries(objectPayload).reduce((acc, [key, value]) => {
const newPath = path.concat(key); // Concatenate the new key to the path
acc[key] = VirtualStream.encodePayloadForNetwork(
value,
commFunctions,
originalPayload || objectPayload,
newPath
);
return acc;
}, {});
} else {
@ -44,6 +79,9 @@ export class VirtualStream {
}
public static decodePayloadFromNetwork(objectPayload: any, commFunctions: ICommFunctions): any {
if (plugins.smartbuffer.isBufferLike(objectPayload)) {
return objectPayload;
}
if (objectPayload !== null && typeof objectPayload === 'object') {
if (objectPayload._isVirtualStream) {
const virtualStream = new VirtualStream();
@ -55,7 +93,9 @@ export class VirtualStream {
if (!virtualStream.side && commFunctions.typedrouter) {
virtualStream.side = 'responding';
virtualStream.typedrouter = commFunctions.typedrouter;
commFunctions.typedrouter.registeredVirtualStreams.add(virtualStream);
}
virtualStream.startKeepAliveLoop();
return virtualStream;
} else if (Array.isArray(objectPayload)) {
const returnArray = [];
@ -78,23 +118,259 @@ export class VirtualStream {
public side: 'requesting' | 'responding';
public streamId: string = plugins.isounique.uni();
// integration with typedrequest mechanics
public sendMethod: ICommFunctions['sendMethod'];
public typedrouter: TypedRouter;
public sendMethod: (
sendPayloadArg: plugins.typedRequestInterfaces.ITypedRequest
) => Promise<plugins.typedRequestInterfaces.ITypedRequest>;
// wether to keep the stream alive
private keepAlive = true;
private lastKeepAliveEvent: number;
// backpressured arrays
private sendBackpressuredArray =
new plugins.lik.BackpressuredArray<T>(
16
);
private receiveBackpressuredArray =
new plugins.lik.BackpressuredArray<T>(
16
);
constructor() {}
public async handleStreamTr(streamTrArg: plugins.typedRequestInterfaces.IStreamRequest) {
return streamTrArg;
/**
* takes care of sending
*/
private async workOnQueue() {
if(this.side === 'requesting') {
let thisSideIsBackpressured = !this.receiveBackpressuredArray.checkSpaceAvailable();
let otherSideHasNext = false;
let otherSideIsBackpressured = false;
// helper functions
const getFeedback = async () => {
const streamTr = await this.sendMethod({
method: '##VirtualStream##',
request: {
streamId: this.streamId,
cycleId: plugins.isounique.uni(),
cycle: 'request',
mainPurpose: 'feedback',
next: this.sendBackpressuredArray.data.length > 0,
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
},
response: null,
}).catch(() => {
console.log('stream ended immaturely');
this.keepAlive = false;
});
if (streamTr && streamTr.response) {
otherSideIsBackpressured = streamTr.response.backpressure
otherSideHasNext = streamTr.response.next;
}
}
await getFeedback();
// do work loop
while (this.sendBackpressuredArray.data.length > 0 || otherSideHasNext) {
let dataArg: typeof this.sendBackpressuredArray.data[0];
if (this.sendBackpressuredArray.data.length > 0) {
dataArg = this.sendBackpressuredArray.shift();
}
let streamTr: plugins.typedRequestInterfaces.IStreamRequest;
streamTr = await this.sendMethod({
method: '##VirtualStream##',
request: {
streamId: this.streamId,
cycleId: plugins.isounique.uni(),
cycle: 'request',
mainPurpose: dataArg ? 'chunk' : 'read',
backpressure: thisSideIsBackpressured,
next: this.sendBackpressuredArray.data.length > 0,
...dataArg ? { chunkData: dataArg } : {},
},
response: null,
}).catch(() => {
console.log('stream ended immaturely');
this.keepAlive = false;
return null;
});
if (streamTr && streamTr.response && streamTr.response.chunkData) {
this.receiveBackpressuredArray.push(streamTr.response.chunkData);
}
thisSideIsBackpressured = this.receiveBackpressuredArray.checkSpaceAvailable();
// lets care about looping
otherSideHasNext = streamTr && streamTr.response && streamTr.response.next;
}
}
}
/**
* This method handles the stream only on the responding side
* @param streamTrArg
* @returns
*/
public async handleStreamTr(streamTrArg: plugins.typedRequestInterfaces.IStreamRequest) {
if (streamTrArg.request.keepAlive === true && this.keepAlive === true) {
this.lastKeepAliveEvent = Date.now();
} else if (streamTrArg.request.keepAlive === false) {
this.keepAlive = false;
}
// keepAlive handling
if (streamTrArg.request.mainPurpose === 'keepAlive') {
// if the main purpose is keepAlive, we answer with a keepAlive
streamTrArg.response = {
streamId: this.streamId,
cycleId: streamTrArg.request.cycleId,
cycle: 'response',
mainPurpose: 'keepAlive',
keepAlive: this.keepAlive,
next: this.sendBackpressuredArray.data.length > 0,
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
};
}
// feedback handling
if (streamTrArg.request.mainPurpose === 'feedback') {
streamTrArg.response = {
streamId: this.streamId,
cycleId: streamTrArg.request.cycleId,
cycle: 'response',
mainPurpose: 'feedback',
next: this.sendBackpressuredArray.data.length > 0,
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
};
}
// chunk handling
if (streamTrArg.request.mainPurpose === 'chunk') {
this.receiveBackpressuredArray.push(streamTrArg.request.chunkData);
if (this.sendBackpressuredArray.data.length > 0 && streamTrArg.response.backpressure === false) {
const dataArg = this.sendBackpressuredArray.shift();
streamTrArg.response = {
streamId: this.streamId,
cycleId: streamTrArg.request.cycleId,
cycle: 'response',
mainPurpose: 'chunk',
next: this.sendBackpressuredArray.data.length > 1,
backpressure: this.receiveBackpressuredArray.checkSpaceAvailable(),
chunkData: this.sendBackpressuredArray.shift(),
};
} else {
streamTrArg.response = {
streamId: this.streamId,
cycleId: streamTrArg.request.cycleId,
cycle: 'response',
mainPurpose: 'feedback',
next: this.sendBackpressuredArray.data.length > 0,
};
}
streamTrArg.request = null;
}
return streamTrArg;
}
// lifecycle methods
/**
* closes the virtual stream
*/
close() {
public async cleanup() {
if (this.typedrouter) {
this.typedrouter.registeredVirtualStreams.remove(this);
}
}
/**
* a keepAlive loop that works across technologies
*/
private async startKeepAliveLoop() {
// initially wait for a second
await plugins.smartdelay.delayFor(0);
let counter = 0;
keepAliveLoop: while (this.keepAlive) {
const triggerResult = await this.triggerKeepAlive();
await plugins.smartdelay.delayFor(1000);
}
await plugins.smartdelay.delayFor(1000);
await this.cleanup();
console.log(`cleaned up for stream ${this.streamId}`);
}
private async triggerKeepAlive() {
if (this.side === 'requesting') {
console.log(`keepalive sent.`);
const streamTr = await this.sendMethod({
method: '##VirtualStream##',
request: {
streamId: this.streamId,
cycleId: plugins.isounique.uni(),
cycle: 'request',
mainPurpose: 'keepAlive',
keepAlive: true,
},
response: null,
}).catch(() => {
this.keepAlive = false;
});
// lets handle keepAlive
if (streamTr && streamTr.response && streamTr.response.keepAlive === false) {
this.keepAlive = false;
} else {
this.lastKeepAliveEvent = Date.now();
}
if (streamTr && streamTr.response && streamTr.response.next) {
this.workOnQueue();
}
}
if (Date.now() - this.lastKeepAliveEvent > 10000) {
console.log(`closing stream for ${this.streamId}`);
this.keepAlive = false;
}
}
// Data sending and receiving
public async sendData(dataArg: T): Promise<void> {
this.sendBackpressuredArray.push(dataArg);
this.workOnQueue();
await this.sendBackpressuredArray.waitForSpace();
}
public async fetchData(): Promise<T> {
if (this.receiveBackpressuredArray.hasSpace) {
// do something maybe?
}
await this.receiveBackpressuredArray.waitForItems();
const dataPackage = this.receiveBackpressuredArray.shift();
return dataPackage;
}
/**
* reads from a Readable and sends it to the other side
* @param readableStreamArg
*/
public async readFromWebstream(readableStreamArg: ReadableStream<T>) {
const reader = readableStreamArg.getReader();
let streamIsDone = false;
while(!streamIsDone) {
const { value, done } = await reader.read();
if(value) {
await this.sendData(value);
}
streamIsDone = done;
}
}
public async writeToWebstream(writableStreamArg: WritableStream<T>) {
const writer = writableStreamArg.getWriter();
while(this.keepAlive) {
await writer.write(await this.fetchData());
}
}
}