Compare commits

..

8 Commits

Author SHA1 Message Date
e43ed3951c 1.0.87 2019-09-17 15:40:55 +02:00
23df304535 fix(core): update 2019-09-17 15:40:54 +02:00
9a142175aa 1.0.86 2019-09-17 14:01:24 +02:00
09b593e192 fix(core): update 2019-09-17 14:01:24 +02:00
c27fc147b5 1.0.85 2019-09-17 13:57:35 +02:00
ddde21925a fix(core): update 2019-09-17 13:57:34 +02:00
bd849d347d 1.0.84 2019-09-17 12:46:35 +02:00
f2a85d4719 fix(core): update 2019-09-17 12:46:35 +02:00
9 changed files with 75 additions and 30 deletions

2
package-lock.json generated
View File

@ -1,6 +1,6 @@
{ {
"name": "@pushrocks/smartuniverse", "name": "@pushrocks/smartuniverse",
"version": "1.0.83", "version": "1.0.87",
"lockfileVersion": 1, "lockfileVersion": 1,
"requires": true, "requires": true,
"dependencies": { "dependencies": {

View File

@ -1,6 +1,6 @@
{ {
"name": "@pushrocks/smartuniverse", "name": "@pushrocks/smartuniverse",
"version": "1.0.83", "version": "1.0.87",
"private": false, "private": false,
"description": "messaging service for your micro services", "description": "messaging service for your micro services",
"main": "dist/index.js", "main": "dist/index.js",

View File

@ -77,8 +77,10 @@ tap.test('should receive a message correctly', async (tools) => {
const testChannel = testClientUniverse.getChannel(testChannelData.channelName); const testChannel = testClientUniverse.getChannel(testChannelData.channelName);
const testChannel2 = testClientUniverse2.getChannel(testChannelData.channelName); const testChannel2 = testClientUniverse2.getChannel(testChannelData.channelName);
const subscription = testChannel2.subscribe(messageArg => { const subscription = testChannel2.subscribe(messageArg => {
if (messageArg.messageText === 'hellothere') {
console.log('Yay##########'); console.log('Yay##########');
done.resolve(); done.resolve();
}
}); });
await testChannel.sendMessage({ await testChannel.sendMessage({
messageText: 'hellothere' messageText: 'hellothere'
@ -86,11 +88,43 @@ tap.test('should receive a message correctly', async (tools) => {
await done.promise; await done.promise;
}); });
tap.test('should disconnect the client correctly', async () => { interface IDemoReqRes {
await testClientUniverse.stop(); method: 'demo',
request: {
wowso: string;
};
response: {
hereso: string;
};
}
tap.test('ReactionRequest and ReactionResponse should work', async () => {
const reactionResponse = new smartuniverse.ReactionResponse<IDemoReqRes>({
channels: [testUniverse.getChannel(testChannelData.channelName)],
funcDef: async reqData => {
console.log(reqData);
return {
hereso: 'Hello there'
};
},
method: 'demo'
});
const reactionRequest = new smartuniverse.ReactionRequest<IDemoReqRes>({
method: 'demo'
});
const reactionResult = await reactionRequest.fire([testClientUniverse2.getChannel(testChannelData.channelName)], {
wowso: 'wowza'
});
const result = await reactionResult.getFirstResult();
console.log(result);
}); });
tap.test('should end the server correctly', async tools => { tap.test('should disconnect the client correctly', async (tools) => {
await testClientUniverse.stop();
await testClientUniverse2.stop();
});
tap.test('should end the server correctly', async (tools) => {
await testUniverse.stopServer(); await testUniverse.stopServer();
}); });

View File

@ -28,7 +28,7 @@ export class ReactionRequest<T extends plugins.typedrequestInterfaces.ITypedRequ
this.method = optionsArg.method; this.method = optionsArg.method;
} }
public async fire(channelsArg: Array<UniverseChannel | ClientUniverseChannel>, requestDataArg: T['request'], timeoutMillisArg=60000) { public async fire(channelsArg: Array<UniverseChannel | ClientUniverseChannel>, requestDataArg: T['request'], timeoutMillisArg=5000) {
const subscriptionMap = new plugins.lik.Objectmap<plugins.smartrx.rxjs.Subscription>(); const subscriptionMap = new plugins.lik.Objectmap<plugins.smartrx.rxjs.Subscription>();
const reactionResult = new ReactionResult<T>(); const reactionResult = new ReactionResult<T>();
const requestId = plugins.smartunique.shortId(); const requestId = plugins.smartunique.shortId();
@ -49,7 +49,7 @@ export class ReactionRequest<T extends plugins.typedrequestInterfaces.ITypedRequ
request: requestDataArg, request: requestDataArg,
response: null response: null
} }
} };
channel.sendMessage({ channel.sendMessage({
messageText: 'reactionRequest', messageText: 'reactionRequest',
payload payload

View File

@ -2,7 +2,7 @@ import * as plugins from './smartuniverse.plugins';
import { ReactionResponse } from './smartuniverse.classes.reactionresponse'; import { ReactionResponse } from './smartuniverse.classes.reactionresponse';
export class ReactionResult<T extends plugins.typedrequestInterfaces.ITypedRequest> { export class ReactionResult<T extends plugins.typedrequestInterfaces.ITypedRequest> {
private resultSubject = new plugins.smartrx.rxjs.Subject<T['response']>(); private resultReplaySubject = new plugins.smartrx.rxjs.ReplaySubject<T['response']>();
private endResult: Array<T['response']> = []; private endResult: Array<T['response']> = [];
private completeDeferred = plugins.smartpromise.defer<Array<T['response']>>(); private completeDeferred = plugins.smartpromise.defer<Array<T['response']>>();
@ -13,19 +13,34 @@ export class ReactionResult<T extends plugins.typedrequestInterfaces.ITypedReque
} }
public resultSubscribe(observerArg: (responseArg: T['response']) => void) { public resultSubscribe(observerArg: (responseArg: T['response']) => void) {
return this.resultSubject.subscribe(observerArg); return this.resultReplaySubject.subscribe(observerArg);
} }
/**
* gets the end result as an array of all results
*/
public async getEndResult() { public async getEndResult() {
const result = await this.completeDeferred.promise; const result = await this.completeDeferred.promise;
return result; return result;
} }
/**
* if there is a single respondant, or you are only interested in the first result
*/
public async getFirstResult() {
const done = plugins.smartpromise.defer<T['response']>();
const subscription = this.resultReplaySubject.subscribe(result => {
done.resolve(result);
subscription.unsubscribe();
});
return await done.promise;
}
/** /**
* push a reactionResponse * push a reactionResponse
*/ */
public async pushReactionResponse(responseArg: T['response']) { public async pushReactionResponse(responseArg: T['response']) {
this.resultSubject.next(responseArg); this.resultReplaySubject.next(responseArg);
} }
/** /**

View File

@ -68,7 +68,7 @@ export class Universe {
/** /**
* returns a channel * returns a channel
*/ */
public getChannelByName(channelNameArg: string) { public getChannel(channelNameArg: string) {
return this.universeCache.channelMap.find(channelArg => { return this.universeCache.channelMap.find(channelArg => {
return channelArg.name === channelNameArg; return channelArg.name === channelNameArg;
}); });
@ -178,6 +178,7 @@ export class Universe {
* stop everything * stop everything
*/ */
public async stopServer() { public async stopServer() {
console.log('hi');
await this.smartsocket.stop(); await this.smartsocket.stop();
if (!this.options.externalServer) { if (!this.options.externalServer) {
await this.smartexpressServer.stop(); await this.smartexpressServer.stop();

View File

@ -19,7 +19,7 @@ export class UniverseCache {
// INSTANCE // INSTANCE
// ======== // ========
public standardMessageExpiry: number; public standardMessageExpiry: number;
public destructionTime: number = 60000; public destructionTime: number = 10000;
/** /**
* stores messages for this instance * stores messages for this instance

View File

@ -163,6 +163,6 @@ export class UniverseChannel {
passphrase: this.passphrase, passphrase: this.passphrase,
timestamp: Date.now() timestamp: Date.now()
}); });
this.push(messageToSend); this.universeRef.universeCache.addMessage(messageToSend);
} }
} }

View File

@ -64,7 +64,7 @@ export class UniverseMessage<T> implements interfaces.IUniverseMessage {
this.passphrase = messageDescriptor.passphrase; this.passphrase = messageDescriptor.passphrase;
this.payload = messageDescriptor.payload; this.payload = messageDescriptor.payload;
// prevent memory issues // prevent memory issues
this.fallBackDestruction(); this.setDestructionTimer();
} }
public setUniverseCache(universeCacheArg: UniverseCache) { public setUniverseCache(universeCacheArg: UniverseCache) {
@ -73,17 +73,23 @@ export class UniverseMessage<T> implements interfaces.IUniverseMessage {
public setTargetChannel() {} public setTargetChannel() {}
public setDestructionTimer(selfdestructAfterArg: number) { public setDestructionTimer(selfdestructAfterArg?: number) {
if (selfdestructAfterArg) { if (selfdestructAfterArg) {
this.destructionTimer = new Timer(selfdestructAfterArg); this.destructionTimer = new Timer(selfdestructAfterArg);
this.destructionTimer.start(); this.destructionTimer.start();
// set up self destruction by removing this from the parent messageCache // set up self destruction by removing this from the parent messageCache
this.destructionTimer.completed.then(async () => { this.destructionTimer.completed.then(async () => {
this.universeCache.messageMap.remove(this); this.universeCache.messageMap.remove(this);
}).catch(err => {
console.log(err);
console.log(this);
}); });
} else { } else {
this.fallBackDestruction(); plugins.smartdelay.delayFor(1000).then(() => {
if (!this.destructionTimer) {
this.setDestructionTimer(6000);
}
});
} }
} }
@ -93,15 +99,4 @@ export class UniverseMessage<T> implements interfaces.IUniverseMessage {
public handleAsBadMessage() { public handleAsBadMessage() {
plugins.smartlog.defaultLogger.log('warn', 'received a bad message'); plugins.smartlog.defaultLogger.log('warn', 'received a bad message');
} }
/**
* prevents memory leaks if channels have no default
*/
private fallBackDestruction() {
plugins.smartdelay.delayFor(1000).then(() => {
if (!this.destructionTimer) {
this.setDestructionTimer(6000);
}
});
}
} }