This commit is contained in:
2018-03-20 08:16:54 +01:00
parent a28c96356f
commit bafa94c6ac
23 changed files with 151 additions and 67 deletions

View File

@ -1,2 +1,3 @@
export * from './smartuniverse.classes.universe';
export * from './smartuniverse.classes.universeclient';
export * from './smartuniverse.classes.universemessage';

View File

@ -3,8 +3,5 @@ import * as plugins from './smartuniverse.plugins';
import { Objectmap } from 'lik';
export class UniverseManager {
public async registerMember () {
}
public async registerMember() {}
}

View File

@ -16,8 +16,8 @@ export interface IServerGetMessagesRequestBody {
}
export interface IServerPutMessageRequestBody {
message: string,
payload: any
message: string;
payload: any;
}
export class Universe {
@ -27,7 +27,7 @@ export class Universe {
// options
private options: ISmartUniverseConstructorOptions;
// Store version handling
private universeVersionStore: string;
private get universeVersion() {
@ -41,12 +41,12 @@ export class Universe {
}
private smartexpressServer: plugins.smartexpress.Server;
private smartsocket: plugins.smartsocket.Smartsocket;
constructor(optionsArg: ISmartUniverseConstructorOptions) {
this.options = optionsArg;
this.universeStore = new UniverseStore(this.options.messageExpiryInMilliseconds);
this.universeManager = new UniverseManager();
}
/**
@ -60,6 +60,13 @@ export class Universe {
port: portArg
});
this.smartsocket = new plugins.smartsocket.Smartsocket({
port: 12345 // fix this within smartsocket
});
this.smartsocket.setServer(this.smartexpressServer as any); // should work with express as well
this.smartsocket.startServer();
// route handling
// adds messages
const addMessageHandler = new Handler('PUT', request => {
@ -73,7 +80,7 @@ export class Universe {
const readMessageHandler = new Handler('GET', request => {
const requestBody = request.body;
this.universeStore.readMessagesYoungerThan(requestBody.since);
})
});
const messageRoute = new Route(this.smartexpressServer, 'message');
messageRoute.addHandler(addMessageHandler);
@ -82,7 +89,7 @@ export class Universe {
await this.smartexpressServer.start();
}
public async stopServer () {
public async stopServer() {
await this.smartexpressServer.stop();
}
}

View File

@ -1,30 +1,52 @@
import * as plugins from './smartuniverse.plugins';
import { Observable } from 'rxjs';
import { IServerGetMessagesRequestBody, IServerPutMessageRequestBody } from './smartuniverse.classes.universe'
import { Smartsocket, SmartsocketClient } from 'smartsocket';
import * as url from 'url';
import {
IServerGetMessagesRequestBody,
IServerPutMessageRequestBody
} from './smartuniverse.classes.universe';
import { UniverseMessage } from './smartuniverse.classes.universemessage';
export interface IClientOptions {
serverAddress: string
serverAddress: string;
}
export class UniverseClient {
public options;
private socketClient: plugins.smartsocket.SmartsocketClient;
private observableIntake: plugins.smartrx.ObservableIntake<UniverseMessage>;
constructor(optionsArg: IClientOptions) {
this.options = optionsArg;
}
public async sendMessage(messageArg, payloadArg) {
const requestBody = {
message: messageArg,
payload: payloadArg
}
};
await plugins.smartrequest.post(this.options.serverAddress, {
requestBody: requestBody
})
requestBody
});
}
public getMessageObservable () {
public getMessageObservable() {
if (!this.socketClient && !this.observableIntake) {
const parsedURL = url.parse(this.options.serverAddress);
this.socketClient = new SmartsocketClient({
alias: process.env.SOCKET_ALIAS || 'someclient',
password: 'UniverseClient',
port: parseInt(parsedURL.port, 10),
role: 'UniverseClient',
url: parsedURL.hostname,
});
this.observableIntake = new plugins.smartrx.ObservableIntake();
this.socketClient.connect();
}
return this.observableIntake.observable;
}
}

View File

@ -17,17 +17,22 @@ export class UniverseMessage {
public attachedPayload: any;
public destructionTimer: Timer;
constructor(parentUniverseStore: UniverseStore, messageArg: string, attachedPayloadArg: any, selfdestructAfterArg: number) {
constructor(
parentUniverseStore: UniverseStore,
messageArg: string,
attachedPayloadArg: any,
selfdestructAfterArg: number
) {
this.universeStore = parentUniverseStore;
this.timestamp = new TimeStamp();
this.message = messageArg;
this.attachedPayload = attachedPayloadArg;
this.destructionTimer = new Timer(selfdestructAfterArg)
this.destructionTimer.start()
this.destructionTimer = new Timer(selfdestructAfterArg);
this.destructionTimer.start();
// set up self destruction by removing this from the parent messageStore
this.destructionTimer.completed.then(async () => {
this.universeStore.messageStore.remove(this);
})
});
}
}

View File

@ -2,10 +2,10 @@ import * as plugins from './smartuniverse.plugins';
import { UniverseMessage } from './smartuniverse.classes.universemessage';
import { Objectmap } from 'lik'
import { Objectmap } from 'lik';
import { Observable } from 'rxjs';
import { rxjs } from 'smartrx'
import { rxjs } from 'smartrx';
export class UniverseStore {
public standardMessageExpiry: number;
@ -23,19 +23,20 @@ export class UniverseStore {
* @param attachedPayloadArg
*/
public addMessage(messageArg, attachedPayloadArg) {
this.messageStore.add(new UniverseMessage(this, messageArg, attachedPayloadArg, this.destructionTime));
this.messageStore.add(
new UniverseMessage(this, messageArg, attachedPayloadArg, this.destructionTime)
);
}
/**
* Read a message from the UniverseStore
*/
public readMessagesYoungerThan(unixTimeArg?: number): Observable<UniverseMessage> {
const messageObservable = rxjs.Observable
.from(this.messageStore.getArray())
.filter(messageArg => {
const messageObservable = rxjs.Observable.from(this.messageStore.getArray()).filter(
messageArg => {
return messageArg.timestamp.isYoungerThanMilliSeconds(this.destructionTime);
});
}
);
return messageObservable;
}
}

View File

@ -11,5 +11,4 @@ universeCli.standardTask().then(async argvArg => {
messageExpiryInMilliseconds: 60000
});
await standardUniverse.initServer(8765);
});

View File

@ -8,4 +8,14 @@ import * as smartrx from 'smartrx';
import * as smartsocket from 'smartsocket';
import * as smarttime from 'smarttime';
export { lik, path, smartcli, smartexpress, smartfile, smartrx, smartrequest, smartsocket, smarttime };
export {
lik,
path,
smartcli,
smartexpress,
smartfile,
smartrx,
smartrequest,
smartsocket,
smarttime
};