fix(core): Improve error handling and logging; enhance search query sanitization; update dependency versions and documentation
This commit is contained in:
@@ -3,6 +3,7 @@ import { SmartdataDb } from './classes.db.js';
|
||||
import { managed, setDefaultManagerForDoc } from './classes.collection.js';
|
||||
import { SmartDataDbDoc, svDb, unI } from './classes.doc.js';
|
||||
import { SmartdataDbWatcher } from './classes.watcher.js';
|
||||
import { logger } from './logging.js';
|
||||
|
||||
@managed()
|
||||
export class DistributedClass extends SmartDataDbDoc<DistributedClass, DistributedClass> {
|
||||
@@ -63,11 +64,11 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
|
||||
this.ownInstance.data.elected = false;
|
||||
}
|
||||
if (this.ownInstance?.data.status === 'stopped') {
|
||||
console.log(`stopping a distributed instance that has not been started yet.`);
|
||||
logger.log('warn', `stopping a distributed instance that has not been started yet.`);
|
||||
}
|
||||
this.ownInstance.data.status = 'stopped';
|
||||
await this.ownInstance.save();
|
||||
console.log(`stopped ${this.ownInstance.id}`);
|
||||
logger.log('info', `stopped ${this.ownInstance.id}`);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -83,17 +84,17 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
|
||||
public async sendHeartbeat() {
|
||||
await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
|
||||
if (this.ownInstance.data.status === 'stopped') {
|
||||
console.log(`aborted sending heartbeat because status is stopped`);
|
||||
logger.log('debug', `aborted sending heartbeat because status is stopped`);
|
||||
return;
|
||||
}
|
||||
await this.ownInstance.updateFromDb();
|
||||
this.ownInstance.data.lastUpdated = Date.now();
|
||||
await this.ownInstance.save();
|
||||
console.log(`sent heartbeat for ${this.ownInstance.id}`);
|
||||
logger.log('debug', `sent heartbeat for ${this.ownInstance.id}`);
|
||||
const allInstances = DistributedClass.getInstances({});
|
||||
});
|
||||
if (this.ownInstance.data.status === 'stopped') {
|
||||
console.log(`aborted sending heartbeat because status is stopped`);
|
||||
logger.log('info', `aborted sending heartbeat because status is stopped`);
|
||||
return;
|
||||
}
|
||||
const eligibleLeader = await this.getEligibleLeader();
|
||||
@@ -120,7 +121,7 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
|
||||
await this.ownInstance.save();
|
||||
});
|
||||
} else {
|
||||
console.warn(`distributed instance already initialized`);
|
||||
logger.log('warn', `distributed instance already initialized`);
|
||||
}
|
||||
|
||||
// lets enable the heartbeat
|
||||
@@ -149,24 +150,24 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
|
||||
public async checkAndMaybeLead() {
|
||||
await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
|
||||
this.ownInstance.data.status = 'initializing';
|
||||
this.ownInstance.save();
|
||||
await this.ownInstance.save();
|
||||
});
|
||||
if (await this.getEligibleLeader()) {
|
||||
await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
|
||||
await this.ownInstance.updateFromDb();
|
||||
this.ownInstance.data.status = 'settled';
|
||||
await this.ownInstance.save();
|
||||
console.log(`${this.ownInstance.id} settled as follower`);
|
||||
logger.log('info', `${this.ownInstance.id} settled as follower`);
|
||||
});
|
||||
return;
|
||||
} else if (
|
||||
(await DistributedClass.getInstances({})).find((instanceArg) => {
|
||||
instanceArg.data.status === 'bidding' &&
|
||||
return instanceArg.data.status === 'bidding' &&
|
||||
instanceArg.data.biddingStartTime <= Date.now() - 4000 &&
|
||||
instanceArg.data.biddingStartTime >= Date.now() - 30000;
|
||||
})
|
||||
) {
|
||||
console.log('too late to the bidding party... waiting for next round.');
|
||||
logger.log('info', 'too late to the bidding party... waiting for next round.');
|
||||
return;
|
||||
} else {
|
||||
await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
|
||||
@@ -175,9 +176,9 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
|
||||
this.ownInstance.data.biddingStartTime = Date.now();
|
||||
this.ownInstance.data.biddingShortcode = plugins.smartunique.shortId();
|
||||
await this.ownInstance.save();
|
||||
console.log('bidding code stored.');
|
||||
logger.log('info', 'bidding code stored.');
|
||||
});
|
||||
console.log(`bidding for leadership...`);
|
||||
logger.log('info', `bidding for leadership...`);
|
||||
await plugins.smartdelay.delayFor(plugins.smarttime.getMilliSecondsFromUnits({ seconds: 5 }));
|
||||
await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
|
||||
let biddingInstances = await DistributedClass.getInstances({});
|
||||
@@ -187,7 +188,7 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
|
||||
instanceArg.data.lastUpdated >=
|
||||
Date.now() - plugins.smarttime.getMilliSecondsFromUnits({ seconds: 10 }),
|
||||
);
|
||||
console.log(`found ${biddingInstances.length} bidding instances...`);
|
||||
logger.log('info', `found ${biddingInstances.length} bidding instances...`);
|
||||
this.ownInstance.data.elected = true;
|
||||
for (const biddingInstance of biddingInstances) {
|
||||
if (biddingInstance.data.biddingShortcode < this.ownInstance.data.biddingShortcode) {
|
||||
@@ -195,7 +196,7 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
|
||||
}
|
||||
}
|
||||
await plugins.smartdelay.delayFor(5000);
|
||||
console.log(`settling with status elected = ${this.ownInstance.data.elected}`);
|
||||
logger.log('info', `settling with status elected = ${this.ownInstance.data.elected}`);
|
||||
this.ownInstance.data.status = 'settled';
|
||||
await this.ownInstance.save();
|
||||
});
|
||||
@@ -226,11 +227,11 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
|
||||
this.distributedWatcher.changeSubject.subscribe({
|
||||
next: async (distributedDoc) => {
|
||||
if (!distributedDoc) {
|
||||
console.log(`registered deletion of instance...`);
|
||||
logger.log('info', `registered deletion of instance...`);
|
||||
return;
|
||||
}
|
||||
console.log(distributedDoc);
|
||||
console.log(`registered change for ${distributedDoc.id}`);
|
||||
logger.log('info', distributedDoc);
|
||||
logger.log('info', `registered change for ${distributedDoc.id}`);
|
||||
distributedDoc;
|
||||
},
|
||||
});
|
||||
@@ -252,7 +253,7 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
|
||||
): Promise<plugins.taskbuffer.distributedCoordination.IDistributedTaskRequestResult> {
|
||||
await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
|
||||
if (!this.ownInstance) {
|
||||
console.error('instance need to be started first...');
|
||||
logger.log('error', 'instance need to be started first...');
|
||||
return;
|
||||
}
|
||||
await this.ownInstance.updateFromDb();
|
||||
@@ -268,7 +269,7 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
|
||||
return taskRequestResult;
|
||||
});
|
||||
if (!result) {
|
||||
console.warn('no result found for task request...');
|
||||
logger.log('warn', 'no result found for task request...');
|
||||
return null;
|
||||
}
|
||||
return result;
|
||||
@@ -285,7 +286,7 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
|
||||
);
|
||||
});
|
||||
if (!existingInfoBasis) {
|
||||
console.warn('trying to update a non existing task request... aborting!');
|
||||
logger.log('warn', 'trying to update a non existing task request... aborting!');
|
||||
return;
|
||||
}
|
||||
Object.assign(existingInfoBasis, infoBasisArg);
|
||||
@@ -293,8 +294,10 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
|
||||
plugins.smartdelay.delayFor(60000).then(() => {
|
||||
this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
|
||||
const indexToRemove = this.ownInstance.data.taskRequests.indexOf(existingInfoBasis);
|
||||
this.ownInstance.data.taskRequests.splice(indexToRemove, indexToRemove);
|
||||
await this.ownInstance.save();
|
||||
if (indexToRemove >= 0) {
|
||||
this.ownInstance.data.taskRequests.splice(indexToRemove, 1);
|
||||
await this.ownInstance.save();
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
Reference in New Issue
Block a user