Compare commits

...

6 Commits

Author SHA1 Message Date
bd849d347d 1.0.84 2019-09-17 12:46:35 +02:00
f2a85d4719 fix(core): update 2019-09-17 12:46:35 +02:00
4e7c28ac83 1.0.83 2019-09-11 14:57:36 +02:00
243f1a70e9 fix(core): update 2019-09-11 14:57:36 +02:00
b5a6517756 1.0.82 2019-09-11 10:11:34 +02:00
e12b128619 fix(core): update 2019-09-11 10:11:34 +02:00
5 changed files with 68 additions and 9 deletions

2
package-lock.json generated
View File

@ -1,6 +1,6 @@
{ {
"name": "@pushrocks/smartuniverse", "name": "@pushrocks/smartuniverse",
"version": "1.0.81", "version": "1.0.84",
"lockfileVersion": 1, "lockfileVersion": 1,
"requires": true, "requires": true,
"dependencies": { "dependencies": {

View File

@ -1,6 +1,6 @@
{ {
"name": "@pushrocks/smartuniverse", "name": "@pushrocks/smartuniverse",
"version": "1.0.81", "version": "1.0.84",
"private": false, "private": false,
"description": "messaging service for your micro services", "description": "messaging service for your micro services",
"main": "dist/index.js", "main": "dist/index.js",

View File

@ -14,7 +14,11 @@ export interface ICombinatorPayload<T extends plugins.typedrequestInterfaces.ITy
* needed for tying responses to requests * needed for tying responses to requests
*/ */
id: string; id: string;
typedRequestPayload: T; typedRequestPayload: {
method: T['method'],
request : T['request'],
response: T['response']
};
} }
export class ReactionRequest<T extends plugins.typedrequestInterfaces.ITypedRequest> { export class ReactionRequest<T extends plugins.typedrequestInterfaces.ITypedRequest> {
@ -24,7 +28,7 @@ export class ReactionRequest<T extends plugins.typedrequestInterfaces.ITypedRequ
this.method = optionsArg.method; this.method = optionsArg.method;
} }
public async fire(channelsArg: Array<UniverseChannel | ClientUniverseChannel>, timeoutMillisArg=60000) { public async fire(channelsArg: Array<UniverseChannel | ClientUniverseChannel>, requestDataArg: T['request'], timeoutMillisArg=60000) {
const subscriptionMap = new plugins.lik.Objectmap<plugins.smartrx.rxjs.Subscription>(); const subscriptionMap = new plugins.lik.Objectmap<plugins.smartrx.rxjs.Subscription>();
const reactionResult = new ReactionResult<T>(); const reactionResult = new ReactionResult<T>();
const requestId = plugins.smartunique.shortId(); const requestId = plugins.smartunique.shortId();
@ -38,6 +42,18 @@ export class ReactionRequest<T extends plugins.typedrequestInterfaces.ITypedRequ
reactionResult.pushReactionResponse(payload.typedRequestPayload.response); reactionResult.pushReactionResponse(payload.typedRequestPayload.response);
} }
})); }));
const payload: ICombinatorPayload<T> = {
id: requestId,
typedRequestPayload: {
method: this.method,
request: requestDataArg,
response: null
}
};
channel.sendMessage({
messageText: 'reactionRequest',
payload
});
} }
plugins.smartdelay.delayFor(timeoutMillisArg).then(async () => { plugins.smartdelay.delayFor(timeoutMillisArg).then(async () => {
await subscriptionMap.forEach(subscriptionArg => { await subscriptionMap.forEach(subscriptionArg => {

View File

@ -6,26 +6,54 @@ import { ClientUniverseChannel } from './smartuniverse.classes.clientuniversecha
import { UniverseMessage } from './smartuniverse.classes.universemessage'; import { UniverseMessage } from './smartuniverse.classes.universemessage';
import { ClientUniverseMessage } from './smartuniverse.classes.clientuniversemessage'; import { ClientUniverseMessage } from './smartuniverse.classes.clientuniversemessage';
export interface IReactionResponseConstructorOptions<T extends plugins.typedrequestInterfaces.ITypedRequest> { export type TReactionResponseFuncDef<T extends plugins.typedrequestInterfaces.ITypedRequest> = (dataArg: T['request']) => Promise<T['response']>;
export interface IReactionResponseConstructorOptions<
T extends plugins.typedrequestInterfaces.ITypedRequest
> {
method: T['method']; method: T['method'];
channels: Array<UniverseChannel | ClientUniverseChannel>; channels: Array<UniverseChannel | ClientUniverseChannel>;
funcDef: TReactionResponseFuncDef<T>;
} }
export class ReactionResponse<T extends plugins.typedrequestInterfaces.ITypedRequest> { export class ReactionResponse<T extends plugins.typedrequestInterfaces.ITypedRequest> {
public method: T['method']; public method: T['method'];
public channels = new plugins.lik.Objectmap<UniverseChannel | ClientUniverseChannel>(); public channels = new plugins.lik.Objectmap<UniverseChannel | ClientUniverseChannel>();
public funcDef: TReactionResponseFuncDef<T>;
constructor(optionsArg: IReactionResponseConstructorOptions<T>) { constructor(optionsArg: IReactionResponseConstructorOptions<T>) {
this.method = optionsArg.method;
this.channels.addArray(optionsArg.channels); this.channels.addArray(optionsArg.channels);
this.funcDef = optionsArg.funcDef;
for (const channel of this.channels.getArray()) { for (const channel of this.channels.getArray()) {
channel.subscribe(messageArg => { channel.subscribe(messageArg => {
this.processMessageForReaction(messageArg); this.processMessageForReaction(channel, messageArg);
}); });
} }
} }
private processMessageForReaction(messageArg: UniverseMessage<ICombinatorPayload<T>> | ClientUniverseMessage<ICombinatorPayload<T>>) { private async processMessageForReaction(
channelArg: UniverseChannel | ClientUniverseChannel,
messageArg:
| UniverseMessage<ICombinatorPayload<T>>
| ClientUniverseMessage<ICombinatorPayload<T>>
) {
if (
messageArg.messageText === 'reactionRequest' &&
messageArg.payload.typedRequestPayload.method === this.method
) {
const response: T['response'] = await this.funcDef(messageArg.payload.typedRequestPayload.request);
const payload: ICombinatorPayload<T> = {
...messageArg.payload,
typedRequestPayload: {
...messageArg.payload.typedRequestPayload,
response
}
};
channelArg.sendMessage({
messageText: 'reactionResponse',
payload
});
}
} }
} }

View File

@ -16,11 +16,26 @@ export class ReactionResult<T extends plugins.typedrequestInterfaces.ITypedReque
return this.resultSubject.subscribe(observerArg); return this.resultSubject.subscribe(observerArg);
} }
/**
* gets the end result as an array of all results
*/
public async getEndResult() { public async getEndResult() {
const result = await this.completeDeferred.promise; const result = await this.completeDeferred.promise;
return result; return result;
} }
/**
* if there is a single respondant, or you are only interested in the first result
*/
public async getFirstResult() {
const done = plugins.smartpromise.defer<T['response']>();
const subscription = this.resultSubject.subscribe(result => {
done.resolve(result);
subscription.unsubscribe();
});
return await done.promise;
}
/** /**
* push a reactionResponse * push a reactionResponse
*/ */