Compare commits

..

17 Commits

Author SHA1 Message Date
30053fe441 1.0.95 2019-11-09 13:00:30 +01:00
afb4e3339a fix(core): update 2019-11-09 13:00:30 +01:00
e413a8116d 1.0.94 2019-11-09 12:59:51 +01:00
ffeed0565c fix(core): update 2019-11-09 12:59:51 +01:00
736240b978 1.0.93 2019-11-09 12:23:34 +01:00
73f4600c2a fix(core): update 2019-11-09 12:23:33 +01:00
40beec1166 1.0.92 2019-11-07 01:02:03 +01:00
f8690fef50 1.0.91 2019-11-07 00:59:46 +01:00
972ddbf327 fix(core): update 2019-11-07 00:59:45 +01:00
80aacd17a6 1.0.90 2019-11-03 20:23:23 +01:00
e67b3e50cc fix(core): update 2019-11-03 20:23:22 +01:00
a4a8959b74 1.0.89 2019-09-25 18:46:18 +02:00
bab0f062f7 fix(core): update 2019-09-25 18:46:18 +02:00
3bdfe4dcb4 1.0.88 2019-09-25 18:26:40 +02:00
fca960ad0d fix(core): update 2019-09-25 18:26:39 +02:00
e43ed3951c 1.0.87 2019-09-17 15:40:55 +02:00
23df304535 fix(core): update 2019-09-17 15:40:54 +02:00
18 changed files with 542 additions and 523 deletions

View File

@ -3,14 +3,14 @@ image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
cache: cache:
paths: paths:
- .npmci_cache/ - .npmci_cache/
key: "$CI_BUILD_STAGE" key: '$CI_BUILD_STAGE'
stages: stages:
- security - security
- test - test
- release - release
- metadata - metadata
# ==================== # ====================
# security stage # security stage
@ -18,10 +18,11 @@ stages:
mirror: mirror:
stage: security stage: security
script: script:
- npmci git mirror - npmci git mirror
tags: tags:
- docker - lossless
- notpriv - docker
- notpriv
snyk: snyk:
stage: security stage: security
@ -31,8 +32,9 @@ snyk:
- npmci command npm install --ignore-scripts - npmci command npm install --ignore-scripts
- npmci command snyk test - npmci command snyk test
tags: tags:
- docker - lossless
- notpriv - docker
- notpriv
# ==================== # ====================
# test stage # test stage
@ -41,37 +43,40 @@ snyk:
testStable: testStable:
stage: test stage: test
script: script:
- npmci npm prepare - npmci npm prepare
- npmci node install stable - npmci node install stable
- npmci npm install - npmci npm install
- npmci npm test - npmci npm test
coverage: /\d+.?\d+?\%\s*coverage/ coverage: /\d+.?\d+?\%\s*coverage/
tags: tags:
- docker - lossless
- priv - docker
- priv
testBuild: testBuild:
stage: test stage: test
script: script:
- npmci npm prepare - npmci npm prepare
- npmci node install lts - npmci node install stable
- npmci npm install - npmci npm install
- npmci command npm run build - npmci command npm run build
coverage: /\d+.?\d+?\%\s*coverage/ coverage: /\d+.?\d+?\%\s*coverage/
tags: tags:
- docker - lossless
- notpriv - docker
- notpriv
release: release:
stage: release stage: release
script: script:
- npmci node install lts - npmci node install stable
- npmci npm publish - npmci npm publish
only: only:
- tags - tags
tags: tags:
- docker - lossless
- notpriv - docker
- notpriv
# ==================== # ====================
# metadata stage # metadata stage
@ -81,33 +86,35 @@ codequality:
allow_failure: true allow_failure: true
script: script:
- npmci command npm install -g tslint typescript - npmci command npm install -g tslint typescript
- npmci npm prepare
- npmci npm install - npmci npm install
- npmci command "tslint -c tslint.json ./ts/**/*.ts" - npmci command "tslint -c tslint.json ./ts/**/*.ts"
tags: tags:
- docker - lossless
- priv - docker
- priv
trigger: trigger:
stage: metadata stage: metadata
script: script:
- npmci trigger - npmci trigger
only: only:
- tags - tags
tags: tags:
- docker - lossless
- notpriv - docker
- notpriv
pages: pages:
image: hosttoday/ht-docker-dbase:npmci
services:
- docker:stable-dind
stage: metadata stage: metadata
script: script:
- npmci node install lts
- npmci command npm install -g @gitzone/tsdoc - npmci command npm install -g @gitzone/tsdoc
- npmci npm prepare - npmci npm prepare
- npmci npm install - npmci npm install
- npmci command tsdoc - npmci command tsdoc
tags: tags:
- lossless
- docker - docker
- notpriv - notpriv
only: only:
@ -115,5 +122,5 @@ pages:
artifacts: artifacts:
expire_in: 1 week expire_in: 1 week
paths: paths:
- public - public
allow_failure: true allow_failure: true

732
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
{ {
"name": "@pushrocks/smartuniverse", "name": "@pushrocks/smartuniverse",
"version": "1.0.86", "version": "1.0.95",
"private": false, "private": false,
"description": "messaging service for your micro services", "description": "messaging service for your micro services",
"main": "dist/index.js", "main": "dist/index.js",
@ -15,10 +15,10 @@
}, },
"devDependencies": { "devDependencies": {
"@gitzone/tsbuild": "^2.1.17", "@gitzone/tsbuild": "^2.1.17",
"@gitzone/tstest": "^1.0.24", "@gitzone/tstest": "^1.0.28",
"@pushrocks/tapbundle": "^3.0.13", "@pushrocks/tapbundle": "^3.0.13",
"@types/node": "^12.7.4", "@types/node": "^12.12.7",
"tslint": "^5.20.0", "tslint": "^5.20.1",
"tslint-config-prettier": "^1.18.0" "tslint-config-prettier": "^1.18.0"
}, },
"peerDependencies": { "peerDependencies": {
@ -27,25 +27,25 @@
"dependencies": { "dependencies": {
"@apiglobal/typedrequest-interfaces": "^1.0.7", "@apiglobal/typedrequest-interfaces": "^1.0.7",
"@pushrocks/lik": "^3.0.11", "@pushrocks/lik": "^3.0.11",
"@pushrocks/smartdelay": "^2.0.3", "@pushrocks/smartdelay": "^2.0.6",
"@pushrocks/smartexpress": "^3.0.40", "@pushrocks/smartexpress": "^3.0.52",
"@pushrocks/smartfile": "^7.0.4", "@pushrocks/smartfile": "^7.0.6",
"@pushrocks/smarthash": "^2.0.6", "@pushrocks/smarthash": "^2.0.6",
"@pushrocks/smartlog": "^2.0.19", "@pushrocks/smartlog": "^2.0.21",
"@pushrocks/smartpromise": "^3.0.2", "@pushrocks/smartpromise": "^3.0.6",
"@pushrocks/smartrequest": "^1.1.27", "@pushrocks/smartrequest": "^1.1.42",
"@pushrocks/smartrx": "^2.0.5", "@pushrocks/smartrx": "^2.0.5",
"@pushrocks/smartsocket": "^1.1.49", "@pushrocks/smartsocket": "^1.1.58",
"@pushrocks/smarttime": "^3.0.12", "@pushrocks/smarttime": "^3.0.12",
"@pushrocks/smartunique": "^3.0.1" "@pushrocks/smartunique": "^3.0.1"
}, },
"files": [ "files": [
"ts/*", "ts/**/*",
"ts_web/*", "ts_web/**/*",
"dist/*", "dist/**/*",
"dist_web/*", "dist_web/**/*",
"dist_ts_web/*", "dist_ts_web/**/*",
"assets/*", "assets/**/*",
"cli.js", "cli.js",
"npmextra.json", "npmextra.json",
"readme.md" "readme.md"

View File

@ -47,6 +47,11 @@ myUniverse.start(8765); // start the server and provide the port on which to lis
All your microservices represents clients in the universe that may talk to each other using the universe server. All your microservices represents clients in the universe that may talk to each other using the universe server.
## Contribution
We are always happy for code contributions. If you are not the code contributing type that is ok. Still, maintaining Open Source repositories takes considerable time and thought. If you like the quality of what we do and our modules are useful to you we would appreciate a little monthly contribution: You can [contribute one time](https://lossless.link/contribute-onetime) or [contribute monthly](https://lossless.link/contribute). :)
For further information read the linked docs at the top of this readme. For further information read the linked docs at the top of this readme.
> MIT licensed | **©** [Lossless GmbH](https://lossless.gmbh) > MIT licensed | **©** [Lossless GmbH](https://lossless.gmbh)

View File

@ -31,7 +31,8 @@ tap.test('add a message to the SmartUniverse', async () => {
// testing message handling // testing message handling
tap.test('create smartuniverse client', async () => { tap.test('create smartuniverse client', async () => {
testClientUniverse = new smartuniverse.ClientUniverse({ testClientUniverse = new smartuniverse.ClientUniverse({
serverAddress: testServerData.serverAddress serverAddress: testServerData.serverAddress,
autoReconnect: true
}); });
expect(testClientUniverse).to.be.instanceof(smartuniverse.ClientUniverse); expect(testClientUniverse).to.be.instanceof(smartuniverse.ClientUniverse);
}); });
@ -65,20 +66,23 @@ tap.test('universe should contain the sent message', async () => {
tap.test('a second client should be able to subscibe', async () => { tap.test('a second client should be able to subscibe', async () => {
testClientUniverse2 = new smartuniverse.ClientUniverse({ testClientUniverse2 = new smartuniverse.ClientUniverse({
serverAddress: testServerData.serverAddress serverAddress: testServerData.serverAddress,
autoReconnect: true
}); });
testClientUniverse2.addChannel(testChannelData.channelName, testChannelData.channelPass); testClientUniverse2.addChannel(testChannelData.channelName, testChannelData.channelPass);
await testClientUniverse2.start(); await testClientUniverse2.start();
}); });
tap.test('should receive a message correctly', async (tools) => { tap.test('should receive a message correctly', async tools => {
const done = tools.defer(); const done = tools.defer();
const testChannel = testClientUniverse.getChannel(testChannelData.channelName); const testChannel = testClientUniverse.getChannel(testChannelData.channelName);
const testChannel2 = testClientUniverse2.getChannel(testChannelData.channelName); const testChannel2 = testClientUniverse2.getChannel(testChannelData.channelName);
const subscription = testChannel2.subscribe(messageArg => { const subscription = testChannel2.subscribe(messageArg => {
console.log('Yay##########'); if (messageArg.messageText === 'hellothere') {
done.resolve(); console.log('Yay##########');
done.resolve();
}
}); });
await testChannel.sendMessage({ await testChannel.sendMessage({
messageText: 'hellothere' messageText: 'hellothere'
@ -87,7 +91,7 @@ tap.test('should receive a message correctly', async (tools) => {
}); });
interface IDemoReqRes { interface IDemoReqRes {
method: 'demo', method: 'demo';
request: { request: {
wowso: string; wowso: string;
}; };
@ -110,15 +114,19 @@ tap.test('ReactionRequest and ReactionResponse should work', async () => {
const reactionRequest = new smartuniverse.ReactionRequest<IDemoReqRes>({ const reactionRequest = new smartuniverse.ReactionRequest<IDemoReqRes>({
method: 'demo' method: 'demo'
}); });
const reactionResult = await reactionRequest.fire([testClientUniverse2.getChannel(testChannelData.channelName)], { const reactionResult = await reactionRequest.fire(
wowso: 'wowza' [testClientUniverse2.getChannel(testChannelData.channelName)],
}); {
wowso: 'wowza'
}
);
const result = await reactionResult.getFirstResult(); const result = await reactionResult.getFirstResult();
console.log(result); console.log(result);
}); });
tap.test('should disconnect the client correctly', async () => { tap.test('should disconnect the client correctly', async tools => {
await testClientUniverse.stop(); await testClientUniverse.stop();
await testClientUniverse2.stop();
}); });
tap.test('should end the server correctly', async tools => { tap.test('should end the server correctly', async tools => {

View File

@ -0,0 +1,8 @@
import * as plugins from './smartuniverse.plugins';
/**
* broadcasts an event to multiple channels
*/
export class BroadcastEvent<T> {
fire() {}
}

View File

@ -0,0 +1,3 @@
import * as plugins from './smartuniverse.plugins';
export class BroadcastSUbscription {}

View File

@ -12,6 +12,7 @@ import { ClientUniverseCache } from './smartuniverse.classes.clientuniversecache
export interface IClientOptions { export interface IClientOptions {
serverAddress: string; serverAddress: string;
autoReconnect: boolean;
} }
/** /**
@ -19,9 +20,9 @@ export interface IClientOptions {
* allows connecting to a universe server * allows connecting to a universe server
*/ */
export class ClientUniverse { export class ClientUniverse {
public options; public options: IClientOptions;
public smartsocketClient: plugins.smartsocket.SmartsocketClient; public smartsocketClient: plugins.smartsocket.SmartsocketClient;
public observableIntake: plugins.smartrx.ObservableIntake<ClientUniverseMessage<any>>; public messageRxjsSubject = new plugins.smartrx.rxjs.Subject<ClientUniverseMessage<any>>();
public clientUniverseCache = new ClientUniverseCache(); public clientUniverseCache = new ClientUniverseCache();
constructor(optionsArg: IClientOptions) { constructor(optionsArg: IClientOptions) {
@ -77,7 +78,7 @@ export class ClientUniverse {
} }
public async stop() { public async stop() {
await this.smartsocketClient.disconnect(); await this.disconnect('triggered');
} }
/** /**
@ -85,7 +86,7 @@ export class ClientUniverse {
* since password validation is done through other means, a connection should always be possible * since password validation is done through other means, a connection should always be possible
*/ */
public async checkConnection(): Promise<void> { public async checkConnection(): Promise<void> {
if (!this.smartsocketClient && !this.observableIntake) { if (!this.smartsocketClient) {
const parsedURL = url.parse(this.options.serverAddress); const parsedURL = url.parse(this.options.serverAddress);
const socketConfig: plugins.smartsocket.ISmartsocketClientOptions = { const socketConfig: plugins.smartsocket.ISmartsocketClientOptions = {
alias: 'universeclient', alias: 'universeclient',
@ -95,7 +96,13 @@ export class ClientUniverse {
url: parsedURL.protocol + '//' + parsedURL.hostname url: parsedURL.protocol + '//' + parsedURL.hostname
}; };
this.smartsocketClient = new SmartsocketClient(socketConfig); this.smartsocketClient = new SmartsocketClient(socketConfig);
this.observableIntake = new plugins.smartrx.ObservableIntake();
this.smartsocketClient.eventSubject.subscribe(async eventArg => {
switch (eventArg) {
case 'disconnected':
this.disconnect('upstreamEvent');
}
});
// lets define some basic actions // lets define some basic actions
@ -105,8 +112,14 @@ export class ClientUniverse {
const socketFunctionUnsubscribe = new plugins.smartsocket.SocketFunction({ const socketFunctionUnsubscribe = new plugins.smartsocket.SocketFunction({
funcName: 'unsubscribe', funcName: 'unsubscribe',
allowedRoles: [], allowedRoles: [],
funcDef: async (data: interfaces.IServerUnsubscribeActionPayload) => { funcDef: async (dataArg: interfaces.IServerUnsubscribeActionPayload) => {
throw new Error('TODO'); const channel = this.clientUniverseCache.channelMap.find(channelArg => {
return channelArg.name === dataArg.name;
});
if (channel) {
channel.unsubscribe();
}
return {};
} }
}); });
@ -123,7 +136,7 @@ export class ClientUniverse {
const clientUniverseMessage = ClientUniverseMessage.createMessageFromMessageDescriptor( const clientUniverseMessage = ClientUniverseMessage.createMessageFromMessageDescriptor(
messageDescriptorArg messageDescriptorArg
); );
this.observableIntake.push(clientUniverseMessage); this.messageRxjsSubject.next(clientUniverseMessage);
// lets find the corresponding channel // lets find the corresponding channel
const targetChannel = this.getChannel(clientUniverseMessage.targetChannelName); const targetChannel = this.getChannel(clientUniverseMessage.targetChannelName);
@ -151,4 +164,20 @@ export class ClientUniverse {
}); });
} }
} }
public async disconnect(
reason: 'upstreamEvent' | 'triggered' = 'triggered',
tryReconnect = false
) {
if (reason === 'triggered') {
const smartsocketToDisconnect = this.smartsocketClient;
this.smartsocketClient = null; // making sure the upstreamEvent does not interfere
await smartsocketToDisconnect.disconnect();
}
if (this.options.autoReconnect && reason === 'upstreamEvent' && this.smartsocketClient) {
await plugins.smartdelay.delayForRandom(5000, 20000);
this.smartsocketClient = null;
this.checkConnection();
}
}
} }

View File

@ -54,7 +54,6 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
* tells the universe about this instances interest into a channel * tells the universe about this instances interest into a channel
*/ */
public subscribe(observingFunctionArg: (messageArg: ClientUniverseMessage<any>) => void) { public subscribe(observingFunctionArg: (messageArg: ClientUniverseMessage<any>) => void) {
return this.subject.subscribe( return this.subject.subscribe(
messageArg => { messageArg => {
observingFunctionArg(messageArg); observingFunctionArg(messageArg);
@ -63,6 +62,10 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
); );
} }
public unsubscribe() {
// TODO: unsubscribe all users
}
public async populateSubscriptionToServer() { public async populateSubscriptionToServer() {
// lets make sure the channel is connected // lets make sure the channel is connected
if (this.status === 'unsubscribed') { if (this.status === 'unsubscribed') {

View File

@ -5,7 +5,9 @@ import { ReactionResult } from './smartuniverse.classes.reactionresult';
import { UniverseMessage } from './smartuniverse.classes.universemessage'; import { UniverseMessage } from './smartuniverse.classes.universemessage';
import { ClientUniverseMessage } from './smartuniverse.classes.clientuniversemessage'; import { ClientUniverseMessage } from './smartuniverse.classes.clientuniversemessage';
export interface IReactionRequestConstructorOptions<T extends plugins.typedrequestInterfaces.ITypedRequest> { export interface IReactionRequestConstructorOptions<
T extends plugins.typedrequestInterfaces.ITypedRequest
> {
method: T['method']; method: T['method'];
} }
@ -15,9 +17,9 @@ export interface ICombinatorPayload<T extends plugins.typedrequestInterfaces.ITy
*/ */
id: string; id: string;
typedRequestPayload: { typedRequestPayload: {
method: T['method'], method: T['method'];
request : T['request'], request: T['request'];
response: T['response'] response: T['response'];
}; };
} }
@ -28,20 +30,35 @@ export class ReactionRequest<T extends plugins.typedrequestInterfaces.ITypedRequ
this.method = optionsArg.method; this.method = optionsArg.method;
} }
public async fire(channelsArg: Array<UniverseChannel | ClientUniverseChannel>, requestDataArg: T['request'], timeoutMillisArg=5000) { public async fire(
channelsArg: Array<UniverseChannel | ClientUniverseChannel>,
requestDataArg: T['request'],
timeoutMillisArg = 5000
) {
const subscriptionMap = new plugins.lik.Objectmap<plugins.smartrx.rxjs.Subscription>(); const subscriptionMap = new plugins.lik.Objectmap<plugins.smartrx.rxjs.Subscription>();
const reactionResult = new ReactionResult<T>(); const reactionResult = new ReactionResult<T>();
const requestId = plugins.smartunique.shortId(); const requestId = plugins.smartunique.shortId();
for (const channel of channelsArg) { for (const channel of channelsArg) {
subscriptionMap.add(channel.subscribe((messageArg: UniverseMessage<ICombinatorPayload<T>> | ClientUniverseMessage<ICombinatorPayload<T>>) => { subscriptionMap.add(
if (messageArg.messageText === 'reactionResponse' && messageArg.payload.typedRequestPayload.method === this.method) { channel.subscribe(
const payload: ICombinatorPayload<T> = messageArg.payload; (
if (payload.id !== requestId) { messageArg:
return; | UniverseMessage<ICombinatorPayload<T>>
| ClientUniverseMessage<ICombinatorPayload<T>>
) => {
if (
messageArg.messageText === 'reactionResponse' &&
messageArg.payload.typedRequestPayload.method === this.method
) {
const payload: ICombinatorPayload<T> = messageArg.payload;
if (payload.id !== requestId) {
return;
}
reactionResult.pushReactionResponse(payload.typedRequestPayload.response);
}
} }
reactionResult.pushReactionResponse(payload.typedRequestPayload.response); )
} );
}));
const payload: ICombinatorPayload<T> = { const payload: ICombinatorPayload<T> = {
id: requestId, id: requestId,
typedRequestPayload: { typedRequestPayload: {

View File

@ -6,7 +6,9 @@ import { ClientUniverseChannel } from './smartuniverse.classes.clientuniversecha
import { UniverseMessage } from './smartuniverse.classes.universemessage'; import { UniverseMessage } from './smartuniverse.classes.universemessage';
import { ClientUniverseMessage } from './smartuniverse.classes.clientuniversemessage'; import { ClientUniverseMessage } from './smartuniverse.classes.clientuniversemessage';
export type TReactionResponseFuncDef<T extends plugins.typedrequestInterfaces.ITypedRequest> = (dataArg: T['request']) => Promise<T['response']>; export type TReactionResponseFuncDef<T extends plugins.typedrequestInterfaces.ITypedRequest> = (
dataArg: T['request']
) => Promise<T['response']>;
export interface IReactionResponseConstructorOptions< export interface IReactionResponseConstructorOptions<
T extends plugins.typedrequestInterfaces.ITypedRequest T extends plugins.typedrequestInterfaces.ITypedRequest
@ -42,7 +44,9 @@ export class ReactionResponse<T extends plugins.typedrequestInterfaces.ITypedReq
messageArg.messageText === 'reactionRequest' && messageArg.messageText === 'reactionRequest' &&
messageArg.payload.typedRequestPayload.method === this.method messageArg.payload.typedRequestPayload.method === this.method
) { ) {
const response: T['response'] = await this.funcDef(messageArg.payload.typedRequestPayload.request); const response: T['response'] = await this.funcDef(
messageArg.payload.typedRequestPayload.request
);
const payload: ICombinatorPayload<T> = { const payload: ICombinatorPayload<T> = {
...messageArg.payload, ...messageArg.payload,
typedRequestPayload: { typedRequestPayload: {

View File

@ -6,7 +6,7 @@ export class ReactionResult<T extends plugins.typedrequestInterfaces.ITypedReque
private endResult: Array<T['response']> = []; private endResult: Array<T['response']> = [];
private completeDeferred = plugins.smartpromise.defer<Array<T['response']>>(); private completeDeferred = plugins.smartpromise.defer<Array<T['response']>>();
constructor () { constructor() {
this.resultSubscribe(responseArg => { this.resultSubscribe(responseArg => {
this.endResult.push(responseArg); this.endResult.push(responseArg);
}); });

View File

@ -112,6 +112,7 @@ export class Universe {
funcName: 'subscribeChannel', funcName: 'subscribeChannel',
funcDef: async (dataArg, socketConnectionArg) => { funcDef: async (dataArg, socketConnectionArg) => {
const universeConnection = new UniverseConnection({ const universeConnection = new UniverseConnection({
universe: this,
socketConnection: socketConnectionArg, socketConnection: socketConnectionArg,
authenticationRequests: [dataArg] authenticationRequests: [dataArg]
}); });

View File

@ -19,7 +19,7 @@ export class UniverseCache {
// INSTANCE // INSTANCE
// ======== // ========
public standardMessageExpiry: number; public standardMessageExpiry: number;
public destructionTime: number = 60000; public destructionTime: number = 10000;
/** /**
* stores messages for this instance * stores messages for this instance

View File

@ -163,6 +163,6 @@ export class UniverseChannel {
passphrase: this.passphrase, passphrase: this.passphrase,
timestamp: Date.now() timestamp: Date.now()
}); });
this.push(messageToSend); this.universeRef.universeCache.addMessage(messageToSend);
} }
} }

View File

@ -26,7 +26,7 @@ export class UniverseConnection {
universeConnection universeConnection
); );
universeRef.universeCache.connectionMap.add(universeConnection); universeRef.universeCache.connectionMap.add(universeConnection);
console.log('hi') console.log('hi');
} }
/** /**
@ -93,6 +93,8 @@ export class UniverseConnection {
return universeConnection; return universeConnection;
} }
// INSTANCE
public universeRef: Universe;
public terminatedDeferred = plugins.smartpromise.defer(); public terminatedDeferred = plugins.smartpromise.defer();
/** /**
@ -100,23 +102,34 @@ export class UniverseConnection {
*/ */
public socketConnection: plugins.smartsocket.SocketConnection; public socketConnection: plugins.smartsocket.SocketConnection;
public authenticationRequests: Array<interfaces.ISocketRequest_SubscribeChannel['request']> = []; public authenticationRequests: Array<interfaces.ISocketRequest_SubscribeChannel['request']> = [];
public subscribedChannels: UniverseChannel[] = [];
public authenticatedChannels: UniverseChannel[] = []; public authenticatedChannels: UniverseChannel[] = [];
public failedToJoinChannels: UniverseChannel[] = []; public failedToJoinChannels: UniverseChannel[] = [];
/** /**
* terminates the connection * disconnect the connection
*/ */
public terminateConnection() { public async disconnect(reason: 'upstreamevent' | 'triggered' = 'triggered') {
this.socketConnection.socket.disconnect(); if (reason === 'triggered') {
await this.socketConnection.disconnect();
}
this.universeRef.universeCache.connectionMap.remove(this);
this.terminatedDeferred.resolve(); this.terminatedDeferred.resolve();
} }
constructor(optionsArg: { constructor(optionsArg: {
universe: Universe;
socketConnection: plugins.smartsocket.SocketConnection; socketConnection: plugins.smartsocket.SocketConnection;
authenticationRequests: Array<interfaces.ISocketRequest_SubscribeChannel['request']>; authenticationRequests: Array<interfaces.ISocketRequest_SubscribeChannel['request']>;
}) { }) {
this.universeRef = optionsArg.universe;
this.authenticationRequests = optionsArg.authenticationRequests; this.authenticationRequests = optionsArg.authenticationRequests;
this.socketConnection = optionsArg.socketConnection; this.socketConnection = optionsArg.socketConnection;
this.socketConnection.eventSubject.subscribe(async eventArg => {
switch (eventArg) {
case 'disconnected':
await this.disconnect('upstreamevent');
break;
}
});
} }
} }

View File

@ -64,7 +64,7 @@ export class UniverseMessage<T> implements interfaces.IUniverseMessage {
this.passphrase = messageDescriptor.passphrase; this.passphrase = messageDescriptor.passphrase;
this.payload = messageDescriptor.payload; this.payload = messageDescriptor.payload;
// prevent memory issues // prevent memory issues
this.fallBackDestruction(); this.setDestructionTimer();
} }
public setUniverseCache(universeCacheArg: UniverseCache) { public setUniverseCache(universeCacheArg: UniverseCache) {
@ -73,17 +73,25 @@ export class UniverseMessage<T> implements interfaces.IUniverseMessage {
public setTargetChannel() {} public setTargetChannel() {}
public setDestructionTimer(selfdestructAfterArg: number) { public setDestructionTimer(selfdestructAfterArg?: number) {
if (selfdestructAfterArg) { if (selfdestructAfterArg) {
this.destructionTimer = new Timer(selfdestructAfterArg); this.destructionTimer = new Timer(selfdestructAfterArg);
this.destructionTimer.start(); this.destructionTimer.start();
// set up self destruction by removing this from the parent messageCache // set up self destruction by removing this from the parent messageCache
this.destructionTimer.completed.then(async () => { this.destructionTimer.completed
this.universeCache.messageMap.remove(this); .then(async () => {
}); this.universeCache.messageMap.remove(this);
})
.catch(err => {
console.log(err);
console.log(this);
});
} else { } else {
this.fallBackDestruction(); plugins.smartdelay.delayFor(1000).then(() => {
if (!this.destructionTimer) {
this.setDestructionTimer(6000);
}
});
} }
} }
@ -93,15 +101,4 @@ export class UniverseMessage<T> implements interfaces.IUniverseMessage {
public handleAsBadMessage() { public handleAsBadMessage() {
plugins.smartlog.defaultLogger.log('warn', 'received a bad message'); plugins.smartlog.defaultLogger.log('warn', 'received a bad message');
} }
/**
* prevents memory leaks if channels have no default
*/
private fallBackDestruction() {
plugins.smartdelay.delayFor(1000).then(() => {
if (!this.destructionTimer) {
this.setDestructionTimer(6000);
}
});
}
} }

View File

@ -6,9 +6,7 @@ export { path };
// apiglobal scope // apiglobal scope
import * as typedrequestInterfaces from '@apiglobal/typedrequest-interfaces'; import * as typedrequestInterfaces from '@apiglobal/typedrequest-interfaces';
export { export { typedrequestInterfaces };
typedrequestInterfaces
};
// pushrocks scope // pushrocks scope
import * as lik from '@pushrocks/lik'; import * as lik from '@pushrocks/lik';