fix(core): update to transparent universe
This commit is contained in:
@ -1,3 +1,10 @@
|
||||
// Client classes
|
||||
export * from './smartuniverse.classes.clientuniverse';
|
||||
export * from './smartuniverse.classes.clientuniversechannel';
|
||||
|
||||
// Server classes
|
||||
export * from './smartuniverse.classes.universe';
|
||||
export * from './smartuniverse.classes.universeclient';
|
||||
export * from './smartuniverse.classes.universecache';
|
||||
export * from './smartuniverse.classes.universechannel';
|
||||
export * from './smartuniverse.classes.universemessage';
|
||||
export * from './smartuniverse.interfaces';
|
||||
|
@ -1,14 +1,16 @@
|
||||
import * as plugins from './smartuniverse.plugins';
|
||||
|
||||
import { Objectmap } from 'lik';
|
||||
import { Observable } from 'rxjs';
|
||||
import { Smartsocket, SmartsocketClient } from 'smartsocket';
|
||||
import * as url from 'url';
|
||||
|
||||
import {
|
||||
ClientUniverseChannel,
|
||||
IServerGetMessagesRequestBody,
|
||||
IServerPutMessageRequestBody
|
||||
} from './smartuniverse.classes.universe';
|
||||
import { UniverseMessage } from './smartuniverse.classes.universemessage';
|
||||
IServerPutMessageRequestBody,
|
||||
UniverseMessage
|
||||
} from './';
|
||||
|
||||
export interface IClientOptions {
|
||||
serverAddress: string;
|
||||
@ -18,10 +20,12 @@ export interface IClientOptions {
|
||||
* this class is for client side only!!!
|
||||
* allows connecting to a universe server
|
||||
*/
|
||||
export class UniverseClient {
|
||||
export class ClientUniverse {
|
||||
public options;
|
||||
private socketClient: plugins.smartsocket.SmartsocketClient;
|
||||
private observableIntake: plugins.smartrx.ObservableIntake<UniverseMessage>;
|
||||
public socketClient: plugins.smartsocket.SmartsocketClient;
|
||||
public observableIntake: plugins.smartrx.ObservableIntake<UniverseMessage>;
|
||||
|
||||
public channelCache = new Objectmap<ClientUniverseChannel>();
|
||||
|
||||
constructor(optionsArg: IClientOptions) {
|
||||
this.options = optionsArg;
|
||||
@ -38,7 +42,21 @@ export class UniverseClient {
|
||||
});
|
||||
}
|
||||
|
||||
public getMessageObservable() {
|
||||
public async getChannel(channelName: string): Promise<ClientUniverseChannel> {
|
||||
await this.checkConnection();
|
||||
const clientUniverseChannel = await ClientUniverseChannel.createClientUniverseChannel(
|
||||
this,
|
||||
channelName
|
||||
);
|
||||
this.channelCache.add(clientUniverseChannel);
|
||||
return clientUniverseChannel;
|
||||
}
|
||||
|
||||
public close() {
|
||||
this.socketClient.disconnect();
|
||||
}
|
||||
|
||||
private async checkConnection() {
|
||||
if (!this.socketClient && !this.observableIntake) {
|
||||
const parsedURL = url.parse(this.options.serverAddress);
|
||||
this.socketClient = new SmartsocketClient({
|
||||
@ -51,10 +69,5 @@ export class UniverseClient {
|
||||
this.observableIntake = new plugins.smartrx.ObservableIntake();
|
||||
this.socketClient.connect();
|
||||
}
|
||||
return this.observableIntake.observable;
|
||||
}
|
||||
|
||||
public close() {
|
||||
this.socketClient.disconnect();
|
||||
}
|
||||
}
|
34
ts/smartuniverse.classes.clientuniversechannel.ts
Normal file
34
ts/smartuniverse.classes.clientuniversechannel.ts
Normal file
@ -0,0 +1,34 @@
|
||||
import * as plugins from './smartuniverse.plugins';
|
||||
|
||||
import { ClientUniverse, IUniverseChannel } from './';
|
||||
|
||||
export class ClientUniverseChannel implements IUniverseChannel {
|
||||
// ======
|
||||
// STATIC
|
||||
// ======
|
||||
public static async createClientUniverseChannel(
|
||||
clientUniverseArg: ClientUniverse,
|
||||
channelName: string
|
||||
): Promise<ClientUniverseChannel> {
|
||||
const clientChannel = new ClientUniverseChannel(clientUniverseArg);
|
||||
await clientChannel.transmitSubscription();
|
||||
return clientChannel;
|
||||
}
|
||||
|
||||
// ========
|
||||
// INSTANCE
|
||||
// ========
|
||||
|
||||
public clientUniverse: ClientUniverse;
|
||||
|
||||
constructor(clientUniverseArg: ClientUniverse) {
|
||||
this.clientUniverse = clientUniverseArg;
|
||||
}
|
||||
|
||||
/**
|
||||
* tells the universe about this instances interest into a channel
|
||||
*/
|
||||
public async transmitSubscription() {
|
||||
this.clientUniverse.socketClient;
|
||||
}
|
||||
}
|
15
ts/smartuniverse.classes.clientuniversemessage.ts
Normal file
15
ts/smartuniverse.classes.clientuniversemessage.ts
Normal file
@ -0,0 +1,15 @@
|
||||
import * as plugins from './smartuniverse.plugins';
|
||||
|
||||
import { IUniverseMessage } from './';
|
||||
|
||||
export class ClientUniverseMessage implements IUniverseMessage {
|
||||
// ======
|
||||
// STATIC
|
||||
// ======
|
||||
createMessage(messageArg: string, payloadArg: any) {}
|
||||
|
||||
// ========
|
||||
// INSTANCE
|
||||
// ========
|
||||
constructor(messageArg, payloadArg) {}
|
||||
}
|
@ -1,9 +1,7 @@
|
||||
import * as plugins from './smartuniverse.plugins';
|
||||
|
||||
import { Handler, Route, Server } from 'smartexpress';
|
||||
import { UniverseChannel } from './smartuniverse.classes.universechannel';
|
||||
import { UniverseMessage } from './smartuniverse.classes.universemessage';
|
||||
import { UniverseCache } from './smartuniverse.classes.universecache';
|
||||
import { UniverseCache, UniverseChannel, UniverseMessage } from './';
|
||||
|
||||
import * as paths from './smartuniverse.paths';
|
||||
|
||||
@ -57,6 +55,14 @@ export class Universe {
|
||||
this.universeCache = new UniverseCache(this.options.messageExpiryInMilliseconds);
|
||||
}
|
||||
|
||||
/**
|
||||
* adds a channel to the Universe
|
||||
*/
|
||||
public async addChannel(nameArg: string, passphraseArg: string) {
|
||||
const newChannel = new UniverseChannel(this.universeCache, nameArg, passphraseArg);
|
||||
this.universeCache.channelMap.add(newChannel);
|
||||
}
|
||||
|
||||
/**
|
||||
* initiates a server
|
||||
*/
|
||||
@ -68,46 +74,13 @@ export class Universe {
|
||||
port: portArg
|
||||
});
|
||||
|
||||
// message handling
|
||||
// adds messages
|
||||
const addMessageHandler = new Handler('PUT', async request => {
|
||||
const requestBody: IServerPutMessageRequestBody = request.body;
|
||||
const message = new UniverseMessage(
|
||||
requestBody.message,
|
||||
requestBody.channel,
|
||||
requestBody.passphrase,
|
||||
requestBody.payload
|
||||
);
|
||||
this.universeCache.addMessage(message);
|
||||
console.log(requestBody);
|
||||
return true;
|
||||
});
|
||||
|
||||
// gets messages
|
||||
const readMessageHandler = new Handler('GET', request => {
|
||||
const done = plugins.smartq.defer<UniverseMessage[]>();
|
||||
const requestBody = request.body;
|
||||
const messageObservable = this.universeCache.readMessagesYoungerThan(requestBody.since);
|
||||
messageObservable.toArray().subscribe(universeMessageArrayArg => {
|
||||
done.resolve(universeMessageArrayArg);
|
||||
});
|
||||
return done.promise;
|
||||
});
|
||||
|
||||
// create new Route for messages
|
||||
const messageRoute = new Route(this.smartexpressServer, 'message');
|
||||
messageRoute.addHandler(addMessageHandler);
|
||||
messageRoute.addHandler(readMessageHandler);
|
||||
|
||||
const leaderElectionRoute = new Route(this.smartexpressServer, 'leadelection');
|
||||
// TODO: implement Handlers for leader election
|
||||
|
||||
// add websocket upgrade
|
||||
this.smartsocket = new plugins.smartsocket.Smartsocket({
|
||||
port: 12345 // fix this within smartsocket
|
||||
});
|
||||
|
||||
this.smartsocket.setExternalServer('express', this.smartexpressServer as any); // should work with express as well
|
||||
this.smartsocket.setExternalServer('express', this.smartexpressServer as any);
|
||||
// should work with express as well
|
||||
this.smartsocket.start();
|
||||
|
||||
await this.smartexpressServer.start();
|
||||
|
@ -12,26 +12,27 @@ import { rxjs } from 'smartrx';
|
||||
* universe store handles the creation, storage and retrieval of messages.
|
||||
*/
|
||||
export class UniverseCache {
|
||||
// ========
|
||||
// INSTANCE
|
||||
// ========
|
||||
public standardMessageExpiry: number;
|
||||
public destructionTime: number = 60000;
|
||||
|
||||
/**
|
||||
* stores messages for this instance
|
||||
*/
|
||||
public messageCache = new Objectmap<UniverseMessage>();
|
||||
public messageMap = new Objectmap<UniverseMessage>();
|
||||
|
||||
/**
|
||||
* stores the channels that are available within the universe
|
||||
*/
|
||||
public channelCache = new Objectmap<UniverseChannel>();
|
||||
public channelMap = new Objectmap<UniverseChannel>();
|
||||
|
||||
/**
|
||||
* allows messages to be processed in a blacklist mode for further analysis
|
||||
*/
|
||||
public blackListChannel = new UniverseChannel(this, 'blacklist', 'nada');
|
||||
|
||||
private lastId: number = 0; // stores the last id
|
||||
|
||||
constructor(standardMessageExpiryArg: number) {
|
||||
this.standardMessageExpiry = standardMessageExpiryArg;
|
||||
}
|
||||
@ -44,14 +45,14 @@ export class UniverseCache {
|
||||
public async addMessage(messageArg: UniverseMessage) {
|
||||
messageArg.setUniverseCache(this);
|
||||
UniverseChannel.authorizeAMessageForAChannel(this, messageArg);
|
||||
this.messageCache.add(messageArg);
|
||||
this.messageMap.add(messageArg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a message from the UniverseStore
|
||||
*/
|
||||
public readMessagesYoungerThan(unixTimeArg?: number): Observable<UniverseMessage> {
|
||||
const messageObservable = rxjs.Observable.from(this.messageCache.getArray()).filter(
|
||||
const messageObservable = rxjs.Observable.from(this.messageMap.getArray()).filter(
|
||||
messageArg => {
|
||||
return messageArg.timestamp.isYoungerThanMilliSeconds(this.destructionTime);
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ export class UniverseChannel {
|
||||
* returns boolean wether certain channel exists
|
||||
*/
|
||||
public static async doesChannelExists(universeCacheArg: UniverseCache, channelNameArg: string) {
|
||||
const channel = universeCacheArg.channelCache.find(channelArg => {
|
||||
const channel = universeCacheArg.channelMap.find(channelArg => {
|
||||
return channelArg.name === channelNameArg;
|
||||
});
|
||||
if (channel) {
|
||||
@ -44,7 +44,7 @@ export class UniverseChannel {
|
||||
universeCacheArg: UniverseCache,
|
||||
universeMessageArg: UniverseMessage
|
||||
) {
|
||||
const foundChannel = universeCacheArg.channelCache.find(universeChannel => {
|
||||
const foundChannel = universeCacheArg.channelMap.find(universeChannel => {
|
||||
const result = universeChannel.authenticate(universeMessageArg);
|
||||
return result;
|
||||
});
|
||||
@ -86,4 +86,6 @@ export class UniverseChannel {
|
||||
this.passphrase === universeMessageArg.requestedChannelPassphrase
|
||||
);
|
||||
}
|
||||
|
||||
public pushToClients(messageArg: UniverseMessage) {}
|
||||
}
|
||||
|
@ -2,7 +2,6 @@ import * as plugins from './smartuniverse.plugins';
|
||||
|
||||
import { Objectmap } from 'lik';
|
||||
|
||||
|
||||
import { Timer, TimeStamp } from 'smarttime';
|
||||
import { Universe } from './smartuniverse.classes.universe';
|
||||
import { UniverseChannel } from './smartuniverse.classes.universechannel';
|
||||
@ -89,7 +88,7 @@ export class UniverseMessage {
|
||||
|
||||
// set up self destruction by removing this from the parent messageCache
|
||||
this.destructionTimer.completed.then(async () => {
|
||||
this.universeCache.messageCache.remove(this);
|
||||
this.universeCache.messageMap.remove(this);
|
||||
});
|
||||
} else {
|
||||
this.fallBackDestruction();
|
||||
|
@ -1,14 +0,0 @@
|
||||
import * as plugins from './smartuniverse.plugins';
|
||||
|
||||
import { Universe } from './index';
|
||||
|
||||
process.env.CLI = 'true';
|
||||
|
||||
const universeCli = new plugins.smartcli.Smartcli();
|
||||
|
||||
universeCli.standardTask().subscribe(async argvArg => {
|
||||
const standardUniverse = new Universe({
|
||||
messageExpiryInMilliseconds: 60000
|
||||
});
|
||||
await standardUniverse.initServer(8765);
|
||||
});
|
3
ts/smartuniverse.interfaces.ts
Normal file
3
ts/smartuniverse.interfaces.ts
Normal file
@ -0,0 +1,3 @@
|
||||
export interface IUniverseChannel {}
|
||||
|
||||
export interface IUniverseMessage {}
|
@ -1,4 +1,3 @@
|
||||
import * as smartcli from '@pushrocks/smartcli';
|
||||
import * as lik from 'lik';
|
||||
import * as nodehash from 'nodehash';
|
||||
import * as path from 'path';
|
||||
@ -15,7 +14,6 @@ export {
|
||||
lik,
|
||||
nodehash,
|
||||
path,
|
||||
smartcli,
|
||||
smartdelay,
|
||||
smartexpress,
|
||||
smartfile,
|
||||
|
Reference in New Issue
Block a user