fix(core): update
This commit is contained in:
parent
c4261765ec
commit
89feeca735
@ -15,7 +15,7 @@ const testServerData = {
|
||||
|
||||
const testChannelData = {
|
||||
channelName: 'awesomeTestChannel',
|
||||
channelPass: 'awesomeChannelPAss'
|
||||
channelPass: 'awesomeChannelPass'
|
||||
};
|
||||
|
||||
tap.test('first test', async () => {
|
||||
@ -44,6 +44,10 @@ tap.test('should add the same channel to the client universe in the same way', a
|
||||
await testClientUniverse.addChannel(testChannelData.channelName, testChannelData.channelPass);
|
||||
});
|
||||
|
||||
tap.test('should start the ClientUniverse', async () => {
|
||||
await testClientUniverse.start();
|
||||
})
|
||||
|
||||
tap.test('should get a observable correctly', async () => {
|
||||
testClientChannel = await testClientUniverse.getChannel(testChannelData.channelName);
|
||||
expect(testClientChannel).to.be.instanceof(smartuniverse.ClientUniverseChannel);
|
||||
@ -70,7 +74,7 @@ tap.test('a second client should be able to subscibe', async () => {
|
||||
tap.test('should receive a message correctly', async () => {});
|
||||
|
||||
tap.test('should disconnect the client correctly', async () => {
|
||||
testClientUniverse.close();
|
||||
testClientUniverse.stop();
|
||||
});
|
||||
|
||||
tap.test('should end the server correctly', async tools => {
|
||||
|
@ -1,4 +0,0 @@
|
||||
export interface IAuthenticationRequest {
|
||||
channelName: string;
|
||||
password: string;
|
||||
}
|
@ -22,8 +22,6 @@ export class ClientUniverse {
|
||||
public options;
|
||||
public smartsocketClient: plugins.smartsocket.SmartsocketClient;
|
||||
public observableIntake: plugins.smartrx.ObservableIntake<UniverseMessage>;
|
||||
|
||||
public channelStore = new Objectmap<ClientUniverseChannel>();
|
||||
public clientUniverseCache = new ClientUniverseCache();
|
||||
|
||||
constructor(optionsArg: IClientOptions) {
|
||||
@ -51,8 +49,7 @@ export class ClientUniverse {
|
||||
* @param passphraseArg
|
||||
*/
|
||||
public async getChannel(channelName: string): Promise<ClientUniverseChannel> {
|
||||
await this.checkConnection();
|
||||
const clientUniverseChannel = this.channelStore.find(channel => {
|
||||
const clientUniverseChannel = this.clientUniverseCache.channelMap.find(channel => {
|
||||
return channel.name === channelName;
|
||||
});
|
||||
return clientUniverseChannel;
|
||||
@ -63,12 +60,16 @@ export class ClientUniverse {
|
||||
* @param messageArg
|
||||
*/
|
||||
public removeChannel(channelNameArg, notifyServer = true) {
|
||||
const clientUniverseChannel = this.channelStore.findOneAndRemove(channelItemArg => {
|
||||
const clientUniverseChannel = this.clientUniverseCache.channelMap.findOneAndRemove(channelItemArg => {
|
||||
return channelItemArg.name === channelNameArg;
|
||||
});
|
||||
}
|
||||
|
||||
public close() {
|
||||
public async start() {
|
||||
await this.checkConnection();
|
||||
}
|
||||
|
||||
public stop() {
|
||||
this.smartsocketClient.disconnect();
|
||||
}
|
||||
|
||||
@ -95,7 +96,7 @@ export class ClientUniverse {
|
||||
/**
|
||||
* should handle a forced unsubscription by the server
|
||||
*/
|
||||
const unsubscribe = new plugins.smartsocket.SocketFunction({
|
||||
const socketFunctionUnsubscribe = new plugins.smartsocket.SocketFunction({
|
||||
funcName: 'unsubscribe',
|
||||
allowedRoles: [],
|
||||
funcDef: async (data: interfaces.IServerUnsubscribeActionPayload) => {
|
||||
@ -104,17 +105,25 @@ export class ClientUniverse {
|
||||
});
|
||||
|
||||
/**
|
||||
* should handle a message reception
|
||||
* handles message reception
|
||||
*/
|
||||
const processMessageSocketFunction = new plugins.smartsocket.SocketFunction({
|
||||
const socketFunctionProcessMessage = new plugins.smartsocket.SocketFunction({
|
||||
funcName: 'processMessage',
|
||||
allowedRoles: [],
|
||||
funcDef: async (data: interfaces.IServerUnsubscribeActionPayload) => {
|
||||
throw new Error('TODO');
|
||||
console.log('Got message from server');
|
||||
}
|
||||
});
|
||||
|
||||
// add functions
|
||||
this.smartsocketClient.addSocketFunction(socketFunctionUnsubscribe);
|
||||
this.smartsocketClient.addSocketFunction(socketFunctionProcessMessage);
|
||||
|
||||
await this.smartsocketClient.connect();
|
||||
console.log('universe client connected successfully');
|
||||
await this.clientUniverseCache.channelMap.forEach(async clientUniverseChannelArg => {
|
||||
await clientUniverseChannelArg.subscribe();
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,8 +1,11 @@
|
||||
import * as plugins from './smartuniverse.plugins';
|
||||
import { ClientUniverseChannel } from './smartuniverse.classes.clientuniversechannel';
|
||||
|
||||
/**
|
||||
* a cache for clients
|
||||
* keeps track of which messages have already been received
|
||||
* good for deduplication in mesh environments
|
||||
*/
|
||||
export class ClientUniverseCache {}
|
||||
export class ClientUniverseCache {
|
||||
public channelMap = new plugins.lik.Objectmap<ClientUniverseChannel>();
|
||||
}
|
||||
|
@ -23,8 +23,7 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
|
||||
channelNameArg,
|
||||
passphraseArg
|
||||
);
|
||||
clientUniverseArg.channelStore.add(clientChannel);
|
||||
await clientChannel.subscribe();
|
||||
clientUniverseArg.clientUniverseCache.channelMap.add(clientChannel);
|
||||
return clientChannel;
|
||||
}
|
||||
|
||||
|
@ -34,7 +34,7 @@ export class Universe {
|
||||
|
||||
constructor(optionsArg: ISmartUniverseConstructorOptions) {
|
||||
this.options = optionsArg;
|
||||
this.universeCache = new UniverseCache(this.options.messageExpiryInMilliseconds);
|
||||
this.universeCache = new UniverseCache(this, this.options.messageExpiryInMilliseconds);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -60,7 +60,7 @@ export class Universe {
|
||||
* adds a channel to the Universe
|
||||
*/
|
||||
public async addChannel(nameArg: string, passphraseArg: string) {
|
||||
const newChannel = UniverseChannel.createChannel(this.universeCache, nameArg, passphraseArg);
|
||||
const newChannel = UniverseChannel.createChannel(this, nameArg, passphraseArg);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -98,12 +98,11 @@ export class Universe {
|
||||
) => {
|
||||
// run in "this context" of this class
|
||||
await (async () => {
|
||||
// TODO: properly add the connection
|
||||
const universeConnection = new UniverseConnection({
|
||||
socketConnection: socketConnectionArg,
|
||||
authenticationRequests: []
|
||||
authenticationRequests: [dataArg]
|
||||
});
|
||||
await UniverseConnection.addConnectionToCache(this.universeCache, universeConnection);
|
||||
await UniverseConnection.addConnectionToCache(this, universeConnection);
|
||||
return {
|
||||
'subscription status': 'success'
|
||||
};
|
||||
@ -128,7 +127,7 @@ export class Universe {
|
||||
error: 'You need to authenticate for a channel'
|
||||
};
|
||||
}
|
||||
const unauthenticatedMessage = UniverseMessage.createMessageFromPayload(dataArg);
|
||||
const unauthenticatedMessage = UniverseMessage.createMessageFromPayload(socketConnectionArg, dataArg);
|
||||
const foundChannel = await UniverseChannel.authorizeAMessageForAChannel(
|
||||
this.universeCache,
|
||||
unauthenticatedMessage
|
||||
|
@ -9,6 +9,7 @@ import { Observable, from } from 'rxjs';
|
||||
import { filter } from 'rxjs/operators';
|
||||
import { rxjs } from '@pushrocks/smartrx';
|
||||
import { UniverseConnection } from './smartuniverse.classes.universeconnection';
|
||||
import { Universe } from './smartuniverse.classes.universe';
|
||||
|
||||
/**
|
||||
* universe store handles the creation, storage and retrieval of messages.
|
||||
@ -30,15 +31,22 @@ export class UniverseCache {
|
||||
*/
|
||||
public channelMap = new Objectmap<UniverseChannel>();
|
||||
|
||||
/**
|
||||
* stores all connections
|
||||
*/
|
||||
public connectionMap = new plugins.lik.Objectmap<UniverseConnection>();
|
||||
|
||||
/**
|
||||
* allows messages to be processed in a blacklist mode for further analysis
|
||||
*/
|
||||
public blackListChannel = new UniverseChannel(this, 'blacklist', 'nada');
|
||||
public blackListChannel: UniverseChannel;
|
||||
|
||||
constructor(standardMessageExpiryArg: number) {
|
||||
public universeRef: Universe;
|
||||
|
||||
constructor(universeArg: Universe, standardMessageExpiryArg: number) {
|
||||
this.universeRef = universeArg;
|
||||
this.standardMessageExpiry = standardMessageExpiryArg;
|
||||
this.blackListChannel = new UniverseChannel(this.universeRef, 'blacklist', 'nada');
|
||||
}
|
||||
|
||||
/**
|
||||
@ -50,6 +58,9 @@ export class UniverseCache {
|
||||
messageArg.setUniverseCache(this);
|
||||
UniverseChannel.authorizeAMessageForAChannel(this, messageArg);
|
||||
this.messageMap.add(messageArg);
|
||||
messageArg.universeChannelList.forEach(universeChannel => {
|
||||
universeChannel.pushToClients(messageArg);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1,7 +1,10 @@
|
||||
import * as plugins from './smartuniverse.plugins';
|
||||
import * as interfaces from './interfaces';
|
||||
|
||||
import { UniverseCache } from './smartuniverse.classes.universecache';
|
||||
import { UniverseMessage } from './smartuniverse.classes.universemessage';
|
||||
import { UniverseConnection } from './smartuniverse.classes.universeconnection';
|
||||
import { Universe } from './smartuniverse.classes.universe';
|
||||
|
||||
/**
|
||||
* enables messages to stay within a certain scope.
|
||||
@ -17,12 +20,12 @@ export class UniverseChannel {
|
||||
* @param passphraseArg the secret thats used for a certain topic.
|
||||
*/
|
||||
public static createChannel(
|
||||
universeCacheArg: UniverseCache,
|
||||
universeArg: Universe,
|
||||
channelNameArg: string,
|
||||
passphraseArg: string
|
||||
) {
|
||||
const newChannel = new UniverseChannel(universeCacheArg, channelNameArg, passphraseArg);
|
||||
universeCacheArg.channelMap.add(newChannel);
|
||||
const newChannel = new UniverseChannel(universeArg, channelNameArg, passphraseArg);
|
||||
universeArg.universeCache.channelMap.add(newChannel);
|
||||
return newChannel;
|
||||
}
|
||||
|
||||
@ -68,6 +71,12 @@ export class UniverseChannel {
|
||||
}
|
||||
}
|
||||
|
||||
public static getUniverseChannelByName(universeRef: Universe, universeChannelName: string) {
|
||||
return universeRef.universeCache.channelMap.find(channelArg => {
|
||||
return channelArg.name === universeChannelName;
|
||||
});
|
||||
}
|
||||
|
||||
// ========
|
||||
// INSTANCE
|
||||
// ========
|
||||
@ -75,14 +84,15 @@ export class UniverseChannel {
|
||||
* the name of the channel
|
||||
*/
|
||||
public name: string;
|
||||
public universeCacheInstance: UniverseCache;
|
||||
public universeRef: Universe;
|
||||
|
||||
/**
|
||||
* the passphrase for the channel
|
||||
*/
|
||||
public passphrase: string;
|
||||
|
||||
constructor(universeCacheArg: UniverseCache, channelNameArg: string, passphraseArg: string) {
|
||||
constructor(universeArg: Universe, channelNameArg: string, passphraseArg: string) {
|
||||
this.universeRef = universeArg;
|
||||
this.name = channelNameArg;
|
||||
this.passphrase = passphraseArg;
|
||||
}
|
||||
@ -99,5 +109,29 @@ export class UniverseChannel {
|
||||
);
|
||||
}
|
||||
|
||||
public pushToClients(messageArg: UniverseMessage) {}
|
||||
/**
|
||||
* pushes a message to clients
|
||||
* @param messageArg
|
||||
*/
|
||||
public async pushToClients(messageArg: UniverseMessage) {
|
||||
const universeConnectionsWithChannelAccess: UniverseConnection[] = [];
|
||||
this.universeRef.universeCache.connectionMap.forEach(async socketConnection => {
|
||||
if (socketConnection.authenticatedChannels.includes(this)) {
|
||||
universeConnectionsWithChannelAccess.push(socketConnection);
|
||||
}
|
||||
});
|
||||
for (const universeConnection of universeConnectionsWithChannelAccess) {
|
||||
const smartsocket = universeConnection.socketConnection.smartsocketRef as plugins.smartsocket.Smartsocket;
|
||||
const universeMessageToSend: interfaces.IUniverseMessage = {
|
||||
id: messageArg.id,
|
||||
timestamp: messageArg.timestamp,
|
||||
passphrase: messageArg.passphrase,
|
||||
targetChannelName: this.name,
|
||||
messageText: messageArg.messageText,
|
||||
payload: messageArg.payload,
|
||||
payloadStringType: messageArg.payloadStringType
|
||||
};
|
||||
smartsocket.clientCall('processMessage', universeMessageToSend, universeConnection.socketConnection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ import * as plugins from './smartuniverse.plugins';
|
||||
import * as interfaces from './interfaces';
|
||||
import { UniverseChannel } from './smartuniverse.classes.universechannel';
|
||||
import { UniverseCache } from './smartuniverse.classes.universecache';
|
||||
import { Universe } from './smartuniverse.classes.universe';
|
||||
|
||||
/**
|
||||
* represents a connection to the universe
|
||||
@ -12,18 +13,19 @@ export class UniverseConnection {
|
||||
* @param universeConnectionArg
|
||||
*/
|
||||
public static async addConnectionToCache(
|
||||
universeCache: UniverseCache,
|
||||
universeRef: Universe,
|
||||
universeConnectionArg: UniverseConnection
|
||||
) {
|
||||
let universeConnection = universeConnectionArg;
|
||||
universeConnection = await UniverseConnection.deduplicateUniverseConnection(
|
||||
universeCache,
|
||||
universeRef.universeCache,
|
||||
universeConnection
|
||||
);
|
||||
universeConnection = await UniverseConnection.authenticateAuthenticationRequests(
|
||||
universeRef,
|
||||
universeConnection
|
||||
);
|
||||
universeCache.connectionMap.add(universeConnection);
|
||||
universeRef.universeCache.connectionMap.add(universeConnection);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -51,10 +53,17 @@ export class UniverseConnection {
|
||||
/**
|
||||
* authenticate AuthenticationRequests
|
||||
*/
|
||||
public static authenticateAuthenticationRequests(
|
||||
universeConnectionArg
|
||||
public static async authenticateAuthenticationRequests(
|
||||
universeRef: Universe,
|
||||
universeConnectionArg: UniverseConnection
|
||||
): Promise<UniverseConnection> {
|
||||
// TODO: authenticate connections
|
||||
for (const authenticationRequest of universeConnectionArg.authenticationRequests) {
|
||||
// TODO: authenticate channel subscriptions
|
||||
const universeChannelToAuthenticateAgainst = UniverseChannel.getUniverseChannelByName(universeRef, authenticationRequest.name);
|
||||
if (universeChannelToAuthenticateAgainst.passphrase === authenticationRequest.passphrase) {
|
||||
universeConnectionArg.authenticatedChannels.push(universeChannelToAuthenticateAgainst);
|
||||
}
|
||||
}
|
||||
return universeConnectionArg;
|
||||
}
|
||||
|
||||
@ -88,7 +97,7 @@ export class UniverseConnection {
|
||||
* the socketClient to ping
|
||||
*/
|
||||
public socketConnection: plugins.smartsocket.SocketConnection;
|
||||
public authenticationRequests = [];
|
||||
public authenticationRequests: interfaces.IServerCallSubscribeActionPayload[] = [];
|
||||
public subscribedChannels: UniverseChannel[] = [];
|
||||
public authenticatedChannels: UniverseChannel[] = [];
|
||||
public failedToJoinChannels: UniverseChannel[] = [];
|
||||
@ -105,7 +114,7 @@ export class UniverseConnection {
|
||||
socketConnection: plugins.smartsocket.SocketConnection;
|
||||
authenticationRequests: interfaces.IServerCallSubscribeActionPayload[];
|
||||
}) {
|
||||
// TODO: check if this is correct
|
||||
this.authenticationRequests = optionsArg.authenticationRequests;
|
||||
this.socketConnection = optionsArg.socketConnection;
|
||||
}
|
||||
}
|
||||
|
@ -7,27 +7,28 @@ import { Timer, TimeStamp } from '@pushrocks/smarttime';
|
||||
import { Universe } from './smartuniverse.classes.universe';
|
||||
import { UniverseChannel } from './smartuniverse.classes.universechannel';
|
||||
import { UniverseCache } from './smartuniverse.classes.universecache';
|
||||
import { IUniverseMessage } from './interfaces';
|
||||
import { SocketConnection } from '@pushrocks/smartsocket';
|
||||
|
||||
/**
|
||||
* represents a message within a universe
|
||||
* acts as a container to save message states like authentication status
|
||||
*/
|
||||
export class UniverseMessage implements interfaces.IUniverseMessage {
|
||||
public static createMessageFromPayload(dataArg: interfaces.IUniverseMessage) {
|
||||
return new UniverseMessage(dataArg);
|
||||
public static createMessageFromPayload(socketConnectionArg: SocketConnection, dataArg: interfaces.IUniverseMessage) {
|
||||
const universeMessageInstance = new UniverseMessage(dataArg);
|
||||
universeMessageInstance.socketConnection = socketConnectionArg;
|
||||
return universeMessageInstance;
|
||||
}
|
||||
|
||||
public id: string;
|
||||
|
||||
public timestamp: number;
|
||||
public smartTimestamp: TimeStamp;
|
||||
|
||||
public messageText: string;
|
||||
public passphrase: string;
|
||||
public payload: any;
|
||||
public payloadStringType;
|
||||
public targetChannelName: string;
|
||||
public socketConnection: SocketConnection;
|
||||
|
||||
/**
|
||||
* the UniverseCache the message is attached to
|
||||
@ -54,7 +55,7 @@ export class UniverseMessage implements interfaces.IUniverseMessage {
|
||||
* @param messageArg
|
||||
* @param attachedPayloadArg
|
||||
*/
|
||||
constructor(messageDescriptor: IUniverseMessage) {
|
||||
constructor(messageDescriptor: interfaces.IUniverseMessage) {
|
||||
this.smartTimestamp = new TimeStamp(this.timestamp);
|
||||
this.messageText = messageDescriptor.messageText;
|
||||
this.targetChannelName = messageDescriptor.targetChannelName;
|
||||
@ -68,6 +69,10 @@ export class UniverseMessage implements interfaces.IUniverseMessage {
|
||||
this.universeCache = universeCacheArg;
|
||||
}
|
||||
|
||||
public setTargetChannel() {
|
||||
|
||||
}
|
||||
|
||||
public setDestructionTimer(selfdestructAfterArg: number) {
|
||||
if (selfdestructAfterArg) {
|
||||
this.destructionTimer = new Timer(selfdestructAfterArg);
|
||||
|
Loading…
Reference in New Issue
Block a user