Compare commits
6 Commits
Author | SHA1 | Date | |
---|---|---|---|
bd849d347d | |||
f2a85d4719 | |||
4e7c28ac83 | |||
243f1a70e9 | |||
b5a6517756 | |||
e12b128619 |
2
package-lock.json
generated
2
package-lock.json
generated
@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@pushrocks/smartuniverse",
|
"name": "@pushrocks/smartuniverse",
|
||||||
"version": "1.0.81",
|
"version": "1.0.84",
|
||||||
"lockfileVersion": 1,
|
"lockfileVersion": 1,
|
||||||
"requires": true,
|
"requires": true,
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@pushrocks/smartuniverse",
|
"name": "@pushrocks/smartuniverse",
|
||||||
"version": "1.0.81",
|
"version": "1.0.84",
|
||||||
"private": false,
|
"private": false,
|
||||||
"description": "messaging service for your micro services",
|
"description": "messaging service for your micro services",
|
||||||
"main": "dist/index.js",
|
"main": "dist/index.js",
|
||||||
|
@ -14,7 +14,11 @@ export interface ICombinatorPayload<T extends plugins.typedrequestInterfaces.ITy
|
|||||||
* needed for tying responses to requests
|
* needed for tying responses to requests
|
||||||
*/
|
*/
|
||||||
id: string;
|
id: string;
|
||||||
typedRequestPayload: T;
|
typedRequestPayload: {
|
||||||
|
method: T['method'],
|
||||||
|
request : T['request'],
|
||||||
|
response: T['response']
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
export class ReactionRequest<T extends plugins.typedrequestInterfaces.ITypedRequest> {
|
export class ReactionRequest<T extends plugins.typedrequestInterfaces.ITypedRequest> {
|
||||||
@ -24,7 +28,7 @@ export class ReactionRequest<T extends plugins.typedrequestInterfaces.ITypedRequ
|
|||||||
this.method = optionsArg.method;
|
this.method = optionsArg.method;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async fire(channelsArg: Array<UniverseChannel | ClientUniverseChannel>, timeoutMillisArg=60000) {
|
public async fire(channelsArg: Array<UniverseChannel | ClientUniverseChannel>, requestDataArg: T['request'], timeoutMillisArg=60000) {
|
||||||
const subscriptionMap = new plugins.lik.Objectmap<plugins.smartrx.rxjs.Subscription>();
|
const subscriptionMap = new plugins.lik.Objectmap<plugins.smartrx.rxjs.Subscription>();
|
||||||
const reactionResult = new ReactionResult<T>();
|
const reactionResult = new ReactionResult<T>();
|
||||||
const requestId = plugins.smartunique.shortId();
|
const requestId = plugins.smartunique.shortId();
|
||||||
@ -38,6 +42,18 @@ export class ReactionRequest<T extends plugins.typedrequestInterfaces.ITypedRequ
|
|||||||
reactionResult.pushReactionResponse(payload.typedRequestPayload.response);
|
reactionResult.pushReactionResponse(payload.typedRequestPayload.response);
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
const payload: ICombinatorPayload<T> = {
|
||||||
|
id: requestId,
|
||||||
|
typedRequestPayload: {
|
||||||
|
method: this.method,
|
||||||
|
request: requestDataArg,
|
||||||
|
response: null
|
||||||
|
}
|
||||||
|
};
|
||||||
|
channel.sendMessage({
|
||||||
|
messageText: 'reactionRequest',
|
||||||
|
payload
|
||||||
|
});
|
||||||
}
|
}
|
||||||
plugins.smartdelay.delayFor(timeoutMillisArg).then(async () => {
|
plugins.smartdelay.delayFor(timeoutMillisArg).then(async () => {
|
||||||
await subscriptionMap.forEach(subscriptionArg => {
|
await subscriptionMap.forEach(subscriptionArg => {
|
||||||
|
@ -6,26 +6,54 @@ import { ClientUniverseChannel } from './smartuniverse.classes.clientuniversecha
|
|||||||
import { UniverseMessage } from './smartuniverse.classes.universemessage';
|
import { UniverseMessage } from './smartuniverse.classes.universemessage';
|
||||||
import { ClientUniverseMessage } from './smartuniverse.classes.clientuniversemessage';
|
import { ClientUniverseMessage } from './smartuniverse.classes.clientuniversemessage';
|
||||||
|
|
||||||
export interface IReactionResponseConstructorOptions<T extends plugins.typedrequestInterfaces.ITypedRequest> {
|
export type TReactionResponseFuncDef<T extends plugins.typedrequestInterfaces.ITypedRequest> = (dataArg: T['request']) => Promise<T['response']>;
|
||||||
|
|
||||||
|
export interface IReactionResponseConstructorOptions<
|
||||||
|
T extends plugins.typedrequestInterfaces.ITypedRequest
|
||||||
|
> {
|
||||||
method: T['method'];
|
method: T['method'];
|
||||||
channels: Array<UniverseChannel | ClientUniverseChannel>;
|
channels: Array<UniverseChannel | ClientUniverseChannel>;
|
||||||
|
funcDef: TReactionResponseFuncDef<T>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
export class ReactionResponse<T extends plugins.typedrequestInterfaces.ITypedRequest> {
|
export class ReactionResponse<T extends plugins.typedrequestInterfaces.ITypedRequest> {
|
||||||
public method: T['method'];
|
public method: T['method'];
|
||||||
public channels = new plugins.lik.Objectmap<UniverseChannel | ClientUniverseChannel>();
|
public channels = new plugins.lik.Objectmap<UniverseChannel | ClientUniverseChannel>();
|
||||||
|
public funcDef: TReactionResponseFuncDef<T>;
|
||||||
|
|
||||||
constructor(optionsArg: IReactionResponseConstructorOptions<T>) {
|
constructor(optionsArg: IReactionResponseConstructorOptions<T>) {
|
||||||
|
this.method = optionsArg.method;
|
||||||
this.channels.addArray(optionsArg.channels);
|
this.channels.addArray(optionsArg.channels);
|
||||||
|
this.funcDef = optionsArg.funcDef;
|
||||||
for (const channel of this.channels.getArray()) {
|
for (const channel of this.channels.getArray()) {
|
||||||
channel.subscribe(messageArg => {
|
channel.subscribe(messageArg => {
|
||||||
this.processMessageForReaction(messageArg);
|
this.processMessageForReaction(channel, messageArg);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private processMessageForReaction(messageArg: UniverseMessage<ICombinatorPayload<T>> | ClientUniverseMessage<ICombinatorPayload<T>>) {
|
private async processMessageForReaction(
|
||||||
|
channelArg: UniverseChannel | ClientUniverseChannel,
|
||||||
|
messageArg:
|
||||||
|
| UniverseMessage<ICombinatorPayload<T>>
|
||||||
|
| ClientUniverseMessage<ICombinatorPayload<T>>
|
||||||
|
) {
|
||||||
|
if (
|
||||||
|
messageArg.messageText === 'reactionRequest' &&
|
||||||
|
messageArg.payload.typedRequestPayload.method === this.method
|
||||||
|
) {
|
||||||
|
const response: T['response'] = await this.funcDef(messageArg.payload.typedRequestPayload.request);
|
||||||
|
const payload: ICombinatorPayload<T> = {
|
||||||
|
...messageArg.payload,
|
||||||
|
typedRequestPayload: {
|
||||||
|
...messageArg.payload.typedRequestPayload,
|
||||||
|
response
|
||||||
|
}
|
||||||
|
};
|
||||||
|
channelArg.sendMessage({
|
||||||
|
messageText: 'reactionResponse',
|
||||||
|
payload
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -16,11 +16,26 @@ export class ReactionResult<T extends plugins.typedrequestInterfaces.ITypedReque
|
|||||||
return this.resultSubject.subscribe(observerArg);
|
return this.resultSubject.subscribe(observerArg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* gets the end result as an array of all results
|
||||||
|
*/
|
||||||
public async getEndResult() {
|
public async getEndResult() {
|
||||||
const result = await this.completeDeferred.promise;
|
const result = await this.completeDeferred.promise;
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* if there is a single respondant, or you are only interested in the first result
|
||||||
|
*/
|
||||||
|
public async getFirstResult() {
|
||||||
|
const done = plugins.smartpromise.defer<T['response']>();
|
||||||
|
const subscription = this.resultSubject.subscribe(result => {
|
||||||
|
done.resolve(result);
|
||||||
|
subscription.unsubscribe();
|
||||||
|
});
|
||||||
|
return await done.promise;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* push a reactionResponse
|
* push a reactionResponse
|
||||||
*/
|
*/
|
||||||
|
Reference in New Issue
Block a user