fix(types,client,server): improve type safety and harden client/server message handling

This commit is contained in:
2026-05-01 11:22:06 +00:00
parent 496fd9a81a
commit 3ac4c2e708
22 changed files with 9276 additions and 3895 deletions
+3 -3
View File
@@ -1,8 +1,8 @@
/**
* autocreated commitinfo by @pushrocks/commitinfo
* autocreated commitinfo by @push.rocks/commitinfo
*/
export const commitinfo = {
name: '@push.rocks/smartuniverse',
version: '1.0.108',
description: 'messaging service for your micro services'
version: '1.0.109',
description: 'A messaging service enabling secure, reactive communication between microservices.'
}
+7 -1
View File
@@ -15,6 +15,12 @@ export interface ISocketRequest_ProcessMessage {
method: 'processMessage';
request: interfaces.IUniverseMessage;
response: {
messageStatus: 'ok' | 'channel not found';
messageStatus: 'ok' | 'channel not found' | 'authentication required';
};
}
export interface ISocketRequest_Unsubscribe {
method: 'unsubscribe';
request: interfaces.IServerUnsubscribeActionPayload;
response: {};
}
+3 -3
View File
@@ -1,12 +1,12 @@
export interface IMessageCreator {
export interface IMessageCreator<T = any> {
messageText: string;
payload?: string | number | any;
payload?: T;
}
/**
* A universe
*/
export interface IUniverseMessage extends IMessageCreator {
export interface IUniverseMessage<T = any> extends IMessageCreator<T> {
id: string;
/**
* time of creation
+17 -16
View File
@@ -1,5 +1,5 @@
import * as plugins from './smartuniverse.plugins.js';
import { Smartsocket, SmartsocketClient } from '@push.rocks/smartsocket';
import { SmartsocketClient } from '@push.rocks/smartsocket';
import * as interfaces from './interfaces/index.js';
@@ -18,7 +18,7 @@ export interface IClientOptions {
*/
export class ClientUniverse {
public options: IClientOptions;
public smartsocketClient: plugins.smartsocket.SmartsocketClient;
public smartsocketClient?: plugins.smartsocket.SmartsocketClient;
public messageRxjsSubject = new plugins.smartrx.rxjs.Subject<ClientUniverseMessage<any>>();
public clientUniverseCache = new ClientUniverseCache();
@@ -64,7 +64,7 @@ export class ClientUniverse {
* remove a a achannel
* @param messageArg
*/
public removeChannel(channelNameArg, notifyServer = true) {
public removeChannel(channelNameArg: string, notifyServer = true) {
const clientUniverseChannel = this.clientUniverseCache.channelMap.findOneAndRemoveSync(
(channelItemArg) => {
return channelItemArg.name === channelNameArg;
@@ -110,18 +110,19 @@ export class ClientUniverse {
/**
* should handle a forced unsubscription by the server
*/
const socketFunctionUnsubscribe = new plugins.smartsocket.SocketFunction({
funcName: 'unsubscribe',
funcDef: async (dataArg: interfaces.IServerUnsubscribeActionPayload) => {
const channel = this.clientUniverseCache.channelMap.findSync((channelArg) => {
return channelArg.name === dataArg.name;
});
if (channel) {
channel.unsubscribe();
}
return {};
},
});
const socketFunctionUnsubscribe =
new plugins.smartsocket.SocketFunction<interfaces.ISocketRequest_Unsubscribe>({
funcName: 'unsubscribe',
funcDef: async (dataArg) => {
const channel = this.clientUniverseCache.channelMap.findSync((channelArg) => {
return channelArg.name === dataArg.name;
});
if (channel) {
channel.unsubscribe();
}
return {};
},
});
/**
* handles message reception
@@ -169,7 +170,7 @@ export class ClientUniverse {
const instructDisconnect = async () => {
if (this.smartsocketClient) {
const smartsocketToDisconnect = this.smartsocketClient;
this.smartsocketClient = null; // making sure the upstreamEvent does not interfere
this.smartsocketClient = undefined; // making sure the upstreamEvent does not interfere
await smartsocketToDisconnect.disconnect();
}
};
@@ -69,8 +69,12 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
public async populateSubscriptionToServer() {
// lets make sure the channel is connected
if (this.status === 'unsubscribed') {
const smartsocketClient = this.clientUniverseRef.smartsocketClient;
if (!smartsocketClient) {
throw new Error('Cannot subscribe channel before the smartuniverse client is connected.');
}
const response =
await this.clientUniverseRef.smartsocketClient.serverCall<interfaces.ISocketRequest_SubscribeChannel>(
await smartsocketClient.serverCall<interfaces.ISocketRequest_SubscribeChannel>(
'subscribeChannel',
{
name: this.name,
@@ -91,6 +95,10 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
*/
public async postMessage(messageArg: interfaces.IMessageCreator) {
await this.clientUniverseRef.start(); // its ok to call this multiple times
const smartsocketClient = this.clientUniverseRef.smartsocketClient;
if (!smartsocketClient) {
throw new Error('Cannot post message before the smartuniverse client is connected.');
}
const universeMessageToSend: interfaces.IUniverseMessage = {
id: plugins.isounique.uni(),
timestamp: Date.now(),
@@ -99,7 +107,7 @@ export class ClientUniverseChannel implements interfaces.IUniverseChannel {
messageText: messageArg.messageText,
payload: messageArg.payload,
};
await this.clientUniverseRef.smartsocketClient.serverCall(
await smartsocketClient.serverCall<interfaces.ISocketRequest_ProcessMessage>(
'processMessage',
universeMessageToSend
);
@@ -2,12 +2,12 @@ import * as plugins from './smartuniverse.plugins.js';
import * as interfaces from './interfaces/index.js';
export class ClientUniverseMessage<T> implements interfaces.IUniverseMessage {
export class ClientUniverseMessage<T = any> implements interfaces.IUniverseMessage<T> {
// ======
// STATIC
// ======
public static createMessageFromMessageDescriptor(messageDescriptor: interfaces.IUniverseMessage) {
const clientuniverseMessage = new ClientUniverseMessage(messageDescriptor);
public static createMessageFromMessageDescriptor<T = any>(messageDescriptor: interfaces.IUniverseMessage<T>) {
const clientuniverseMessage = new ClientUniverseMessage<T>(messageDescriptor);
return clientuniverseMessage;
}
@@ -25,10 +25,14 @@ export class ClientUniverseMessage<T> implements interfaces.IUniverseMessage {
public payload: T;
public targetChannelName: string;
constructor(messageArg: interfaces.IUniverseMessage) {
for (const key of Object.keys(messageArg)) {
this[key] = messageArg[key];
}
constructor(messageArg: interfaces.IUniverseMessage<T>) {
this.id = messageArg.id;
this.timestamp = messageArg.timestamp;
this.smartTimestamp = new plugins.smarttime.TimeStamp(this.timestamp);
this.messageText = messageArg.messageText;
this.passphrase = messageArg.passphrase;
this.payload = messageArg.payload as T;
this.targetChannelName = messageArg.targetChannelName;
}
/**
@@ -19,7 +19,7 @@ export interface ICombinatorPayload<T extends plugins.typedrequestInterfaces.ITy
typedRequestPayload: {
method: T['method'];
request: T['request'];
response: T['response'];
response: T['response'] | null;
};
}
@@ -54,7 +54,9 @@ export class ReactionRequest<T extends plugins.typedrequestInterfaces.ITypedRequ
if (payload.id !== requestId) {
return;
}
reactionResult.pushReactionResponse(payload.typedRequestPayload.response);
if (payload.typedRequestPayload.response !== null) {
reactionResult.pushReactionResponse(payload.typedRequestPayload.response);
}
}
}
)
+39 -49
View File
@@ -1,5 +1,4 @@
import * as plugins from './smartuniverse.plugins.js';
import * as pluginsTyped from './smartuniverse.pluginstyped.js';
import { UniverseCache, UniverseChannel, UniverseMessage } from './index.js';
@@ -9,7 +8,6 @@ import { logger } from './smartuniverse.logging.js';
export interface ISmartUniverseConstructorOptions {
messageExpiryInMilliseconds: number;
externalServer?: pluginsTyped.typedserver.servertools.Server;
}
/**
@@ -22,15 +20,10 @@ export class Universe {
// options
private options: ISmartUniverseConstructorOptions;
/**
* the smartexpress server used
*/
private server: pluginsTyped.typedserver.servertools.Server;
/**
* the smartsocket used
*/
private smartsocket: plugins.smartsocket.Smartsocket;
private smartsocket?: plugins.smartsocket.Smartsocket;
constructor(optionsArg: ISmartUniverseConstructorOptions) {
this.options = optionsArg;
@@ -41,7 +34,7 @@ export class Universe {
* stores the version of the universe server running
* this is done since the version is exposed through the api and multiple fs actions are avoided this way.
*/
private universeVersionStore: string;
private universeVersionStore = '';
/**
* get the currently running version of smartuniverse
@@ -77,25 +70,16 @@ export class Universe {
* initiates a server
*/
public async start(portArg?: number) {
if (!this.options.externalServer && !portArg) {
throw new Error(`You supplied an external error. You need to specify a portArg to start on.`);
if (!portArg) {
throw new Error(`You need to specify a portArg to start on.`);
}
portArg = portArg || 3000; // TODO: remove
// add websocket upgrade
this.smartsocket = new plugins.smartsocket.Smartsocket({
alias: 'smartuniverse',
port: portArg,
});
// lets create the base smartexpress server
if (this.options.externalServer) {
console.log('Universe is using externally supplied server');
this.smartsocket.setExternalServer('smartexpress', this.options.externalServer);
}
const socketFunctionSubscription =
new plugins.smartsocket.SocketFunction<interfaces.ISocketRequest_SubscribeChannel>({
funcName: 'subscribeChannel',
@@ -112,36 +96,42 @@ export class Universe {
},
});
const socketFunctionProcessMessage = new plugins.smartsocket.SocketFunction<any>({
// TODO proper ITypedRequest here instead of any
funcName: 'processMessage',
funcDef: async (messageDataArg: interfaces.IUniverseMessage, socketConnectionArg) => {
const universeConnection = UniverseConnection.findUniverseConnectionBySocketConnection(
this.universeCache,
socketConnectionArg
);
if (universeConnection) {
logger.log('ok', 'found UniverseConnection for socket for incoming message');
} else {
logger.log('warn', 'found no Authorized channel for incoming message');
const socketFunctionProcessMessage =
new plugins.smartsocket.SocketFunction<interfaces.ISocketRequest_ProcessMessage>({
funcName: 'processMessage',
funcDef: async (messageDataArg: interfaces.IUniverseMessage, socketConnectionArg) => {
const universeConnection = UniverseConnection.findUniverseConnectionBySocketConnection(
this.universeCache,
socketConnectionArg
);
if (universeConnection) {
logger.log('ok', 'found UniverseConnection for socket for incoming message');
} else {
logger.log('warn', 'found no Authorized channel for incoming message');
return {
messageStatus: 'authentication required',
};
}
const unauthenticatedMessage = UniverseMessage.createMessageFromPayload(
socketConnectionArg,
messageDataArg
);
const foundChannel = await UniverseChannel.authorizeAMessageForAChannel(
this.universeCache,
unauthenticatedMessage
);
if (foundChannel && unauthenticatedMessage.authenticated) {
const authenticatedMessage = unauthenticatedMessage;
await this.universeCache.addMessage(authenticatedMessage);
return {
messageStatus: 'ok',
};
}
return {
error: 'You need to authenticate for a channel',
messageStatus: 'channel not found',
};
}
const unauthenticatedMessage = UniverseMessage.createMessageFromPayload(
socketConnectionArg,
messageDataArg
);
const foundChannel = await UniverseChannel.authorizeAMessageForAChannel(
this.universeCache,
unauthenticatedMessage
);
if (foundChannel && unauthenticatedMessage.authenticated) {
const authenticatedMessage = unauthenticatedMessage;
await this.universeCache.addMessage(authenticatedMessage);
}
},
});
},
});
// add socket functions
this.smartsocket.addSocketFunction(socketFunctionSubscription);
@@ -156,6 +146,6 @@ export class Universe {
* stop everything
*/
public async stopServer() {
await this.smartsocket.stop();
await this.smartsocket?.stop();
}
}
+1 -1
View File
@@ -54,7 +54,7 @@ export class UniverseChannel {
public static authorizeAMessageForAChannel(
universeCacheArg: UniverseCache,
universeMessageArg: UniverseMessage<any>
): UniverseChannel {
): UniverseChannel | null {
const foundChannel = universeCacheArg.channelMap.findSync((universeChannel) => {
const result = universeChannel.authenticate(universeMessageArg);
return result;
@@ -36,7 +36,7 @@ export class UniverseConnection {
universeCache: UniverseCache,
universeConnectionArg: UniverseConnection
): Promise<UniverseConnection> {
let connectionToReturn: UniverseConnection;
let connectionToReturn: UniverseConnection | undefined;
universeCache.connectionMap.forEach(async (existingConnection) => {
if (existingConnection.socketConnection === universeConnectionArg.socketConnection) {
connectionToReturn = await this.mergeUniverseConnections(
@@ -63,7 +63,7 @@ export class UniverseConnection {
universeRef,
authenticationRequest.name
);
if (universeChannelToAuthenticateAgainst.passphrase === authenticationRequest.passphrase) {
if (universeChannelToAuthenticateAgainst?.passphrase === authenticationRequest.passphrase) {
universeConnectionArg.authenticatedChannels.push(universeChannelToAuthenticateAgainst);
}
}
@@ -86,7 +86,7 @@ export class UniverseConnection {
public static findUniverseConnectionBySocketConnection(
universeCache: UniverseCache,
socketConnectionArg: plugins.smartsocket.SocketConnection
): UniverseConnection {
): UniverseConnection | undefined {
const universeConnection = universeCache.connectionMap.findSync((universeConnectionArg) => {
return universeConnectionArg.socketConnection === socketConnectionArg;
});
+12 -11
View File
@@ -1,6 +1,5 @@
import * as plugins from './smartuniverse.plugins.js';
import * as interfaces from './interfaces/index.js';
import { Universe } from './smartuniverse.classes.universe.js';
import { UniverseChannel } from './smartuniverse.classes.universechannel.js';
import { UniverseCache } from './smartuniverse.classes.universecache.js';
import { SocketConnection } from '@push.rocks/smartsocket';
@@ -10,12 +9,12 @@ import { logger } from './smartuniverse.logging.js';
* represents a message within a universe
* acts as a container to save message states like authentication status
*/
export class UniverseMessage<T> implements interfaces.IUniverseMessage {
public static createMessageFromPayload(
export class UniverseMessage<T = any> implements interfaces.IUniverseMessage<T> {
public static createMessageFromPayload<T = any>(
socketConnectionArg: SocketConnection,
dataArg: interfaces.IUniverseMessage
dataArg: interfaces.IUniverseMessage<T>
) {
const universeMessageInstance = new UniverseMessage(dataArg);
const universeMessageInstance = new UniverseMessage<T>(dataArg);
universeMessageInstance.socketConnection = socketConnectionArg;
return universeMessageInstance;
}
@@ -27,12 +26,12 @@ export class UniverseMessage<T> implements interfaces.IUniverseMessage {
public passphrase: string;
public payload: T;
public targetChannelName: string;
public socketConnection: SocketConnection;
public socketConnection?: SocketConnection;
/**
* the UniverseCache the message is attached to
*/
public universeCache: UniverseCache;
public universeCache?: UniverseCache;
/**
* enables unprotected grouping of messages for efficiency purposes.
@@ -47,19 +46,21 @@ export class UniverseMessage<T> implements interfaces.IUniverseMessage {
/**
* a destruction timer for this message
*/
public destructionTimer: plugins.smarttime.Timer; // a timer to take care of message destruction
public destructionTimer?: plugins.smarttime.Timer; // a timer to take care of message destruction
/**
* the constructor to create a universe message
* @param messageArg
* @param attachedPayloadArg
*/
constructor(messageDescriptor: interfaces.IUniverseMessage) {
constructor(messageDescriptor: interfaces.IUniverseMessage<T>) {
this.id = messageDescriptor.id;
this.timestamp = messageDescriptor.timestamp;
this.smartTimestamp = new plugins.smarttime.TimeStamp(this.timestamp);
this.messageText = messageDescriptor.messageText;
this.targetChannelName = messageDescriptor.targetChannelName;
this.passphrase = messageDescriptor.passphrase;
this.payload = messageDescriptor.payload;
this.payload = messageDescriptor.payload as T;
// prevent memory issues
this.setDestructionTimer();
}
@@ -77,7 +78,7 @@ export class UniverseMessage<T> implements interfaces.IUniverseMessage {
// set up self destruction by removing this from the parent messageCache
this.destructionTimer.completed
.then(async () => {
this.universeCache.messageMap.remove(this);
this.universeCache?.messageMap.remove(this);
})
.catch((err) => {
console.log(err);
+1 -1
View File
@@ -1,5 +1,5 @@
// apiglobal scope
import * as typedrequestInterfaces from '@apiglobal/typedrequest-interfaces';
import * as typedrequestInterfaces from '@api.global/typedrequest-interfaces';
export { typedrequestInterfaces };
+2 -2
View File
@@ -1,5 +1,5 @@
import type * as typedserver from '@apiglobal/typedserver';
import type * as typedserver from '@api.global/typedserver';
export type {
typedserver
};
};