113 lines
5.2 KiB
TypeScript
113 lines
5.2 KiB
TypeScript
import { tap, expect } from '@push.rocks/tapbundle';
|
|
import * as smartmongo from '@push.rocks/smartmongo';
|
|
import type * as taskbuffer from '@push.rocks/taskbuffer';
|
|
|
|
import * as smartdata from '../ts/index.js';
|
|
import { SmartdataDistributedCoordinator, DistributedClass } from '../ts/smartdata.classes.distributedcoordinator.js'; // path might need adjusting
|
|
const totalInstances = 10;
|
|
|
|
// =======================================
|
|
// Connecting to the database server
|
|
// =======================================
|
|
|
|
let smartmongoInstance: smartmongo.SmartMongo;
|
|
let testDb: smartdata.SmartdataDb;
|
|
|
|
tap.test('should create a testinstance as database', async () => {
|
|
smartmongoInstance = await smartmongo.SmartMongo.createAndStart();
|
|
testDb = new smartdata.SmartdataDb(await smartmongoInstance.getMongoDescriptor());
|
|
await testDb.init();
|
|
});
|
|
|
|
tap.test('should instantiate DistributedClass', async (tools) => {
|
|
const instance = new DistributedClass();
|
|
expect(instance).toBeInstanceOf(DistributedClass);
|
|
});
|
|
|
|
tap.test('DistributedClass should update the time', async (tools) => {
|
|
const distributedCoordinator = new SmartdataDistributedCoordinator(testDb);
|
|
await distributedCoordinator.start();
|
|
const initialTime = distributedCoordinator.ownInstance.data.lastUpdated;
|
|
await distributedCoordinator.sendHeartbeat();
|
|
const updatedTime = distributedCoordinator.ownInstance.data.lastUpdated;
|
|
expect(updatedTime).toBeGreaterThan(initialTime);
|
|
await distributedCoordinator.stop();
|
|
});
|
|
|
|
tap.test('should instantiate SmartdataDistributedCoordinator', async (tools) => {
|
|
const distributedCoordinator = new SmartdataDistributedCoordinator(testDb);
|
|
await distributedCoordinator.start();
|
|
expect(distributedCoordinator).toBeInstanceOf(SmartdataDistributedCoordinator);
|
|
await distributedCoordinator.stop();
|
|
});
|
|
|
|
tap.test('SmartdataDistributedCoordinator should update leader status', async (tools) => {
|
|
const distributedCoordinator = new SmartdataDistributedCoordinator(testDb);
|
|
await distributedCoordinator.start();
|
|
await distributedCoordinator.checkAndMaybeLead();
|
|
expect(distributedCoordinator.ownInstance.data.elected).toBeOneOf([true, false]);
|
|
await distributedCoordinator.stop();
|
|
});
|
|
|
|
tap.test('SmartdataDistributedCoordinator should handle distributed task requests', async (tools) => {
|
|
const distributedCoordinator = new SmartdataDistributedCoordinator(testDb);
|
|
await distributedCoordinator.start();
|
|
|
|
const mockTaskRequest: taskbuffer.distributedCoordination.IDistributedTaskRequest = {
|
|
submitterId: "mockSubmitter12345", // Some unique mock submitter ID
|
|
requestResponseId: 'uni879873462hjhfkjhsdf', // Some unique ID for the request-response
|
|
taskName: "SampleTask",
|
|
taskVersion: "1.0.0", // Assuming it's a version string
|
|
taskExecutionTime: Date.now(),
|
|
taskExecutionTimeout: 60000, // Let's say the timeout is 1 minute (60000 ms)
|
|
taskExecutionParallel: 5, // Let's assume max 5 parallel executions
|
|
status: 'requesting'
|
|
};
|
|
|
|
const response = await distributedCoordinator.fireDistributedTaskRequest(mockTaskRequest);
|
|
console.log(response) // based on your expected structure for the response
|
|
await distributedCoordinator.stop();
|
|
});
|
|
|
|
tap.test('SmartdataDistributedCoordinator should update distributed task requests', async (tools) => {
|
|
const distributedCoordinator = new SmartdataDistributedCoordinator(testDb);
|
|
|
|
await distributedCoordinator.start();
|
|
|
|
const mockTaskRequest: taskbuffer.distributedCoordination.IDistributedTaskRequest = {
|
|
submitterId: "mockSubmitter12345", // Some unique mock submitter ID
|
|
requestResponseId: 'uni879873462hjhfkjhsdf', // Some unique ID for the request-response
|
|
taskName: "SampleTask",
|
|
taskVersion: "1.0.0", // Assuming it's a version string
|
|
taskExecutionTime: Date.now(),
|
|
taskExecutionTimeout: 60000, // Let's say the timeout is 1 minute (60000 ms)
|
|
taskExecutionParallel: 5, // Let's assume max 5 parallel executions
|
|
status: 'requesting'
|
|
};
|
|
|
|
|
|
await distributedCoordinator.updateDistributedTaskRequest(mockTaskRequest);
|
|
// Here, we can potentially check if a DB entry got updated or some other side-effect of the update method.
|
|
await distributedCoordinator.stop();
|
|
});
|
|
|
|
tap.test('should elect only one leader amongst multiple instances', async (tools) => {
|
|
const coordinators = Array.from({ length: totalInstances }).map(() => new SmartdataDistributedCoordinator(testDb));
|
|
await Promise.all(coordinators.map(coordinator => coordinator.start()));
|
|
const leaders = coordinators.filter(coordinator => coordinator.ownInstance.data.elected);
|
|
for (const leader of leaders) {
|
|
console.log(leader.ownInstance);
|
|
}
|
|
expect(leaders.length).toEqual(1);
|
|
|
|
// stopping clears a coordinator from being elected.
|
|
await Promise.all(coordinators.map(coordinator => coordinator.stop()));
|
|
});
|
|
|
|
tap.test('should clean up', async () => {
|
|
await smartmongoInstance.stopAndDumpToDir(`.nogit/dbdump/test.distributedcoordinator.ts`);
|
|
setTimeout(() => process.exit(), 2000);
|
|
})
|
|
|
|
tap.start({ throwOnError: true });
|