diff --git a/ts/interfaces/universemessage.interfaces.ts b/ts/interfaces/universemessage.interfaces.ts index 775b42d..a3d5b55 100644 --- a/ts/interfaces/universemessage.interfaces.ts +++ b/ts/interfaces/universemessage.interfaces.ts @@ -1,7 +1,6 @@ export interface IMessageCreator { messageText: string; payload?: string | number | any; - payloadStringType?: 'Buffer' | 'string' | 'object'; } /** diff --git a/ts/smartuniverse.classes.clientuniverse.ts b/ts/smartuniverse.classes.clientuniverse.ts index d60d9a9..92344ac 100644 --- a/ts/smartuniverse.classes.clientuniverse.ts +++ b/ts/smartuniverse.classes.clientuniverse.ts @@ -21,7 +21,7 @@ export interface IClientOptions { export class ClientUniverse { public options; public smartsocketClient: plugins.smartsocket.SmartsocketClient; - public observableIntake: plugins.smartrx.ObservableIntake; + public observableIntake: plugins.smartrx.ObservableIntake>; public clientUniverseCache = new ClientUniverseCache(); constructor(optionsArg: IClientOptions) { diff --git a/ts/smartuniverse.classes.clientuniversechannel.ts b/ts/smartuniverse.classes.clientuniversechannel.ts index 7add763..9ada8c5 100644 --- a/ts/smartuniverse.classes.clientuniversechannel.ts +++ b/ts/smartuniverse.classes.clientuniversechannel.ts @@ -38,7 +38,7 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel { public name: string; public passphrase: string; public status: 'subscribed' | 'unsubscribed' = 'unsubscribed'; - private subject = new plugins.smartrx.rxjs.Subject(); + private subject = new plugins.smartrx.rxjs.Subject>(); // refs public clientUniverseRef: ClientUniverse; @@ -53,7 +53,7 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel { * subscribes to a channel * tells the universe about this instances interest into a channel */ - public subscribe(observingFunctionArg: (messageArg: ClientUniverseMessage) => void) { + public subscribe(observingFunctionArg: (messageArg: ClientUniverseMessage) => void) { return this.subject.subscribe( messageArg => { @@ -76,7 +76,7 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel { } } - public async emitMessageLocally(messageArg: ClientUniverseMessage) { + public async emitMessageLocally(messageArg: ClientUniverseMessage) { this.subject.next(messageArg); } @@ -92,8 +92,7 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel { passphrase: this.passphrase, targetChannelName: this.name, messageText: messageArg.messageText, - payload: messageArg.payload, - payloadStringType: messageArg.payloadStringType + payload: messageArg.payload }; await this.clientUniverseRef.smartsocketClient.serverCall( 'processMessage', diff --git a/ts/smartuniverse.classes.clientuniversemessage.ts b/ts/smartuniverse.classes.clientuniversemessage.ts index e0b604d..77807de 100644 --- a/ts/smartuniverse.classes.clientuniversemessage.ts +++ b/ts/smartuniverse.classes.clientuniversemessage.ts @@ -2,7 +2,7 @@ import * as plugins from './smartuniverse.plugins'; import * as interfaces from './interfaces'; -export class ClientUniverseMessage implements interfaces.IUniverseMessage { +export class ClientUniverseMessage implements interfaces.IUniverseMessage { // ====== // STATIC // ====== @@ -22,8 +22,7 @@ export class ClientUniverseMessage implements interfaces.IUniverseMessage { public smartTimestamp: plugins.smarttime.TimeStamp; public messageText: string; public passphrase: string; - public payload: any; - public payloadStringType; + public payload: T; public targetChannelName: string; constructor(messageArg: interfaces.IUniverseMessage) { diff --git a/ts/smartuniverse.classes.reactionrequest.ts b/ts/smartuniverse.classes.reactionrequest.ts index 28487c4..e027985 100644 --- a/ts/smartuniverse.classes.reactionrequest.ts +++ b/ts/smartuniverse.classes.reactionrequest.ts @@ -1,4 +1,9 @@ import * as plugins from './smartuniverse.plugins'; +import { UniverseChannel } from './smartuniverse.classes.universechannel'; +import { ClientUniverseChannel } from './smartuniverse.classes.clientuniversechannel'; +import { ReactionResult } from './smartuniverse.classes.reactionresult'; +import { UniverseMessage } from './smartuniverse.classes.universemessage'; +import { ClientUniverseMessage } from './smartuniverse.classes.clientuniversemessage'; export interface IReactionRequestConstructorOptions { method: T['method']; @@ -10,7 +15,6 @@ export interface ICombinatorPayload { @@ -20,5 +24,27 @@ export class ReactionRequest, timeoutMillisArg=60000) { + const subscriptionMap = new plugins.lik.Objectmap(); + const reactionResult = new ReactionResult(); + const requestId = plugins.smartunique.shortId(); + for (const channel of channelsArg) { + subscriptionMap.add(channel.subscribe((messageArg: UniverseMessage> | ClientUniverseMessage>) => { + if (messageArg.messageText === 'reactionResponse' && messageArg.payload.typedRequestPayload.method === this.method) { + const payload: ICombinatorPayload = messageArg.payload; + if (payload.id !== requestId) { + return; + } + reactionResult.pushReactionResponse(payload.typedRequestPayload.response); + } + })); + } + plugins.smartdelay.delayFor(timeoutMillisArg).then(async () => { + await subscriptionMap.forEach(subscriptionArg => { + subscriptionArg.unsubscribe(); + }); + reactionResult.complete(); + }); + return reactionResult; + } } diff --git a/ts/smartuniverse.classes.reactionresponse.ts b/ts/smartuniverse.classes.reactionresponse.ts index d82e985..9343d59 100644 --- a/ts/smartuniverse.classes.reactionresponse.ts +++ b/ts/smartuniverse.classes.reactionresponse.ts @@ -25,7 +25,7 @@ export class ReactionResponse> | ClientUniverseMessage>) { } } diff --git a/ts/smartuniverse.classes.reactionresult.ts b/ts/smartuniverse.classes.reactionresult.ts new file mode 100644 index 0000000..14b96e4 --- /dev/null +++ b/ts/smartuniverse.classes.reactionresult.ts @@ -0,0 +1,37 @@ +import * as plugins from './smartuniverse.plugins'; +import { ReactionResponse } from './smartuniverse.classes.reactionresponse'; + +export class ReactionResult { + private resultSubject = new plugins.smartrx.rxjs.Subject(); + private endResult: Array = []; + private completeDeferred = plugins.smartpromise.defer>(); + + constructor () { + this.resultSubscribe(responseArg => { + this.endResult.push(responseArg); + }); + } + + public resultSubscribe(observerArg: (responseArg: T['response']) => void) { + return this.resultSubject.subscribe(observerArg); + } + + public async getEndResult() { + const result = await this.completeDeferred.promise; + return result; + } + + /** + * push a reactionResponse + */ + public async pushReactionResponse(responseArg: T['response']) { + this.resultSubject.next(responseArg); + } + + /** + * completes the ReactionResult + */ + public async complete() { + this.completeDeferred.resolve(this.endResult); + } +} diff --git a/ts/smartuniverse.classes.universecache.ts b/ts/smartuniverse.classes.universecache.ts index bad72d9..e9efc71 100644 --- a/ts/smartuniverse.classes.universecache.ts +++ b/ts/smartuniverse.classes.universecache.ts @@ -24,7 +24,7 @@ export class UniverseCache { /** * stores messages for this instance */ - public messageMap = new Objectmap(); + public messageMap = new Objectmap>(); /** * stores the channels that are available within the universe @@ -54,7 +54,7 @@ export class UniverseCache { * @param messageArg * @param attachedPayloadArg */ - public async addMessage(messageArg: UniverseMessage) { + public async addMessage(messageArg: UniverseMessage) { messageArg.setUniverseCache(this); UniverseChannel.authorizeAMessageForAChannel(this, messageArg); this.messageMap.add(messageArg); @@ -69,7 +69,7 @@ export class UniverseCache { public readMessagesYoungerThan( unixTimeArg?: number, channelName?: string - ): Observable { + ): Observable> { const messageObservable = from(this.messageMap.getArray()).pipe( filter(messageArg => { return messageArg.smartTimestamp.isYoungerThanMilliSeconds(this.destructionTime); diff --git a/ts/smartuniverse.classes.universechannel.ts b/ts/smartuniverse.classes.universechannel.ts index 6e43caf..08311b1 100644 --- a/ts/smartuniverse.classes.universechannel.ts +++ b/ts/smartuniverse.classes.universechannel.ts @@ -52,7 +52,7 @@ export class UniverseChannel { */ public static authorizeAMessageForAChannel( universeCacheArg: UniverseCache, - universeMessageArg: UniverseMessage + universeMessageArg: UniverseMessage ): UniverseChannel { const foundChannel = universeCacheArg.channelMap.find(universeChannel => { const result = universeChannel.authenticate(universeMessageArg); @@ -85,7 +85,7 @@ export class UniverseChannel { */ public name: string; public universeRef: Universe; - private subject = new plugins.smartrx.rxjs.Subject(); + private subject = new plugins.smartrx.rxjs.Subject>(); /** * the passphrase for the channel @@ -103,7 +103,7 @@ export class UniverseChannel { * # the messages channelName against the unverseChannel's name * # the messages password against the universeChannel's password */ - public authenticate(universeMessageArg: UniverseMessage): boolean { + public authenticate(universeMessageArg: UniverseMessage): boolean { return ( this.name === universeMessageArg.targetChannelName && this.passphrase === universeMessageArg.passphrase @@ -114,7 +114,7 @@ export class UniverseChannel { * pushes a message to clients * @param messageArg */ - public async push(messageArg: UniverseMessage) { + public async push(messageArg: UniverseMessage) { this.subject.next(messageArg); const universeConnectionsWithChannelAccess: UniverseConnection[] = []; await this.universeRef.universeCache.connectionMap.forEach(async socketConnection => { @@ -131,8 +131,7 @@ export class UniverseChannel { passphrase: messageArg.passphrase, targetChannelName: this.name, messageText: messageArg.messageText, - payload: messageArg.payload, - payloadStringType: messageArg.payloadStringType + payload: messageArg.payload }; smartsocket.clientCall( 'processMessage', @@ -143,7 +142,7 @@ export class UniverseChannel { } // functions to interact with a channel locally - public subscribe(observingFunctionArg: (messageArg: UniverseMessage) => void) { + public subscribe(observingFunctionArg: (messageArg: UniverseMessage) => void) { return this.subject.subscribe( messageArg => { observingFunctionArg(messageArg); @@ -160,7 +159,6 @@ export class UniverseChannel { id: plugins.smartunique.shortId(), messageText: messageDescriptor.messageText, payload: messageDescriptor.payload, - payloadStringType: messageDescriptor.payloadStringType, targetChannelName: this.name, passphrase: this.passphrase, timestamp: Date.now() diff --git a/ts/smartuniverse.classes.universemessage.ts b/ts/smartuniverse.classes.universemessage.ts index 475cb67..707a75c 100644 --- a/ts/smartuniverse.classes.universemessage.ts +++ b/ts/smartuniverse.classes.universemessage.ts @@ -13,7 +13,7 @@ import { SocketConnection } from '@pushrocks/smartsocket'; * represents a message within a universe * acts as a container to save message states like authentication status */ -export class UniverseMessage implements interfaces.IUniverseMessage { +export class UniverseMessage implements interfaces.IUniverseMessage { public static createMessageFromPayload( socketConnectionArg: SocketConnection, dataArg: interfaces.IUniverseMessage @@ -28,8 +28,7 @@ export class UniverseMessage implements interfaces.IUniverseMessage { public smartTimestamp: TimeStamp; public messageText: string; public passphrase: string; - public payload: any; - public payloadStringType; + public payload: T; public targetChannelName: string; public socketConnection: SocketConnection;