Refactor smartsocket implementation for improved WebSocket handling and message protocol
- Updated test files to use new testing library and reduced test cycles for efficiency. - Removed dependency on smartexpress and integrated direct WebSocket handling. - Enhanced Smartsocket and SmartsocketClient classes to support new message types and authentication flow. - Implemented a new message interface for structured communication between client and server. - Added external server support for smartserve with appropriate WebSocket hooks. - Improved connection management and error handling in SocketConnection and SocketRequest classes. - Cleaned up code and removed deprecated socket.io references in favor of native WebSocket.
This commit is contained in:
@@ -16,7 +16,7 @@ import { logger } from './smartsocket.logging.js';
|
||||
export interface ISmartsocketClientOptions {
|
||||
port: number;
|
||||
url: string;
|
||||
alias: string; // an alias makes it easier to identify this client in a multo client environment
|
||||
alias: string; // an alias makes it easier to identify this client in a multi client environment
|
||||
autoReconnect?: boolean;
|
||||
maxRetries?: number; // maximum number of reconnection attempts
|
||||
initialBackoffDelay?: number; // initial backoff delay in ms
|
||||
@@ -97,115 +97,162 @@ export class SmartsocketClient {
|
||||
this.socketFunctions.add(socketFunction);
|
||||
}
|
||||
|
||||
private isReconnecting = false;
|
||||
|
||||
/**
|
||||
* connect the client to the server
|
||||
*/
|
||||
public async connect() {
|
||||
// Reset retry counters on new connection attempt
|
||||
this.currentRetryCount = 0;
|
||||
this.currentBackoffDelay = this.initialBackoffDelay;
|
||||
|
||||
// Only reset retry counters on fresh connection (not during auto-reconnect)
|
||||
if (!this.isReconnecting) {
|
||||
this.currentRetryCount = 0;
|
||||
this.currentBackoffDelay = this.initialBackoffDelay;
|
||||
}
|
||||
this.isReconnecting = false;
|
||||
|
||||
const done = plugins.smartpromise.defer();
|
||||
const smartenvInstance = new plugins.smartenv.Smartenv();
|
||||
const socketIoClient: any = await smartenvInstance.getEnvAwareModule({
|
||||
nodeModuleName: 'socket.io-client',
|
||||
webUrlArg: 'https://cdn.jsdelivr.net/npm/socket.io-client@4/dist/socket.io.js',
|
||||
getFunction: () => {
|
||||
const socketIoBrowserModule = (globalThis as any).io;
|
||||
// console.log('loaded socket.io for browser');
|
||||
return socketIoBrowserModule;
|
||||
},
|
||||
});
|
||||
// console.log(socketIoClient);
|
||||
|
||||
logger.log('info', 'trying to connect...');
|
||||
const socketUrl = `${this.serverUrl}:${this.serverPort}`;
|
||||
|
||||
// Construct WebSocket URL
|
||||
const protocol = this.serverUrl.startsWith('https') ? 'wss' : 'ws';
|
||||
const host = this.serverUrl.replace(/^https?:\/\//, '');
|
||||
const socketUrl = `${protocol}://${host}:${this.serverPort}`;
|
||||
|
||||
// Get WebSocket implementation (native in browser, ws in Node)
|
||||
let WebSocketClass: typeof WebSocket;
|
||||
if (typeof WebSocket !== 'undefined') {
|
||||
// Browser environment
|
||||
WebSocketClass = WebSocket;
|
||||
} else {
|
||||
// Node.js environment
|
||||
const wsModule = await smartenvInstance.getSafeNodeModule('ws');
|
||||
WebSocketClass = wsModule.default || wsModule;
|
||||
}
|
||||
|
||||
const socket = new WebSocketClass(socketUrl);
|
||||
this.currentSocket = socket;
|
||||
|
||||
this.socketConnection = new SocketConnection({
|
||||
alias: this.alias,
|
||||
authenticated: false,
|
||||
side: 'client',
|
||||
smartsocketHost: this,
|
||||
socket: await socketIoClient
|
||||
.connect(socketUrl, {
|
||||
multiplex: true,
|
||||
rememberUpgrade: true,
|
||||
autoConnect: false,
|
||||
reconnectionAttempts: 0,
|
||||
rejectUnauthorized: socketUrl.startsWith('https://localhost') ? false : true,
|
||||
})
|
||||
.open(),
|
||||
socket: socket as any,
|
||||
});
|
||||
|
||||
// Increment attempt ID to invalidate any pending timers from previous attempts
|
||||
this.connectionAttemptId++;
|
||||
const currentAttemptId = this.connectionAttemptId;
|
||||
|
||||
const timer = new plugins.smarttime.Timer(5000);
|
||||
timer.start();
|
||||
timer.completed.then(() => {
|
||||
this.updateStatus('timedOut');
|
||||
logger.log('warn', 'connection to server timed out.');
|
||||
this.disconnect(true);
|
||||
// Only fire timeout if this is still the current connection attempt
|
||||
if (currentAttemptId === this.connectionAttemptId && this.eventStatus !== 'connected') {
|
||||
this.updateStatus('timedOut');
|
||||
logger.log('warn', 'connection to server timed out.');
|
||||
this.disconnect(true);
|
||||
}
|
||||
});
|
||||
|
||||
// authentication flow
|
||||
this.socketConnection.socket.on('requestAuth', (dataArg: interfaces.IRequestAuthPayload) => {
|
||||
// Handle connection open
|
||||
socket.addEventListener('open', () => {
|
||||
timer.reset();
|
||||
logger.log('info', `server ${dataArg.serverAlias} requested authentication`);
|
||||
});
|
||||
|
||||
// lets register the authenticated event
|
||||
this.socketConnection.socket.on('authenticated', async () => {
|
||||
this.remoteShortId = dataArg.serverAlias;
|
||||
logger.log('info', 'client is authenticated');
|
||||
this.socketConnection.authenticated = true;
|
||||
await this.socketConnection.listenToFunctionRequests();
|
||||
});
|
||||
// Handle messages
|
||||
socket.addEventListener('message', async (event: MessageEvent | { data: string }) => {
|
||||
try {
|
||||
const data = typeof event.data === 'string' ? event.data : event.data.toString();
|
||||
const message: interfaces.ISocketMessage = JSON.parse(data);
|
||||
|
||||
this.socketConnection.socket.on('serverFullyReactive', async () => {
|
||||
// lets take care of retagging
|
||||
const oldTagStore = this.tagStore;
|
||||
this.tagStoreSubscription?.unsubscribe();
|
||||
for (const keyArg of Object.keys(this.tagStore)) {
|
||||
this.socketConnection.addTag(this.tagStore[keyArg]);
|
||||
switch (message.type) {
|
||||
case 'authRequest':
|
||||
timer.reset();
|
||||
const authRequestPayload = message.payload as interfaces.IAuthRequestPayload;
|
||||
logger.log('info', `server ${authRequestPayload.serverAlias} requested authentication`);
|
||||
this.remoteShortId = authRequestPayload.serverAlias;
|
||||
|
||||
// Send authentication data
|
||||
this.socketConnection.sendMessage({
|
||||
type: 'auth',
|
||||
payload: { alias: this.alias },
|
||||
});
|
||||
break;
|
||||
|
||||
case 'authResponse':
|
||||
const authResponse = message.payload as interfaces.IAuthResponsePayload;
|
||||
if (authResponse.success) {
|
||||
logger.log('info', 'client is authenticated');
|
||||
this.socketConnection.authenticated = true;
|
||||
} else {
|
||||
logger.log('warn', `authentication failed: ${authResponse.error}`);
|
||||
await this.disconnect();
|
||||
}
|
||||
break;
|
||||
|
||||
case 'serverReady':
|
||||
// Set up function request listening
|
||||
await this.socketConnection.listenToFunctionRequests();
|
||||
|
||||
// Handle retagging
|
||||
const oldTagStore = this.tagStore;
|
||||
this.tagStoreSubscription?.unsubscribe();
|
||||
for (const keyArg of Object.keys(this.tagStore)) {
|
||||
this.socketConnection.addTag(this.tagStore[keyArg]);
|
||||
}
|
||||
this.tagStoreSubscription = this.socketConnection.tagStoreObservable.subscribe(
|
||||
(tagStoreArg) => {
|
||||
this.tagStore = tagStoreArg;
|
||||
}
|
||||
);
|
||||
|
||||
for (const tag of Object.keys(oldTagStore)) {
|
||||
await this.addTag(oldTagStore[tag]);
|
||||
}
|
||||
this.updateStatus('connected');
|
||||
done.resolve();
|
||||
break;
|
||||
|
||||
default:
|
||||
// Other messages are handled by SocketConnection
|
||||
this.socketConnection.handleMessage(message);
|
||||
break;
|
||||
}
|
||||
this.tagStoreSubscription = this.socketConnection.tagStoreObservable.subscribe(
|
||||
(tagStoreArg) => {
|
||||
this.tagStore = tagStoreArg;
|
||||
}
|
||||
} catch (err) {
|
||||
// Not a valid JSON message, ignore
|
||||
}
|
||||
});
|
||||
|
||||
// Handle disconnection and errors
|
||||
const closeHandler = async () => {
|
||||
// Only handle close if this is still the current socket and we're not already disconnecting
|
||||
if (this.currentSocket === socket && !this.disconnectRunning) {
|
||||
logger.log(
|
||||
'info',
|
||||
`SocketConnection with >alias ${this.alias} on >side client disconnected`
|
||||
);
|
||||
await this.disconnect(true);
|
||||
}
|
||||
};
|
||||
|
||||
for (const tag of Object.keys(oldTagStore)) {
|
||||
await this.addTag(oldTagStore[tag]);
|
||||
}
|
||||
this.updateStatus('connected');
|
||||
done.resolve();
|
||||
});
|
||||
const errorHandler = async () => {
|
||||
if (this.currentSocket === socket && !this.disconnectRunning) {
|
||||
await this.disconnect(true);
|
||||
}
|
||||
};
|
||||
|
||||
// lets register the forbidden event
|
||||
this.socketConnection.socket.on('forbidden', async () => {
|
||||
logger.log('warn', `disconnecting due to being forbidden to use the ressource`);
|
||||
await this.disconnect();
|
||||
});
|
||||
socket.addEventListener('close', closeHandler);
|
||||
socket.addEventListener('error', errorHandler);
|
||||
|
||||
// lets provide the actual auth data
|
||||
this.socketConnection.socket.emit('dataAuth', {
|
||||
alias: this.alias,
|
||||
});
|
||||
});
|
||||
|
||||
// handle connection
|
||||
this.socketConnection.socket.on('connect', async () => {});
|
||||
|
||||
// handle disconnection and errors
|
||||
this.socketConnection.socket.on('disconnect', async () => {
|
||||
await this.disconnect(true);
|
||||
});
|
||||
|
||||
this.socketConnection.socket.on('reconnect_failed', async () => {
|
||||
await this.disconnect(true);
|
||||
});
|
||||
this.socketConnection.socket.on('connect_error', async () => {
|
||||
await this.disconnect(true);
|
||||
});
|
||||
return done.promise;
|
||||
}
|
||||
|
||||
private disconnectRunning = false;
|
||||
private currentSocket: WebSocket | null = null;
|
||||
private connectionAttemptId = 0; // Increment on each connect attempt to invalidate old timers
|
||||
|
||||
/**
|
||||
* disconnect from the server
|
||||
@@ -217,11 +264,16 @@ export class SmartsocketClient {
|
||||
this.disconnectRunning = true;
|
||||
this.updateStatus('disconnecting');
|
||||
this.tagStoreSubscription?.unsubscribe();
|
||||
|
||||
// Store reference to current socket before cleanup
|
||||
const socketToClose = this.currentSocket;
|
||||
this.currentSocket = null;
|
||||
|
||||
if (this.socketConnection) {
|
||||
await this.socketConnection.disconnect();
|
||||
this.socketConnection = undefined;
|
||||
logger.log('ok', 'disconnected socket!');
|
||||
} else {
|
||||
} else if (!socketToClose) {
|
||||
this.disconnectRunning = false;
|
||||
logger.log('warn', 'tried to disconnect, without a SocketConnection');
|
||||
return;
|
||||
@@ -232,28 +284,29 @@ export class SmartsocketClient {
|
||||
|
||||
if (this.autoReconnect && useAutoReconnectSetting && this.eventStatus !== 'connecting') {
|
||||
this.updateStatus('connecting');
|
||||
|
||||
|
||||
// Check if we've exceeded the maximum number of retries
|
||||
if (this.currentRetryCount >= this.maxRetries) {
|
||||
logger.log('warn', `Maximum reconnection attempts (${this.maxRetries}) reached. Giving up.`);
|
||||
this.disconnectRunning = false;
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// Increment retry counter
|
||||
this.currentRetryCount++;
|
||||
|
||||
|
||||
// Calculate backoff with jitter (±20% randomness)
|
||||
const jitter = this.currentBackoffDelay * 0.2 * (Math.random() * 2 - 1);
|
||||
const delay = Math.min(this.currentBackoffDelay + jitter, this.maxBackoffDelay);
|
||||
|
||||
|
||||
logger.log('info', `Reconnect attempt ${this.currentRetryCount}/${this.maxRetries} in ${Math.round(delay)}ms`);
|
||||
|
||||
|
||||
// Apply exponential backoff for next time (doubling with each attempt)
|
||||
this.currentBackoffDelay = Math.min(this.currentBackoffDelay * 2, this.maxBackoffDelay);
|
||||
|
||||
|
||||
await plugins.smartdelay.delayFor(delay);
|
||||
this.disconnectRunning = false;
|
||||
this.isReconnecting = true;
|
||||
await this.connect();
|
||||
} else {
|
||||
this.disconnectRunning = false;
|
||||
@@ -279,7 +332,6 @@ export class SmartsocketClient {
|
||||
functionNameArg: T['method'],
|
||||
dataArg: T['request']
|
||||
): Promise<T['response']> {
|
||||
const done = plugins.smartpromise.defer();
|
||||
const socketRequest = new SocketRequest<T>(this, {
|
||||
side: 'requesting',
|
||||
originSocketConnection: this.socketConnection,
|
||||
@@ -299,14 +351,14 @@ export class SmartsocketClient {
|
||||
this.eventSubject.next(statusArg);
|
||||
}
|
||||
this.eventStatus = statusArg;
|
||||
|
||||
|
||||
// Reset reconnection state when connection is successful
|
||||
if (statusArg === 'connected') {
|
||||
this.currentRetryCount = 0;
|
||||
this.currentBackoffDelay = this.initialBackoffDelay;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Resets the reconnection state
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user