Compare commits

...

57 Commits

Author SHA1 Message Date
9c1504ef02 1.0.64 2019-08-13 18:43:33 +02:00
e8f2e04d1c fix(core): update 2019-08-13 18:43:33 +02:00
e12aa7e961 1.0.63 2019-08-13 18:41:28 +02:00
857b7cd010 fix(core): update 2019-08-13 18:41:27 +02:00
e100dea160 1.0.62 2019-08-13 18:16:17 +02:00
e8e87fcdba fix(core): update 2019-08-13 18:16:16 +02:00
0d18b11721 1.0.61 2019-08-13 18:06:14 +02:00
eaaefddbe3 fix(core): update 2019-08-13 18:06:13 +02:00
8c6946ddb6 1.0.60 2019-08-13 15:55:01 +02:00
3a7ebcdd80 fix(core): update 2019-08-13 15:55:01 +02:00
ec2afbfd55 1.0.59 2019-08-13 15:48:21 +02:00
89feeca735 fix(core): update 2019-08-13 15:48:20 +02:00
c4261765ec 1.0.58 2019-08-13 13:04:49 +02:00
33fe6bcd41 fix(core): update 2019-08-13 13:04:49 +02:00
1baf1c318c 1.0.57 2019-08-12 17:23:11 +02:00
051aba3299 fix(core): update 2019-08-12 17:23:10 +02:00
7998d79b13 1.0.56 2019-08-12 15:12:32 +02:00
6838a8729a fix(core): update 2019-08-12 15:12:31 +02:00
67f4e33ca0 1.0.55 2019-08-12 15:10:40 +02:00
8a8277ae9f fix(core): update 2019-08-12 15:10:40 +02:00
ff9cb9132c 1.0.54 2019-08-12 14:59:38 +02:00
f4ce784a59 fix(core): update 2019-08-12 14:59:37 +02:00
b34be4dcba 1.0.53 2019-07-30 10:31:36 +02:00
6cc69efe2d fix(core): update 2019-07-30 10:31:35 +02:00
8c30f294bc 1.0.52 2019-06-11 03:06:18 +02:00
228eb791b7 fix(core): update 2019-06-11 03:06:17 +02:00
057476ae66 1.0.51 2019-06-10 17:46:07 +02:00
cb80e4dc2e fix(core): update 2019-06-10 17:46:06 +02:00
8410e09a4d 1.0.50 2019-06-07 11:49:10 +02:00
eb04abddbf fix(core): update 2019-06-07 11:49:10 +02:00
57809d9b53 1.0.49 2019-06-06 23:23:37 +02:00
bee5231d47 fix(core): update 2019-06-06 23:23:37 +02:00
df45287026 1.0.48 2019-06-06 22:22:45 +02:00
b5b6ca81cf fix(core): update 2019-06-06 22:22:45 +02:00
dc80e3b48d 1.0.47 2019-04-30 19:16:04 +02:00
043d795013 fix(core): update 2019-04-30 19:16:03 +02:00
29c0c8dc23 1.0.46 2019-04-28 12:42:09 +02:00
8157f2a56b fix(core): update 2019-04-28 12:42:08 +02:00
2f528d1812 1.0.45 2019-04-24 23:27:58 +02:00
139c71a451 fix(core): update 2019-04-24 23:27:57 +02:00
4678e44d16 1.0.44 2019-04-24 18:20:32 +02:00
af9f590349 fix(core): update 2019-04-24 18:20:31 +02:00
d43ad80784 1.0.43 2019-04-23 00:28:57 +02:00
b1017121ea fix(core): update 2019-04-23 00:28:57 +02:00
a8a91b4db2 1.0.42 2019-04-22 23:23:36 +02:00
67c4b06c4d fix(core): update 2019-04-22 23:23:36 +02:00
7693b52066 1.0.41 2019-04-22 23:11:52 +02:00
30a02ae48b fix(core): update 2019-04-22 23:11:51 +02:00
241182ed2e 1.0.40 2019-04-22 22:04:53 +02:00
3d82038ec3 fix(core): update 2019-04-22 22:04:52 +02:00
300d62ed12 1.0.39 2019-04-22 13:06:02 +02:00
a5e849aa17 fix(core): update 2019-04-22 13:06:01 +02:00
83807d7c5c 1.0.38 2019-04-22 09:58:36 +02:00
39d3a0f2f8 fix(core): update 2019-04-22 09:58:36 +02:00
904a48d414 1.0.37 2019-04-12 20:50:44 +02:00
e2acb28756 1.0.36 2019-04-11 18:59:46 +02:00
92e4379bd2 fix(core): update 2019-04-11 18:59:45 +02:00
23 changed files with 1308 additions and 749 deletions

20
.gitignore vendored
View File

@ -1,6 +1,22 @@
.nogit/
node_modules/
dist/
# artifacts
coverage/
public/
pages/
# installs
node_modules/
# caches
.yarn/
.cache/
.rpt2_cache
# builds
dist/
dist_web/
dist_serve/
dist_ts_web/
# custom

View File

@ -1,5 +1,5 @@
# gitzone standard
image: hosttoday/ht-docker-node:npmci
# gitzone ci_default
image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
cache:
paths:
@ -49,14 +49,14 @@ testLTS:
tags:
- docker
- notpriv
testSTABLE:
testBuild:
stage: test
script:
- npmci npm prepare
- npmci node install stable
- npmci node install lts
- npmci npm install
- npmci npm test
- npmci command npm run build
coverage: /\d+.?\d+?\%\s*coverage/
tags:
- docker
@ -65,7 +65,7 @@ testSTABLE:
release:
stage: release
script:
- npmci node install stable
- npmci node install lts
- npmci npm publish
only:
- tags
@ -78,19 +78,11 @@ release:
# ====================
codequality:
stage: metadata
image: docker:stable
allow_failure: true
services:
- docker:stable-dind
script:
- export SP_VERSION=$(echo "$CI_SERVER_VERSION" | sed 's/^\([0-9]*\)\.\([0-9]*\).*/\1-\2-stable/')
- docker run
--env SOURCE_CODE="$PWD"
--volume "$PWD":/code
--volume /var/run/docker.sock:/var/run/docker.sock
"registry.gitlab.com/gitlab-org/security-products/codequality:$SP_VERSION" /code
artifacts:
paths: [codeclimate.json]
- npmci command npm install -g tslint typescript
- npmci npm install
- npmci command "tslint -c tslint.json ./ts/**/*.ts"
tags:
- docker
- priv
@ -106,13 +98,15 @@ trigger:
- notpriv
pages:
image: hosttoday/ht-docker-node:npmci
image: hosttoday/ht-docker-dbase:npmci
services:
- docker:18-dind
stage: metadata
script:
- npmci command npm install -g typedoc typescript
- npmci command npm install -g @gitzone/tsdoc
- npmci npm prepare
- npmci npm install
- npmci command typedoc --module "commonjs" --target "ES2016" --out public/ ts/
- npmci command tsdoc
tags:
- docker
- notpriv

4
.snyk Normal file
View File

@ -0,0 +1,4 @@
# Snyk (https://snyk.io) policy file, patches or ignores known vulnerabilities.
version: v1.13.5
ignore: {}
patch: {}

29
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,29 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "current file",
"type": "node",
"request": "launch",
"args": [
"${relativeFile}"
],
"runtimeArgs": ["-r", "@gitzone/tsrun"],
"cwd": "${workspaceRoot}",
"protocol": "inspector",
"internalConsoleOptions": "openOnSessionStart"
},
{
"name": "test.ts",
"type": "node",
"request": "launch",
"args": [
"test/test.ts"
],
"runtimeArgs": ["-r", "@gitzone/tsrun"],
"cwd": "${workspaceRoot}",
"protocol": "inspector",
"internalConsoleOptions": "openOnSessionStart"
}
]
}

1177
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
{
"name": "@pushrocks/smartuniverse",
"version": "1.0.35",
"version": "1.0.64",
"private": false,
"description": "messaging service for your micro services",
"main": "dist/index.js",
@ -9,30 +9,44 @@
"license": "MIT",
"scripts": {
"test": "(tstest test/)",
"testManual": "(tsrun test/test.ts)",
"build": "(tsbuild)",
"format": "(gitzone format)"
},
"devDependencies": {
"@gitzone/tsbuild": "^2.1.8",
"@gitzone/tstest": "^1.0.20",
"@pushrocks/tapbundle": "^3.0.9",
"@types/node": "^11.13.4",
"tslint": "^5.15.0",
"@gitzone/tsbuild": "^2.1.11",
"@gitzone/tstest": "^1.0.24",
"@pushrocks/tapbundle": "^3.0.11",
"@types/node": "^12.7.1",
"tslint": "^5.18.0",
"tslint-config-prettier": "^1.18.0"
},
"peerDependencies": {
"rxjs": "*"
},
"dependencies": {
"@pushrocks/lik": "^3.0.5",
"@pushrocks/lik": "^3.0.10",
"@pushrocks/smartdelay": "^2.0.3",
"@pushrocks/smartexpress": "^3.0.18",
"@pushrocks/smartfile": "^7.0.2",
"@pushrocks/smarthash": "^2.0.4",
"@pushrocks/smartexpress": "^3.0.38",
"@pushrocks/smartfile": "^7.0.4",
"@pushrocks/smarthash": "^2.0.6",
"@pushrocks/smartlog": "^2.0.19",
"@pushrocks/smartpromise": "^3.0.2",
"@pushrocks/smartrequest": "^1.1.14",
"@pushrocks/smartrequest": "^1.1.16",
"@pushrocks/smartrx": "^2.0.3",
"@pushrocks/smartsocket": "^1.1.27",
"@pushrocks/smarttime": "^3.0.7"
}
"@pushrocks/smartsocket": "^1.1.44",
"@pushrocks/smarttime": "^3.0.12",
"@pushrocks/smartunique": "^3.0.1"
},
"files": [
"ts/*",
"ts_web/*",
"dist/*",
"dist_web/*",
"dist_ts_web/*",
"assets/*",
"cli.js",
"npmextra.json",
"readme.md"
]
}

View File

@ -1,16 +1,13 @@
# @pushrocks/smartuniverse
messaging service for micro services
## Availabililty and Links
- [npmjs.org (npm package)](https://www.npmjs.com/package/@pushrocks/smartuniverse)
- [gitlab.com (source)](https://gitlab.com/pushrocks/smartuniverse)
- [github.com (source mirror)](https://github.com/pushrocks/smartuniverse)
- [docs (typedoc)](https://pushrocks.gitlab.io/smartuniverse/)
* [npmjs.org (npm package)](https://www.npmjs.com/package/@pushrocks/smartuniverse)
* [gitlab.com (source)](https://gitlab.com/pushrocks/smartuniverse)
* [github.com (source mirror)](https://github.com/pushrocks/smartuniverse)
* [docs (typedoc)](https://pushrocks.gitlab.io/smartuniverse/)
## Status for master
[![build status](https://gitlab.com/pushrocks/smartuniverse/badges/master/build.svg)](https://gitlab.com/pushrocks/smartuniverse/commits/master)
[![coverage report](https://gitlab.com/pushrocks/smartuniverse/badges/master/coverage.svg)](https://gitlab.com/pushrocks/smartuniverse/commits/master)
[![npm downloads per month](https://img.shields.io/npm/dm/@pushrocks/smartuniverse.svg)](https://www.npmjs.com/package/@pushrocks/smartuniverse)
@ -27,6 +24,11 @@ Use TypeScript for best in class instellisense.
Think WhatsApp, but for your microservices architecture. It allows your services to securely talk to each other in **private, shielded channels** without having to expose anything to the outside world. This allows the use of **reactive programming across your entire stack**.
### Server side
every universe has a server that manages messages.
Think Kafka, but without Kafka.
```typescript
import * as smartuniverse from '@pushrocks/smartuniverse';
@ -41,9 +43,13 @@ myUniverse.addChannel('awesomeChannel2', 'jhkjhfsdf87eerkjslkfja9');
myUniverse.start(8765); // start the server and provide the port on which to listen on
```
### Client side
All your microservices represents clients in the universe that may talk to each other using the universe server.
For further information read the linked docs at the top of this readme.
> MIT licensed | **©** [Lossless GmbH](https://lossless.gmbh)
> | By using this npm module you agree to our [privacy policy](https://lossless.gmbH/privacy.html)
| By using this npm module you agree to our [privacy policy](https://lossless.gmbH/privacy.html)
[![repo-footer](https://pushrocks.gitlab.io/assets/repo-footer.svg)](https://maintainedby.lossless.com)

View File

@ -5,13 +5,18 @@ import * as smartuniverse from '../ts/index';
import { Observable } from 'rxjs';
let testUniverse: smartuniverse.Universe;
let testUniverseClient: smartuniverse.ClientUniverse;
let testClientUniverse: smartuniverse.ClientUniverse;
let testClientUniverse2: smartuniverse.ClientUniverse;
let testClientChannel: smartuniverse.ClientUniverseChannel;
const testServerData = {
serverAddress: 'http://localhost:8765'
};
const testChannelData = {
channelName: 'awesomeTestChannel',
channelPass: 'awesomeChannelPAss'
}
channelPass: 'awesomeChannelPass'
};
tap.test('first test', async () => {
testUniverse = new smartuniverse.Universe({
@ -25,31 +30,51 @@ tap.test('add a message to the SmartUniverse', async () => {
// testing message handling
tap.test('create smartuniverse client', async () => {
testUniverseClient = new smartuniverse.ClientUniverse({
serverAddress: 'http://localhost:8765'
testClientUniverse = new smartuniverse.ClientUniverse({
serverAddress: testServerData.serverAddress
});
expect(testUniverseClient).to.be.instanceof(smartuniverse.ClientUniverse);
expect(testClientUniverse).to.be.instanceof(smartuniverse.ClientUniverse);
});
tap.test('should add a channel to the universe', async () => {
await testUniverse.addChannel('testChannel', 'testPassword');
testUniverse.addChannel(testChannelData.channelName, testChannelData.channelPass);
});
tap.test('should add the same channel to the client universe in the same way', async () => {
testClientUniverse.addChannel(testChannelData.channelName, testChannelData.channelPass);
});
tap.test('should start the ClientUniverse', async () => {
await testClientUniverse.start();
})
tap.test('should get a observable correctly', async () => {
testClientChannel = await testUniverseClient.getChannel(testChannelData.channelName, testChannelData.channelPass);
testClientChannel = testClientUniverse.getChannel(testChannelData.channelName);
expect(testClientChannel).to.be.instanceof(smartuniverse.ClientUniverseChannel);
});
tap.test('should send a message correctly', async () => {
await testUniverseClient.sendMessage('greeting', {
anyBool: true
await (testClientUniverse.getChannel(testChannelData.channelName)).sendMessage({
messageText: 'hello'
});
});
tap.test('universe should contain the sent message', async () => {
expect(testUniverse.universeCache.messageMap.getArray()[0].messageText).to.equal('hello');
});
tap.test('a second client should be able to subscibe', async () => {
testClientUniverse2 = new smartuniverse.ClientUniverse({
serverAddress: testServerData.serverAddress
});
testClientUniverse2.addChannel(testChannelData.channelName, testChannelData.channelPass);
});
tap.test('should receive a message correctly', async () => {});
tap.test('should disconnect the client correctly', async () => {
testUniverseClient.close();
testClientUniverse.stop();
});
tap.test('should end the server correctly', async tools => {

View File

@ -1,6 +1,7 @@
// Client classes
export * from './smartuniverse.classes.clientuniverse';
export * from './smartuniverse.classes.clientuniversechannel';
export * from './smartuniverse.classes.clientuniversemessage';
// Server classes
export * from './smartuniverse.classes.universe';

View File

@ -12,4 +12,4 @@ export interface IServerPutMessageRequestBody {
passphrase: string;
message: string;
payload: any;
}
}

View File

@ -1,3 +1,4 @@
export * from './http.interfaces';
export * from './universechannel.interfaces';
export * from './universemessage.interfaces';
export * from './universeactions.interfaces';

View File

@ -0,0 +1,17 @@
export type IServerCallActions =
| 'channelSubscription'
| 'processMessage'
| 'channelUnsubscribe'
| 'terminateConnection';
/**
* the interface for a subscription
*/
export interface IServerCallSubscribeActionPayload {
name: string;
passphrase: string;
}
export interface IServerUnsubscribeActionPayload {
name: string;
}

View File

@ -1,7 +1,18 @@
export interface IUniverseMessage {
export interface IMessageCreator {
messageText: string;
targetChannelName: string;
passphrase: string;
payload?: string | number | any;
payloadStringType?: 'Buffer' | 'string' | 'object';
}
/**
*
*/
export interface IUniverseMessage extends IMessageCreator {
id: string;
/**
* time of creation
*/
timestamp: number;
passphrase: string;
targetChannelName: string;
}

View File

@ -7,10 +7,8 @@ import * as url from 'url';
import * as interfaces from './interfaces';
import {
ClientUniverseChannel,
UniverseMessage
} from './';
import { ClientUniverseChannel, ClientUniverseMessage } from './';
import { ClientUniverseCache } from './smartuniverse.classes.clientuniversecache';
export interface IClientOptions {
serverAddress: string;
@ -22,50 +20,111 @@ export interface IClientOptions {
*/
export class ClientUniverse {
public options;
public socketClient: plugins.smartsocket.SmartsocketClient;
public observableIntake: plugins.smartrx.ObservableIntake<UniverseMessage>;
public channelCache = new Objectmap<ClientUniverseChannel>();
public smartsocketClient: plugins.smartsocket.SmartsocketClient;
public observableIntake: plugins.smartrx.ObservableIntake<ClientUniverseMessage>;
public clientUniverseCache = new ClientUniverseCache();
constructor(optionsArg: IClientOptions) {
this.options = optionsArg;
}
public async sendMessage(messageArg: interfaces.IUniverseMessage) {
const requestBody: interfaces.IUniverseMessage = messageArg;
const requestBodyString = JSON.stringify(requestBody);
// TODO: User websocket connection if available
const response = await plugins.smartrequest.postJson(`${this.options.serverAddress}/sendmessage` , {
requestBody: requestBodyString
});
}
/**
* adds a channel to the channelcache
* TODO: verify channel before adding it to the channel cache
*/
public addChannel(channelNameArg: string, passphraseArg: string) {
const existingChannel = this.getChannel(channelNameArg);
public async getChannel(channelName: string, passphrase): Promise<ClientUniverseChannel> {
await this.checkConnection();
const clientUniverseChannel = await ClientUniverseChannel.createClientUniverseChannel(
this,
channelName
);
this.channelCache.add(clientUniverseChannel);
if (existingChannel) {
throw new Error('channel exists');
}
// lets create the channel
const clientUniverseChannel = ClientUniverseChannel.createClientUniverseChannel(this, channelNameArg, passphraseArg);
return clientUniverseChannel;
}
public close() {
this.socketClient.disconnect();
/**
* gets a channel from the channelcache
* @param channelName
* @param passphraseArg
*/
public getChannel(channelName: string): ClientUniverseChannel {
const clientUniverseChannel = this.clientUniverseCache.channelMap.find(channel => {
return channel.name === channelName;
});
return clientUniverseChannel;
}
private async checkConnection() {
if (!this.socketClient && !this.observableIntake) {
/**
* remove a a achannel
* @param messageArg
*/
public removeChannel(channelNameArg, notifyServer = true) {
const clientUniverseChannel = this.clientUniverseCache.channelMap.findOneAndRemove(channelItemArg => {
return channelItemArg.name === channelNameArg;
});
}
public async start() {
await this.checkConnection();
}
public stop() {
this.smartsocketClient.disconnect();
}
/**
* checks the connection towards a universe server
* since password validation is done through other means, a connection should always be possible
*/
public async checkConnection(): Promise<void> {
if (!this.smartsocketClient && !this.observableIntake) {
const parsedURL = url.parse(this.options.serverAddress);
this.socketClient = new SmartsocketClient({
const socketConfig: plugins.smartsocket.ISmartsocketClientOptions = {
alias: process.env.SOCKET_ALIAS || 'someclient',
password: 'UniverseClient',
port: parseInt(parsedURL.port, 10),
role: 'UniverseClient',
url: parsedURL.hostname
});
url: parsedURL.protocol + '//' + parsedURL.hostname
};
this.smartsocketClient = new SmartsocketClient(socketConfig);
this.observableIntake = new plugins.smartrx.ObservableIntake();
this.socketClient.connect();
// lets define some basic actions
/**
* should handle a forced unsubscription by the server
*/
const socketFunctionUnsubscribe = new plugins.smartsocket.SocketFunction({
funcName: 'unsubscribe',
allowedRoles: [],
funcDef: async (data: interfaces.IServerUnsubscribeActionPayload) => {
throw new Error('TODO');
}
});
/**
* handles message reception
*/
const socketFunctionProcessMessage = new plugins.smartsocket.SocketFunction({
funcName: 'processMessage',
allowedRoles: [],
funcDef: async (messageDescriptorArg: interfaces.IUniverseMessage) => {
plugins.smartlog.defaultLogger.log('info', 'Got message from server');
this.observableIntake.push(ClientUniverseMessage.createMessageFromMessageDescriptor(messageDescriptorArg));
}
});
// add functions
this.smartsocketClient.addSocketFunction(socketFunctionUnsubscribe);
this.smartsocketClient.addSocketFunction(socketFunctionProcessMessage);
await this.smartsocketClient.connect();
plugins.smartlog.defaultLogger.log('info', 'universe client connected successfully');
await this.clientUniverseCache.channelMap.forEach(async clientUniverseChannelArg => {
await clientUniverseChannelArg.subscribe();
});
}
}
}

View File

@ -0,0 +1,11 @@
import * as plugins from './smartuniverse.plugins';
import { ClientUniverseChannel } from './smartuniverse.classes.clientuniversechannel';
/**
* a cache for clients
* keeps track of which messages have already been received
* good for deduplication in mesh environments
*/
export class ClientUniverseCache {
public channelMap = new plugins.lik.Objectmap<ClientUniverseChannel>();
}

View File

@ -7,12 +7,23 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
// ======
// STATIC
// ======
public static async createClientUniverseChannel(
/**
* creates a channel and adds it to the cache of clientUniverseArg
* @param clientUniverseArg
* @param channelNameArg
* @param passphraseArg
*/
public static createClientUniverseChannel(
clientUniverseArg: ClientUniverse,
channelName: string
): Promise<ClientUniverseChannel> {
const clientChannel = new ClientUniverseChannel(clientUniverseArg);
await clientChannel.transmitSubscription();
channelNameArg: string,
passphraseArg: string
): ClientUniverseChannel {
const clientChannel = new ClientUniverseChannel(
clientUniverseArg,
channelNameArg,
passphraseArg
);
clientUniverseArg.clientUniverseCache.channelMap.add(clientChannel);
return clientChannel;
}
@ -20,16 +31,47 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
// INSTANCE
// ========
public clientUniverse: ClientUniverse;
// properties
public name: string;
public passphrase: string;
constructor(clientUniverseArg: ClientUniverse) {
this.clientUniverse = clientUniverseArg;
// refs
public clientUniverseRef: ClientUniverse;
constructor(clientUniverseArg: ClientUniverse, nameArg: string, passphraseArg: string) {
this.clientUniverseRef = clientUniverseArg;
this.name = nameArg;
this.passphrase = passphraseArg;
}
/**
* subscribes to a channel
* tells the universe about this instances interest into a channel
*/
public async transmitSubscription() {
this.clientUniverse.socketClient;
public async subscribe() {
const serverCallActionName: interfaces.IServerCallActions = 'channelSubscription';
const serverCallActionPayload: interfaces.IServerCallSubscribeActionPayload = {
name: this.name,
passphrase: this.passphrase
};
await this.clientUniverseRef.smartsocketClient.serverCall(serverCallActionName, serverCallActionPayload);
}
/**
* sends a message towards the server
* @param messageArg
*/
public async sendMessage(messageArg: interfaces.IMessageCreator) {
await this.clientUniverseRef.checkConnection();
const universeMessageToSend: interfaces.IUniverseMessage = {
id: plugins.smartunique.shortId(),
timestamp: Date.now(),
passphrase: this.passphrase,
targetChannelName: this.name,
messageText: messageArg.messageText,
payload: messageArg.payload,
payloadStringType: messageArg.payloadStringType
};
await this.clientUniverseRef.smartsocketClient.serverCall('processMessage', universeMessageToSend);
}
}

View File

@ -6,16 +6,34 @@ export class ClientUniverseMessage implements interfaces.IUniverseMessage {
// ======
// STATIC
// ======
public static createMessageFromPayload(messageArg: string, payloadArg: any) {
};
public static createMessageFromMessageDescriptor(messageDescriptor: interfaces.IUniverseMessage) {
const clientuniverseMessage = new ClientUniverseMessage(messageDescriptor);
return clientuniverseMessage;
}
// ========
// INSTANCE
// ========
constructor(messageArg, payloadArg) {}
getAsJsonForPayload () {
// properties
public id: string;
public timestamp: number;
public smartTimestamp: plugins.smarttime.TimeStamp;
public messageText: string;
public passphrase: string;
public payload: any;
public payloadStringType;
public targetChannelName: string;
constructor(messageArg: interfaces.IUniverseMessage) {
for (const key of Object.keys(messageArg)) {
this[key] = messageArg[key];
}
}
/**
* gets json for payload
*/
getAsJsonForPayload() {}
}

View File

@ -6,15 +6,14 @@ import { UniverseCache, UniverseChannel, UniverseMessage } from './';
import * as paths from './smartuniverse.paths';
import * as interfaces from './interfaces';
import { UniverseConnection } from './smartuniverse.classes.universeconnection';
export interface ISmartUniverseConstructorOptions {
messageExpiryInMilliseconds: number;
}
/**
* main class that setsup a Universe
* main class that setups a Universe
*/
export class Universe {
// subinstances
@ -23,25 +22,6 @@ export class Universe {
// options
private options: ISmartUniverseConstructorOptions;
/**
* stores the version of the universe server running
* this is done since the version is exposed through the api and multiple fs actions are avoided this way.
*/
private universeVersionStore: string;
/**
* get the currently running version of smartuniverse
*/
public get universeVersion() {
if (this.universeVersionStore) {
return this.universeVersionStore;
} else {
const packageJson = plugins.smartfile.fs.toObjectSync(paths.packageJson);
this.universeVersionStore = packageJson.version;
return this.universeVersionStore;
}
}
/**
* the smartexpress server used
*/
@ -54,63 +34,123 @@ export class Universe {
constructor(optionsArg: ISmartUniverseConstructorOptions) {
this.options = optionsArg;
this.universeCache = new UniverseCache(this.options.messageExpiryInMilliseconds);
this.universeCache = new UniverseCache(this, this.options.messageExpiryInMilliseconds);
}
/**
* stores the version of the universe server running
* this is done since the version is exposed through the api and multiple fs actions are avoided this way.
*/
private universeVersionStore: string;
/**
* get the currently running version of smartuniverse
*/
public getUniverseVersion() {
if (this.universeVersionStore) {
return this.universeVersionStore;
} else {
const packageJson = plugins.smartfile.fs.toObjectSync(paths.packageJson);
this.universeVersionStore = packageJson.version;
return this.universeVersionStore;
}
}
/**
* adds a channel to the Universe
*/
public async addChannel(nameArg: string, passphraseArg: string) {
const newChannel = UniverseChannel.createChannel(this.universeCache, nameArg, passphraseArg);
public addChannel(nameArg: string, passphraseArg: string) {
const newChannel = UniverseChannel.createChannel(this, nameArg, passphraseArg);
}
/**
* initiates a server
*/
public async start(portArg: number | string) {
public async start(portArg: number) {
// lets create the base smartexpress server
this.smartexpressServer = new plugins.smartexpress.Server({
cors: true,
defaultAnswer: async () => {
return `smartuniverse server ${this.universeVersion}`;
return `smartuniverse server ${this.getUniverseVersion()}`;
},
forceSsl: false,
port: portArg
});
// lets create the http request route
this.smartexpressServer.addRoute('/sendmessage', new Handler('POST', async (req, res) => {
this.universeCache.addMessage(req.body);
}));
// add websocket upgrade
this.smartsocket = new plugins.smartsocket.Smartsocket({
port: 12345 // fix this within smartsocket
});
this.smartsocket = new plugins.smartsocket.Smartsocket({});
// add a role for the clients
const ClientRole = new plugins.smartsocket.SocketRole({
name: 'clientuniverse',
passwordHash: 'clientuniverse' // authentication happens on another level
name: 'UniverseClient',
passwordHash: plugins.smarthash.sha256FromStringSync('UniverseClient') // authentication happens on another level
});
// add the role to smartsocket
this.smartsocket.addSocketRoles([ClientRole]);
const SubscriptionSocketFunction = new plugins.smartsocket.SocketFunction({
allowedRoles: [ClientRole],
const socketFunctionSubscription = new plugins.smartsocket.SocketFunction({
allowedRoles: [ClientRole], // there is only one client role, Authentication happens on another level
funcName: 'channelSubscription',
funcDef: () => {} // TODO: implement an action upon connection of clients
funcDef: async (
dataArg: interfaces.IServerCallSubscribeActionPayload,
socketConnectionArg
) => {
// run in "this context" of this class
await (async () => {
const universeConnection = new UniverseConnection({
socketConnection: socketConnectionArg,
authenticationRequests: [dataArg]
});
await UniverseConnection.addConnectionToCache(this, universeConnection);
return {
'subscription status': 'success'
};
})();
}
});
const socketFunctionProcessMessage = new plugins.smartsocket.SocketFunction({
allowedRoles: [ClientRole], // there is only one client role, Authentication happens on another level
funcName: 'processMessage',
funcDef: async (dataArg: interfaces.IUniverseMessage, socketConnectionArg) => {
// run in "this" context of this class
await (async () => {
const universeConnection = UniverseConnection.findUniverseConnectionBySocketConnection(
this.universeCache,
socketConnectionArg
);
if (universeConnection) {
plugins.smartlog.defaultLogger.log('ok', 'found UniverseConnection for socket for incoming message');
} else {
plugins.smartlog.defaultLogger.log('warn', 'found no Authorized channel for incoming message');
return {
error: 'You need to authenticate for a channel'
};
}
const unauthenticatedMessage = UniverseMessage.createMessageFromPayload(socketConnectionArg, dataArg);
const foundChannel = await UniverseChannel.authorizeAMessageForAChannel(
this.universeCache,
unauthenticatedMessage
);
if (foundChannel && unauthenticatedMessage.authenticated) {
const authenticatedMessage = unauthenticatedMessage;
await this.universeCache.addMessage(authenticatedMessage);
}
})();
}
});
// add socket functions
this.smartsocket.addSocketFunction(socketFunctionSubscription);
this.smartsocket.addSocketFunction(socketFunctionProcessMessage);
// add smartsocket to the running smartexpress app
this.smartsocket.setExternalServer('express', this.smartexpressServer as any);
// start the socket
this.smartsocket.start();
// start the smartexpress instance
this.smartsocket.setExternalServer('smartexpress', this.smartexpressServer as any);
// start everything
await this.smartexpressServer.start();
await this.smartsocket.start();
plugins.smartlog.defaultLogger.log('success', 'started universe');
}
/**

View File

@ -8,6 +8,8 @@ import { Objectmap } from '@pushrocks/lik';
import { Observable, from } from 'rxjs';
import { filter } from 'rxjs/operators';
import { rxjs } from '@pushrocks/smartrx';
import { UniverseConnection } from './smartuniverse.classes.universeconnection';
import { Universe } from './smartuniverse.classes.universe';
/**
* universe store handles the creation, storage and retrieval of messages.
@ -29,13 +31,22 @@ export class UniverseCache {
*/
public channelMap = new Objectmap<UniverseChannel>();
/**
* stores all connections
*/
public connectionMap = new plugins.lik.Objectmap<UniverseConnection>();
/**
* allows messages to be processed in a blacklist mode for further analysis
*/
public blackListChannel = new UniverseChannel(this, 'blacklist', 'nada');
public blackListChannel: UniverseChannel;
constructor(standardMessageExpiryArg: number) {
public universeRef: Universe;
constructor(universeArg: Universe, standardMessageExpiryArg: number) {
this.universeRef = universeArg;
this.standardMessageExpiry = standardMessageExpiryArg;
this.blackListChannel = new UniverseChannel(this.universeRef, 'blacklist', 'nada');
}
/**
@ -47,15 +58,21 @@ export class UniverseCache {
messageArg.setUniverseCache(this);
UniverseChannel.authorizeAMessageForAChannel(this, messageArg);
this.messageMap.add(messageArg);
messageArg.universeChannelList.forEach(universeChannel => {
universeChannel.pushToClients(messageArg);
});
}
/**
* Read a message from the UniverseCache
*/
public readMessagesYoungerThan(unixTimeArg?: number): Observable<UniverseMessage> {
public readMessagesYoungerThan(
unixTimeArg?: number,
channelName?: string
): Observable<UniverseMessage> {
const messageObservable = from(this.messageMap.getArray()).pipe(
filter(messageArg => {
return messageArg.timestamp.isYoungerThanMilliSeconds(this.destructionTime);
return messageArg.smartTimestamp.isYoungerThanMilliSeconds(this.destructionTime);
})
);
return messageObservable;

View File

@ -1,8 +1,10 @@
import * as plugins from './smartuniverse.plugins';
import * as interfaces from './interfaces';
import { Objectmap } from '@pushrocks/lik';
import { UniverseCache } from './smartuniverse.classes.universecache';
import { UniverseMessage } from './smartuniverse.classes.universemessage';
import { UniverseConnection } from './smartuniverse.classes.universeconnection';
import { Universe } from './smartuniverse.classes.universe';
/**
* enables messages to stay within a certain scope.
@ -18,12 +20,12 @@ export class UniverseChannel {
* @param passphraseArg the secret thats used for a certain topic.
*/
public static createChannel(
universeCacheArg: UniverseCache,
universeArg: Universe,
channelNameArg: string,
passphraseArg: string
) {
const newChannel = new UniverseChannel(universeCacheArg, channelNameArg, passphraseArg);
universeCacheArg.channelMap.add(newChannel);
const newChannel = new UniverseChannel(universeArg, channelNameArg, passphraseArg);
universeArg.universeCache.channelMap.add(newChannel);
return newChannel;
}
@ -41,10 +43,17 @@ export class UniverseChannel {
}
}
/**
* a static message authorization function that takes the UniverseCache
* (where messages and channels are stored and their lifetime is managed)
* and the universemessage to find a fitting channel for the message
* @param universeCacheArg
* @param universeMessageArg
*/
public static authorizeAMessageForAChannel(
universeCacheArg: UniverseCache,
universeMessageArg: UniverseMessage
) {
): UniverseChannel {
const foundChannel = universeCacheArg.channelMap.find(universeChannel => {
const result = universeChannel.authenticate(universeMessageArg);
return result;
@ -52,13 +61,22 @@ export class UniverseChannel {
if (foundChannel) {
universeMessageArg.authenticated = true;
universeMessageArg.universeChannelList.add(foundChannel);
plugins.smartlog.defaultLogger.log('ok', 'message authorized');
return foundChannel;
} else {
universeMessageArg.authenticated = false;
universeMessageArg.universeChannelList.add(universeCacheArg.blackListChannel);
plugins.smartlog.defaultLogger.log('warn', 'message not valid');
return null;
}
}
public static getUniverseChannelByName(universeRef: Universe, universeChannelName: string) {
return universeRef.universeCache.channelMap.find(channelArg => {
return channelArg.name === universeChannelName;
});
}
// ========
// INSTANCE
// ========
@ -66,27 +84,54 @@ export class UniverseChannel {
* the name of the channel
*/
public name: string;
public universeCacheInstance: UniverseCache;
public universeRef: Universe;
/**
* the passphrase for the channel
*/
public passphrase: string;
constructor(universeCacheArg: UniverseCache, channelNameArg: string, passphraseArg: string) {
constructor(universeArg: Universe, channelNameArg: string, passphraseArg: string) {
this.universeRef = universeArg;
this.name = channelNameArg;
this.passphrase = passphraseArg;
}
/**
* authenticates a client on the server side
* authenticates a client on the server side by matching
* # the messages channelName against the unverseChannel's name
* # the messages password against the universeChannel's password
*/
public authenticate(universeMessageArg: UniverseMessage): boolean {
return (
this.name === universeMessageArg.requestedChannelName &&
this.passphrase === universeMessageArg.requestedChannelPassphrase
this.name === universeMessageArg.targetChannelName &&
this.passphrase === universeMessageArg.passphrase
);
}
public pushToClients(messageArg: UniverseMessage) {}
/**
* pushes a message to clients
* @param messageArg
*/
public async pushToClients(messageArg: UniverseMessage) {
const universeConnectionsWithChannelAccess: UniverseConnection[] = [];
this.universeRef.universeCache.connectionMap.forEach(async socketConnection => {
if (socketConnection.authenticatedChannels.includes(this)) {
universeConnectionsWithChannelAccess.push(socketConnection);
}
});
for (const universeConnection of universeConnectionsWithChannelAccess) {
const smartsocket = universeConnection.socketConnection.smartsocketRef as plugins.smartsocket.Smartsocket;
const universeMessageToSend: interfaces.IUniverseMessage = {
id: messageArg.id,
timestamp: messageArg.timestamp,
passphrase: messageArg.passphrase,
targetChannelName: this.name,
messageText: messageArg.messageText,
payload: messageArg.payload,
payloadStringType: messageArg.payloadStringType
};
smartsocket.clientCall('processMessage', universeMessageToSend, universeConnection.socketConnection);
}
}
}

View File

@ -0,0 +1,118 @@
import * as plugins from './smartuniverse.plugins';
import * as interfaces from './interfaces';
import { UniverseChannel } from './smartuniverse.classes.universechannel';
import { UniverseCache } from './smartuniverse.classes.universecache';
import { Universe } from './smartuniverse.classes.universe';
/**
* represents a connection to the universe
*/
export class UniverseConnection {
/**
*
* @param universeConnectionArg
*/
public static async addConnectionToCache(
universeRef: Universe,
universeConnectionArg: UniverseConnection
) {
let universeConnection = universeConnectionArg;
universeConnection = await UniverseConnection.deduplicateUniverseConnection(
universeRef.universeCache,
universeConnection
);
universeConnection = await UniverseConnection.authenticateAuthenticationRequests(
universeRef,
universeConnection
);
universeRef.universeCache.connectionMap.add(universeConnection);
}
/**
* deduplicates UniverseConnections
*/
public static async deduplicateUniverseConnection(
universeCache: UniverseCache,
universeConnectionArg: UniverseConnection
): Promise<UniverseConnection> {
let connectionToReturn: UniverseConnection;
universeCache.connectionMap.forEach(async existingConnection => {
if (existingConnection.socketConnection === universeConnectionArg.socketConnection) {
connectionToReturn = await this.mergeUniverseConnections(
existingConnection,
universeConnectionArg
);
}
});
if (!connectionToReturn) {
connectionToReturn = universeConnectionArg;
}
return connectionToReturn;
}
/**
* authenticate AuthenticationRequests
*/
public static async authenticateAuthenticationRequests(
universeRef: Universe,
universeConnectionArg: UniverseConnection
): Promise<UniverseConnection> {
for (const authenticationRequest of universeConnectionArg.authenticationRequests) {
const universeChannelToAuthenticateAgainst = UniverseChannel.getUniverseChannelByName(universeRef, authenticationRequest.name);
if (universeChannelToAuthenticateAgainst.passphrase === authenticationRequest.passphrase) {
universeConnectionArg.authenticatedChannels.push(universeChannelToAuthenticateAgainst);
}
}
return universeConnectionArg;
}
/**
* merges two UniverseConnections
*/
public static mergeUniverseConnections(
connectionArg1: UniverseConnection,
connectionArg2: UniverseConnection
) {
return connectionArg1;
}
/**
* finds a UniverseConnection by providing a socket connection
*/
public static findUniverseConnectionBySocketConnection(
universeCache: UniverseCache,
socketConnectionArg: plugins.smartsocket.SocketConnection
): UniverseConnection {
const universeConnection = universeCache.connectionMap.find(universeConnectionArg => {
return universeConnectionArg.socketConnection === socketConnectionArg;
});
return universeConnection;
}
public terminatedDeferred = plugins.smartpromise.defer();
/**
* the socketClient to ping
*/
public socketConnection: plugins.smartsocket.SocketConnection;
public authenticationRequests: interfaces.IServerCallSubscribeActionPayload[] = [];
public subscribedChannels: UniverseChannel[] = [];
public authenticatedChannels: UniverseChannel[] = [];
public failedToJoinChannels: UniverseChannel[] = [];
/**
* terminates the connection
*/
public terminateConnection() {
this.socketConnection.socket.disconnect();
this.terminatedDeferred.resolve();
}
constructor(optionsArg: {
socketConnection: plugins.smartsocket.SocketConnection;
authenticationRequests: interfaces.IServerCallSubscribeActionPayload[];
}) {
this.authenticationRequests = optionsArg.authenticationRequests;
this.socketConnection = optionsArg.socketConnection;
}
}

View File

@ -7,34 +7,34 @@ import { Timer, TimeStamp } from '@pushrocks/smarttime';
import { Universe } from './smartuniverse.classes.universe';
import { UniverseChannel } from './smartuniverse.classes.universechannel';
import { UniverseCache } from './smartuniverse.classes.universecache';
import { SocketConnection } from '@pushrocks/smartsocket';
/**
* represents a message within a universe
* acts as a container to save message states like authentication status
*/
export class UniverseMessage implements interfaces.IUniverseMessage {
/**
* public and unique id
* numeric ascending
* adheres to time in milliseconds
* -> meaning it describes the time of arrival
* -> two messages received at the same time will count up the second one
* -> avoids duplications of messages
* -> may be changed to nanoseconds to ensure higher throughput
*/
public id: number;
public static createMessageFromPayload(socketConnectionArg: SocketConnection, dataArg: interfaces.IUniverseMessage) {
const universeMessageInstance = new UniverseMessage(dataArg);
universeMessageInstance.socketConnection = socketConnectionArg;
return universeMessageInstance;
}
public id: string;
public timestamp: number;
public smartTimestamp: TimeStamp;
public messageText: string;
public passphrase: string;
public payload: any;
public payloadStringType;
public targetChannelName: string;
public socketConnection: SocketConnection;
/**
* the UniverseCache the message is attached to
*/
public universeCache: UniverseCache;
/**
* requestedChannelName
*/
public requestedChannelName: string;
public requestedChannelPassphrase: string;
/**
* enables unprotected grouping of messages for efficiency purposes.
*/
@ -43,22 +43,11 @@ export class UniverseMessage implements interfaces.IUniverseMessage {
/**
* wether the message is authenticated
*/
public authenticated: boolean = null;
public authenticated: boolean = false;
/**
* time of creation
* a destruction timer for this message
*/
public timestamp: TimeStamp;
/**
* the actual message
*/
public message: string;
/**
* any attached payloads. Can be of binary format.
*/
public attachedPayload: any;
public destructionTimer: Timer; // a timer to take care of message destruction
/**
@ -66,17 +55,12 @@ export class UniverseMessage implements interfaces.IUniverseMessage {
* @param messageArg
* @param attachedPayloadArg
*/
constructor(
messageArg: string,
requestedChannelNameArg: string,
passphraseArg: string,
attachedPayloadArg: any
) {
this.timestamp = new TimeStamp();
this.message = messageArg;
this.requestedChannelName = requestedChannelNameArg;
this.requestedChannelPassphrase = passphraseArg;
this.attachedPayload = attachedPayloadArg;
constructor(messageDescriptor: interfaces.IUniverseMessage) {
this.smartTimestamp = new TimeStamp(this.timestamp);
this.messageText = messageDescriptor.messageText;
this.targetChannelName = messageDescriptor.targetChannelName;
this.passphrase = messageDescriptor.passphrase;
this.payload = messageDescriptor.payload;
// prevent memory issues
this.fallBackDestruction();
}
@ -85,6 +69,10 @@ export class UniverseMessage implements interfaces.IUniverseMessage {
this.universeCache = universeCacheArg;
}
public setTargetChannel() {
}
public setDestructionTimer(selfdestructAfterArg: number) {
if (selfdestructAfterArg) {
this.destructionTimer = new Timer(selfdestructAfterArg);
@ -102,8 +90,8 @@ export class UniverseMessage implements interfaces.IUniverseMessage {
/**
* handles bad messages for further analysis
*/
handleAsBadMessage() {
console.log('received a bad message');
public handleAsBadMessage() {
plugins.smartlog.defaultLogger.log('warn', 'received a bad message');
}
/**

View File

@ -9,11 +9,13 @@ import * as smarthash from '@pushrocks/smarthash';
import * as smartdelay from '@pushrocks/smartdelay';
import * as smartexpress from '@pushrocks/smartexpress';
import * as smartfile from '@pushrocks/smartfile';
import * as smartlog from '@pushrocks/smartlog';
import * as smartpromise from '@pushrocks/smartpromise';
import * as smartrequest from '@pushrocks/smartrequest';
import * as smartrx from '@pushrocks/smartrx';
import * as smartsocket from '@pushrocks/smartsocket';
import * as smarttime from '@pushrocks/smarttime';
import * as smartunique from '@pushrocks/smartunique';
export {
lik,
@ -21,9 +23,11 @@ export {
smartdelay,
smartexpress,
smartfile,
smartlog,
smartpromise,
smartrx,
smartrequest,
smartsocket,
smarttime
smarttime,
smartunique
};