Compare commits

..

67 Commits

Author SHA1 Message Date
736240b978 1.0.93 2019-11-09 12:23:34 +01:00
73f4600c2a fix(core): update 2019-11-09 12:23:33 +01:00
40beec1166 1.0.92 2019-11-07 01:02:03 +01:00
f8690fef50 1.0.91 2019-11-07 00:59:46 +01:00
972ddbf327 fix(core): update 2019-11-07 00:59:45 +01:00
80aacd17a6 1.0.90 2019-11-03 20:23:23 +01:00
e67b3e50cc fix(core): update 2019-11-03 20:23:22 +01:00
a4a8959b74 1.0.89 2019-09-25 18:46:18 +02:00
bab0f062f7 fix(core): update 2019-09-25 18:46:18 +02:00
3bdfe4dcb4 1.0.88 2019-09-25 18:26:40 +02:00
fca960ad0d fix(core): update 2019-09-25 18:26:39 +02:00
e43ed3951c 1.0.87 2019-09-17 15:40:55 +02:00
23df304535 fix(core): update 2019-09-17 15:40:54 +02:00
9a142175aa 1.0.86 2019-09-17 14:01:24 +02:00
09b593e192 fix(core): update 2019-09-17 14:01:24 +02:00
c27fc147b5 1.0.85 2019-09-17 13:57:35 +02:00
ddde21925a fix(core): update 2019-09-17 13:57:34 +02:00
bd849d347d 1.0.84 2019-09-17 12:46:35 +02:00
f2a85d4719 fix(core): update 2019-09-17 12:46:35 +02:00
4e7c28ac83 1.0.83 2019-09-11 14:57:36 +02:00
243f1a70e9 fix(core): update 2019-09-11 14:57:36 +02:00
b5a6517756 1.0.82 2019-09-11 10:11:34 +02:00
e12b128619 fix(core): update 2019-09-11 10:11:34 +02:00
03fbab5265 1.0.81 2019-09-10 23:55:21 +02:00
1d13bf5bcc fix(core): update 2019-09-10 23:55:20 +02:00
c2052f16a8 1.0.80 2019-09-10 19:36:11 +02:00
ff7cdc908c fix(core): update 2019-09-10 19:36:10 +02:00
f3d41b8719 1.0.79 2019-09-10 18:03:47 +02:00
f9f0fc45e2 fix(core): update 2019-09-10 18:03:46 +02:00
da6b7724b8 1.0.78 2019-09-10 10:55:11 +02:00
be7ca29e4b fix(core): update 2019-09-10 10:55:10 +02:00
f401d78c4b 1.0.77 2019-09-10 10:51:18 +02:00
6ceec0201f fix(core): update 2019-09-10 10:51:18 +02:00
16ce4e09a9 1.0.76 2019-09-10 10:50:56 +02:00
2868ab686d fix(core): update 2019-09-10 10:50:55 +02:00
5dab36382f 1.0.75 2019-09-10 09:56:33 +02:00
02a32eb8c7 fix(core): update 2019-09-10 09:56:32 +02:00
b258979b5a 1.0.74 2019-09-10 01:39:39 +02:00
166e29bbf6 fix(core): update 2019-09-10 01:39:38 +02:00
870f37d403 1.0.73 2019-09-10 01:19:10 +02:00
64c4b91678 fix(core): update 2019-09-10 01:19:10 +02:00
f3e13292d8 1.0.72 2019-09-10 00:39:18 +02:00
7e1c405cb1 fix(core): update 2019-09-10 00:39:18 +02:00
d1b4672eff 1.0.71 2019-09-10 00:29:08 +02:00
0dd9fee52b fix(core): update 2019-09-10 00:29:08 +02:00
37e1ee7970 1.0.70 2019-09-01 21:34:01 +02:00
bd0bb3acf5 fix(core): update 2019-09-01 21:34:01 +02:00
f60497474e 1.0.69 2019-09-01 21:27:45 +02:00
1d84cefa84 fix(core): update 2019-09-01 21:27:45 +02:00
6792acd533 1.0.68 2019-09-01 18:22:44 +02:00
9397d89cf5 fix(core): update 2019-09-01 18:22:44 +02:00
37cf4a91f4 1.0.67 2019-09-01 17:04:25 +02:00
52db86c929 fix(core): update 2019-09-01 17:04:25 +02:00
e8f09c1b7a 1.0.66 2019-09-01 17:01:26 +02:00
79edea873f fix(core): update 2019-09-01 17:01:26 +02:00
97666a623d 1.0.65 2019-09-01 16:54:36 +02:00
ef61ea9ad7 fix(core): update 2019-09-01 16:54:36 +02:00
9c1504ef02 1.0.64 2019-08-13 18:43:33 +02:00
e8f2e04d1c fix(core): update 2019-08-13 18:43:33 +02:00
e12aa7e961 1.0.63 2019-08-13 18:41:28 +02:00
857b7cd010 fix(core): update 2019-08-13 18:41:27 +02:00
e100dea160 1.0.62 2019-08-13 18:16:17 +02:00
e8e87fcdba fix(core): update 2019-08-13 18:16:16 +02:00
0d18b11721 1.0.61 2019-08-13 18:06:14 +02:00
eaaefddbe3 fix(core): update 2019-08-13 18:06:13 +02:00
8c6946ddb6 1.0.60 2019-08-13 15:55:01 +02:00
3a7ebcdd80 fix(core): update 2019-08-13 15:55:01 +02:00
24 changed files with 997 additions and 709 deletions

View File

@ -38,17 +38,17 @@ snyk:
# test stage # test stage
# ==================== # ====================
testLTS: testStable:
stage: test stage: test
script: script:
- npmci npm prepare - npmci npm prepare
- npmci node install lts - npmci node install stable
- npmci npm install - npmci npm install
- npmci npm test - npmci npm test
coverage: /\d+.?\d+?\%\s*coverage/ coverage: /\d+.?\d+?\%\s*coverage/
tags: tags:
- docker - docker
- notpriv - priv
testBuild: testBuild:
stage: test stage: test
@ -100,7 +100,7 @@ trigger:
pages: pages:
image: hosttoday/ht-docker-dbase:npmci image: hosttoday/ht-docker-dbase:npmci
services: services:
- docker:18-dind - docker:stable-dind
stage: metadata stage: metadata
script: script:
- npmci command npm install -g @gitzone/tsdoc - npmci command npm install -g @gitzone/tsdoc

928
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
{ {
"name": "@pushrocks/smartuniverse", "name": "@pushrocks/smartuniverse",
"version": "1.0.59", "version": "1.0.93",
"private": false, "private": false,
"description": "messaging service for your micro services", "description": "messaging service for your micro services",
"main": "dist/index.js", "main": "dist/index.js",
@ -14,26 +14,28 @@
"format": "(gitzone format)" "format": "(gitzone format)"
}, },
"devDependencies": { "devDependencies": {
"@gitzone/tsbuild": "^2.1.11", "@gitzone/tsbuild": "^2.1.17",
"@gitzone/tstest": "^1.0.24", "@gitzone/tstest": "^1.0.28",
"@pushrocks/tapbundle": "^3.0.11", "@pushrocks/tapbundle": "^3.0.13",
"@types/node": "^12.7.1", "@types/node": "^12.12.6",
"tslint": "^5.18.0", "tslint": "^5.20.1",
"tslint-config-prettier": "^1.18.0" "tslint-config-prettier": "^1.18.0"
}, },
"peerDependencies": { "peerDependencies": {
"rxjs": "*" "rxjs": "*"
}, },
"dependencies": { "dependencies": {
"@pushrocks/lik": "^3.0.10", "@apiglobal/typedrequest-interfaces": "^1.0.7",
"@pushrocks/smartdelay": "^2.0.3", "@pushrocks/lik": "^3.0.11",
"@pushrocks/smartexpress": "^3.0.38", "@pushrocks/smartdelay": "^2.0.6",
"@pushrocks/smartfile": "^7.0.4", "@pushrocks/smartexpress": "^3.0.52",
"@pushrocks/smartfile": "^7.0.6",
"@pushrocks/smarthash": "^2.0.6", "@pushrocks/smarthash": "^2.0.6",
"@pushrocks/smartpromise": "^3.0.2", "@pushrocks/smartlog": "^2.0.21",
"@pushrocks/smartrequest": "^1.1.16", "@pushrocks/smartpromise": "^3.0.6",
"@pushrocks/smartrx": "^2.0.3", "@pushrocks/smartrequest": "^1.1.42",
"@pushrocks/smartsocket": "^1.1.44", "@pushrocks/smartrx": "^2.0.5",
"@pushrocks/smartsocket": "^1.1.58",
"@pushrocks/smarttime": "^3.0.12", "@pushrocks/smarttime": "^3.0.12",
"@pushrocks/smartunique": "^3.0.1" "@pushrocks/smartunique": "^3.0.1"
}, },

View File

@ -50,6 +50,6 @@ All your microservices represents clients in the universe that may talk to each
For further information read the linked docs at the top of this readme. For further information read the linked docs at the top of this readme.
> MIT licensed | **©** [Lossless GmbH](https://lossless.gmbh) > MIT licensed | **©** [Lossless GmbH](https://lossless.gmbh)
| By using this npm module you agree to our [privacy policy](https://lossless.gmbH/privacy.html) | By using this npm module you agree to our [privacy policy](https://lossless.gmbH/privacy)
[![repo-footer](https://pushrocks.gitlab.io/assets/repo-footer.svg)](https://maintainedby.lossless.com) [![repo-footer](https://lossless.gitlab.io/publicrelations/repofooter.svg)](https://maintainedby.lossless.com)

View File

@ -37,24 +37,24 @@ tap.test('create smartuniverse client', async () => {
}); });
tap.test('should add a channel to the universe', async () => { tap.test('should add a channel to the universe', async () => {
await testUniverse.addChannel(testChannelData.channelName, testChannelData.channelPass); testUniverse.addChannel(testChannelData.channelName, testChannelData.channelPass);
}); });
tap.test('should add the same channel to the client universe in the same way', async () => { tap.test('should add the same channel to the client universe in the same way', async () => {
await testClientUniverse.addChannel(testChannelData.channelName, testChannelData.channelPass); testClientUniverse.addChannel(testChannelData.channelName, testChannelData.channelPass);
}); });
tap.test('should start the ClientUniverse', async () => { tap.test('should start the ClientUniverse', async () => {
await testClientUniverse.start(); await testClientUniverse.start();
}) });
tap.test('should get a observable correctly', async () => { tap.test('should get a observable correctly', async () => {
testClientChannel = await testClientUniverse.getChannel(testChannelData.channelName); testClientChannel = testClientUniverse.getChannel(testChannelData.channelName);
expect(testClientChannel).to.be.instanceof(smartuniverse.ClientUniverseChannel); expect(testClientChannel).to.be.instanceof(smartuniverse.ClientUniverseChannel);
}); });
tap.test('should send a message correctly', async () => { tap.test('should send a message correctly', async () => {
await (await testClientUniverse.getChannel(testChannelData.channelName)).sendMessage({ await testClientUniverse.getChannel(testChannelData.channelName).sendMessage({
messageText: 'hello' messageText: 'hello'
}); });
}); });
@ -69,15 +69,62 @@ tap.test('a second client should be able to subscibe', async () => {
}); });
testClientUniverse2.addChannel(testChannelData.channelName, testChannelData.channelPass); testClientUniverse2.addChannel(testChannelData.channelName, testChannelData.channelPass);
await testClientUniverse2.start();
}); });
tap.test('should receive a message correctly', async () => {}); tap.test('should receive a message correctly', async (tools) => {
const done = tools.defer();
tap.test('should disconnect the client correctly', async () => { const testChannel = testClientUniverse.getChannel(testChannelData.channelName);
testClientUniverse.stop(); const testChannel2 = testClientUniverse2.getChannel(testChannelData.channelName);
const subscription = testChannel2.subscribe(messageArg => {
if (messageArg.messageText === 'hellothere') {
console.log('Yay##########');
done.resolve();
}
});
await testChannel.sendMessage({
messageText: 'hellothere'
});
await done.promise;
}); });
tap.test('should end the server correctly', async tools => { interface IDemoReqRes {
method: 'demo',
request: {
wowso: string;
};
response: {
hereso: string;
};
}
tap.test('ReactionRequest and ReactionResponse should work', async () => {
const reactionResponse = new smartuniverse.ReactionResponse<IDemoReqRes>({
channels: [testUniverse.getChannel(testChannelData.channelName)],
funcDef: async reqData => {
console.log(reqData);
return {
hereso: 'Hello there'
};
},
method: 'demo'
});
const reactionRequest = new smartuniverse.ReactionRequest<IDemoReqRes>({
method: 'demo'
});
const reactionResult = await reactionRequest.fire([testClientUniverse2.getChannel(testChannelData.channelName)], {
wowso: 'wowza'
});
const result = await reactionResult.getFirstResult();
console.log(result);
});
tap.test('should disconnect the client correctly', async (tools) => {
await testClientUniverse.stop();
await testClientUniverse2.stop();
});
tap.test('should end the server correctly', async (tools) => {
await testUniverse.stopServer(); await testUniverse.stopServer();
}); });

View File

@ -1,6 +1,7 @@
// Client classes // Client classes
export * from './smartuniverse.classes.clientuniverse'; export * from './smartuniverse.classes.clientuniverse';
export * from './smartuniverse.classes.clientuniversechannel'; export * from './smartuniverse.classes.clientuniversechannel';
export * from './smartuniverse.classes.clientuniversemessage';
// Server classes // Server classes
export * from './smartuniverse.classes.universe'; export * from './smartuniverse.classes.universe';
@ -8,4 +9,8 @@ export * from './smartuniverse.classes.universecache';
export * from './smartuniverse.classes.universechannel'; export * from './smartuniverse.classes.universechannel';
export * from './smartuniverse.classes.universemessage'; export * from './smartuniverse.classes.universemessage';
// Reaction Response
export * from './smartuniverse.classes.reactionrequest';
export * from './smartuniverse.classes.reactionresponse';
export * from './interfaces'; export * from './interfaces';

View File

@ -1,4 +1,5 @@
export * from './http.interfaces'; export * from './http.interfaces';
export * from './socketfunctionrequests';
export * from './universechannel.interfaces'; export * from './universechannel.interfaces';
export * from './universemessage.interfaces'; export * from './universemessage.interfaces';
export * from './universeactions.interfaces'; export * from './universeactions.interfaces';

View File

@ -0,0 +1,20 @@
import * as interfaces from './index';
export interface ISocketRequest_SubscribeChannel {
method: 'subscribeChannel';
request: {
name: string;
passphrase: string;
};
response: {
subscriptionStatus: 'subscribed' | 'unsubscribed';
};
}
export interface ISocketRequest_ProcessMessage {
method: 'processMessage';
request: interfaces.IUniverseMessage;
response: {
messageStatus: 'ok' | 'channel not found';
};
}

View File

@ -1,17 +1,3 @@
export type IServerCallActions =
| 'channelSubscription'
| 'processMessage'
| 'channelUnsubscribe'
| 'terminateConnection';
/**
* the interface for a subscription
*/
export interface IServerCallSubscribeActionPayload {
name: string;
passphrase: string;
}
export interface IServerUnsubscribeActionPayload { export interface IServerUnsubscribeActionPayload {
name: string; name: string;
} }

View File

@ -1,11 +1,10 @@
export interface IMessageCreator { export interface IMessageCreator {
messageText: string; messageText: string;
payload?: string | number | any; payload?: string | number | any;
payloadStringType?: 'Buffer' | 'string' | 'object';
} }
/** /**
* * A universe
*/ */
export interface IUniverseMessage extends IMessageCreator { export interface IUniverseMessage extends IMessageCreator {
id: string; id: string;

View File

@ -0,0 +1,10 @@
import * as plugins from './smartuniverse.plugins';
/**
* broadcasts an event to multiple channels
*/
export class BroadcastEvent<T> {
fire() {
}
};

View File

@ -0,0 +1,5 @@
import * as plugins from './smartuniverse.plugins';
export class BroadcastSUbscription {
}

View File

@ -7,7 +7,7 @@ import * as url from 'url';
import * as interfaces from './interfaces'; import * as interfaces from './interfaces';
import { ClientUniverseChannel, UniverseMessage } from './'; import { ClientUniverseChannel, ClientUniverseMessage } from './';
import { ClientUniverseCache } from './smartuniverse.classes.clientuniversecache'; import { ClientUniverseCache } from './smartuniverse.classes.clientuniversecache';
export interface IClientOptions { export interface IClientOptions {
@ -19,9 +19,9 @@ export interface IClientOptions {
* allows connecting to a universe server * allows connecting to a universe server
*/ */
export class ClientUniverse { export class ClientUniverse {
public options; public options: IClientOptions;
public smartsocketClient: plugins.smartsocket.SmartsocketClient; public smartsocketClient: plugins.smartsocket.SmartsocketClient;
public observableIntake: plugins.smartrx.ObservableIntake<UniverseMessage>; public messageRxjsSubject = new plugins.smartrx.rxjs.Subject<ClientUniverseMessage<any>>();
public clientUniverseCache = new ClientUniverseCache(); public clientUniverseCache = new ClientUniverseCache();
constructor(optionsArg: IClientOptions) { constructor(optionsArg: IClientOptions) {
@ -32,15 +32,20 @@ export class ClientUniverse {
* adds a channel to the channelcache * adds a channel to the channelcache
* TODO: verify channel before adding it to the channel cache * TODO: verify channel before adding it to the channel cache
*/ */
public async addChannel(channelNameArg: string, passphraseArg: string) { public addChannel(channelNameArg: string, passphraseArg: string) {
const existingChannel = await this.getChannel(channelNameArg); const existingChannel = this.getChannel(channelNameArg);
if (existingChannel) { if (existingChannel) {
throw new Error('channel exists'); throw new Error('channel exists');
} }
// lets create the channel // lets create the channel
await ClientUniverseChannel.createClientUniverseChannel(this, channelNameArg, passphraseArg); const clientUniverseChannel = ClientUniverseChannel.createClientUniverseChannel(
this,
channelNameArg,
passphraseArg
);
return clientUniverseChannel;
} }
/** /**
@ -48,7 +53,7 @@ export class ClientUniverse {
* @param channelName * @param channelName
* @param passphraseArg * @param passphraseArg
*/ */
public async getChannel(channelName: string): Promise<ClientUniverseChannel> { public getChannel(channelName: string): ClientUniverseChannel {
const clientUniverseChannel = this.clientUniverseCache.channelMap.find(channel => { const clientUniverseChannel = this.clientUniverseCache.channelMap.find(channel => {
return channel.name === channelName; return channel.name === channelName;
}); });
@ -60,17 +65,19 @@ export class ClientUniverse {
* @param messageArg * @param messageArg
*/ */
public removeChannel(channelNameArg, notifyServer = true) { public removeChannel(channelNameArg, notifyServer = true) {
const clientUniverseChannel = this.clientUniverseCache.channelMap.findOneAndRemove(channelItemArg => { const clientUniverseChannel = this.clientUniverseCache.channelMap.findOneAndRemove(
return channelItemArg.name === channelNameArg; channelItemArg => {
}); return channelItemArg.name === channelNameArg;
}
);
} }
public async start() { public async start() {
await this.checkConnection(); await this.checkConnection();
} }
public stop() { public async stop() {
this.smartsocketClient.disconnect(); await this.disconnect('triggered');
} }
/** /**
@ -78,18 +85,23 @@ export class ClientUniverse {
* since password validation is done through other means, a connection should always be possible * since password validation is done through other means, a connection should always be possible
*/ */
public async checkConnection(): Promise<void> { public async checkConnection(): Promise<void> {
if (!this.smartsocketClient && !this.observableIntake) { if (!this.smartsocketClient) {
const parsedURL = url.parse(this.options.serverAddress); const parsedURL = url.parse(this.options.serverAddress);
const socketConfig: plugins.smartsocket.ISmartsocketClientOptions = { const socketConfig: plugins.smartsocket.ISmartsocketClientOptions = {
alias: process.env.SOCKET_ALIAS || 'someclient', alias: 'universeclient',
password: 'UniverseClient', password: 'UniverseClient',
port: parseInt(parsedURL.port, 10), port: parseInt(parsedURL.port, 10),
role: 'UniverseClient', role: 'UniverseClient',
url: parsedURL.protocol + '//' + parsedURL.hostname url: parsedURL.protocol + '//' + parsedURL.hostname
}; };
console.log(socketConfig);
this.smartsocketClient = new SmartsocketClient(socketConfig); this.smartsocketClient = new SmartsocketClient(socketConfig);
this.observableIntake = new plugins.smartrx.ObservableIntake();
this.smartsocketClient.eventSubject.subscribe(async eventArg => {
switch(eventArg) {
case 'disconnected':
this.disconnect('upstreamEvent');
}
});
// lets define some basic actions // lets define some basic actions
@ -99,19 +111,44 @@ export class ClientUniverse {
const socketFunctionUnsubscribe = new plugins.smartsocket.SocketFunction({ const socketFunctionUnsubscribe = new plugins.smartsocket.SocketFunction({
funcName: 'unsubscribe', funcName: 'unsubscribe',
allowedRoles: [], allowedRoles: [],
funcDef: async (data: interfaces.IServerUnsubscribeActionPayload) => { funcDef: async (dataArg: interfaces.IServerUnsubscribeActionPayload) => {
throw new Error('TODO'); const channel = this.clientUniverseCache.channelMap.find(channelArg => {
return channelArg.name === dataArg.name;
});
if (channel) {
channel.unsubscribe();
}
return {};
} }
}); });
/** /**
* handles message reception * handles message reception
*/ */
const socketFunctionProcessMessage = new plugins.smartsocket.SocketFunction({ const socketFunctionProcessMessage = new plugins.smartsocket.SocketFunction<
interfaces.ISocketRequest_ProcessMessage
>({
funcName: 'processMessage', funcName: 'processMessage',
allowedRoles: [], allowedRoles: [],
funcDef: async (data: interfaces.IServerUnsubscribeActionPayload) => { funcDef: async messageDescriptorArg => {
console.log('Got message from server'); plugins.smartlog.defaultLogger.log('info', 'Got message from server');
const clientUniverseMessage = ClientUniverseMessage.createMessageFromMessageDescriptor(
messageDescriptorArg
);
this.messageRxjsSubject.next(clientUniverseMessage);
// lets find the corresponding channel
const targetChannel = this.getChannel(clientUniverseMessage.targetChannelName);
if (targetChannel) {
await targetChannel.emitMessageLocally(clientUniverseMessage);
return {
messageStatus: 'ok'
};
} else {
return {
messageStatus: 'channel not found'
};
}
} }
}); });
@ -120,10 +157,21 @@ export class ClientUniverse {
this.smartsocketClient.addSocketFunction(socketFunctionProcessMessage); this.smartsocketClient.addSocketFunction(socketFunctionProcessMessage);
await this.smartsocketClient.connect(); await this.smartsocketClient.connect();
console.log('universe client connected successfully'); plugins.smartlog.defaultLogger.log('info', 'universe client connected successfully');
await this.clientUniverseCache.channelMap.forEach(async clientUniverseChannelArg => { await this.clientUniverseCache.channelMap.forEach(async clientUniverseChannelArg => {
await clientUniverseChannelArg.subscribe(); await clientUniverseChannelArg.populateSubscriptionToServer();
}); });
} }
} }
public async disconnect(reason: 'upstreamEvent' | 'triggered' = 'triggered', tryReconnect = false) {
if ('triggered') {
this.smartsocketClient.disconnect();
}
this.smartsocketClient = null;
if (tryReconnect) {
await plugins.smartdelay.delayForRandom(5000, 20000);
this.checkConnection();
}
}
} }

View File

@ -2,6 +2,9 @@ import * as plugins from './smartuniverse.plugins';
import * as interfaces from './interfaces'; import * as interfaces from './interfaces';
import { ClientUniverse } from './'; import { ClientUniverse } from './';
import { ClientUniverseMessage } from './smartuniverse.classes.clientuniversemessage';
import { ReactionRequest } from './smartuniverse.classes.reactionrequest';
import { ReactionResponse } from './smartuniverse.classes.reactionresponse';
export class ClientUniverseChannel implements interfaces.IUniverseChannel { export class ClientUniverseChannel implements interfaces.IUniverseChannel {
// ====== // ======
@ -13,11 +16,11 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
* @param channelNameArg * @param channelNameArg
* @param passphraseArg * @param passphraseArg
*/ */
public static async createClientUniverseChannel( public static createClientUniverseChannel(
clientUniverseArg: ClientUniverse, clientUniverseArg: ClientUniverse,
channelNameArg: string, channelNameArg: string,
passphraseArg: string passphraseArg: string
): Promise<ClientUniverseChannel> { ): ClientUniverseChannel {
const clientChannel = new ClientUniverseChannel( const clientChannel = new ClientUniverseChannel(
clientUniverseArg, clientUniverseArg,
channelNameArg, channelNameArg,
@ -34,6 +37,8 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
// properties // properties
public name: string; public name: string;
public passphrase: string; public passphrase: string;
public status: 'subscribed' | 'unsubscribed' = 'unsubscribed';
private subject = new plugins.smartrx.rxjs.Subject<ClientUniverseMessage<any>>();
// refs // refs
public clientUniverseRef: ClientUniverse; public clientUniverseRef: ClientUniverse;
@ -48,13 +53,35 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
* subscribes to a channel * subscribes to a channel
* tells the universe about this instances interest into a channel * tells the universe about this instances interest into a channel
*/ */
public async subscribe() { public subscribe(observingFunctionArg: (messageArg: ClientUniverseMessage<any>) => void) {
const serverCallActionName: interfaces.IServerCallActions = 'channelSubscription';
const serverCallActionPayload: interfaces.IServerCallSubscribeActionPayload = { return this.subject.subscribe(
name: this.name, messageArg => {
passphrase: this.passphrase observingFunctionArg(messageArg);
}; },
await this.clientUniverseRef.smartsocketClient.serverCall(serverCallActionName, serverCallActionPayload); error => console.log(error)
);
}
public unsubscribe() {
// TODO: unsubscribe all users
}
public async populateSubscriptionToServer() {
// lets make sure the channel is connected
if (this.status === 'unsubscribed') {
const response = await this.clientUniverseRef.smartsocketClient.serverCall<
interfaces.ISocketRequest_SubscribeChannel
>('subscribeChannel', {
name: this.name,
passphrase: this.passphrase
});
this.status = response.subscriptionStatus;
}
}
public async emitMessageLocally(messageArg: ClientUniverseMessage<any>) {
this.subject.next(messageArg);
} }
/** /**
@ -69,9 +96,11 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
passphrase: this.passphrase, passphrase: this.passphrase,
targetChannelName: this.name, targetChannelName: this.name,
messageText: messageArg.messageText, messageText: messageArg.messageText,
payload: messageArg.payload, payload: messageArg.payload
payloadStringType: messageArg.payloadStringType
}; };
await this.clientUniverseRef.smartsocketClient.serverCall('processMessage', universeMessageToSend); await this.clientUniverseRef.smartsocketClient.serverCall(
'processMessage',
universeMessageToSend
);
} }
} }

View File

@ -2,11 +2,14 @@ import * as plugins from './smartuniverse.plugins';
import * as interfaces from './interfaces'; import * as interfaces from './interfaces';
export class ClientUniverseMessage implements interfaces.IUniverseMessage { export class ClientUniverseMessage<T> implements interfaces.IUniverseMessage {
// ====== // ======
// STATIC // STATIC
// ====== // ======
public static createMessageFromPayload(messageDescriptor: interfaces.IUniverseMessage) {} public static createMessageFromMessageDescriptor(messageDescriptor: interfaces.IUniverseMessage) {
const clientuniverseMessage = new ClientUniverseMessage(messageDescriptor);
return clientuniverseMessage;
}
// ======== // ========
// INSTANCE // INSTANCE
@ -19,15 +22,17 @@ export class ClientUniverseMessage implements interfaces.IUniverseMessage {
public smartTimestamp: plugins.smarttime.TimeStamp; public smartTimestamp: plugins.smarttime.TimeStamp;
public messageText: string; public messageText: string;
public passphrase: string; public passphrase: string;
public payload: any; public payload: T;
public payloadStringType;
public targetChannelName: string; public targetChannelName: string;
constructor(messageArg: interfaces.IUniverseMessage, payloadArg) { constructor(messageArg: interfaces.IUniverseMessage) {
for (const key of Object.keys(messageArg)) { for (const key of Object.keys(messageArg)) {
this[key] = messageArg[key]; this[key] = messageArg[key];
} }
} }
/**
* gets json for payload
*/
getAsJsonForPayload() {} getAsJsonForPayload() {}
} }

View File

@ -0,0 +1,66 @@
import * as plugins from './smartuniverse.plugins';
import { UniverseChannel } from './smartuniverse.classes.universechannel';
import { ClientUniverseChannel } from './smartuniverse.classes.clientuniversechannel';
import { ReactionResult } from './smartuniverse.classes.reactionresult';
import { UniverseMessage } from './smartuniverse.classes.universemessage';
import { ClientUniverseMessage } from './smartuniverse.classes.clientuniversemessage';
export interface IReactionRequestConstructorOptions<T extends plugins.typedrequestInterfaces.ITypedRequest> {
method: T['method'];
}
export interface ICombinatorPayload<T extends plugins.typedrequestInterfaces.ITypedRequest> {
/**
* needed for tying responses to requests
*/
id: string;
typedRequestPayload: {
method: T['method'],
request : T['request'],
response: T['response']
};
}
export class ReactionRequest<T extends plugins.typedrequestInterfaces.ITypedRequest> {
public method: T['method'];
constructor(optionsArg: IReactionRequestConstructorOptions<T>) {
this.method = optionsArg.method;
}
public async fire(channelsArg: Array<UniverseChannel | ClientUniverseChannel>, requestDataArg: T['request'], timeoutMillisArg=5000) {
const subscriptionMap = new plugins.lik.Objectmap<plugins.smartrx.rxjs.Subscription>();
const reactionResult = new ReactionResult<T>();
const requestId = plugins.smartunique.shortId();
for (const channel of channelsArg) {
subscriptionMap.add(channel.subscribe((messageArg: UniverseMessage<ICombinatorPayload<T>> | ClientUniverseMessage<ICombinatorPayload<T>>) => {
if (messageArg.messageText === 'reactionResponse' && messageArg.payload.typedRequestPayload.method === this.method) {
const payload: ICombinatorPayload<T> = messageArg.payload;
if (payload.id !== requestId) {
return;
}
reactionResult.pushReactionResponse(payload.typedRequestPayload.response);
}
}));
const payload: ICombinatorPayload<T> = {
id: requestId,
typedRequestPayload: {
method: this.method,
request: requestDataArg,
response: null
}
};
channel.sendMessage({
messageText: 'reactionRequest',
payload
});
}
plugins.smartdelay.delayFor(timeoutMillisArg).then(async () => {
await subscriptionMap.forEach(subscriptionArg => {
subscriptionArg.unsubscribe();
});
reactionResult.complete();
});
return reactionResult;
}
}

View File

@ -0,0 +1,59 @@
import * as plugins from './smartuniverse.plugins';
import { ICombinatorPayload } from './smartuniverse.classes.reactionrequest';
import { UniverseChannel } from './smartuniverse.classes.universechannel';
import { ClientUniverseChannel } from './smartuniverse.classes.clientuniversechannel';
import { UniverseMessage } from './smartuniverse.classes.universemessage';
import { ClientUniverseMessage } from './smartuniverse.classes.clientuniversemessage';
export type TReactionResponseFuncDef<T extends plugins.typedrequestInterfaces.ITypedRequest> = (dataArg: T['request']) => Promise<T['response']>;
export interface IReactionResponseConstructorOptions<
T extends plugins.typedrequestInterfaces.ITypedRequest
> {
method: T['method'];
channels: Array<UniverseChannel | ClientUniverseChannel>;
funcDef: TReactionResponseFuncDef<T>;
}
export class ReactionResponse<T extends plugins.typedrequestInterfaces.ITypedRequest> {
public method: T['method'];
public channels = new plugins.lik.Objectmap<UniverseChannel | ClientUniverseChannel>();
public funcDef: TReactionResponseFuncDef<T>;
constructor(optionsArg: IReactionResponseConstructorOptions<T>) {
this.method = optionsArg.method;
this.channels.addArray(optionsArg.channels);
this.funcDef = optionsArg.funcDef;
for (const channel of this.channels.getArray()) {
channel.subscribe(messageArg => {
this.processMessageForReaction(channel, messageArg);
});
}
}
private async processMessageForReaction(
channelArg: UniverseChannel | ClientUniverseChannel,
messageArg:
| UniverseMessage<ICombinatorPayload<T>>
| ClientUniverseMessage<ICombinatorPayload<T>>
) {
if (
messageArg.messageText === 'reactionRequest' &&
messageArg.payload.typedRequestPayload.method === this.method
) {
const response: T['response'] = await this.funcDef(messageArg.payload.typedRequestPayload.request);
const payload: ICombinatorPayload<T> = {
...messageArg.payload,
typedRequestPayload: {
...messageArg.payload.typedRequestPayload,
response
}
};
channelArg.sendMessage({
messageText: 'reactionResponse',
payload
});
}
}
}

View File

@ -0,0 +1,52 @@
import * as plugins from './smartuniverse.plugins';
import { ReactionResponse } from './smartuniverse.classes.reactionresponse';
export class ReactionResult<T extends plugins.typedrequestInterfaces.ITypedRequest> {
private resultReplaySubject = new plugins.smartrx.rxjs.ReplaySubject<T['response']>();
private endResult: Array<T['response']> = [];
private completeDeferred = plugins.smartpromise.defer<Array<T['response']>>();
constructor () {
this.resultSubscribe(responseArg => {
this.endResult.push(responseArg);
});
}
public resultSubscribe(observerArg: (responseArg: T['response']) => void) {
return this.resultReplaySubject.subscribe(observerArg);
}
/**
* gets the end result as an array of all results
*/
public async getEndResult() {
const result = await this.completeDeferred.promise;
return result;
}
/**
* if there is a single respondant, or you are only interested in the first result
*/
public async getFirstResult() {
const done = plugins.smartpromise.defer<T['response']>();
const subscription = this.resultReplaySubject.subscribe(result => {
done.resolve(result);
subscription.unsubscribe();
});
return await done.promise;
}
/**
* push a reactionResponse
*/
public async pushReactionResponse(responseArg: T['response']) {
this.resultReplaySubject.next(responseArg);
}
/**
* completes the ReactionResult
*/
public async complete() {
this.completeDeferred.resolve(this.endResult);
}
}

View File

@ -10,6 +10,7 @@ import { UniverseConnection } from './smartuniverse.classes.universeconnection';
export interface ISmartUniverseConstructorOptions { export interface ISmartUniverseConstructorOptions {
messageExpiryInMilliseconds: number; messageExpiryInMilliseconds: number;
externalServer?: plugins.smartexpress.Server;
} }
/** /**
@ -59,8 +60,18 @@ export class Universe {
/** /**
* adds a channel to the Universe * adds a channel to the Universe
*/ */
public async addChannel(nameArg: string, passphraseArg: string) { public addChannel(nameArg: string, passphraseArg: string) {
const newChannel = UniverseChannel.createChannel(this, nameArg, passphraseArg); const newChannel = UniverseChannel.createChannel(this, nameArg, passphraseArg);
return newChannel;
}
/**
* returns a channel
*/
public getChannel(channelNameArg: string) {
return this.universeCache.channelMap.find(channelArg => {
return channelArg.name === channelNameArg;
});
} }
/** /**
@ -68,14 +79,19 @@ export class Universe {
*/ */
public async start(portArg: number) { public async start(portArg: number) {
// lets create the base smartexpress server // lets create the base smartexpress server
this.smartexpressServer = new plugins.smartexpress.Server({ if (!this.options.externalServer) {
cors: true, this.smartexpressServer = new plugins.smartexpress.Server({
defaultAnswer: async () => { cors: true,
return `smartuniverse server ${this.getUniverseVersion()}`; defaultAnswer: async () => {
}, return `smartuniverse server ${this.getUniverseVersion()}`;
forceSsl: false, },
port: portArg forceSsl: false,
}); port: portArg
});
} else {
console.log('Universe is using externally supplied server');
this.smartexpressServer = this.options.externalServer;
}
// add websocket upgrade // add websocket upgrade
this.smartsocket = new plugins.smartsocket.Smartsocket({}); this.smartsocket = new plugins.smartsocket.Smartsocket({});
@ -89,24 +105,21 @@ export class Universe {
// add the role to smartsocket // add the role to smartsocket
this.smartsocket.addSocketRoles([ClientRole]); this.smartsocket.addSocketRoles([ClientRole]);
const socketFunctionSubscription = new plugins.smartsocket.SocketFunction({ const socketFunctionSubscription = new plugins.smartsocket.SocketFunction<
interfaces.ISocketRequest_SubscribeChannel
>({
allowedRoles: [ClientRole], // there is only one client role, Authentication happens on another level allowedRoles: [ClientRole], // there is only one client role, Authentication happens on another level
funcName: 'channelSubscription', funcName: 'subscribeChannel',
funcDef: async ( funcDef: async (dataArg, socketConnectionArg) => {
dataArg: interfaces.IServerCallSubscribeActionPayload, const universeConnection = new UniverseConnection({
socketConnectionArg universe: this,
) => { socketConnection: socketConnectionArg,
// run in "this context" of this class authenticationRequests: [dataArg]
await (async () => { });
const universeConnection = new UniverseConnection({ await UniverseConnection.addConnectionToCache(this, universeConnection);
socketConnection: socketConnectionArg, return {
authenticationRequests: [dataArg] subscriptionStatus: 'subscribed'
}); };
await UniverseConnection.addConnectionToCache(this, universeConnection);
return {
'subscription status': 'success'
};
})();
} }
}); });
@ -114,29 +127,36 @@ export class Universe {
allowedRoles: [ClientRole], // there is only one client role, Authentication happens on another level allowedRoles: [ClientRole], // there is only one client role, Authentication happens on another level
funcName: 'processMessage', funcName: 'processMessage',
funcDef: async (dataArg: interfaces.IUniverseMessage, socketConnectionArg) => { funcDef: async (dataArg: interfaces.IUniverseMessage, socketConnectionArg) => {
// run in "this" context of this class const universeConnection = UniverseConnection.findUniverseConnectionBySocketConnection(
await (async () => { this.universeCache,
const universeConnection = UniverseConnection.findUniverseConnectionBySocketConnection( socketConnectionArg
this.universeCache, );
socketConnectionArg if (universeConnection) {
plugins.smartlog.defaultLogger.log(
'ok',
'found UniverseConnection for socket for incoming message'
); );
if (universeConnection) { } else {
console.log('found UniverseConnection for socket'); plugins.smartlog.defaultLogger.log(
} else { 'warn',
return { 'found no Authorized channel for incoming message'
error: 'You need to authenticate for a channel'
};
}
const unauthenticatedMessage = UniverseMessage.createMessageFromPayload(socketConnectionArg, dataArg);
const foundChannel = await UniverseChannel.authorizeAMessageForAChannel(
this.universeCache,
unauthenticatedMessage
); );
if (foundChannel && unauthenticatedMessage.authenticated) { return {
const authenticatedMessage = unauthenticatedMessage; error: 'You need to authenticate for a channel'
await this.universeCache.addMessage(authenticatedMessage); };
} }
})(); const unauthenticatedMessage = UniverseMessage.createMessageFromPayload(
socketConnectionArg,
dataArg
);
const foundChannel = await UniverseChannel.authorizeAMessageForAChannel(
this.universeCache,
unauthenticatedMessage
);
if (foundChannel && unauthenticatedMessage.authenticated) {
const authenticatedMessage = unauthenticatedMessage;
await this.universeCache.addMessage(authenticatedMessage);
}
} }
}); });
@ -144,12 +164,15 @@ export class Universe {
this.smartsocket.addSocketFunction(socketFunctionSubscription); this.smartsocket.addSocketFunction(socketFunctionSubscription);
this.smartsocket.addSocketFunction(socketFunctionProcessMessage); this.smartsocket.addSocketFunction(socketFunctionProcessMessage);
// start the server
if (!this.options.externalServer) {
await this.smartexpressServer.start();
}
// add smartsocket to the running smartexpress app // add smartsocket to the running smartexpress app
this.smartsocket.setExternalServer('smartexpress', this.smartexpressServer as any); await this.smartsocket.setExternalServer('smartexpress', this.smartexpressServer);
// start everything
await this.smartexpressServer.start();
await this.smartsocket.start(); await this.smartsocket.start();
console.log('started universe'); plugins.smartlog.defaultLogger.log('success', 'started universe');
} }
/** /**
@ -157,6 +180,8 @@ export class Universe {
*/ */
public async stopServer() { public async stopServer() {
await this.smartsocket.stop(); await this.smartsocket.stop();
await this.smartexpressServer.stop(); if (!this.options.externalServer) {
await this.smartexpressServer.stop();
}
} }
} }

View File

@ -19,12 +19,12 @@ export class UniverseCache {
// INSTANCE // INSTANCE
// ======== // ========
public standardMessageExpiry: number; public standardMessageExpiry: number;
public destructionTime: number = 60000; public destructionTime: number = 10000;
/** /**
* stores messages for this instance * stores messages for this instance
*/ */
public messageMap = new Objectmap<UniverseMessage>(); public messageMap = new Objectmap<UniverseMessage<any>>();
/** /**
* stores the channels that are available within the universe * stores the channels that are available within the universe
@ -54,12 +54,12 @@ export class UniverseCache {
* @param messageArg * @param messageArg
* @param attachedPayloadArg * @param attachedPayloadArg
*/ */
public async addMessage(messageArg: UniverseMessage) { public async addMessage(messageArg: UniverseMessage<any>) {
messageArg.setUniverseCache(this); messageArg.setUniverseCache(this);
UniverseChannel.authorizeAMessageForAChannel(this, messageArg); UniverseChannel.authorizeAMessageForAChannel(this, messageArg);
this.messageMap.add(messageArg); this.messageMap.add(messageArg);
messageArg.universeChannelList.forEach(universeChannel => { messageArg.universeChannelList.forEach(universeChannel => {
universeChannel.pushToClients(messageArg); universeChannel.push(messageArg);
}); });
} }
@ -69,7 +69,7 @@ export class UniverseCache {
public readMessagesYoungerThan( public readMessagesYoungerThan(
unixTimeArg?: number, unixTimeArg?: number,
channelName?: string channelName?: string
): Observable<UniverseMessage> { ): Observable<UniverseMessage<any>> {
const messageObservable = from(this.messageMap.getArray()).pipe( const messageObservable = from(this.messageMap.getArray()).pipe(
filter(messageArg => { filter(messageArg => {
return messageArg.smartTimestamp.isYoungerThanMilliSeconds(this.destructionTime); return messageArg.smartTimestamp.isYoungerThanMilliSeconds(this.destructionTime);

View File

@ -52,7 +52,7 @@ export class UniverseChannel {
*/ */
public static authorizeAMessageForAChannel( public static authorizeAMessageForAChannel(
universeCacheArg: UniverseCache, universeCacheArg: UniverseCache,
universeMessageArg: UniverseMessage universeMessageArg: UniverseMessage<any>
): UniverseChannel { ): UniverseChannel {
const foundChannel = universeCacheArg.channelMap.find(universeChannel => { const foundChannel = universeCacheArg.channelMap.find(universeChannel => {
const result = universeChannel.authenticate(universeMessageArg); const result = universeChannel.authenticate(universeMessageArg);
@ -61,12 +61,12 @@ export class UniverseChannel {
if (foundChannel) { if (foundChannel) {
universeMessageArg.authenticated = true; universeMessageArg.authenticated = true;
universeMessageArg.universeChannelList.add(foundChannel); universeMessageArg.universeChannelList.add(foundChannel);
console.log('message authorized'); plugins.smartlog.defaultLogger.log('ok', 'message authorized');
return foundChannel; return foundChannel;
} else { } else {
universeMessageArg.authenticated = false; universeMessageArg.authenticated = false;
universeMessageArg.universeChannelList.add(universeCacheArg.blackListChannel); universeMessageArg.universeChannelList.add(universeCacheArg.blackListChannel);
console.log('message not valid'); plugins.smartlog.defaultLogger.log('warn', 'message not valid');
return null; return null;
} }
} }
@ -85,6 +85,7 @@ export class UniverseChannel {
*/ */
public name: string; public name: string;
public universeRef: Universe; public universeRef: Universe;
private subject = new plugins.smartrx.rxjs.Subject<UniverseMessage<any>>();
/** /**
* the passphrase for the channel * the passphrase for the channel
@ -102,7 +103,7 @@ export class UniverseChannel {
* # the messages channelName against the unverseChannel's name * # the messages channelName against the unverseChannel's name
* # the messages password against the universeChannel's password * # the messages password against the universeChannel's password
*/ */
public authenticate(universeMessageArg: UniverseMessage): boolean { public authenticate(universeMessageArg: UniverseMessage<any>): boolean {
return ( return (
this.name === universeMessageArg.targetChannelName && this.name === universeMessageArg.targetChannelName &&
this.passphrase === universeMessageArg.passphrase this.passphrase === universeMessageArg.passphrase
@ -113,25 +114,55 @@ export class UniverseChannel {
* pushes a message to clients * pushes a message to clients
* @param messageArg * @param messageArg
*/ */
public async pushToClients(messageArg: UniverseMessage) { public async push(messageArg: UniverseMessage<any>) {
this.subject.next(messageArg);
const universeConnectionsWithChannelAccess: UniverseConnection[] = []; const universeConnectionsWithChannelAccess: UniverseConnection[] = [];
this.universeRef.universeCache.connectionMap.forEach(async socketConnection => { await this.universeRef.universeCache.connectionMap.forEach(async socketConnection => {
if (socketConnection.authenticatedChannels.includes(this)) { if (socketConnection.authenticatedChannels.includes(this)) {
universeConnectionsWithChannelAccess.push(socketConnection); universeConnectionsWithChannelAccess.push(socketConnection);
} }
}); });
for (const universeConnection of universeConnectionsWithChannelAccess) { for (const universeConnection of universeConnectionsWithChannelAccess) {
const smartsocket = universeConnection.socketConnection.smartsocketRef as plugins.smartsocket.Smartsocket; const smartsocket = universeConnection.socketConnection
.smartsocketRef as plugins.smartsocket.Smartsocket;
const universeMessageToSend: interfaces.IUniverseMessage = { const universeMessageToSend: interfaces.IUniverseMessage = {
id: messageArg.id, id: messageArg.id,
timestamp: messageArg.timestamp, timestamp: messageArg.timestamp,
passphrase: messageArg.passphrase, passphrase: messageArg.passphrase,
targetChannelName: this.name, targetChannelName: this.name,
messageText: messageArg.messageText, messageText: messageArg.messageText,
payload: messageArg.payload, payload: messageArg.payload
payloadStringType: messageArg.payloadStringType
}; };
smartsocket.clientCall('processMessage', universeMessageToSend, universeConnection.socketConnection); smartsocket.clientCall(
'processMessage',
universeMessageToSend,
universeConnection.socketConnection
);
} }
} }
// functions to interact with a channel locally
public subscribe(observingFunctionArg: (messageArg: UniverseMessage<any>) => void) {
return this.subject.subscribe(
messageArg => {
observingFunctionArg(messageArg);
},
error => console.log(error)
);
}
/**
* sends a message to the channel
*/
public async sendMessage(messageDescriptor: interfaces.IMessageCreator) {
const messageToSend = new UniverseMessage({
id: plugins.smartunique.shortId(),
messageText: messageDescriptor.messageText,
payload: messageDescriptor.payload,
targetChannelName: this.name,
passphrase: this.passphrase,
timestamp: Date.now()
});
this.universeRef.universeCache.addMessage(messageToSend);
}
} }

View File

@ -26,6 +26,7 @@ export class UniverseConnection {
universeConnection universeConnection
); );
universeRef.universeCache.connectionMap.add(universeConnection); universeRef.universeCache.connectionMap.add(universeConnection);
console.log('hi');
} }
/** /**
@ -58,8 +59,10 @@ export class UniverseConnection {
universeConnectionArg: UniverseConnection universeConnectionArg: UniverseConnection
): Promise<UniverseConnection> { ): Promise<UniverseConnection> {
for (const authenticationRequest of universeConnectionArg.authenticationRequests) { for (const authenticationRequest of universeConnectionArg.authenticationRequests) {
// TODO: authenticate channel subscriptions const universeChannelToAuthenticateAgainst = UniverseChannel.getUniverseChannelByName(
const universeChannelToAuthenticateAgainst = UniverseChannel.getUniverseChannelByName(universeRef, authenticationRequest.name); universeRef,
authenticationRequest.name
);
if (universeChannelToAuthenticateAgainst.passphrase === authenticationRequest.passphrase) { if (universeChannelToAuthenticateAgainst.passphrase === authenticationRequest.passphrase) {
universeConnectionArg.authenticatedChannels.push(universeChannelToAuthenticateAgainst); universeConnectionArg.authenticatedChannels.push(universeChannelToAuthenticateAgainst);
} }
@ -74,7 +77,6 @@ export class UniverseConnection {
connectionArg1: UniverseConnection, connectionArg1: UniverseConnection,
connectionArg2: UniverseConnection connectionArg2: UniverseConnection
) { ) {
// TODO: merge connections
return connectionArg1; return connectionArg1;
} }
@ -91,30 +93,42 @@ export class UniverseConnection {
return universeConnection; return universeConnection;
} }
// INSTANCE
public universeRef: Universe;
public terminatedDeferred = plugins.smartpromise.defer(); public terminatedDeferred = plugins.smartpromise.defer();
/** /**
* the socketClient to ping * the socketClient to ping
*/ */
public socketConnection: plugins.smartsocket.SocketConnection; public socketConnection: plugins.smartsocket.SocketConnection;
public authenticationRequests: interfaces.IServerCallSubscribeActionPayload[] = []; public authenticationRequests: Array<interfaces.ISocketRequest_SubscribeChannel['request']> = [];
public subscribedChannels: UniverseChannel[] = [];
public authenticatedChannels: UniverseChannel[] = []; public authenticatedChannels: UniverseChannel[] = [];
public failedToJoinChannels: UniverseChannel[] = []; public failedToJoinChannels: UniverseChannel[] = [];
/** /**
* terminates the connection * disconnect the connection
*/ */
public terminateConnection() { public async disconnect(reason: 'upstreamevent' | 'triggered' = 'triggered') {
this.socketConnection.socket.disconnect(); if (reason === 'triggered') {
await this.socketConnection.disconnect();
}
this.universeRef.universeCache.connectionMap.remove(this);
this.terminatedDeferred.resolve(); this.terminatedDeferred.resolve();
} }
constructor(optionsArg: { constructor(optionsArg: {
universe: Universe;
socketConnection: plugins.smartsocket.SocketConnection; socketConnection: plugins.smartsocket.SocketConnection;
authenticationRequests: interfaces.IServerCallSubscribeActionPayload[]; authenticationRequests: Array<interfaces.ISocketRequest_SubscribeChannel['request']>;
}) { }) {
this.authenticationRequests = optionsArg.authenticationRequests; this.authenticationRequests = optionsArg.authenticationRequests;
this.socketConnection = optionsArg.socketConnection; this.socketConnection = optionsArg.socketConnection;
this.socketConnection.eventSubject.subscribe(async eventArg => {
switch (eventArg) {
case 'disconnected':
await this.disconnect('upstreamevent');
break;
}
});
} }
} }

View File

@ -13,8 +13,11 @@ import { SocketConnection } from '@pushrocks/smartsocket';
* represents a message within a universe * represents a message within a universe
* acts as a container to save message states like authentication status * acts as a container to save message states like authentication status
*/ */
export class UniverseMessage implements interfaces.IUniverseMessage { export class UniverseMessage<T> implements interfaces.IUniverseMessage {
public static createMessageFromPayload(socketConnectionArg: SocketConnection, dataArg: interfaces.IUniverseMessage) { public static createMessageFromPayload(
socketConnectionArg: SocketConnection,
dataArg: interfaces.IUniverseMessage
) {
const universeMessageInstance = new UniverseMessage(dataArg); const universeMessageInstance = new UniverseMessage(dataArg);
universeMessageInstance.socketConnection = socketConnectionArg; universeMessageInstance.socketConnection = socketConnectionArg;
return universeMessageInstance; return universeMessageInstance;
@ -25,8 +28,7 @@ export class UniverseMessage implements interfaces.IUniverseMessage {
public smartTimestamp: TimeStamp; public smartTimestamp: TimeStamp;
public messageText: string; public messageText: string;
public passphrase: string; public passphrase: string;
public payload: any; public payload: T;
public payloadStringType;
public targetChannelName: string; public targetChannelName: string;
public socketConnection: SocketConnection; public socketConnection: SocketConnection;
@ -62,28 +64,32 @@ export class UniverseMessage implements interfaces.IUniverseMessage {
this.passphrase = messageDescriptor.passphrase; this.passphrase = messageDescriptor.passphrase;
this.payload = messageDescriptor.payload; this.payload = messageDescriptor.payload;
// prevent memory issues // prevent memory issues
this.fallBackDestruction(); this.setDestructionTimer();
} }
public setUniverseCache(universeCacheArg: UniverseCache) { public setUniverseCache(universeCacheArg: UniverseCache) {
this.universeCache = universeCacheArg; this.universeCache = universeCacheArg;
} }
public setTargetChannel() { public setTargetChannel() {}
}
public setDestructionTimer(selfdestructAfterArg: number) { public setDestructionTimer(selfdestructAfterArg?: number) {
if (selfdestructAfterArg) { if (selfdestructAfterArg) {
this.destructionTimer = new Timer(selfdestructAfterArg); this.destructionTimer = new Timer(selfdestructAfterArg);
this.destructionTimer.start(); this.destructionTimer.start();
// set up self destruction by removing this from the parent messageCache // set up self destruction by removing this from the parent messageCache
this.destructionTimer.completed.then(async () => { this.destructionTimer.completed.then(async () => {
this.universeCache.messageMap.remove(this); this.universeCache.messageMap.remove(this);
}).catch(err => {
console.log(err);
console.log(this);
}); });
} else { } else {
this.fallBackDestruction(); plugins.smartdelay.delayFor(1000).then(() => {
if (!this.destructionTimer) {
this.setDestructionTimer(6000);
}
});
} }
} }
@ -91,17 +97,6 @@ export class UniverseMessage implements interfaces.IUniverseMessage {
* handles bad messages for further analysis * handles bad messages for further analysis
*/ */
public handleAsBadMessage() { public handleAsBadMessage() {
console.log('received a bad message'); plugins.smartlog.defaultLogger.log('warn', 'received a bad message');
}
/**
* prevents memory leaks if channels have no default
*/
private fallBackDestruction() {
plugins.smartdelay.delayFor(1000).then(() => {
if (!this.destructionTimer) {
this.setDestructionTimer(6000);
}
});
} }
} }

View File

@ -3,12 +3,20 @@ import * as path from 'path';
export { path }; export { path };
// apiglobal scope
import * as typedrequestInterfaces from '@apiglobal/typedrequest-interfaces';
export {
typedrequestInterfaces
};
// pushrocks scope // pushrocks scope
import * as lik from '@pushrocks/lik'; import * as lik from '@pushrocks/lik';
import * as smarthash from '@pushrocks/smarthash'; import * as smarthash from '@pushrocks/smarthash';
import * as smartdelay from '@pushrocks/smartdelay'; import * as smartdelay from '@pushrocks/smartdelay';
import * as smartexpress from '@pushrocks/smartexpress'; import * as smartexpress from '@pushrocks/smartexpress';
import * as smartfile from '@pushrocks/smartfile'; import * as smartfile from '@pushrocks/smartfile';
import * as smartlog from '@pushrocks/smartlog';
import * as smartpromise from '@pushrocks/smartpromise'; import * as smartpromise from '@pushrocks/smartpromise';
import * as smartrequest from '@pushrocks/smartrequest'; import * as smartrequest from '@pushrocks/smartrequest';
import * as smartrx from '@pushrocks/smartrx'; import * as smartrx from '@pushrocks/smartrx';
@ -22,6 +30,7 @@ export {
smartdelay, smartdelay,
smartexpress, smartexpress,
smartfile, smartfile,
smartlog,
smartpromise, smartpromise,
smartrx, smartrx,
smartrequest, smartrequest,