fix(core): update
This commit is contained in:
parent
c2052f16a8
commit
1d13bf5bcc
@ -1,7 +1,6 @@
|
|||||||
export interface IMessageCreator {
|
export interface IMessageCreator {
|
||||||
messageText: string;
|
messageText: string;
|
||||||
payload?: string | number | any;
|
payload?: string | number | any;
|
||||||
payloadStringType?: 'Buffer' | 'string' | 'object';
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -21,7 +21,7 @@ export interface IClientOptions {
|
|||||||
export class ClientUniverse {
|
export class ClientUniverse {
|
||||||
public options;
|
public options;
|
||||||
public smartsocketClient: plugins.smartsocket.SmartsocketClient;
|
public smartsocketClient: plugins.smartsocket.SmartsocketClient;
|
||||||
public observableIntake: plugins.smartrx.ObservableIntake<ClientUniverseMessage>;
|
public observableIntake: plugins.smartrx.ObservableIntake<ClientUniverseMessage<any>>;
|
||||||
public clientUniverseCache = new ClientUniverseCache();
|
public clientUniverseCache = new ClientUniverseCache();
|
||||||
|
|
||||||
constructor(optionsArg: IClientOptions) {
|
constructor(optionsArg: IClientOptions) {
|
||||||
|
@ -38,7 +38,7 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
|
|||||||
public name: string;
|
public name: string;
|
||||||
public passphrase: string;
|
public passphrase: string;
|
||||||
public status: 'subscribed' | 'unsubscribed' = 'unsubscribed';
|
public status: 'subscribed' | 'unsubscribed' = 'unsubscribed';
|
||||||
private subject = new plugins.smartrx.rxjs.Subject<ClientUniverseMessage>();
|
private subject = new plugins.smartrx.rxjs.Subject<ClientUniverseMessage<any>>();
|
||||||
|
|
||||||
// refs
|
// refs
|
||||||
public clientUniverseRef: ClientUniverse;
|
public clientUniverseRef: ClientUniverse;
|
||||||
@ -53,7 +53,7 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
|
|||||||
* subscribes to a channel
|
* subscribes to a channel
|
||||||
* tells the universe about this instances interest into a channel
|
* tells the universe about this instances interest into a channel
|
||||||
*/
|
*/
|
||||||
public subscribe(observingFunctionArg: (messageArg: ClientUniverseMessage) => void) {
|
public subscribe(observingFunctionArg: (messageArg: ClientUniverseMessage<any>) => void) {
|
||||||
|
|
||||||
return this.subject.subscribe(
|
return this.subject.subscribe(
|
||||||
messageArg => {
|
messageArg => {
|
||||||
@ -76,7 +76,7 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async emitMessageLocally(messageArg: ClientUniverseMessage) {
|
public async emitMessageLocally(messageArg: ClientUniverseMessage<any>) {
|
||||||
this.subject.next(messageArg);
|
this.subject.next(messageArg);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,8 +92,7 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
|
|||||||
passphrase: this.passphrase,
|
passphrase: this.passphrase,
|
||||||
targetChannelName: this.name,
|
targetChannelName: this.name,
|
||||||
messageText: messageArg.messageText,
|
messageText: messageArg.messageText,
|
||||||
payload: messageArg.payload,
|
payload: messageArg.payload
|
||||||
payloadStringType: messageArg.payloadStringType
|
|
||||||
};
|
};
|
||||||
await this.clientUniverseRef.smartsocketClient.serverCall(
|
await this.clientUniverseRef.smartsocketClient.serverCall(
|
||||||
'processMessage',
|
'processMessage',
|
||||||
|
@ -2,7 +2,7 @@ import * as plugins from './smartuniverse.plugins';
|
|||||||
|
|
||||||
import * as interfaces from './interfaces';
|
import * as interfaces from './interfaces';
|
||||||
|
|
||||||
export class ClientUniverseMessage implements interfaces.IUniverseMessage {
|
export class ClientUniverseMessage<T> implements interfaces.IUniverseMessage {
|
||||||
// ======
|
// ======
|
||||||
// STATIC
|
// STATIC
|
||||||
// ======
|
// ======
|
||||||
@ -22,8 +22,7 @@ export class ClientUniverseMessage implements interfaces.IUniverseMessage {
|
|||||||
public smartTimestamp: plugins.smarttime.TimeStamp;
|
public smartTimestamp: plugins.smarttime.TimeStamp;
|
||||||
public messageText: string;
|
public messageText: string;
|
||||||
public passphrase: string;
|
public passphrase: string;
|
||||||
public payload: any;
|
public payload: T;
|
||||||
public payloadStringType;
|
|
||||||
public targetChannelName: string;
|
public targetChannelName: string;
|
||||||
|
|
||||||
constructor(messageArg: interfaces.IUniverseMessage) {
|
constructor(messageArg: interfaces.IUniverseMessage) {
|
||||||
|
@ -1,4 +1,9 @@
|
|||||||
import * as plugins from './smartuniverse.plugins';
|
import * as plugins from './smartuniverse.plugins';
|
||||||
|
import { UniverseChannel } from './smartuniverse.classes.universechannel';
|
||||||
|
import { ClientUniverseChannel } from './smartuniverse.classes.clientuniversechannel';
|
||||||
|
import { ReactionResult } from './smartuniverse.classes.reactionresult';
|
||||||
|
import { UniverseMessage } from './smartuniverse.classes.universemessage';
|
||||||
|
import { ClientUniverseMessage } from './smartuniverse.classes.clientuniversemessage';
|
||||||
|
|
||||||
export interface IReactionRequestConstructorOptions<T extends plugins.typedrequestInterfaces.ITypedRequest> {
|
export interface IReactionRequestConstructorOptions<T extends plugins.typedrequestInterfaces.ITypedRequest> {
|
||||||
method: T['method'];
|
method: T['method'];
|
||||||
@ -10,7 +15,6 @@ export interface ICombinatorPayload<T extends plugins.typedrequestInterfaces.ITy
|
|||||||
*/
|
*/
|
||||||
id: string;
|
id: string;
|
||||||
typedRequestPayload: T;
|
typedRequestPayload: T;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export class ReactionRequest<T extends plugins.typedrequestInterfaces.ITypedRequest> {
|
export class ReactionRequest<T extends plugins.typedrequestInterfaces.ITypedRequest> {
|
||||||
@ -20,5 +24,27 @@ export class ReactionRequest<T extends plugins.typedrequestInterfaces.ITypedRequ
|
|||||||
this.method = optionsArg.method;
|
this.method = optionsArg.method;
|
||||||
}
|
}
|
||||||
|
|
||||||
public fireRequest(channelArg) {}
|
public async fire(channelsArg: Array<UniverseChannel | ClientUniverseChannel>, timeoutMillisArg=60000) {
|
||||||
|
const subscriptionMap = new plugins.lik.Objectmap<plugins.smartrx.rxjs.Subscription>();
|
||||||
|
const reactionResult = new ReactionResult<T>();
|
||||||
|
const requestId = plugins.smartunique.shortId();
|
||||||
|
for (const channel of channelsArg) {
|
||||||
|
subscriptionMap.add(channel.subscribe((messageArg: UniverseMessage<ICombinatorPayload<T>> | ClientUniverseMessage<ICombinatorPayload<T>>) => {
|
||||||
|
if (messageArg.messageText === 'reactionResponse' && messageArg.payload.typedRequestPayload.method === this.method) {
|
||||||
|
const payload: ICombinatorPayload<T> = messageArg.payload;
|
||||||
|
if (payload.id !== requestId) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
reactionResult.pushReactionResponse(payload.typedRequestPayload.response);
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
plugins.smartdelay.delayFor(timeoutMillisArg).then(async () => {
|
||||||
|
await subscriptionMap.forEach(subscriptionArg => {
|
||||||
|
subscriptionArg.unsubscribe();
|
||||||
|
});
|
||||||
|
reactionResult.complete();
|
||||||
|
});
|
||||||
|
return reactionResult;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -25,7 +25,7 @@ export class ReactionResponse<T extends plugins.typedrequestInterfaces.ITypedReq
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private processMessageForReaction(messageArg: UniverseMessage | ClientUniverseMessage) {
|
private processMessageForReaction(messageArg: UniverseMessage<ICombinatorPayload<T>> | ClientUniverseMessage<ICombinatorPayload<T>>) {
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
37
ts/smartuniverse.classes.reactionresult.ts
Normal file
37
ts/smartuniverse.classes.reactionresult.ts
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
import * as plugins from './smartuniverse.plugins';
|
||||||
|
import { ReactionResponse } from './smartuniverse.classes.reactionresponse';
|
||||||
|
|
||||||
|
export class ReactionResult<T extends plugins.typedrequestInterfaces.ITypedRequest> {
|
||||||
|
private resultSubject = new plugins.smartrx.rxjs.Subject<T['response']>();
|
||||||
|
private endResult: Array<T['response']> = [];
|
||||||
|
private completeDeferred = plugins.smartpromise.defer<Array<T['response']>>();
|
||||||
|
|
||||||
|
constructor () {
|
||||||
|
this.resultSubscribe(responseArg => {
|
||||||
|
this.endResult.push(responseArg);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public resultSubscribe(observerArg: (responseArg: T['response']) => void) {
|
||||||
|
return this.resultSubject.subscribe(observerArg);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getEndResult() {
|
||||||
|
const result = await this.completeDeferred.promise;
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* push a reactionResponse
|
||||||
|
*/
|
||||||
|
public async pushReactionResponse(responseArg: T['response']) {
|
||||||
|
this.resultSubject.next(responseArg);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* completes the ReactionResult
|
||||||
|
*/
|
||||||
|
public async complete() {
|
||||||
|
this.completeDeferred.resolve(this.endResult);
|
||||||
|
}
|
||||||
|
}
|
@ -24,7 +24,7 @@ export class UniverseCache {
|
|||||||
/**
|
/**
|
||||||
* stores messages for this instance
|
* stores messages for this instance
|
||||||
*/
|
*/
|
||||||
public messageMap = new Objectmap<UniverseMessage>();
|
public messageMap = new Objectmap<UniverseMessage<any>>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* stores the channels that are available within the universe
|
* stores the channels that are available within the universe
|
||||||
@ -54,7 +54,7 @@ export class UniverseCache {
|
|||||||
* @param messageArg
|
* @param messageArg
|
||||||
* @param attachedPayloadArg
|
* @param attachedPayloadArg
|
||||||
*/
|
*/
|
||||||
public async addMessage(messageArg: UniverseMessage) {
|
public async addMessage(messageArg: UniverseMessage<any>) {
|
||||||
messageArg.setUniverseCache(this);
|
messageArg.setUniverseCache(this);
|
||||||
UniverseChannel.authorizeAMessageForAChannel(this, messageArg);
|
UniverseChannel.authorizeAMessageForAChannel(this, messageArg);
|
||||||
this.messageMap.add(messageArg);
|
this.messageMap.add(messageArg);
|
||||||
@ -69,7 +69,7 @@ export class UniverseCache {
|
|||||||
public readMessagesYoungerThan(
|
public readMessagesYoungerThan(
|
||||||
unixTimeArg?: number,
|
unixTimeArg?: number,
|
||||||
channelName?: string
|
channelName?: string
|
||||||
): Observable<UniverseMessage> {
|
): Observable<UniverseMessage<any>> {
|
||||||
const messageObservable = from(this.messageMap.getArray()).pipe(
|
const messageObservable = from(this.messageMap.getArray()).pipe(
|
||||||
filter(messageArg => {
|
filter(messageArg => {
|
||||||
return messageArg.smartTimestamp.isYoungerThanMilliSeconds(this.destructionTime);
|
return messageArg.smartTimestamp.isYoungerThanMilliSeconds(this.destructionTime);
|
||||||
|
@ -52,7 +52,7 @@ export class UniverseChannel {
|
|||||||
*/
|
*/
|
||||||
public static authorizeAMessageForAChannel(
|
public static authorizeAMessageForAChannel(
|
||||||
universeCacheArg: UniverseCache,
|
universeCacheArg: UniverseCache,
|
||||||
universeMessageArg: UniverseMessage
|
universeMessageArg: UniverseMessage<any>
|
||||||
): UniverseChannel {
|
): UniverseChannel {
|
||||||
const foundChannel = universeCacheArg.channelMap.find(universeChannel => {
|
const foundChannel = universeCacheArg.channelMap.find(universeChannel => {
|
||||||
const result = universeChannel.authenticate(universeMessageArg);
|
const result = universeChannel.authenticate(universeMessageArg);
|
||||||
@ -85,7 +85,7 @@ export class UniverseChannel {
|
|||||||
*/
|
*/
|
||||||
public name: string;
|
public name: string;
|
||||||
public universeRef: Universe;
|
public universeRef: Universe;
|
||||||
private subject = new plugins.smartrx.rxjs.Subject<UniverseMessage>();
|
private subject = new plugins.smartrx.rxjs.Subject<UniverseMessage<any>>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* the passphrase for the channel
|
* the passphrase for the channel
|
||||||
@ -103,7 +103,7 @@ export class UniverseChannel {
|
|||||||
* # the messages channelName against the unverseChannel's name
|
* # the messages channelName against the unverseChannel's name
|
||||||
* # the messages password against the universeChannel's password
|
* # the messages password against the universeChannel's password
|
||||||
*/
|
*/
|
||||||
public authenticate(universeMessageArg: UniverseMessage): boolean {
|
public authenticate(universeMessageArg: UniverseMessage<any>): boolean {
|
||||||
return (
|
return (
|
||||||
this.name === universeMessageArg.targetChannelName &&
|
this.name === universeMessageArg.targetChannelName &&
|
||||||
this.passphrase === universeMessageArg.passphrase
|
this.passphrase === universeMessageArg.passphrase
|
||||||
@ -114,7 +114,7 @@ export class UniverseChannel {
|
|||||||
* pushes a message to clients
|
* pushes a message to clients
|
||||||
* @param messageArg
|
* @param messageArg
|
||||||
*/
|
*/
|
||||||
public async push(messageArg: UniverseMessage) {
|
public async push(messageArg: UniverseMessage<any>) {
|
||||||
this.subject.next(messageArg);
|
this.subject.next(messageArg);
|
||||||
const universeConnectionsWithChannelAccess: UniverseConnection[] = [];
|
const universeConnectionsWithChannelAccess: UniverseConnection[] = [];
|
||||||
await this.universeRef.universeCache.connectionMap.forEach(async socketConnection => {
|
await this.universeRef.universeCache.connectionMap.forEach(async socketConnection => {
|
||||||
@ -131,8 +131,7 @@ export class UniverseChannel {
|
|||||||
passphrase: messageArg.passphrase,
|
passphrase: messageArg.passphrase,
|
||||||
targetChannelName: this.name,
|
targetChannelName: this.name,
|
||||||
messageText: messageArg.messageText,
|
messageText: messageArg.messageText,
|
||||||
payload: messageArg.payload,
|
payload: messageArg.payload
|
||||||
payloadStringType: messageArg.payloadStringType
|
|
||||||
};
|
};
|
||||||
smartsocket.clientCall(
|
smartsocket.clientCall(
|
||||||
'processMessage',
|
'processMessage',
|
||||||
@ -143,7 +142,7 @@ export class UniverseChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// functions to interact with a channel locally
|
// functions to interact with a channel locally
|
||||||
public subscribe(observingFunctionArg: (messageArg: UniverseMessage) => void) {
|
public subscribe(observingFunctionArg: (messageArg: UniverseMessage<any>) => void) {
|
||||||
return this.subject.subscribe(
|
return this.subject.subscribe(
|
||||||
messageArg => {
|
messageArg => {
|
||||||
observingFunctionArg(messageArg);
|
observingFunctionArg(messageArg);
|
||||||
@ -160,7 +159,6 @@ export class UniverseChannel {
|
|||||||
id: plugins.smartunique.shortId(),
|
id: plugins.smartunique.shortId(),
|
||||||
messageText: messageDescriptor.messageText,
|
messageText: messageDescriptor.messageText,
|
||||||
payload: messageDescriptor.payload,
|
payload: messageDescriptor.payload,
|
||||||
payloadStringType: messageDescriptor.payloadStringType,
|
|
||||||
targetChannelName: this.name,
|
targetChannelName: this.name,
|
||||||
passphrase: this.passphrase,
|
passphrase: this.passphrase,
|
||||||
timestamp: Date.now()
|
timestamp: Date.now()
|
||||||
|
@ -13,7 +13,7 @@ import { SocketConnection } from '@pushrocks/smartsocket';
|
|||||||
* represents a message within a universe
|
* represents a message within a universe
|
||||||
* acts as a container to save message states like authentication status
|
* acts as a container to save message states like authentication status
|
||||||
*/
|
*/
|
||||||
export class UniverseMessage implements interfaces.IUniverseMessage {
|
export class UniverseMessage<T> implements interfaces.IUniverseMessage {
|
||||||
public static createMessageFromPayload(
|
public static createMessageFromPayload(
|
||||||
socketConnectionArg: SocketConnection,
|
socketConnectionArg: SocketConnection,
|
||||||
dataArg: interfaces.IUniverseMessage
|
dataArg: interfaces.IUniverseMessage
|
||||||
@ -28,8 +28,7 @@ export class UniverseMessage implements interfaces.IUniverseMessage {
|
|||||||
public smartTimestamp: TimeStamp;
|
public smartTimestamp: TimeStamp;
|
||||||
public messageText: string;
|
public messageText: string;
|
||||||
public passphrase: string;
|
public passphrase: string;
|
||||||
public payload: any;
|
public payload: T;
|
||||||
public payloadStringType;
|
|
||||||
public targetChannelName: string;
|
public targetChannelName: string;
|
||||||
public socketConnection: SocketConnection;
|
public socketConnection: SocketConnection;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user