Compare commits
41 Commits
Author | SHA1 | Date | |
---|---|---|---|
908d00981b | |||
669ef262d7 | |||
30053fe441 | |||
afb4e3339a | |||
e413a8116d | |||
ffeed0565c | |||
736240b978 | |||
73f4600c2a | |||
40beec1166 | |||
f8690fef50 | |||
972ddbf327 | |||
80aacd17a6 | |||
e67b3e50cc | |||
a4a8959b74 | |||
bab0f062f7 | |||
3bdfe4dcb4 | |||
fca960ad0d | |||
e43ed3951c | |||
23df304535 | |||
9a142175aa | |||
09b593e192 | |||
c27fc147b5 | |||
ddde21925a | |||
bd849d347d | |||
f2a85d4719 | |||
4e7c28ac83 | |||
243f1a70e9 | |||
b5a6517756 | |||
e12b128619 | |||
03fbab5265 | |||
1d13bf5bcc | |||
c2052f16a8 | |||
ff7cdc908c | |||
f3d41b8719 | |||
f9f0fc45e2 | |||
da6b7724b8 | |||
be7ca29e4b | |||
f401d78c4b | |||
6ceec0201f | |||
16ce4e09a9 | |||
2868ab686d |
@ -4,13 +4,13 @@ image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
|
||||
cache:
|
||||
paths:
|
||||
- .npmci_cache/
|
||||
key: "$CI_BUILD_STAGE"
|
||||
key: '$CI_BUILD_STAGE'
|
||||
|
||||
stages:
|
||||
- security
|
||||
- test
|
||||
- release
|
||||
- metadata
|
||||
- security
|
||||
- test
|
||||
- release
|
||||
- metadata
|
||||
|
||||
# ====================
|
||||
# security stage
|
||||
@ -20,6 +20,7 @@ mirror:
|
||||
script:
|
||||
- npmci git mirror
|
||||
tags:
|
||||
- lossless
|
||||
- docker
|
||||
- notpriv
|
||||
|
||||
@ -31,6 +32,7 @@ snyk:
|
||||
- npmci command npm install --ignore-scripts
|
||||
- npmci command snyk test
|
||||
tags:
|
||||
- lossless
|
||||
- docker
|
||||
- notpriv
|
||||
|
||||
@ -47,6 +49,7 @@ testStable:
|
||||
- npmci npm test
|
||||
coverage: /\d+.?\d+?\%\s*coverage/
|
||||
tags:
|
||||
- lossless
|
||||
- docker
|
||||
- priv
|
||||
|
||||
@ -54,22 +57,24 @@ testBuild:
|
||||
stage: test
|
||||
script:
|
||||
- npmci npm prepare
|
||||
- npmci node install lts
|
||||
- npmci node install stable
|
||||
- npmci npm install
|
||||
- npmci command npm run build
|
||||
coverage: /\d+.?\d+?\%\s*coverage/
|
||||
tags:
|
||||
- lossless
|
||||
- docker
|
||||
- notpriv
|
||||
|
||||
release:
|
||||
stage: release
|
||||
script:
|
||||
- npmci node install lts
|
||||
- npmci node install stable
|
||||
- npmci npm publish
|
||||
only:
|
||||
- tags
|
||||
tags:
|
||||
- lossless
|
||||
- docker
|
||||
- notpriv
|
||||
|
||||
@ -81,9 +86,11 @@ codequality:
|
||||
allow_failure: true
|
||||
script:
|
||||
- npmci command npm install -g tslint typescript
|
||||
- npmci npm prepare
|
||||
- npmci npm install
|
||||
- npmci command "tslint -c tslint.json ./ts/**/*.ts"
|
||||
tags:
|
||||
- lossless
|
||||
- docker
|
||||
- priv
|
||||
|
||||
@ -94,20 +101,20 @@ trigger:
|
||||
only:
|
||||
- tags
|
||||
tags:
|
||||
- lossless
|
||||
- docker
|
||||
- notpriv
|
||||
|
||||
pages:
|
||||
image: hosttoday/ht-docker-dbase:npmci
|
||||
services:
|
||||
- docker:stable-dind
|
||||
stage: metadata
|
||||
script:
|
||||
- npmci node install lts
|
||||
- npmci command npm install -g @gitzone/tsdoc
|
||||
- npmci npm prepare
|
||||
- npmci npm install
|
||||
- npmci command tsdoc
|
||||
tags:
|
||||
- lossless
|
||||
- docker
|
||||
- notpriv
|
||||
only:
|
||||
|
722
package-lock.json
generated
722
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
35
package.json
35
package.json
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@pushrocks/smartuniverse",
|
||||
"version": "1.0.75",
|
||||
"version": "1.0.96",
|
||||
"private": false,
|
||||
"description": "messaging service for your micro services",
|
||||
"main": "dist/index.js",
|
||||
@ -15,36 +15,37 @@
|
||||
},
|
||||
"devDependencies": {
|
||||
"@gitzone/tsbuild": "^2.1.17",
|
||||
"@gitzone/tstest": "^1.0.24",
|
||||
"@gitzone/tstest": "^1.0.28",
|
||||
"@pushrocks/tapbundle": "^3.0.13",
|
||||
"@types/node": "^12.7.4",
|
||||
"tslint": "^5.20.0",
|
||||
"@types/node": "^12.12.7",
|
||||
"tslint": "^5.20.1",
|
||||
"tslint-config-prettier": "^1.18.0"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"rxjs": "*"
|
||||
},
|
||||
"dependencies": {
|
||||
"@apiglobal/typedrequest-interfaces": "^1.0.7",
|
||||
"@pushrocks/lik": "^3.0.11",
|
||||
"@pushrocks/smartdelay": "^2.0.3",
|
||||
"@pushrocks/smartexpress": "^3.0.40",
|
||||
"@pushrocks/smartfile": "^7.0.4",
|
||||
"@pushrocks/smartdelay": "^2.0.6",
|
||||
"@pushrocks/smartexpress": "^3.0.52",
|
||||
"@pushrocks/smartfile": "^7.0.6",
|
||||
"@pushrocks/smarthash": "^2.0.6",
|
||||
"@pushrocks/smartlog": "^2.0.19",
|
||||
"@pushrocks/smartpromise": "^3.0.2",
|
||||
"@pushrocks/smartrequest": "^1.1.27",
|
||||
"@pushrocks/smartlog": "^2.0.21",
|
||||
"@pushrocks/smartpromise": "^3.0.6",
|
||||
"@pushrocks/smartrequest": "^1.1.42",
|
||||
"@pushrocks/smartrx": "^2.0.5",
|
||||
"@pushrocks/smartsocket": "^1.1.49",
|
||||
"@pushrocks/smartsocket": "^1.1.58",
|
||||
"@pushrocks/smarttime": "^3.0.12",
|
||||
"@pushrocks/smartunique": "^3.0.1"
|
||||
},
|
||||
"files": [
|
||||
"ts/*",
|
||||
"ts_web/*",
|
||||
"dist/*",
|
||||
"dist_web/*",
|
||||
"dist_ts_web/*",
|
||||
"assets/*",
|
||||
"ts/**/*",
|
||||
"ts_web/**/*",
|
||||
"dist/**/*",
|
||||
"dist_web/**/*",
|
||||
"dist_ts_web/**/*",
|
||||
"assets/**/*",
|
||||
"cli.js",
|
||||
"npmextra.json",
|
||||
"readme.md"
|
||||
|
@ -47,6 +47,10 @@ myUniverse.start(8765); // start the server and provide the port on which to lis
|
||||
|
||||
All your microservices represents clients in the universe that may talk to each other using the universe server.
|
||||
|
||||
## Contribution
|
||||
|
||||
We are always happy for code contributions. If you are not the code contributing type that is ok. Still, maintaining Open Source repositories takes considerable time and thought. If you like the quality of what we do and our modules are useful to you we would appreciate a little monthly contribution: You can [contribute one time](https://lossless.link/contribute-onetime) or [contribute monthly](https://lossless.link/contribute). :)
|
||||
|
||||
For further information read the linked docs at the top of this readme.
|
||||
|
||||
> MIT licensed | **©** [Lossless GmbH](https://lossless.gmbh)
|
||||
|
60
test/test.ts
60
test/test.ts
@ -31,7 +31,8 @@ tap.test('add a message to the SmartUniverse', async () => {
|
||||
// testing message handling
|
||||
tap.test('create smartuniverse client', async () => {
|
||||
testClientUniverse = new smartuniverse.ClientUniverse({
|
||||
serverAddress: testServerData.serverAddress
|
||||
serverAddress: testServerData.serverAddress,
|
||||
autoReconnect: true
|
||||
});
|
||||
expect(testClientUniverse).to.be.instanceof(smartuniverse.ClientUniverse);
|
||||
});
|
||||
@ -65,16 +66,67 @@ tap.test('universe should contain the sent message', async () => {
|
||||
|
||||
tap.test('a second client should be able to subscibe', async () => {
|
||||
testClientUniverse2 = new smartuniverse.ClientUniverse({
|
||||
serverAddress: testServerData.serverAddress
|
||||
serverAddress: testServerData.serverAddress,
|
||||
autoReconnect: true
|
||||
});
|
||||
|
||||
testClientUniverse2.addChannel(testChannelData.channelName, testChannelData.channelPass);
|
||||
await testClientUniverse2.start();
|
||||
});
|
||||
|
||||
tap.test('should receive a message correctly', async () => {});
|
||||
tap.test('should receive a message correctly', async tools => {
|
||||
const done = tools.defer();
|
||||
const testChannel = testClientUniverse.getChannel(testChannelData.channelName);
|
||||
const testChannel2 = testClientUniverse2.getChannel(testChannelData.channelName);
|
||||
const subscription = testChannel2.subscribe(messageArg => {
|
||||
if (messageArg.messageText === 'hellothere') {
|
||||
console.log('Yay##########');
|
||||
done.resolve();
|
||||
}
|
||||
});
|
||||
await testChannel.sendMessage({
|
||||
messageText: 'hellothere'
|
||||
});
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
tap.test('should disconnect the client correctly', async () => {
|
||||
interface IDemoReqRes {
|
||||
method: 'demo';
|
||||
request: {
|
||||
wowso: string;
|
||||
};
|
||||
response: {
|
||||
hereso: string;
|
||||
};
|
||||
}
|
||||
|
||||
tap.test('ReactionRequest and ReactionResponse should work', async () => {
|
||||
const reactionResponse = new smartuniverse.ReactionResponse<IDemoReqRes>({
|
||||
channels: [testUniverse.getChannel(testChannelData.channelName)],
|
||||
funcDef: async reqData => {
|
||||
console.log(reqData);
|
||||
return {
|
||||
hereso: 'Hello there'
|
||||
};
|
||||
},
|
||||
method: 'demo'
|
||||
});
|
||||
const reactionRequest = new smartuniverse.ReactionRequest<IDemoReqRes>({
|
||||
method: 'demo'
|
||||
});
|
||||
const reactionResult = await reactionRequest.fire(
|
||||
[testClientUniverse2.getChannel(testChannelData.channelName)],
|
||||
{
|
||||
wowso: 'wowza'
|
||||
}
|
||||
);
|
||||
const result = await reactionResult.getFirstResult();
|
||||
console.log(result);
|
||||
});
|
||||
|
||||
tap.test('should disconnect the client correctly', async tools => {
|
||||
await testClientUniverse.stop();
|
||||
await testClientUniverse2.stop();
|
||||
});
|
||||
|
||||
tap.test('should end the server correctly', async tools => {
|
||||
|
@ -9,4 +9,8 @@ export * from './smartuniverse.classes.universecache';
|
||||
export * from './smartuniverse.classes.universechannel';
|
||||
export * from './smartuniverse.classes.universemessage';
|
||||
|
||||
// Reaction Response
|
||||
export * from './smartuniverse.classes.reactionrequest';
|
||||
export * from './smartuniverse.classes.reactionresponse';
|
||||
|
||||
export * from './interfaces';
|
||||
|
@ -7,7 +7,7 @@ export interface ISocketRequest_SubscribeChannel {
|
||||
passphrase: string;
|
||||
};
|
||||
response: {
|
||||
subscriptionStatus: 'subscribed' | 'unsubscribed'
|
||||
subscriptionStatus: 'subscribed' | 'unsubscribed';
|
||||
};
|
||||
}
|
||||
|
||||
@ -15,6 +15,6 @@ export interface ISocketRequest_ProcessMessage {
|
||||
method: 'processMessage';
|
||||
request: interfaces.IUniverseMessage;
|
||||
response: {
|
||||
messageStatus: 'ok' | 'channel not found'
|
||||
messageStatus: 'ok' | 'channel not found';
|
||||
};
|
||||
}
|
@ -1,10 +1,3 @@
|
||||
export type IServerCallActions =
|
||||
| 'channelSubscription'
|
||||
| 'processMessage'
|
||||
| 'channelUnsubscribe'
|
||||
| 'terminateConnection';
|
||||
|
||||
|
||||
export interface IServerUnsubscribeActionPayload {
|
||||
name: string;
|
||||
}
|
||||
|
@ -1,7 +1,6 @@
|
||||
export interface IMessageCreator {
|
||||
messageText: string;
|
||||
payload?: string | number | any;
|
||||
payloadStringType?: 'Buffer' | 'string' | 'object';
|
||||
}
|
||||
|
||||
/**
|
||||
|
8
ts/smartuniverse.classes.broadcastevent.ts
Normal file
8
ts/smartuniverse.classes.broadcastevent.ts
Normal file
@ -0,0 +1,8 @@
|
||||
import * as plugins from './smartuniverse.plugins';
|
||||
|
||||
/**
|
||||
* broadcasts an event to multiple channels
|
||||
*/
|
||||
export class BroadcastEvent<T> {
|
||||
fire() {}
|
||||
}
|
5
ts/smartuniverse.classes.broadcastsubscription.ts
Normal file
5
ts/smartuniverse.classes.broadcastsubscription.ts
Normal file
@ -0,0 +1,5 @@
|
||||
import * as plugins from './smartuniverse.plugins';
|
||||
|
||||
export class BroadcastSubscription {
|
||||
|
||||
}
|
@ -12,6 +12,7 @@ import { ClientUniverseCache } from './smartuniverse.classes.clientuniversecache
|
||||
|
||||
export interface IClientOptions {
|
||||
serverAddress: string;
|
||||
autoReconnect: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -19,9 +20,9 @@ export interface IClientOptions {
|
||||
* allows connecting to a universe server
|
||||
*/
|
||||
export class ClientUniverse {
|
||||
public options;
|
||||
public options: IClientOptions;
|
||||
public smartsocketClient: plugins.smartsocket.SmartsocketClient;
|
||||
public observableIntake: plugins.smartrx.ObservableIntake<ClientUniverseMessage>;
|
||||
public messageRxjsSubject = new plugins.smartrx.rxjs.Subject<ClientUniverseMessage<any>>();
|
||||
public clientUniverseCache = new ClientUniverseCache();
|
||||
|
||||
constructor(optionsArg: IClientOptions) {
|
||||
@ -77,7 +78,7 @@ export class ClientUniverse {
|
||||
}
|
||||
|
||||
public async stop() {
|
||||
await this.smartsocketClient.disconnect();
|
||||
await this.disconnect('triggered');
|
||||
}
|
||||
|
||||
/**
|
||||
@ -85,7 +86,7 @@ export class ClientUniverse {
|
||||
* since password validation is done through other means, a connection should always be possible
|
||||
*/
|
||||
public async checkConnection(): Promise<void> {
|
||||
if (!this.smartsocketClient && !this.observableIntake) {
|
||||
if (!this.smartsocketClient) {
|
||||
const parsedURL = url.parse(this.options.serverAddress);
|
||||
const socketConfig: plugins.smartsocket.ISmartsocketClientOptions = {
|
||||
alias: 'universeclient',
|
||||
@ -95,7 +96,13 @@ export class ClientUniverse {
|
||||
url: parsedURL.protocol + '//' + parsedURL.hostname
|
||||
};
|
||||
this.smartsocketClient = new SmartsocketClient(socketConfig);
|
||||
this.observableIntake = new plugins.smartrx.ObservableIntake();
|
||||
|
||||
this.smartsocketClient.eventSubject.subscribe(async eventArg => {
|
||||
switch (eventArg) {
|
||||
case 'disconnected':
|
||||
this.disconnect('upstreamEvent');
|
||||
}
|
||||
});
|
||||
|
||||
// lets define some basic actions
|
||||
|
||||
@ -105,8 +112,14 @@ export class ClientUniverse {
|
||||
const socketFunctionUnsubscribe = new plugins.smartsocket.SocketFunction({
|
||||
funcName: 'unsubscribe',
|
||||
allowedRoles: [],
|
||||
funcDef: async (data: interfaces.IServerUnsubscribeActionPayload) => {
|
||||
throw new Error('TODO');
|
||||
funcDef: async (dataArg: interfaces.IServerUnsubscribeActionPayload) => {
|
||||
const channel = this.clientUniverseCache.channelMap.find(channelArg => {
|
||||
return channelArg.name === dataArg.name;
|
||||
});
|
||||
if (channel) {
|
||||
channel.unsubscribe();
|
||||
}
|
||||
return {};
|
||||
}
|
||||
});
|
||||
|
||||
@ -123,7 +136,7 @@ export class ClientUniverse {
|
||||
const clientUniverseMessage = ClientUniverseMessage.createMessageFromMessageDescriptor(
|
||||
messageDescriptorArg
|
||||
);
|
||||
this.observableIntake.push(clientUniverseMessage);
|
||||
this.messageRxjsSubject.next(clientUniverseMessage);
|
||||
|
||||
// lets find the corresponding channel
|
||||
const targetChannel = this.getChannel(clientUniverseMessage.targetChannelName);
|
||||
@ -147,8 +160,24 @@ export class ClientUniverse {
|
||||
await this.smartsocketClient.connect();
|
||||
plugins.smartlog.defaultLogger.log('info', 'universe client connected successfully');
|
||||
await this.clientUniverseCache.channelMap.forEach(async clientUniverseChannelArg => {
|
||||
await clientUniverseChannelArg.subscribe();
|
||||
await clientUniverseChannelArg.populateSubscriptionToServer();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public async disconnect(
|
||||
reason: 'upstreamEvent' | 'triggered' = 'triggered',
|
||||
tryReconnect = false
|
||||
) {
|
||||
if (reason === 'triggered') {
|
||||
const smartsocketToDisconnect = this.smartsocketClient;
|
||||
this.smartsocketClient = null; // making sure the upstreamEvent does not interfere
|
||||
await smartsocketToDisconnect.disconnect();
|
||||
}
|
||||
if (this.options.autoReconnect && reason === 'upstreamEvent' && this.smartsocketClient) {
|
||||
await plugins.smartdelay.delayForRandom(5000, 20000);
|
||||
this.smartsocketClient = null;
|
||||
this.checkConnection();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
|
||||
public name: string;
|
||||
public passphrase: string;
|
||||
public status: 'subscribed' | 'unsubscribed' = 'unsubscribed';
|
||||
private subject = new plugins.smartrx.rxjs.Subject<ClientUniverseMessage>();
|
||||
private subject = new plugins.smartrx.rxjs.Subject<ClientUniverseMessage<any>>();
|
||||
|
||||
// refs
|
||||
public clientUniverseRef: ClientUniverse;
|
||||
@ -53,34 +53,36 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
|
||||
* subscribes to a channel
|
||||
* tells the universe about this instances interest into a channel
|
||||
*/
|
||||
public async subscribe(observerArg?: plugins.smartrx.rxjs.Observer<any>) {
|
||||
public subscribe(observingFunctionArg: (messageArg: ClientUniverseMessage<any>) => void) {
|
||||
return this.subject.subscribe(
|
||||
messageArg => {
|
||||
observingFunctionArg(messageArg);
|
||||
},
|
||||
error => console.log(error)
|
||||
);
|
||||
}
|
||||
|
||||
public unsubscribe() {
|
||||
// TODO: unsubscribe all users
|
||||
}
|
||||
|
||||
public async populateSubscriptionToServer() {
|
||||
// lets make sure the channel is connected
|
||||
if (this.status === 'unsubscribed') {
|
||||
const response = await this.clientUniverseRef.smartsocketClient.serverCall<interfaces.ISocketRequest_SubscribeChannel>(
|
||||
'subscribeChannel',
|
||||
{
|
||||
const response = await this.clientUniverseRef.smartsocketClient.serverCall<
|
||||
interfaces.ISocketRequest_SubscribeChannel
|
||||
>('subscribeChannel', {
|
||||
name: this.name,
|
||||
passphrase: this.passphrase
|
||||
}
|
||||
);
|
||||
});
|
||||
this.status = response.subscriptionStatus;
|
||||
}
|
||||
|
||||
if (observerArg) {
|
||||
return this.subject.subscribe(observerArg);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public async emitMessageLocally(messageArg: ClientUniverseMessage) {
|
||||
public async emitMessageLocally(messageArg: ClientUniverseMessage<any>) {
|
||||
this.subject.next(messageArg);
|
||||
}
|
||||
|
||||
public askForReaction(reactionRequest: ReactionRequest): ReactionResponse {
|
||||
const reactionResponse = new ReactionResponse();
|
||||
return reactionResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
* sends a message towards the server
|
||||
* @param messageArg
|
||||
@ -93,8 +95,7 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
|
||||
passphrase: this.passphrase,
|
||||
targetChannelName: this.name,
|
||||
messageText: messageArg.messageText,
|
||||
payload: messageArg.payload,
|
||||
payloadStringType: messageArg.payloadStringType
|
||||
payload: messageArg.payload
|
||||
};
|
||||
await this.clientUniverseRef.smartsocketClient.serverCall(
|
||||
'processMessage',
|
||||
|
@ -2,7 +2,7 @@ import * as plugins from './smartuniverse.plugins';
|
||||
|
||||
import * as interfaces from './interfaces';
|
||||
|
||||
export class ClientUniverseMessage implements interfaces.IUniverseMessage {
|
||||
export class ClientUniverseMessage<T> implements interfaces.IUniverseMessage {
|
||||
// ======
|
||||
// STATIC
|
||||
// ======
|
||||
@ -22,8 +22,7 @@ export class ClientUniverseMessage implements interfaces.IUniverseMessage {
|
||||
public smartTimestamp: plugins.smarttime.TimeStamp;
|
||||
public messageText: string;
|
||||
public passphrase: string;
|
||||
public payload: any;
|
||||
public payloadStringType;
|
||||
public payload: T;
|
||||
public targetChannelName: string;
|
||||
|
||||
constructor(messageArg: interfaces.IUniverseMessage) {
|
||||
|
@ -1,5 +1,83 @@
|
||||
import * as plugins from './smartuniverse.plugins';
|
||||
import { UniverseChannel } from './smartuniverse.classes.universechannel';
|
||||
import { ClientUniverseChannel } from './smartuniverse.classes.clientuniversechannel';
|
||||
import { ReactionResult } from './smartuniverse.classes.reactionresult';
|
||||
import { UniverseMessage } from './smartuniverse.classes.universemessage';
|
||||
import { ClientUniverseMessage } from './smartuniverse.classes.clientuniversemessage';
|
||||
|
||||
export class ReactionRequest {
|
||||
|
||||
export interface IReactionRequestConstructorOptions<
|
||||
T extends plugins.typedrequestInterfaces.ITypedRequest
|
||||
> {
|
||||
method: T['method'];
|
||||
}
|
||||
|
||||
export interface ICombinatorPayload<T extends plugins.typedrequestInterfaces.ITypedRequest> {
|
||||
/**
|
||||
* needed for tying responses to requests
|
||||
*/
|
||||
id: string;
|
||||
typedRequestPayload: {
|
||||
method: T['method'];
|
||||
request: T['request'];
|
||||
response: T['response'];
|
||||
};
|
||||
}
|
||||
|
||||
export class ReactionRequest<T extends plugins.typedrequestInterfaces.ITypedRequest> {
|
||||
public method: T['method'];
|
||||
|
||||
constructor(optionsArg: IReactionRequestConstructorOptions<T>) {
|
||||
this.method = optionsArg.method;
|
||||
}
|
||||
|
||||
public async fire(
|
||||
channelsArg: Array<UniverseChannel | ClientUniverseChannel>,
|
||||
requestDataArg: T['request'],
|
||||
timeoutMillisArg = 5000
|
||||
) {
|
||||
const subscriptionMap = new plugins.lik.Objectmap<plugins.smartrx.rxjs.Subscription>();
|
||||
const reactionResult = new ReactionResult<T>();
|
||||
const requestId = plugins.smartunique.shortId();
|
||||
for (const channel of channelsArg) {
|
||||
subscriptionMap.add(
|
||||
channel.subscribe(
|
||||
(
|
||||
messageArg:
|
||||
| UniverseMessage<ICombinatorPayload<T>>
|
||||
| ClientUniverseMessage<ICombinatorPayload<T>>
|
||||
) => {
|
||||
if (
|
||||
messageArg.messageText === 'reactionResponse' &&
|
||||
messageArg.payload.typedRequestPayload.method === this.method
|
||||
) {
|
||||
const payload: ICombinatorPayload<T> = messageArg.payload;
|
||||
if (payload.id !== requestId) {
|
||||
return;
|
||||
}
|
||||
reactionResult.pushReactionResponse(payload.typedRequestPayload.response);
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
const payload: ICombinatorPayload<T> = {
|
||||
id: requestId,
|
||||
typedRequestPayload: {
|
||||
method: this.method,
|
||||
request: requestDataArg,
|
||||
response: null
|
||||
}
|
||||
};
|
||||
channel.sendMessage({
|
||||
messageText: 'reactionRequest',
|
||||
payload
|
||||
});
|
||||
}
|
||||
plugins.smartdelay.delayFor(timeoutMillisArg).then(async () => {
|
||||
await subscriptionMap.forEach(subscriptionArg => {
|
||||
subscriptionArg.unsubscribe();
|
||||
});
|
||||
reactionResult.complete();
|
||||
});
|
||||
return reactionResult;
|
||||
}
|
||||
}
|
||||
|
@ -1,3 +1,63 @@
|
||||
import * as plugins from './smartuniverse.plugins';
|
||||
|
||||
export class ReactionResponse {}
|
||||
import { ICombinatorPayload } from './smartuniverse.classes.reactionrequest';
|
||||
import { UniverseChannel } from './smartuniverse.classes.universechannel';
|
||||
import { ClientUniverseChannel } from './smartuniverse.classes.clientuniversechannel';
|
||||
import { UniverseMessage } from './smartuniverse.classes.universemessage';
|
||||
import { ClientUniverseMessage } from './smartuniverse.classes.clientuniversemessage';
|
||||
|
||||
export type TReactionResponseFuncDef<T extends plugins.typedrequestInterfaces.ITypedRequest> = (
|
||||
dataArg: T['request']
|
||||
) => Promise<T['response']>;
|
||||
|
||||
export interface IReactionResponseConstructorOptions<
|
||||
T extends plugins.typedrequestInterfaces.ITypedRequest
|
||||
> {
|
||||
method: T['method'];
|
||||
channels: Array<UniverseChannel | ClientUniverseChannel>;
|
||||
funcDef: TReactionResponseFuncDef<T>;
|
||||
}
|
||||
|
||||
export class ReactionResponse<T extends plugins.typedrequestInterfaces.ITypedRequest> {
|
||||
public method: T['method'];
|
||||
public channels = new plugins.lik.Objectmap<UniverseChannel | ClientUniverseChannel>();
|
||||
public funcDef: TReactionResponseFuncDef<T>;
|
||||
|
||||
constructor(optionsArg: IReactionResponseConstructorOptions<T>) {
|
||||
this.method = optionsArg.method;
|
||||
this.channels.addArray(optionsArg.channels);
|
||||
this.funcDef = optionsArg.funcDef;
|
||||
for (const channel of this.channels.getArray()) {
|
||||
channel.subscribe(messageArg => {
|
||||
this.processMessageForReaction(channel, messageArg);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async processMessageForReaction(
|
||||
channelArg: UniverseChannel | ClientUniverseChannel,
|
||||
messageArg:
|
||||
| UniverseMessage<ICombinatorPayload<T>>
|
||||
| ClientUniverseMessage<ICombinatorPayload<T>>
|
||||
) {
|
||||
if (
|
||||
messageArg.messageText === 'reactionRequest' &&
|
||||
messageArg.payload.typedRequestPayload.method === this.method
|
||||
) {
|
||||
const response: T['response'] = await this.funcDef(
|
||||
messageArg.payload.typedRequestPayload.request
|
||||
);
|
||||
const payload: ICombinatorPayload<T> = {
|
||||
...messageArg.payload,
|
||||
typedRequestPayload: {
|
||||
...messageArg.payload.typedRequestPayload,
|
||||
response
|
||||
}
|
||||
};
|
||||
channelArg.sendMessage({
|
||||
messageText: 'reactionResponse',
|
||||
payload
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
52
ts/smartuniverse.classes.reactionresult.ts
Normal file
52
ts/smartuniverse.classes.reactionresult.ts
Normal file
@ -0,0 +1,52 @@
|
||||
import * as plugins from './smartuniverse.plugins';
|
||||
import { ReactionResponse } from './smartuniverse.classes.reactionresponse';
|
||||
|
||||
export class ReactionResult<T extends plugins.typedrequestInterfaces.ITypedRequest> {
|
||||
private resultReplaySubject = new plugins.smartrx.rxjs.ReplaySubject<T['response']>();
|
||||
private endResult: Array<T['response']> = [];
|
||||
private completeDeferred = plugins.smartpromise.defer<Array<T['response']>>();
|
||||
|
||||
constructor() {
|
||||
this.resultSubscribe(responseArg => {
|
||||
this.endResult.push(responseArg);
|
||||
});
|
||||
}
|
||||
|
||||
public resultSubscribe(observerArg: (responseArg: T['response']) => void) {
|
||||
return this.resultReplaySubject.subscribe(observerArg);
|
||||
}
|
||||
|
||||
/**
|
||||
* gets the end result as an array of all results
|
||||
*/
|
||||
public async getEndResult() {
|
||||
const result = await this.completeDeferred.promise;
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* if there is a single respondant, or you are only interested in the first result
|
||||
*/
|
||||
public async getFirstResult() {
|
||||
const done = plugins.smartpromise.defer<T['response']>();
|
||||
const subscription = this.resultReplaySubject.subscribe(result => {
|
||||
done.resolve(result);
|
||||
subscription.unsubscribe();
|
||||
});
|
||||
return await done.promise;
|
||||
}
|
||||
|
||||
/**
|
||||
* push a reactionResponse
|
||||
*/
|
||||
public async pushReactionResponse(responseArg: T['response']) {
|
||||
this.resultReplaySubject.next(responseArg);
|
||||
}
|
||||
|
||||
/**
|
||||
* completes the ReactionResult
|
||||
*/
|
||||
public async complete() {
|
||||
this.completeDeferred.resolve(this.endResult);
|
||||
}
|
||||
}
|
@ -62,6 +62,16 @@ export class Universe {
|
||||
*/
|
||||
public addChannel(nameArg: string, passphraseArg: string) {
|
||||
const newChannel = UniverseChannel.createChannel(this, nameArg, passphraseArg);
|
||||
return newChannel;
|
||||
}
|
||||
|
||||
/**
|
||||
* returns a channel
|
||||
*/
|
||||
public getChannel(channelNameArg: string) {
|
||||
return this.universeCache.channelMap.find(channelArg => {
|
||||
return channelArg.name === channelNameArg;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@ -95,14 +105,14 @@ export class Universe {
|
||||
// add the role to smartsocket
|
||||
this.smartsocket.addSocketRoles([ClientRole]);
|
||||
|
||||
const socketFunctionSubscription = new plugins.smartsocket.SocketFunction<interfaces.ISocketRequest_SubscribeChannel>({
|
||||
const socketFunctionSubscription = new plugins.smartsocket.SocketFunction<
|
||||
interfaces.ISocketRequest_SubscribeChannel
|
||||
>({
|
||||
allowedRoles: [ClientRole], // there is only one client role, Authentication happens on another level
|
||||
funcName: 'subscribeChannel',
|
||||
funcDef: async (
|
||||
dataArg,
|
||||
socketConnectionArg
|
||||
) => {
|
||||
funcDef: async (dataArg, socketConnectionArg) => {
|
||||
const universeConnection = new UniverseConnection({
|
||||
universe: this,
|
||||
socketConnection: socketConnectionArg,
|
||||
authenticationRequests: [dataArg]
|
||||
});
|
||||
|
@ -19,12 +19,12 @@ export class UniverseCache {
|
||||
// INSTANCE
|
||||
// ========
|
||||
public standardMessageExpiry: number;
|
||||
public destructionTime: number = 60000;
|
||||
public destructionTime: number = 10000;
|
||||
|
||||
/**
|
||||
* stores messages for this instance
|
||||
*/
|
||||
public messageMap = new Objectmap<UniverseMessage>();
|
||||
public messageMap = new Objectmap<UniverseMessage<any>>();
|
||||
|
||||
/**
|
||||
* stores the channels that are available within the universe
|
||||
@ -54,7 +54,7 @@ export class UniverseCache {
|
||||
* @param messageArg
|
||||
* @param attachedPayloadArg
|
||||
*/
|
||||
public async addMessage(messageArg: UniverseMessage) {
|
||||
public async addMessage(messageArg: UniverseMessage<any>) {
|
||||
messageArg.setUniverseCache(this);
|
||||
UniverseChannel.authorizeAMessageForAChannel(this, messageArg);
|
||||
this.messageMap.add(messageArg);
|
||||
@ -69,7 +69,7 @@ export class UniverseCache {
|
||||
public readMessagesYoungerThan(
|
||||
unixTimeArg?: number,
|
||||
channelName?: string
|
||||
): Observable<UniverseMessage> {
|
||||
): Observable<UniverseMessage<any>> {
|
||||
const messageObservable = from(this.messageMap.getArray()).pipe(
|
||||
filter(messageArg => {
|
||||
return messageArg.smartTimestamp.isYoungerThanMilliSeconds(this.destructionTime);
|
||||
|
@ -52,7 +52,7 @@ export class UniverseChannel {
|
||||
*/
|
||||
public static authorizeAMessageForAChannel(
|
||||
universeCacheArg: UniverseCache,
|
||||
universeMessageArg: UniverseMessage
|
||||
universeMessageArg: UniverseMessage<any>
|
||||
): UniverseChannel {
|
||||
const foundChannel = universeCacheArg.channelMap.find(universeChannel => {
|
||||
const result = universeChannel.authenticate(universeMessageArg);
|
||||
@ -85,7 +85,7 @@ export class UniverseChannel {
|
||||
*/
|
||||
public name: string;
|
||||
public universeRef: Universe;
|
||||
private subject = new plugins.smartrx.rxjs.Subject<UniverseMessage>();
|
||||
private subject = new plugins.smartrx.rxjs.Subject<UniverseMessage<any>>();
|
||||
|
||||
/**
|
||||
* the passphrase for the channel
|
||||
@ -103,7 +103,7 @@ export class UniverseChannel {
|
||||
* # the messages channelName against the unverseChannel's name
|
||||
* # the messages password against the universeChannel's password
|
||||
*/
|
||||
public authenticate(universeMessageArg: UniverseMessage): boolean {
|
||||
public authenticate(universeMessageArg: UniverseMessage<any>): boolean {
|
||||
return (
|
||||
this.name === universeMessageArg.targetChannelName &&
|
||||
this.passphrase === universeMessageArg.passphrase
|
||||
@ -114,10 +114,10 @@ export class UniverseChannel {
|
||||
* pushes a message to clients
|
||||
* @param messageArg
|
||||
*/
|
||||
public async push(messageArg: UniverseMessage) {
|
||||
public async push(messageArg: UniverseMessage<any>) {
|
||||
this.subject.next(messageArg);
|
||||
const universeConnectionsWithChannelAccess: UniverseConnection[] = [];
|
||||
this.universeRef.universeCache.connectionMap.forEach(async socketConnection => {
|
||||
await this.universeRef.universeCache.connectionMap.forEach(async socketConnection => {
|
||||
if (socketConnection.authenticatedChannels.includes(this)) {
|
||||
universeConnectionsWithChannelAccess.push(socketConnection);
|
||||
}
|
||||
@ -131,8 +131,7 @@ export class UniverseChannel {
|
||||
passphrase: messageArg.passphrase,
|
||||
targetChannelName: this.name,
|
||||
messageText: messageArg.messageText,
|
||||
payload: messageArg.payload,
|
||||
payloadStringType: messageArg.payloadStringType
|
||||
payload: messageArg.payload
|
||||
};
|
||||
smartsocket.clientCall(
|
||||
'processMessage',
|
||||
@ -143,8 +142,13 @@ export class UniverseChannel {
|
||||
}
|
||||
|
||||
// functions to interact with a channel locally
|
||||
public async subscribe(observer: plugins.smartrx.rxjs.Observer<any>) {
|
||||
return this.subject.subscribe(observer);
|
||||
public subscribe(observingFunctionArg: (messageArg: UniverseMessage<any>) => void) {
|
||||
return this.subject.subscribe(
|
||||
messageArg => {
|
||||
observingFunctionArg(messageArg);
|
||||
},
|
||||
error => console.log(error)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -155,11 +159,10 @@ export class UniverseChannel {
|
||||
id: plugins.smartunique.shortId(),
|
||||
messageText: messageDescriptor.messageText,
|
||||
payload: messageDescriptor.payload,
|
||||
payloadStringType: messageDescriptor.payloadStringType,
|
||||
targetChannelName: this.name,
|
||||
passphrase: this.passphrase,
|
||||
timestamp: Date.now()
|
||||
});
|
||||
this.push(messageToSend);
|
||||
this.universeRef.universeCache.addMessage(messageToSend);
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ export class UniverseConnection {
|
||||
universeConnection
|
||||
);
|
||||
universeRef.universeCache.connectionMap.add(universeConnection);
|
||||
console.log('hi');
|
||||
}
|
||||
|
||||
/**
|
||||
@ -92,6 +93,8 @@ export class UniverseConnection {
|
||||
return universeConnection;
|
||||
}
|
||||
|
||||
// INSTANCE
|
||||
public universeRef: Universe;
|
||||
public terminatedDeferred = plugins.smartpromise.defer();
|
||||
|
||||
/**
|
||||
@ -99,23 +102,34 @@ export class UniverseConnection {
|
||||
*/
|
||||
public socketConnection: plugins.smartsocket.SocketConnection;
|
||||
public authenticationRequests: Array<interfaces.ISocketRequest_SubscribeChannel['request']> = [];
|
||||
public subscribedChannels: UniverseChannel[] = [];
|
||||
public authenticatedChannels: UniverseChannel[] = [];
|
||||
public failedToJoinChannels: UniverseChannel[] = [];
|
||||
|
||||
/**
|
||||
* terminates the connection
|
||||
* disconnect the connection
|
||||
*/
|
||||
public terminateConnection() {
|
||||
this.socketConnection.socket.disconnect();
|
||||
public async disconnect(reason: 'upstreamevent' | 'triggered' = 'triggered') {
|
||||
if (reason === 'triggered') {
|
||||
await this.socketConnection.disconnect();
|
||||
}
|
||||
this.universeRef.universeCache.connectionMap.remove(this);
|
||||
this.terminatedDeferred.resolve();
|
||||
}
|
||||
|
||||
constructor(optionsArg: {
|
||||
universe: Universe;
|
||||
socketConnection: plugins.smartsocket.SocketConnection;
|
||||
authenticationRequests: Array<interfaces.ISocketRequest_SubscribeChannel['request']>;
|
||||
}) {
|
||||
this.universeRef = optionsArg.universe;
|
||||
this.authenticationRequests = optionsArg.authenticationRequests;
|
||||
this.socketConnection = optionsArg.socketConnection;
|
||||
this.socketConnection.eventSubject.subscribe(async eventArg => {
|
||||
switch (eventArg) {
|
||||
case 'disconnected':
|
||||
await this.disconnect('upstreamevent');
|
||||
break;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -13,7 +13,7 @@ import { SocketConnection } from '@pushrocks/smartsocket';
|
||||
* represents a message within a universe
|
||||
* acts as a container to save message states like authentication status
|
||||
*/
|
||||
export class UniverseMessage implements interfaces.IUniverseMessage {
|
||||
export class UniverseMessage<T> implements interfaces.IUniverseMessage {
|
||||
public static createMessageFromPayload(
|
||||
socketConnectionArg: SocketConnection,
|
||||
dataArg: interfaces.IUniverseMessage
|
||||
@ -28,8 +28,7 @@ export class UniverseMessage implements interfaces.IUniverseMessage {
|
||||
public smartTimestamp: TimeStamp;
|
||||
public messageText: string;
|
||||
public passphrase: string;
|
||||
public payload: any;
|
||||
public payloadStringType;
|
||||
public payload: T;
|
||||
public targetChannelName: string;
|
||||
public socketConnection: SocketConnection;
|
||||
|
||||
@ -65,7 +64,7 @@ export class UniverseMessage implements interfaces.IUniverseMessage {
|
||||
this.passphrase = messageDescriptor.passphrase;
|
||||
this.payload = messageDescriptor.payload;
|
||||
// prevent memory issues
|
||||
this.fallBackDestruction();
|
||||
this.setDestructionTimer();
|
||||
}
|
||||
|
||||
public setUniverseCache(universeCacheArg: UniverseCache) {
|
||||
@ -74,17 +73,25 @@ export class UniverseMessage implements interfaces.IUniverseMessage {
|
||||
|
||||
public setTargetChannel() {}
|
||||
|
||||
public setDestructionTimer(selfdestructAfterArg: number) {
|
||||
public setDestructionTimer(selfdestructAfterArg?: number) {
|
||||
if (selfdestructAfterArg) {
|
||||
this.destructionTimer = new Timer(selfdestructAfterArg);
|
||||
this.destructionTimer.start();
|
||||
|
||||
// set up self destruction by removing this from the parent messageCache
|
||||
this.destructionTimer.completed.then(async () => {
|
||||
this.destructionTimer.completed
|
||||
.then(async () => {
|
||||
this.universeCache.messageMap.remove(this);
|
||||
})
|
||||
.catch(err => {
|
||||
console.log(err);
|
||||
console.log(this);
|
||||
});
|
||||
} else {
|
||||
this.fallBackDestruction();
|
||||
plugins.smartdelay.delayFor(1000).then(() => {
|
||||
if (!this.destructionTimer) {
|
||||
this.setDestructionTimer(6000);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@ -94,15 +101,4 @@ export class UniverseMessage implements interfaces.IUniverseMessage {
|
||||
public handleAsBadMessage() {
|
||||
plugins.smartlog.defaultLogger.log('warn', 'received a bad message');
|
||||
}
|
||||
|
||||
/**
|
||||
* prevents memory leaks if channels have no default
|
||||
*/
|
||||
private fallBackDestruction() {
|
||||
plugins.smartdelay.delayFor(1000).then(() => {
|
||||
if (!this.destructionTimer) {
|
||||
this.setDestructionTimer(6000);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -3,6 +3,11 @@ import * as path from 'path';
|
||||
|
||||
export { path };
|
||||
|
||||
// apiglobal scope
|
||||
import * as typedrequestInterfaces from '@apiglobal/typedrequest-interfaces';
|
||||
|
||||
export { typedrequestInterfaces };
|
||||
|
||||
// pushrocks scope
|
||||
import * as lik from '@pushrocks/lik';
|
||||
import * as smarthash from '@pushrocks/smarthash';
|
||||
|
Reference in New Issue
Block a user