Compare commits

..

41 Commits

Author SHA1 Message Date
203a284c88 5.0.43 2024-04-14 01:24:22 +02:00
30ae641a9c fix(core): update 2024-04-14 01:24:21 +02:00
cfe733621f update npmextra.json: githost 2024-04-01 21:34:27 +02:00
1f76e2478e update npmextra.json: githost 2024-04-01 19:57:58 +02:00
7d668bee05 update npmextra.json: githost 2024-03-30 21:46:55 +01:00
bef7f68360 5.0.42 2024-03-30 12:26:43 +01:00
56e9754725 fix(TypeScript): improve tsconfig.json for ES Module use 2024-03-30 12:26:42 +01:00
30d81581cf 5.0.41 2024-03-27 17:32:02 +01:00
5e9db12955 fix(core): update 2024-03-27 17:32:01 +01:00
ad2f422c86 5.0.40 2024-03-27 17:30:51 +01:00
17ce14bcb9 fix(core): update 2024-03-27 17:30:50 +01:00
32319e6e77 5.0.39 2024-03-27 17:30:15 +01:00
4cd284eaa9 fix(core): update 2024-03-27 17:30:14 +01:00
00ec2e57c2 5.0.38 2024-03-27 16:24:58 +01:00
765356ce3d fix(core): update 2024-03-27 16:24:57 +01:00
56b8581d2b 5.0.37 2024-03-26 13:22:34 +01:00
37a9df9086 fix(core): update 2024-03-26 13:22:33 +01:00
090fb668cd 5.0.36 2024-03-26 13:21:37 +01:00
a1c807261c fix(core): update 2024-03-26 13:21:36 +01:00
a2ccf15f69 5.0.35 2024-03-26 00:25:06 +01:00
84d48f1914 fix(core): update 2024-03-26 00:25:06 +01:00
1e258e5ffb 5.0.34 2024-03-22 18:36:35 +01:00
19d5f553b9 fix(core): update 2024-03-22 18:36:34 +01:00
7a257ea925 5.0.33 2023-08-21 12:39:49 +02:00
2fa1e89f34 fix(core): update 2023-08-21 12:39:48 +02:00
d6b3896dd3 5.0.32 2023-08-16 13:16:40 +02:00
49b11b17ce fix(core): update 2023-08-16 13:16:39 +02:00
4ac8a4c0cd 5.0.31 2023-08-16 12:08:28 +02:00
7f9983382a fix(core): update 2023-08-16 12:08:27 +02:00
54f529b0a7 5.0.30 2023-08-15 19:55:53 +02:00
f542463bf6 fix(core): update 2023-08-15 19:55:52 +02:00
1235ae2eb3 5.0.29 2023-08-15 19:55:23 +02:00
8166d2f7c2 fix(core): update 2023-08-15 19:55:22 +02:00
7c9f27e02f 5.0.28 2023-08-15 01:24:30 +02:00
842e4b280b fix(core): update 2023-08-15 01:24:29 +02:00
009f3297b2 5.0.27 2023-08-15 01:01:16 +02:00
2ff3a4e0b7 fix(core): update 2023-08-15 01:01:16 +02:00
0e55cd8876 5.0.26 2023-08-12 23:32:40 +02:00
eccdf3f00a fix(core): update 2023-08-12 23:32:39 +02:00
c7544133d9 5.0.25 2023-08-12 23:32:02 +02:00
c7c9acf5bd fix(core): update 2023-08-12 23:32:02 +02:00
16 changed files with 1966 additions and 1049 deletions

View File

@ -12,12 +12,15 @@
"gitzone": { "gitzone": {
"projectType": "npm", "projectType": "npm",
"module": { "module": {
"githost": "gitlab.com", "githost": "code.foss.global",
"gitscope": "push.rocks", "gitscope": "push.rocks",
"gitrepo": "smartdata", "gitrepo": "smartdata",
"description": "do more with data", "description": "do more with data",
"npmPackagename": "@push.rocks/smartdata", "npmPackagename": "@push.rocks/smartdata",
"license": "MIT" "license": "MIT"
} }
},
"tsdoc": {
"legal": "\n## License and Legal Information\n\nThis repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the [license](license) file within this repository. \n\n**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.\n\n### Trademarks\n\nThis project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.\n\n### Company Information\n\nTask Venture Capital GmbH \nRegistered at District court Bremen HRB 35230 HB, Germany\n\nFor any legal inquiries or if you require further information, please contact us via email at hello@task.vc.\n\nBy using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.\n"
} }
} }

View File

@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartdata", "name": "@push.rocks/smartdata",
"version": "5.0.24", "version": "5.0.43",
"private": false, "private": false,
"description": "do more with data", "description": "do more with data",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",
@ -22,27 +22,26 @@
}, },
"homepage": "https://gitlab.com/pushrocks/smartdata#README", "homepage": "https://gitlab.com/pushrocks/smartdata#README",
"dependencies": { "dependencies": {
"@push.rocks/lik": "^6.0.2", "@push.rocks/lik": "^6.0.14",
"@push.rocks/smartdelay": "^3.0.1", "@push.rocks/smartdelay": "^3.0.1",
"@push.rocks/smartlog": "^3.0.2", "@push.rocks/smartlog": "^3.0.2",
"@push.rocks/smartmongo": "^2.0.10", "@push.rocks/smartmongo": "^2.0.10",
"@push.rocks/smartpromise": "^4.0.2", "@push.rocks/smartpromise": "^4.0.2",
"@push.rocks/smartrx": "^3.0.6", "@push.rocks/smartrx": "^3.0.7",
"@push.rocks/smartstring": "^4.0.7", "@push.rocks/smartstring": "^4.0.15",
"@push.rocks/smarttime": "^4.0.1", "@push.rocks/smarttime": "^4.0.6",
"@push.rocks/smartunique": "^3.0.3", "@push.rocks/smartunique": "^3.0.8",
"@push.rocks/taskbuffer": "^3.1.0", "@push.rocks/taskbuffer": "^3.1.7",
"@tsclass/tsclass": "^4.0.42", "@tsclass/tsclass": "^4.0.52",
"mongodb": "^5.7.0" "mongodb": "^6.5.0"
}, },
"devDependencies": { "devDependencies": {
"@gitzone/tsbuild": "^2.1.66", "@gitzone/tsbuild": "^2.1.66",
"@gitzone/tsrun": "^1.2.44", "@gitzone/tsrun": "^1.2.44",
"@gitzone/tstest": "^1.0.77", "@gitzone/tstest": "^1.0.77",
"@push.rocks/qenv": "^6.0.2", "@push.rocks/qenv": "^6.0.5",
"@push.rocks/tapbundle": "^5.0.8", "@push.rocks/tapbundle": "^5.0.22",
"@types/node": "^20.4.9", "@types/node": "^20.11.30"
"@types/shortid": "0.0.29"
}, },
"files": [ "files": [
"ts/**/*", "ts/**/*",

2589
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,112 @@
import { tap, expect } from '@push.rocks/tapbundle';
import * as smartmongo from '@push.rocks/smartmongo';
import type * as taskbuffer from '@push.rocks/taskbuffer';
import * as smartdata from '../ts/index.js';
import { SmartdataDistributedCoordinator, DistributedClass } from '../ts/smartdata.classes.distributedcoordinator.js'; // path might need adjusting
const totalInstances = 10;
// =======================================
// Connecting to the database server
// =======================================
let smartmongoInstance: smartmongo.SmartMongo;
let testDb: smartdata.SmartdataDb;
tap.test('should create a testinstance as database', async () => {
smartmongoInstance = await smartmongo.SmartMongo.createAndStart();
testDb = new smartdata.SmartdataDb(await smartmongoInstance.getMongoDescriptor());
await testDb.init();
});
tap.test('should instantiate DistributedClass', async (tools) => {
const instance = new DistributedClass();
expect(instance).toBeInstanceOf(DistributedClass);
});
tap.test('DistributedClass should update the time', async (tools) => {
const distributedCoordinator = new SmartdataDistributedCoordinator(testDb);
await distributedCoordinator.start();
const initialTime = distributedCoordinator.ownInstance.data.lastUpdated;
await distributedCoordinator.sendHeartbeat();
const updatedTime = distributedCoordinator.ownInstance.data.lastUpdated;
expect(updatedTime).toBeGreaterThan(initialTime);
await distributedCoordinator.stop();
});
tap.test('should instantiate SmartdataDistributedCoordinator', async (tools) => {
const distributedCoordinator = new SmartdataDistributedCoordinator(testDb);
await distributedCoordinator.start();
expect(distributedCoordinator).toBeInstanceOf(SmartdataDistributedCoordinator);
await distributedCoordinator.stop();
});
tap.test('SmartdataDistributedCoordinator should update leader status', async (tools) => {
const distributedCoordinator = new SmartdataDistributedCoordinator(testDb);
await distributedCoordinator.start();
await distributedCoordinator.checkAndMaybeLead();
expect(distributedCoordinator.ownInstance.data.elected).toBeOneOf([true, false]);
await distributedCoordinator.stop();
});
tap.test('SmartdataDistributedCoordinator should handle distributed task requests', async (tools) => {
const distributedCoordinator = new SmartdataDistributedCoordinator(testDb);
await distributedCoordinator.start();
const mockTaskRequest: taskbuffer.distributedCoordination.IDistributedTaskRequest = {
submitterId: "mockSubmitter12345", // Some unique mock submitter ID
requestResponseId: 'uni879873462hjhfkjhsdf', // Some unique ID for the request-response
taskName: "SampleTask",
taskVersion: "1.0.0", // Assuming it's a version string
taskExecutionTime: Date.now(),
taskExecutionTimeout: 60000, // Let's say the timeout is 1 minute (60000 ms)
taskExecutionParallel: 5, // Let's assume max 5 parallel executions
status: 'requesting'
};
const response = await distributedCoordinator.fireDistributedTaskRequest(mockTaskRequest);
console.log(response) // based on your expected structure for the response
await distributedCoordinator.stop();
});
tap.test('SmartdataDistributedCoordinator should update distributed task requests', async (tools) => {
const distributedCoordinator = new SmartdataDistributedCoordinator(testDb);
await distributedCoordinator.start();
const mockTaskRequest: taskbuffer.distributedCoordination.IDistributedTaskRequest = {
submitterId: "mockSubmitter12345", // Some unique mock submitter ID
requestResponseId: 'uni879873462hjhfkjhsdf', // Some unique ID for the request-response
taskName: "SampleTask",
taskVersion: "1.0.0", // Assuming it's a version string
taskExecutionTime: Date.now(),
taskExecutionTimeout: 60000, // Let's say the timeout is 1 minute (60000 ms)
taskExecutionParallel: 5, // Let's assume max 5 parallel executions
status: 'requesting'
};
await distributedCoordinator.updateDistributedTaskRequest(mockTaskRequest);
// Here, we can potentially check if a DB entry got updated or some other side-effect of the update method.
await distributedCoordinator.stop();
});
tap.test('should elect only one leader amongst multiple instances', async (tools) => {
const coordinators = Array.from({ length: totalInstances }).map(() => new SmartdataDistributedCoordinator(testDb));
await Promise.all(coordinators.map(coordinator => coordinator.start()));
const leaders = coordinators.filter(coordinator => coordinator.ownInstance.data.elected);
for (const leader of leaders) {
console.log(leader.ownInstance);
}
expect(leaders.length).toEqual(1);
// stopping clears a coordinator from being elected.
await Promise.all(coordinators.map(coordinator => coordinator.stop()));
});
tap.test('should clean up', async () => {
await smartmongoInstance.stopAndDumpToDir(`.nogit/dbdump/test.distributedcoordinator.ts`);
setTimeout(() => process.exit(), 2000);
})
tap.start({ throwOnError: true });

View File

@ -26,7 +26,7 @@ tap.test('should create a testinstance as database', async () => {
tap.skip.test('should connect to atlas', async (tools) => { tap.skip.test('should connect to atlas', async (tools) => {
const databaseName = `test-smartdata-${smartunique.shortId()}`; const databaseName = `test-smartdata-${smartunique.shortId()}`;
testDb = new smartdata.SmartdataDb({ testDb = new smartdata.SmartdataDb({
mongoDbUrl: testQenv.getEnvVarOnDemand('MONGO_URL'), mongoDbUrl: await testQenv.getEnvVarOnDemand('MONGO_URL'),
mongoDbName: databaseName, mongoDbName: databaseName,
}); });
await testDb.init(); await testDb.init();

View File

@ -30,7 +30,7 @@ tap.test('should create a testinstance as database', async () => {
tap.skip.test('should connect to atlas', async (tools) => { tap.skip.test('should connect to atlas', async (tools) => {
const databaseName = `test-smartdata-${smartunique.shortId()}`; const databaseName = `test-smartdata-${smartunique.shortId()}`;
testDb = new smartdata.SmartdataDb({ testDb = new smartdata.SmartdataDb({
mongoDbUrl: testQenv.getEnvVarOnDemand('MONGO_URL'), mongoDbUrl: await testQenv.getEnvVarOnDemand('MONGO_URL'),
mongoDbName: databaseName, mongoDbName: databaseName,
}); });
await testDb.init(); await testDb.init();
@ -72,6 +72,11 @@ class Car extends smartdata.SmartDataDbDoc<Car, Car> {
} }
} }
tap.test('should create a new id', async () => {
const newid = await Car.getNewId();
console.log(newid);
});
tap.test('should save the car to the db', async (toolsArg) => { tap.test('should save the car to the db', async (toolsArg) => {
const myCar = new Car('red', 'Volvo'); const myCar = new Car('red', 'Volvo');
await myCar.save(); await myCar.save();
@ -213,11 +218,13 @@ tap.test('should use a cursor', async () => {
// close the database connection // close the database connection
// ======================================= // =======================================
tap.test('close', async () => { tap.test('close', async () => {
if (smartmongoInstance) {
await smartmongoInstance.stopAndDumpToDir('./.nogit/dbdump/test.ts');
} else {
await testDb.mongoDb.dropDatabase(); await testDb.mongoDb.dropDatabase();
await testDb.close(); await testDb.close();
if (smartmongoInstance) {
await smartmongoInstance.stop();
} }
setTimeout(() => process.exit(), 2000);
}); });
tap.start({ throwOnError: true }); tap.start({ throwOnError: true });

View File

@ -28,7 +28,7 @@ tap.test('should create a testinstance as database', async () => {
tap.skip.test('should connect to atlas', async (tools) => { tap.skip.test('should connect to atlas', async (tools) => {
const databaseName = `test-smartdata-${smartunique.shortId()}`; const databaseName = `test-smartdata-${smartunique.shortId()}`;
testDb = new smartdata.SmartdataDb({ testDb = new smartdata.SmartdataDb({
mongoDbUrl: testQenv.getEnvVarOnDemand('MONGO_URL'), mongoDbUrl: await testQenv.getEnvVarOnDemand('MONGO_URL'),
mongoDbName: databaseName, mongoDbName: databaseName,
}); });
await testDb.init(); await testDb.init();

View File

@ -28,7 +28,7 @@ tap.test('should create a testinstance as database', async () => {
tap.skip.test('should connect to atlas', async (tools) => { tap.skip.test('should connect to atlas', async (tools) => {
const databaseName = `test-smartdata-${smartunique.shortId()}`; const databaseName = `test-smartdata-${smartunique.shortId()}`;
testDb = new smartdata.SmartdataDb({ testDb = new smartdata.SmartdataDb({
mongoDbUrl: testQenv.getEnvVarOnDemand('MONGO_URL'), mongoDbUrl: await testQenv.getEnvVarOnDemand('MONGO_URL'),
mongoDbName: databaseName, mongoDbName: databaseName,
}); });
await testDb.init(); await testDb.init();

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartdata', name: '@push.rocks/smartdata',
version: '5.0.24', version: '5.0.43',
description: 'do more with data' description: 'do more with data'
} }

View File

@ -26,7 +26,8 @@ const collectionFactory = new CollectionFactory();
*/ */
export function Collection(dbArg: SmartdataDb | TDelayed<SmartdataDb>) { export function Collection(dbArg: SmartdataDb | TDelayed<SmartdataDb>) {
return function classDecorator<T extends { new (...args: any[]): {} }>(constructor: T) { return function classDecorator<T extends { new (...args: any[]): {} }>(constructor: T) {
return class extends constructor { const decoratedClass = class extends constructor {
public static className = constructor.name;
public static get collection() { public static get collection() {
if (!(dbArg instanceof SmartdataDb)) { if (!(dbArg instanceof SmartdataDb)) {
dbArg = dbArg(); dbArg = dbArg();
@ -40,6 +41,7 @@ export function Collection(dbArg: SmartdataDb | TDelayed<SmartdataDb>) {
return collectionFactory.getCollection(constructor.name, dbArg); return collectionFactory.getCollection(constructor.name, dbArg);
} }
}; };
return decoratedClass;
}; };
} }
@ -56,9 +58,10 @@ export const setDefaultManagerForDoc = <T>(managerArg: IManager, dbDocArg: T): T
* This is a decorator that will tell the decorated class what dbTable to use * This is a decorator that will tell the decorated class what dbTable to use
* @param dbArg * @param dbArg
*/ */
export function Manager<TManager extends IManager>(managerArg?: TManager | TDelayed<TManager>) { export function managed<TManager extends IManager>(managerArg?: TManager | TDelayed<TManager>) {
return function classDecorator<T extends { new (...args: any[]): any }>(constructor: T) { return function classDecorator<T extends { new (...args: any[]): any }>(constructor: T) {
return class extends constructor { const decoratedClass = class extends constructor {
public static className = constructor.name;
public static get collection() { public static get collection() {
let dbArg: SmartdataDb; let dbArg: SmartdataDb;
if (!managerArg) { if (!managerArg) {
@ -106,9 +109,15 @@ export function Manager<TManager extends IManager>(managerArg?: TManager | TDela
return manager; return manager;
} }
}; };
return decoratedClass;
}; };
} }
/**
* @dpecrecated use @managed instead
*/
export const Manager = managed;
export class SmartdataCollection<T> { export class SmartdataCollection<T> {
/** /**
* the collection that is used * the collection that is used
@ -269,7 +278,7 @@ export class SmartdataCollection<T> {
* if this.objectValidation is not set it passes. * if this.objectValidation is not set it passes.
*/ */
private checkDoc(docArg: T): Promise<void> { private checkDoc(docArg: T): Promise<void> {
const done = plugins.smartq.defer<void>(); const done = plugins.smartpromise.defer<void>();
let validationResult = true; let validationResult = true;
if (this.objectValidation) { if (this.objectValidation) {
validationResult = this.objectValidation(docArg); validationResult = this.objectValidation(docArg);

View File

@ -1,18 +1,20 @@
import * as plugins from './smartdata.plugins.js'; import * as plugins from './smartdata.plugins.js';
import { SmartdataDb } from './smartdata.classes.db.js'; import { SmartdataDb } from './smartdata.classes.db.js';
import { Manager, setDefaultManagerForDoc } from './smartdata.classes.collection.js'; import { managed, setDefaultManagerForDoc } from './smartdata.classes.collection.js';
import { SmartDataDbDoc, svDb, unI } from './smartdata.classes.doc.js'; import { SmartDataDbDoc, svDb, unI } from './smartdata.classes.doc.js';
import { SmartdataDbWatcher } from './smartdata.classes.watcher.js';
@Manager() @managed()
class DistributedClass extends SmartDataDbDoc<DistributedClass, DistributedClass> { export class DistributedClass extends SmartDataDbDoc<DistributedClass, DistributedClass> {
// INSTANCE // INSTANCE
@unI() @unI()
public id: string; public id: string;
@svDb() @svDb()
public data: { public data: {
status: 'bidding' | 'settled' | 'initializing' | 'stopped'; status: 'initializing' | 'bidding' | 'settled' | 'stopped';
biddingShortcode?: string; biddingShortcode?: string;
biddingStartTime?: number;
lastUpdated: number; lastUpdated: number;
elected: boolean; elected: boolean;
/** /**
@ -37,14 +39,14 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
public readyPromise: Promise<any>; public readyPromise: Promise<any>;
public db: SmartdataDb; public db: SmartdataDb;
private asyncExecutionStack = new plugins.lik.AsyncExecutionStack(); private asyncExecutionStack = new plugins.lik.AsyncExecutionStack();
private ownInstance: DistributedClass; public ownInstance: DistributedClass;
public distributedWatcher: SmartdataDbWatcher<DistributedClass>;
constructor(dbArg?: SmartdataDb) { constructor(dbArg: SmartdataDb) {
super(); super();
this.db = dbArg; this.db = dbArg;
setDefaultManagerForDoc(this, DistributedClass); setDefaultManagerForDoc(this, DistributedClass);
this.readyPromise = this.db.statusConnectedDeferred.promise; this.readyPromise = this.db.statusConnectedDeferred.promise;
this.init();
} }
// smartdata specific stuff // smartdata specific stuff
@ -53,26 +55,56 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
} }
public async stop() { public async stop() {
await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
if (this.distributedWatcher) {
await this.distributedWatcher.close();
}
if (this.ownInstance?.data.elected) { if (this.ownInstance?.data.elected) {
this.ownInstance.data.elected = false; this.ownInstance.data.elected = false;
} else {
console.log(`can't stop a distributed instance that has not been started yet.`);
} }
if (this.ownInstance?.data.status === 'stopped') {
console.log(`stopping a distributed instance that has not been started yet.`);
}
this.ownInstance.data.status = 'stopped';
await this.ownInstance.save();
console.log(`stopped ${this.ownInstance.id}`);
});
} }
public id = plugins.smartunique.uni('distributedInstance'); public id = plugins.smartunique.uni('distributedInstance');
private startHeartbeat = async () => { private startHeartbeat = async () => {
while (this.ownInstance.data.status !== 'stopped') { while (this.ownInstance.data.status !== 'stopped') {
await this.sendHeartbeat();
await plugins.smartdelay.delayForRandom(5000, 10000);
}
};
public async sendHeartbeat() {
await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
if (this.ownInstance.data.status === 'stopped') {
console.log(`aborted sending heartbeat because status is stopped`);
return;
}
await this.ownInstance.updateFromDb(); await this.ownInstance.updateFromDb();
this.ownInstance.data.lastUpdated = Date.now(); this.ownInstance.data.lastUpdated = Date.now();
await this.ownInstance.save(); await this.ownInstance.save();
console.log(`sent heartbeat for ${this.ownInstance.id}`);
const allInstances = DistributedClass.getInstances({});
}); });
await plugins.smartdelay.delayFor(10000); if (this.ownInstance.data.status === 'stopped') {
console.log(`aborted sending heartbeat because status is stopped`);
return;
} }
}; const eligibleLeader = await this.getEligibleLeader();
public async init() { // not awaiting here because we don't want to block the heartbeat
this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
if (!eligibleLeader && this.ownInstance.data.status === 'settled') {
this.checkAndMaybeLead();
}
});
}
private async init() {
await this.readyPromise; await this.readyPromise;
if (!this.ownInstance) { if (!this.ownInstance) {
await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
@ -87,6 +119,8 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
}; };
await this.ownInstance.save(); await this.ownInstance.save();
}); });
} else {
console.warn(`distributed instance already initialized`);
} }
// lets enable the heartbeat // lets enable the heartbeat
@ -98,42 +132,73 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
return this.ownInstance; return this.ownInstance;
} }
public async getEligibleLeader() {
return this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
const allInstances = await DistributedClass.getInstances({});
let leaders = allInstances.filter((instanceArg) => instanceArg.data.elected === true);
const eligibleLeader = leaders.find(
(leader) =>
leader.data.lastUpdated >=
Date.now() - plugins.smarttime.getMilliSecondsFromUnits({ seconds: 20 })
);
return eligibleLeader;
});
}
// --> leader election // --> leader election
public async checkAndMaybeLead() { public async checkAndMaybeLead() {
const allInstances = await DistributedClass.getInstances({}); await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
let leader = allInstances.find((instanceArg) => instanceArg.data.elected === true); this.ownInstance.data.status = 'initializing';
if ( this.ownInstance.save();
leader && });
leader.data.lastUpdated >= if (await this.getEligibleLeader()) {
Date.now() - plugins.smarttime.getMilliSecondsFromUnits({ minutes: 1 })
) {
await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
await this.ownInstance.updateFromDb(); await this.ownInstance.updateFromDb();
this.ownInstance.data.status = 'settled'; this.ownInstance.data.status = 'settled';
await this.ownInstance.save(); await this.ownInstance.save();
console.log(`${this.ownInstance.id} settled as follower`);
}); });
return; return;
} else if (
(await DistributedClass.getInstances({})).find((instanceArg) => {
instanceArg.data.status === 'bidding' &&
instanceArg.data.biddingStartTime <= Date.now() - 4000 &&
instanceArg.data.biddingStartTime >= Date.now() - 30000;
})
) {
console.log('too late to the bidding party... waiting for next round.');
return;
} else { } else {
await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
await this.ownInstance.updateFromDb();
this.ownInstance.data.status = 'bidding'; this.ownInstance.data.status = 'bidding';
this.ownInstance.data.biddingStartTime = Date.now();
this.ownInstance.data.biddingShortcode = plugins.smartunique.shortId(); this.ownInstance.data.biddingShortcode = plugins.smartunique.shortId();
await this.ownInstance.save(); await this.ownInstance.save();
console.log('bidding code stored.');
}); });
await plugins.smartdelay.delayFor(plugins.smarttime.getMilliSecondsFromUnits({ minutes: 2 })); console.log(`bidding for leadership...`);
await plugins.smartdelay.delayFor(
plugins.smarttime.getMilliSecondsFromUnits({ seconds: 5 })
);
await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
let biddingInstances = await DistributedClass.getInstances({}); let biddingInstances = await DistributedClass.getInstances({});
biddingInstances = biddingInstances.filter( biddingInstances = biddingInstances.filter(
(instanceArg) => (instanceArg) =>
!instanceArg.data.elected && instanceArg.data.status === 'bidding' &&
instanceArg.data.lastUpdated >= instanceArg.data.lastUpdated >=
Date.now() - plugins.smarttime.getMilliSecondsFromUnits({ minutes: 1 }) Date.now() - plugins.smarttime.getMilliSecondsFromUnits({ seconds: 10 })
); );
console.log(`found ${biddingInstances.length} bidding instances...`);
this.ownInstance.data.elected = true; this.ownInstance.data.elected = true;
for (const biddingInstance of biddingInstances) { for (const biddingInstance of biddingInstances) {
if (biddingInstance.data.biddingShortcode < this.ownInstance.data.biddingShortcode) { if (biddingInstance.data.biddingShortcode < this.ownInstance.data.biddingShortcode) {
this.ownInstance.data.elected = false; this.ownInstance.data.elected = false;
} }
} }
await plugins.smartdelay.delayFor(5000);
console.log(`settling with status elected = ${this.ownInstance.data.elected}`);
this.ownInstance.data.status = 'settled';
await this.ownInstance.save(); await this.ownInstance.save();
}); });
if (this.ownInstance.data.elected) { if (this.ownInstance.data.elected) {
@ -148,20 +213,38 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
* the leading is implemented here * the leading is implemented here
*/ */
public async leadFunction() { public async leadFunction() {
const ownInstance = await this.init(); this.distributedWatcher = await DistributedClass.watch({});
const watcher = await DistributedClass.watch({});
/**
* this function is started once per unique job request
*/
const startResultTimer = async () => {};
watcher.changeSubject.subscribe({ const currentTaskRequests: Array<{
taskName: string;
taskExecutionTime: number;
/**
* all instances that requested this task
*/
requestingDistibutedInstanceIds: string[];
responseTimeout: plugins.smartdelay.Timeout<any>;
}> = [];
this.distributedWatcher.changeSubject.subscribe({
next: async (distributedDoc) => { next: async (distributedDoc) => {
if (!distributedDoc) {
console.log(`registered deletion of instance...`);
return;
}
console.log(distributedDoc);
console.log(`registered change for ${distributedDoc.id}`);
distributedDoc; distributedDoc;
}, },
}); });
while (this.ownInstance.data.status !== 'stopped') {
await plugins.smartdelay.delayFor(1000); while (this.ownInstance.data.status !== 'stopped' && this.ownInstance.data.elected) {
const allInstances = await DistributedClass.getInstances({});
for (const instance of allInstances) {
if (instance.data.status === 'stopped') {
await instance.delete();
};
}
await plugins.smartdelay.delayFor(10000);
} }
} }
@ -169,13 +252,29 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
public async fireDistributedTaskRequest( public async fireDistributedTaskRequest(
taskRequestArg: plugins.taskbuffer.distributedCoordination.IDistributedTaskRequest taskRequestArg: plugins.taskbuffer.distributedCoordination.IDistributedTaskRequest
): Promise<plugins.taskbuffer.distributedCoordination.IDistributedTaskRequestResult> { ): Promise<plugins.taskbuffer.distributedCoordination.IDistributedTaskRequestResult> {
const ownInstance = await this.init();
await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => { await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
if (!this.ownInstance) {
console.error('instance need to be started first...');
return;
}
await this.ownInstance.updateFromDb();
this.ownInstance.data.taskRequests.push(taskRequestArg); this.ownInstance.data.taskRequests.push(taskRequestArg);
await this.ownInstance.save(); await this.ownInstance.save();
}); });
await plugins.smartdelay.delayFor(10000);
const result = await this.asyncExecutionStack.getExclusiveExecutionSlot(async () => {
await this.ownInstance.updateFromDb();
const taskRequestResult = this.ownInstance.data.taskRequestResults.find((resultItem) => {
return resultItem.requestResponseId === taskRequestArg.requestResponseId;
});
return taskRequestResult;
});
if (!result) {
console.warn('no result found for task request...');
return null; return null;
} }
return result;
}
public async updateDistributedTaskRequest( public async updateDistributedTaskRequest(
infoBasisArg: plugins.taskbuffer.distributedCoordination.IDistributedTaskRequest infoBasisArg: plugins.taskbuffer.distributedCoordination.IDistributedTaskRequest
@ -187,6 +286,10 @@ export class SmartdataDistributedCoordinator extends plugins.taskbuffer.distribu
infoBasisItem.taskExecutionTime === infoBasisArg.taskExecutionTime infoBasisItem.taskExecutionTime === infoBasisArg.taskExecutionTime
); );
}); });
if (!existingInfoBasis) {
console.warn('trying to update a non existing task request... aborting!');
return;
}
Object.assign(existingInfoBasis, infoBasisArg); Object.assign(existingInfoBasis, infoBasisArg);
await this.ownInstance.save(); await this.ownInstance.save();
plugins.smartdelay.delayFor(60000).then(() => { plugins.smartdelay.delayFor(60000).then(() => {

View File

@ -127,6 +127,13 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
} }
} }
/**
* get a unique id prefixed with the class name
*/
public static async getNewId<T = any>(this: plugins.tsclass.typeFest.Class<T>, lengthArg: number = 20) {
return `${(this as any).className}:${plugins.smartunique.shortId(lengthArg)}`;
}
/** /**
* get cursor * get cursor
* @returns * @returns
@ -181,6 +188,18 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
*/ */
public creationStatus: TDocCreation = 'new'; public creationStatus: TDocCreation = 'new';
/**
* updated from db in any case where doc comes from db
*/
@svDb()
_createdAt: number = Date.now();
/**
* will be updated everytime the doc is saved
*/
@svDb()
_updatedAt: number = Date.now();
/** /**
* unique indexes * unique indexes
*/ */
@ -214,6 +233,9 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
// tslint:disable-next-line: no-this-assignment // tslint:disable-next-line: no-this-assignment
const self: any = this; const self: any = this;
let dbResult: any; let dbResult: any;
this._updatedAt = Date.now();
switch (this.creationStatus) { switch (this.creationStatus) {
case 'db': case 'db':
dbResult = await this.collection.update(self); dbResult = await this.collection.update(self);
@ -257,7 +279,7 @@ export class SmartDataDbDoc<T extends TImplements, TImplements, TManager extends
* updates an object from db * updates an object from db
*/ */
public async updateFromDb() { public async updateFromDb() {
const mongoDbNativeDoc = await this.collection.findOne(this.createIdentifiableObject()); const mongoDbNativeDoc = await this.collection.findOne(await this.createIdentifiableObject());
for (const key of Object.keys(mongoDbNativeDoc)) { for (const key of Object.keys(mongoDbNativeDoc)) {
this[key] = mongoDbNativeDoc[key]; this[key] = mongoDbNativeDoc[key];
} }

View File

@ -37,7 +37,16 @@ export class EasyStore<T> {
this.nameId = nameIdArg; this.nameId = nameIdArg;
} }
private async getEasyStore() { private easyStorePromise: Promise<InstanceType<typeof this.easyStoreClass>>;
private async getEasyStore(): Promise<InstanceType<typeof this.easyStoreClass>> {
if (this.easyStorePromise) {
return this.easyStorePromise;
};
// first run from here
const deferred = plugins.smartpromise.defer<InstanceType<typeof this.easyStoreClass>>();
this.easyStorePromise = deferred.promise;
let easyStore = await this.easyStoreClass.getInstance({ let easyStore = await this.easyStoreClass.getInstance({
nameId: this.nameId, nameId: this.nameId,
}); });
@ -48,7 +57,8 @@ export class EasyStore<T> {
easyStore.data = {}; easyStore.data = {};
await easyStore.save(); await easyStore.save();
} }
return easyStore; deferred.resolve(easyStore);
return this.easyStorePromise;
} }
/** /**
@ -70,7 +80,7 @@ export class EasyStore<T> {
/** /**
* writes a specific key to the keyValueStore * writes a specific key to the keyValueStore
*/ */
public async writeKey(keyArg: keyof T, valueArg: any) { public async writeKey<TKey extends keyof T>(keyArg: TKey, valueArg: T[TKey]) {
const easyStore = await this.getEasyStore(); const easyStore = await this.getEasyStore();
easyStore.data[keyArg] = valueArg; easyStore.data[keyArg] = valueArg;
await easyStore.save(); await easyStore.save();

View File

@ -17,9 +17,13 @@ export class SmartdataDbWatcher<T = any> {
smartdataDbDocArg: typeof SmartDataDbDoc smartdataDbDocArg: typeof SmartDataDbDoc
) { ) {
this.changeStream = changeStreamArg; this.changeStream = changeStreamArg;
this.changeStream.on('change', async (item: T) => { this.changeStream.on('change', async (item: any) => {
if (!item.fullDocument) {
this.changeSubject.next(null);
return;
}
this.changeSubject.next( this.changeSubject.next(
smartdataDbDocArg.createInstanceFromMongoDbNativeDoc(item) as any as T smartdataDbDocArg.createInstanceFromMongoDbNativeDoc(item.fullDocument) as any as T
); );
}); });
plugins.smartdelay.delayFor(0).then(() => { plugins.smartdelay.delayFor(0).then(() => {

View File

@ -8,7 +8,6 @@ import * as lik from '@push.rocks/lik';
import * as smartdelay from '@push.rocks/smartdelay'; import * as smartdelay from '@push.rocks/smartdelay';
import * as smartlog from '@push.rocks/smartlog'; import * as smartlog from '@push.rocks/smartlog';
import * as smartpromise from '@push.rocks/smartpromise'; import * as smartpromise from '@push.rocks/smartpromise';
import * as smartq from '@push.rocks/smartpromise';
import * as smartrx from '@push.rocks/smartrx'; import * as smartrx from '@push.rocks/smartrx';
import * as smartstring from '@push.rocks/smartstring'; import * as smartstring from '@push.rocks/smartstring';
import * as smarttime from '@push.rocks/smarttime'; import * as smarttime from '@push.rocks/smarttime';
@ -21,7 +20,6 @@ export {
smartdelay, smartdelay,
smartpromise, smartpromise,
smartlog, smartlog,
smartq,
smartrx, smartrx,
mongodb, mongodb,
smartstring, smartstring,

View File

@ -3,7 +3,12 @@
"experimentalDecorators": true, "experimentalDecorators": true,
"useDefineForClassFields": false, "useDefineForClassFields": false,
"target": "ES2022", "target": "ES2022",
"module": "ES2022", "module": "NodeNext",
"moduleResolution": "nodenext" "moduleResolution": "NodeNext",
} "esModuleInterop": true,
"verbatimModuleSyntax": true
},
"exclude": [
"dist_*/**/*.d.ts"
]
} }