feat(smartmigration): add initial smartmigration package with MongoDB and S3 migration runner
This commit is contained in:
72
test/test.run.checkpoint.ts
Normal file
72
test/test.run.checkpoint.ts
Normal file
@@ -0,0 +1,72 @@
|
||||
import { tap, expect } from '@git.zone/tstest/tapbundle';
|
||||
import { makeTestDb } from './helpers/services.js';
|
||||
import type * as smartdata from '@push.rocks/smartdata';
|
||||
import { SmartMigration, SmartMigrationError } from '../ts/index.js';
|
||||
|
||||
let db: smartdata.SmartdataDb;
|
||||
let cleanup: () => Promise<void>;
|
||||
|
||||
tap.test('setup: connect shared db', async () => {
|
||||
const r = await makeTestDb('checkpoint');
|
||||
db = r.db;
|
||||
cleanup = r.cleanup;
|
||||
});
|
||||
|
||||
tap.test('checkpoint: resumable step writes and re-reads progress across runs', async () => {
|
||||
// First runner: simulate a crash after writing the first half of the work.
|
||||
const m1 = new SmartMigration({ targetVersion: '1.1.0', db, ledgerName: 'resume' });
|
||||
let crashed = false;
|
||||
m1.step('big-job').from('1.0.0').to('1.1.0').resumable().up(async (ctx) => {
|
||||
const seen = (await ctx.checkpoint!.read<number>('processed')) ?? 0;
|
||||
// Pretend we processed items 0..4 and then crash.
|
||||
for (let i = seen; i < 5; i++) {
|
||||
await ctx.checkpoint!.write('processed', i + 1);
|
||||
}
|
||||
crashed = true;
|
||||
throw new Error('simulated crash mid-step');
|
||||
});
|
||||
|
||||
let caught: SmartMigrationError | undefined;
|
||||
try {
|
||||
await m1.run();
|
||||
} catch (err) {
|
||||
caught = err as SmartMigrationError;
|
||||
}
|
||||
expect(caught).toBeInstanceOf(SmartMigrationError);
|
||||
expect(crashed).toBeTrue();
|
||||
|
||||
// Second runner: should see the checkpoint and resume from item 5.
|
||||
const m2 = new SmartMigration({ targetVersion: '1.1.0', db, ledgerName: 'resume' });
|
||||
let resumedFrom: number | undefined;
|
||||
let finalCount: number | undefined;
|
||||
m2.step('big-job').from('1.0.0').to('1.1.0').resumable().up(async (ctx) => {
|
||||
resumedFrom = (await ctx.checkpoint!.read<number>('processed')) ?? 0;
|
||||
// Process items 5..9.
|
||||
let n = resumedFrom;
|
||||
while (n < 10) {
|
||||
n++;
|
||||
await ctx.checkpoint!.write('processed', n);
|
||||
}
|
||||
finalCount = n;
|
||||
});
|
||||
const r = await m2.run();
|
||||
expect(resumedFrom).toEqual(5);
|
||||
expect(finalCount).toEqual(10);
|
||||
expect(r.currentVersionAfter).toEqual('1.1.0');
|
||||
});
|
||||
|
||||
tap.test('checkpoint: ctx.checkpoint is undefined for non-resumable steps', async () => {
|
||||
const m = new SmartMigration({ targetVersion: '1.1.0', db, ledgerName: 'noresume' });
|
||||
let observed: boolean | undefined;
|
||||
m.step('plain').from('1.0.0').to('1.1.0').up(async (ctx) => {
|
||||
observed = ctx.checkpoint === undefined;
|
||||
});
|
||||
await m.run();
|
||||
expect(observed).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('cleanup: close shared db', async () => {
|
||||
await cleanup();
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
Reference in New Issue
Block a user