Compare commits

...

12 Commits

Author SHA1 Message Date
9c1504ef02 1.0.64 2019-08-13 18:43:33 +02:00
e8f2e04d1c fix(core): update 2019-08-13 18:43:33 +02:00
e12aa7e961 1.0.63 2019-08-13 18:41:28 +02:00
857b7cd010 fix(core): update 2019-08-13 18:41:27 +02:00
e100dea160 1.0.62 2019-08-13 18:16:17 +02:00
e8e87fcdba fix(core): update 2019-08-13 18:16:16 +02:00
0d18b11721 1.0.61 2019-08-13 18:06:14 +02:00
eaaefddbe3 fix(core): update 2019-08-13 18:06:13 +02:00
8c6946ddb6 1.0.60 2019-08-13 15:55:01 +02:00
3a7ebcdd80 fix(core): update 2019-08-13 15:55:01 +02:00
ec2afbfd55 1.0.59 2019-08-13 15:48:21 +02:00
89feeca735 fix(core): update 2019-08-13 15:48:20 +02:00
15 changed files with 151 additions and 72 deletions

2
package-lock.json generated
View File

@ -1,6 +1,6 @@
{
"name": "@pushrocks/smartuniverse",
"version": "1.0.58",
"version": "1.0.64",
"lockfileVersion": 1,
"requires": true,
"dependencies": {

View File

@ -1,6 +1,6 @@
{
"name": "@pushrocks/smartuniverse",
"version": "1.0.58",
"version": "1.0.64",
"private": false,
"description": "messaging service for your micro services",
"main": "dist/index.js",
@ -30,6 +30,7 @@
"@pushrocks/smartexpress": "^3.0.38",
"@pushrocks/smartfile": "^7.0.4",
"@pushrocks/smarthash": "^2.0.6",
"@pushrocks/smartlog": "^2.0.19",
"@pushrocks/smartpromise": "^3.0.2",
"@pushrocks/smartrequest": "^1.1.16",
"@pushrocks/smartrx": "^2.0.3",

View File

@ -15,7 +15,7 @@ const testServerData = {
const testChannelData = {
channelName: 'awesomeTestChannel',
channelPass: 'awesomeChannelPAss'
channelPass: 'awesomeChannelPass'
};
tap.test('first test', async () => {
@ -37,20 +37,24 @@ tap.test('create smartuniverse client', async () => {
});
tap.test('should add a channel to the universe', async () => {
await testUniverse.addChannel(testChannelData.channelName, testChannelData.channelPass);
testUniverse.addChannel(testChannelData.channelName, testChannelData.channelPass);
});
tap.test('should add the same channel to the client universe in the same way', async () => {
await testClientUniverse.addChannel(testChannelData.channelName, testChannelData.channelPass);
testClientUniverse.addChannel(testChannelData.channelName, testChannelData.channelPass);
});
tap.test('should start the ClientUniverse', async () => {
await testClientUniverse.start();
})
tap.test('should get a observable correctly', async () => {
testClientChannel = await testClientUniverse.getChannel(testChannelData.channelName);
testClientChannel = testClientUniverse.getChannel(testChannelData.channelName);
expect(testClientChannel).to.be.instanceof(smartuniverse.ClientUniverseChannel);
});
tap.test('should send a message correctly', async () => {
await (await testClientUniverse.getChannel(testChannelData.channelName)).sendMessage({
await (testClientUniverse.getChannel(testChannelData.channelName)).sendMessage({
messageText: 'hello'
});
});
@ -70,7 +74,7 @@ tap.test('a second client should be able to subscibe', async () => {
tap.test('should receive a message correctly', async () => {});
tap.test('should disconnect the client correctly', async () => {
testClientUniverse.close();
testClientUniverse.stop();
});
tap.test('should end the server correctly', async tools => {

View File

@ -1,6 +1,7 @@
// Client classes
export * from './smartuniverse.classes.clientuniverse';
export * from './smartuniverse.classes.clientuniversechannel';
export * from './smartuniverse.classes.clientuniversemessage';
// Server classes
export * from './smartuniverse.classes.universe';

View File

@ -1,4 +0,0 @@
export interface IAuthenticationRequest {
channelName: string;
password: string;
}

View File

@ -7,7 +7,7 @@ import * as url from 'url';
import * as interfaces from './interfaces';
import { ClientUniverseChannel, UniverseMessage } from './';
import { ClientUniverseChannel, ClientUniverseMessage } from './';
import { ClientUniverseCache } from './smartuniverse.classes.clientuniversecache';
export interface IClientOptions {
@ -21,9 +21,7 @@ export interface IClientOptions {
export class ClientUniverse {
public options;
public smartsocketClient: plugins.smartsocket.SmartsocketClient;
public observableIntake: plugins.smartrx.ObservableIntake<UniverseMessage>;
public channelStore = new Objectmap<ClientUniverseChannel>();
public observableIntake: plugins.smartrx.ObservableIntake<ClientUniverseMessage>;
public clientUniverseCache = new ClientUniverseCache();
constructor(optionsArg: IClientOptions) {
@ -34,15 +32,16 @@ export class ClientUniverse {
* adds a channel to the channelcache
* TODO: verify channel before adding it to the channel cache
*/
public async addChannel(channelNameArg: string, passphraseArg: string) {
const existingChannel = await this.getChannel(channelNameArg);
public addChannel(channelNameArg: string, passphraseArg: string) {
const existingChannel = this.getChannel(channelNameArg);
if (existingChannel) {
throw new Error('channel exists');
}
// lets create the channel
await ClientUniverseChannel.createClientUniverseChannel(this, channelNameArg, passphraseArg);
const clientUniverseChannel = ClientUniverseChannel.createClientUniverseChannel(this, channelNameArg, passphraseArg);
return clientUniverseChannel;
}
/**
@ -50,9 +49,8 @@ export class ClientUniverse {
* @param channelName
* @param passphraseArg
*/
public async getChannel(channelName: string): Promise<ClientUniverseChannel> {
await this.checkConnection();
const clientUniverseChannel = this.channelStore.find(channel => {
public getChannel(channelName: string): ClientUniverseChannel {
const clientUniverseChannel = this.clientUniverseCache.channelMap.find(channel => {
return channel.name === channelName;
});
return clientUniverseChannel;
@ -63,12 +61,16 @@ export class ClientUniverse {
* @param messageArg
*/
public removeChannel(channelNameArg, notifyServer = true) {
const clientUniverseChannel = this.channelStore.findOneAndRemove(channelItemArg => {
const clientUniverseChannel = this.clientUniverseCache.channelMap.findOneAndRemove(channelItemArg => {
return channelItemArg.name === channelNameArg;
});
}
public close() {
public async start() {
await this.checkConnection();
}
public stop() {
this.smartsocketClient.disconnect();
}
@ -86,7 +88,6 @@ export class ClientUniverse {
role: 'UniverseClient',
url: parsedURL.protocol + '//' + parsedURL.hostname
};
console.log(socketConfig);
this.smartsocketClient = new SmartsocketClient(socketConfig);
this.observableIntake = new plugins.smartrx.ObservableIntake();
@ -95,7 +96,7 @@ export class ClientUniverse {
/**
* should handle a forced unsubscription by the server
*/
const unsubscribe = new plugins.smartsocket.SocketFunction({
const socketFunctionUnsubscribe = new plugins.smartsocket.SocketFunction({
funcName: 'unsubscribe',
allowedRoles: [],
funcDef: async (data: interfaces.IServerUnsubscribeActionPayload) => {
@ -104,17 +105,26 @@ export class ClientUniverse {
});
/**
* should handle a message reception
* handles message reception
*/
const processMessageSocketFunction = new plugins.smartsocket.SocketFunction({
const socketFunctionProcessMessage = new plugins.smartsocket.SocketFunction({
funcName: 'processMessage',
allowedRoles: [],
funcDef: async (data: interfaces.IServerUnsubscribeActionPayload) => {
throw new Error('TODO');
funcDef: async (messageDescriptorArg: interfaces.IUniverseMessage) => {
plugins.smartlog.defaultLogger.log('info', 'Got message from server');
this.observableIntake.push(ClientUniverseMessage.createMessageFromMessageDescriptor(messageDescriptorArg));
}
});
// add functions
this.smartsocketClient.addSocketFunction(socketFunctionUnsubscribe);
this.smartsocketClient.addSocketFunction(socketFunctionProcessMessage);
await this.smartsocketClient.connect();
plugins.smartlog.defaultLogger.log('info', 'universe client connected successfully');
await this.clientUniverseCache.channelMap.forEach(async clientUniverseChannelArg => {
await clientUniverseChannelArg.subscribe();
});
}
}
}

View File

@ -1,8 +1,11 @@
import * as plugins from './smartuniverse.plugins';
import { ClientUniverseChannel } from './smartuniverse.classes.clientuniversechannel';
/**
* a cache for clients
* keeps track of which messages have already been received
* good for deduplication in mesh environments
*/
export class ClientUniverseCache {}
export class ClientUniverseCache {
public channelMap = new plugins.lik.Objectmap<ClientUniverseChannel>();
}

View File

@ -13,18 +13,17 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
* @param channelNameArg
* @param passphraseArg
*/
public static async createClientUniverseChannel(
public static createClientUniverseChannel(
clientUniverseArg: ClientUniverse,
channelNameArg: string,
passphraseArg: string
): Promise<ClientUniverseChannel> {
): ClientUniverseChannel {
const clientChannel = new ClientUniverseChannel(
clientUniverseArg,
channelNameArg,
passphraseArg
);
clientUniverseArg.channelStore.add(clientChannel);
await clientChannel.subscribe();
clientUniverseArg.clientUniverseCache.channelMap.add(clientChannel);
return clientChannel;
}

View File

@ -6,7 +6,10 @@ export class ClientUniverseMessage implements interfaces.IUniverseMessage {
// ======
// STATIC
// ======
public static createMessageFromPayload(messageDescriptor: interfaces.IUniverseMessage) {}
public static createMessageFromMessageDescriptor(messageDescriptor: interfaces.IUniverseMessage) {
const clientuniverseMessage = new ClientUniverseMessage(messageDescriptor);
return clientuniverseMessage;
}
// ========
// INSTANCE
@ -23,11 +26,14 @@ export class ClientUniverseMessage implements interfaces.IUniverseMessage {
public payloadStringType;
public targetChannelName: string;
constructor(messageArg: interfaces.IUniverseMessage, payloadArg) {
constructor(messageArg: interfaces.IUniverseMessage) {
for (const key of Object.keys(messageArg)) {
this[key] = messageArg[key];
}
}
/**
* gets json for payload
*/
getAsJsonForPayload() {}
}

View File

@ -34,7 +34,7 @@ export class Universe {
constructor(optionsArg: ISmartUniverseConstructorOptions) {
this.options = optionsArg;
this.universeCache = new UniverseCache(this.options.messageExpiryInMilliseconds);
this.universeCache = new UniverseCache(this, this.options.messageExpiryInMilliseconds);
}
/**
@ -59,8 +59,8 @@ export class Universe {
/**
* adds a channel to the Universe
*/
public async addChannel(nameArg: string, passphraseArg: string) {
const newChannel = UniverseChannel.createChannel(this.universeCache, nameArg, passphraseArg);
public addChannel(nameArg: string, passphraseArg: string) {
const newChannel = UniverseChannel.createChannel(this, nameArg, passphraseArg);
}
/**
@ -98,12 +98,11 @@ export class Universe {
) => {
// run in "this context" of this class
await (async () => {
// TODO: properly add the connection
const universeConnection = new UniverseConnection({
socketConnection: socketConnectionArg,
authenticationRequests: []
authenticationRequests: [dataArg]
});
await UniverseConnection.addConnectionToCache(this.universeCache, universeConnection);
await UniverseConnection.addConnectionToCache(this, universeConnection);
return {
'subscription status': 'success'
};
@ -122,13 +121,14 @@ export class Universe {
socketConnectionArg
);
if (universeConnection) {
console.log('found UniverseConnection for socket');
plugins.smartlog.defaultLogger.log('ok', 'found UniverseConnection for socket for incoming message');
} else {
plugins.smartlog.defaultLogger.log('warn', 'found no Authorized channel for incoming message');
return {
error: 'You need to authenticate for a channel'
};
}
const unauthenticatedMessage = UniverseMessage.createMessageFromPayload(dataArg);
const unauthenticatedMessage = UniverseMessage.createMessageFromPayload(socketConnectionArg, dataArg);
const foundChannel = await UniverseChannel.authorizeAMessageForAChannel(
this.universeCache,
unauthenticatedMessage
@ -150,7 +150,7 @@ export class Universe {
// start everything
await this.smartexpressServer.start();
await this.smartsocket.start();
console.log('started universe');
plugins.smartlog.defaultLogger.log('success', 'started universe');
}
/**

View File

@ -9,6 +9,7 @@ import { Observable, from } from 'rxjs';
import { filter } from 'rxjs/operators';
import { rxjs } from '@pushrocks/smartrx';
import { UniverseConnection } from './smartuniverse.classes.universeconnection';
import { Universe } from './smartuniverse.classes.universe';
/**
* universe store handles the creation, storage and retrieval of messages.
@ -30,15 +31,22 @@ export class UniverseCache {
*/
public channelMap = new Objectmap<UniverseChannel>();
/**
* stores all connections
*/
public connectionMap = new plugins.lik.Objectmap<UniverseConnection>();
/**
* allows messages to be processed in a blacklist mode for further analysis
*/
public blackListChannel = new UniverseChannel(this, 'blacklist', 'nada');
public blackListChannel: UniverseChannel;
constructor(standardMessageExpiryArg: number) {
public universeRef: Universe;
constructor(universeArg: Universe, standardMessageExpiryArg: number) {
this.universeRef = universeArg;
this.standardMessageExpiry = standardMessageExpiryArg;
this.blackListChannel = new UniverseChannel(this.universeRef, 'blacklist', 'nada');
}
/**
@ -50,6 +58,9 @@ export class UniverseCache {
messageArg.setUniverseCache(this);
UniverseChannel.authorizeAMessageForAChannel(this, messageArg);
this.messageMap.add(messageArg);
messageArg.universeChannelList.forEach(universeChannel => {
universeChannel.pushToClients(messageArg);
});
}
/**

View File

@ -1,7 +1,10 @@
import * as plugins from './smartuniverse.plugins';
import * as interfaces from './interfaces';
import { UniverseCache } from './smartuniverse.classes.universecache';
import { UniverseMessage } from './smartuniverse.classes.universemessage';
import { UniverseConnection } from './smartuniverse.classes.universeconnection';
import { Universe } from './smartuniverse.classes.universe';
/**
* enables messages to stay within a certain scope.
@ -17,12 +20,12 @@ export class UniverseChannel {
* @param passphraseArg the secret thats used for a certain topic.
*/
public static createChannel(
universeCacheArg: UniverseCache,
universeArg: Universe,
channelNameArg: string,
passphraseArg: string
) {
const newChannel = new UniverseChannel(universeCacheArg, channelNameArg, passphraseArg);
universeCacheArg.channelMap.add(newChannel);
const newChannel = new UniverseChannel(universeArg, channelNameArg, passphraseArg);
universeArg.universeCache.channelMap.add(newChannel);
return newChannel;
}
@ -58,16 +61,22 @@ export class UniverseChannel {
if (foundChannel) {
universeMessageArg.authenticated = true;
universeMessageArg.universeChannelList.add(foundChannel);
console.log('message authorized');
plugins.smartlog.defaultLogger.log('ok', 'message authorized');
return foundChannel;
} else {
universeMessageArg.authenticated = false;
universeMessageArg.universeChannelList.add(universeCacheArg.blackListChannel);
console.log('message not valid');
plugins.smartlog.defaultLogger.log('warn', 'message not valid');
return null;
}
}
public static getUniverseChannelByName(universeRef: Universe, universeChannelName: string) {
return universeRef.universeCache.channelMap.find(channelArg => {
return channelArg.name === universeChannelName;
});
}
// ========
// INSTANCE
// ========
@ -75,14 +84,15 @@ export class UniverseChannel {
* the name of the channel
*/
public name: string;
public universeCacheInstance: UniverseCache;
public universeRef: Universe;
/**
* the passphrase for the channel
*/
public passphrase: string;
constructor(universeCacheArg: UniverseCache, channelNameArg: string, passphraseArg: string) {
constructor(universeArg: Universe, channelNameArg: string, passphraseArg: string) {
this.universeRef = universeArg;
this.name = channelNameArg;
this.passphrase = passphraseArg;
}
@ -99,5 +109,29 @@ export class UniverseChannel {
);
}
public pushToClients(messageArg: UniverseMessage) {}
/**
* pushes a message to clients
* @param messageArg
*/
public async pushToClients(messageArg: UniverseMessage) {
const universeConnectionsWithChannelAccess: UniverseConnection[] = [];
this.universeRef.universeCache.connectionMap.forEach(async socketConnection => {
if (socketConnection.authenticatedChannels.includes(this)) {
universeConnectionsWithChannelAccess.push(socketConnection);
}
});
for (const universeConnection of universeConnectionsWithChannelAccess) {
const smartsocket = universeConnection.socketConnection.smartsocketRef as plugins.smartsocket.Smartsocket;
const universeMessageToSend: interfaces.IUniverseMessage = {
id: messageArg.id,
timestamp: messageArg.timestamp,
passphrase: messageArg.passphrase,
targetChannelName: this.name,
messageText: messageArg.messageText,
payload: messageArg.payload,
payloadStringType: messageArg.payloadStringType
};
smartsocket.clientCall('processMessage', universeMessageToSend, universeConnection.socketConnection);
}
}
}

View File

@ -2,6 +2,7 @@ import * as plugins from './smartuniverse.plugins';
import * as interfaces from './interfaces';
import { UniverseChannel } from './smartuniverse.classes.universechannel';
import { UniverseCache } from './smartuniverse.classes.universecache';
import { Universe } from './smartuniverse.classes.universe';
/**
* represents a connection to the universe
@ -12,18 +13,19 @@ export class UniverseConnection {
* @param universeConnectionArg
*/
public static async addConnectionToCache(
universeCache: UniverseCache,
universeRef: Universe,
universeConnectionArg: UniverseConnection
) {
let universeConnection = universeConnectionArg;
universeConnection = await UniverseConnection.deduplicateUniverseConnection(
universeCache,
universeRef.universeCache,
universeConnection
);
universeConnection = await UniverseConnection.authenticateAuthenticationRequests(
universeRef,
universeConnection
);
universeCache.connectionMap.add(universeConnection);
universeRef.universeCache.connectionMap.add(universeConnection);
}
/**
@ -51,10 +53,16 @@ export class UniverseConnection {
/**
* authenticate AuthenticationRequests
*/
public static authenticateAuthenticationRequests(
universeConnectionArg
public static async authenticateAuthenticationRequests(
universeRef: Universe,
universeConnectionArg: UniverseConnection
): Promise<UniverseConnection> {
// TODO: authenticate connections
for (const authenticationRequest of universeConnectionArg.authenticationRequests) {
const universeChannelToAuthenticateAgainst = UniverseChannel.getUniverseChannelByName(universeRef, authenticationRequest.name);
if (universeChannelToAuthenticateAgainst.passphrase === authenticationRequest.passphrase) {
universeConnectionArg.authenticatedChannels.push(universeChannelToAuthenticateAgainst);
}
}
return universeConnectionArg;
}
@ -65,7 +73,6 @@ export class UniverseConnection {
connectionArg1: UniverseConnection,
connectionArg2: UniverseConnection
) {
// TODO: merge connections
return connectionArg1;
}
@ -88,7 +95,7 @@ export class UniverseConnection {
* the socketClient to ping
*/
public socketConnection: plugins.smartsocket.SocketConnection;
public authenticationRequests = [];
public authenticationRequests: interfaces.IServerCallSubscribeActionPayload[] = [];
public subscribedChannels: UniverseChannel[] = [];
public authenticatedChannels: UniverseChannel[] = [];
public failedToJoinChannels: UniverseChannel[] = [];
@ -105,7 +112,7 @@ export class UniverseConnection {
socketConnection: plugins.smartsocket.SocketConnection;
authenticationRequests: interfaces.IServerCallSubscribeActionPayload[];
}) {
// TODO: check if this is correct
this.authenticationRequests = optionsArg.authenticationRequests;
this.socketConnection = optionsArg.socketConnection;
}
}

View File

@ -7,27 +7,28 @@ import { Timer, TimeStamp } from '@pushrocks/smarttime';
import { Universe } from './smartuniverse.classes.universe';
import { UniverseChannel } from './smartuniverse.classes.universechannel';
import { UniverseCache } from './smartuniverse.classes.universecache';
import { IUniverseMessage } from './interfaces';
import { SocketConnection } from '@pushrocks/smartsocket';
/**
* represents a message within a universe
* acts as a container to save message states like authentication status
*/
export class UniverseMessage implements interfaces.IUniverseMessage {
public static createMessageFromPayload(dataArg: interfaces.IUniverseMessage) {
return new UniverseMessage(dataArg);
public static createMessageFromPayload(socketConnectionArg: SocketConnection, dataArg: interfaces.IUniverseMessage) {
const universeMessageInstance = new UniverseMessage(dataArg);
universeMessageInstance.socketConnection = socketConnectionArg;
return universeMessageInstance;
}
public id: string;
public timestamp: number;
public smartTimestamp: TimeStamp;
public messageText: string;
public passphrase: string;
public payload: any;
public payloadStringType;
public targetChannelName: string;
public socketConnection: SocketConnection;
/**
* the UniverseCache the message is attached to
@ -54,7 +55,7 @@ export class UniverseMessage implements interfaces.IUniverseMessage {
* @param messageArg
* @param attachedPayloadArg
*/
constructor(messageDescriptor: IUniverseMessage) {
constructor(messageDescriptor: interfaces.IUniverseMessage) {
this.smartTimestamp = new TimeStamp(this.timestamp);
this.messageText = messageDescriptor.messageText;
this.targetChannelName = messageDescriptor.targetChannelName;
@ -68,6 +69,10 @@ export class UniverseMessage implements interfaces.IUniverseMessage {
this.universeCache = universeCacheArg;
}
public setTargetChannel() {
}
public setDestructionTimer(selfdestructAfterArg: number) {
if (selfdestructAfterArg) {
this.destructionTimer = new Timer(selfdestructAfterArg);
@ -86,7 +91,7 @@ export class UniverseMessage implements interfaces.IUniverseMessage {
* handles bad messages for further analysis
*/
public handleAsBadMessage() {
console.log('received a bad message');
plugins.smartlog.defaultLogger.log('warn', 'received a bad message');
}
/**

View File

@ -9,6 +9,7 @@ import * as smarthash from '@pushrocks/smarthash';
import * as smartdelay from '@pushrocks/smartdelay';
import * as smartexpress from '@pushrocks/smartexpress';
import * as smartfile from '@pushrocks/smartfile';
import * as smartlog from '@pushrocks/smartlog';
import * as smartpromise from '@pushrocks/smartpromise';
import * as smartrequest from '@pushrocks/smartrequest';
import * as smartrx from '@pushrocks/smartrx';
@ -22,6 +23,7 @@ export {
smartdelay,
smartexpress,
smartfile,
smartlog,
smartpromise,
smartrx,
smartrequest,