feat(migration): add lock heartbeats, predictive dry-run planning, and stricter ledger option validation
This commit is contained in:
@@ -1,5 +1,13 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-04-14 - 1.3.0 - feat(migration)
|
||||
add lock heartbeats, predictive dry-run planning, and stricter ledger option validation
|
||||
|
||||
- switch the mongo ledger to atomic collection updates with lock renewal support and surface LOCK_LOST when a running migration loses its lock
|
||||
- make plan() and dryRun predict currentVersionAfter using the same fresh-install resolution as run() without creating S3 ledger sidecar files
|
||||
- validate freshInstallVersion, ledgerName, lockWaitMs, and lockTtlMs at construction time
|
||||
- clear resumable step checkpoints automatically after successful step completion
|
||||
|
||||
## 2026-04-08 - 1.2.0 - feat(versionresolver)
|
||||
support skip-forward resume for orphan ledger versions
|
||||
|
||||
|
||||
+12
-6
@@ -4,20 +4,26 @@ Internal development notes for `@push.rocks/smartmigration`. This file is not pu
|
||||
|
||||
## Architecture
|
||||
|
||||
- The ledger is the source of truth. Two backends: mongo (via smartdata `EasyStore`) and S3 (via `bucket.fastPut`/`fastGet`).
|
||||
- The ledger is the source of truth. Two backends: mongo (a single document in smartdata's `SmartdataEasyStore` collection) and S3 (via `bucket.fastPut`/`fastGet`).
|
||||
- Step execution is **sequential and registration-ordered**. The `from`/`to` fields exist for chain validation, not for DAG resolution.
|
||||
- Drivers (`mongodb.Db`, `S3Client`) are exposed via `ctx`, not wrapped. smartmigration writes no SQL or S3 wrappers of its own.
|
||||
|
||||
## Locking caveats
|
||||
|
||||
- The mongo lock uses a read-modify-write pattern over `EasyStore` (which writes the entire document each time). This is a last-writer-wins CAS, not a true atomic CAS. It's adequate for v1; v2 may move to `findOneAndUpdate` against the underlying mongo collection.
|
||||
- The mongo lock uses atomic `findOneAndUpdate` / `updateOne` operations against the `SmartdataEasyStore` collection and a background heartbeat that extends `lock.expiresAt` while steps are running.
|
||||
- The S3 lock is best-effort (S3 has no conditional writes without versioning). S3-only deployments should not run multiple instances against the same ledger without external coordination.
|
||||
|
||||
## Why EasyStore (and not a custom collection)?
|
||||
## Why reuse SmartdataEasyStore?
|
||||
|
||||
- Already exists, lazy collection setup, single-document storage that fits the entire ledger naturally.
|
||||
- Avoids polluting the user's DB with smartmigration-internal classes.
|
||||
- See `/mnt/data/lossless/push.rocks/smartdata/ts/classes.easystore.ts` for the implementation.
|
||||
- The collection already exists in the smartdata ecosystem and is a natural fit for singleton blobs keyed by `nameId`.
|
||||
- Reusing it avoids introducing a second smartmigration-specific collection just to store one document per ledger.
|
||||
- The lock path talks directly to the collection for atomic updates; the persisted document shape still mirrors EasyStore's `{ nameId, data }` layout.
|
||||
- See `/mnt/data/lossless/push.rocks/smartdata/ts/classes.easystore.ts` for the reference document shape.
|
||||
|
||||
## Runtime notes
|
||||
|
||||
- `plan()` / `dryRun` resolve the same fresh-install shortcut as `run()`, so their `currentVersionAfter` value is predictive rather than just echoing the current ledger version.
|
||||
- Successful resumable steps clear `checkpoints[stepId]`; failed steps leave checkpoints intact so the next run can resume.
|
||||
|
||||
## Testing notes
|
||||
|
||||
|
||||
@@ -28,8 +28,8 @@ Report bugs and security issues at [community.foss.global](https://community.fos
|
||||
| **Unified mongo + S3** | A single semver represents your combined data version. One step can touch both. |
|
||||
| **Drivers exposed via context** | `ctx.db`, `ctx.mongo`, `ctx.bucket`, `ctx.s3` — write any operation you can write directly |
|
||||
| **Idempotent** | `run()` is a no-op when data is at `targetVersion`. Costs one ledger read on the happy path. |
|
||||
| **Resumable** | Mark a step `.resumable()` and it gets `ctx.checkpoint.read/write/clear` for restartable bulk operations |
|
||||
| **Lockable** | Mongo-backed lock with TTL serializes concurrent SaaS instances — safe for rolling deploys |
|
||||
| **Resumable** | Mark a step `.resumable()` and it gets `ctx.checkpoint.read/write/clear` for restartable bulk operations; successful runs clear the step checkpoint automatically |
|
||||
| **Lockable** | Mongo-backed lock uses atomic updates plus TTL heartbeats to serialize concurrent SaaS instances — safe for rolling deploys |
|
||||
| **Fresh-install fast path** | Configure `freshInstallVersion` to skip migrations on a brand-new database |
|
||||
| **Dry-run** | `dryRun: true` or `.plan()` returns the execution plan without writing anything |
|
||||
| **Structured errors** | All failures throw `SmartMigrationError` with a stable `code` field for branching |
|
||||
@@ -83,8 +83,9 @@ migration
|
||||
const cursor = ctx.bucket!.createCursor('uploads/');
|
||||
const startToken = await ctx.checkpoint!.read<string>('cursorToken');
|
||||
if (startToken) cursor.setToken(startToken);
|
||||
while (await cursor.hasMore()) {
|
||||
for (const key of (await cursor.next()) ?? []) {
|
||||
while (cursor.hasMore()) {
|
||||
const { keys } = await cursor.next();
|
||||
for (const key of keys) {
|
||||
await ctx.bucket!.fastMove({
|
||||
sourcePath: key,
|
||||
destinationPath: 'media/' + key.slice('uploads/'.length),
|
||||
@@ -134,7 +135,7 @@ If no step's `toVersion` is greater than `currentVersion` (the ledger is past th
|
||||
|
||||
The ledger is the source of truth for "what data version are we at, what steps have been applied, who holds the lock right now." It is persisted in one of two backends:
|
||||
|
||||
- **`mongo` (default when `db` is provided)** — backed by smartdata's `EasyStore`, stored as a single document. Lock semantics work safely across multiple SaaS instances. **Recommended.**
|
||||
- **`mongo` (default when `db` is provided)** — stored as a single document in smartdata's `SmartdataEasyStore` collection. Lock acquisition / renewal uses atomic mongo updates and works safely across multiple SaaS instances. **Recommended.**
|
||||
- **`s3` (default when only `bucket` is provided)** — a single JSON object at `<bucket>/.smartmigration/<ledgerName>.json`. Lock is best-effort because S3 has no atomic CAS without additional infrastructure; do not use for multi-instance deployments without external coordination.
|
||||
|
||||
If you pass both `db` and `bucket`, mongo is used.
|
||||
@@ -217,9 +218,9 @@ migration
|
||||
const cursor = ctx.bucket!.createCursor('attachments/legacy/', { pageSize: 200 });
|
||||
const start = await ctx.checkpoint!.read<string>('cursor');
|
||||
if (start) cursor.setToken(start);
|
||||
while (await cursor.hasMore()) {
|
||||
const batch = (await cursor.next()) ?? [];
|
||||
for (const key of batch) {
|
||||
while (cursor.hasMore()) {
|
||||
const { keys } = await cursor.next();
|
||||
for (const key of keys) {
|
||||
await ctx.bucket!.fastMove({
|
||||
sourcePath: key,
|
||||
destinationPath: 'attachments/' + key.slice('attachments/legacy/'.length),
|
||||
@@ -232,6 +233,7 @@ migration
|
||||
```
|
||||
|
||||
If the process crashes mid-migration, the next call to `run()` will resume from the last persisted cursor token.
|
||||
If the step completes successfully, smartmigration clears that step's checkpoint bag automatically.
|
||||
|
||||
### Mongo + S3 in lockstep
|
||||
|
||||
@@ -276,6 +278,7 @@ await migration.run(); // for a fresh DB, runs zero steps and stamps version 5.0
|
||||
const planned = await migration.plan();
|
||||
console.log(`would apply ${planned.stepsSkipped.length} steps:`,
|
||||
planned.stepsSkipped.map((s) => `${s.id}(${s.fromVersion}→${s.toVersion})`).join(' → '));
|
||||
console.log(`predicted version after run: ${planned.currentVersionAfter}`);
|
||||
|
||||
// or — same thing, via dryRun option
|
||||
const m = new SmartMigration({ targetVersion: '2.0.0', db, dryRun: true });
|
||||
@@ -283,6 +286,8 @@ const m = new SmartMigration({ targetVersion: '2.0.0', db, dryRun: true });
|
||||
const result = await m.run(); // returns plan, doesn't write
|
||||
```
|
||||
|
||||
`plan()` / `dryRun` resolve the same fresh-install shortcut that `run()` would use, so the returned `currentVersionAfter` is the predicted post-run version. For S3-backed ledgers, planning does not create the `.smartmigration/<ledger>.json` sidecar.
|
||||
|
||||
## API reference
|
||||
|
||||
### `new SmartMigration(options: ISmartMigrationOptions)`
|
||||
@@ -306,6 +311,9 @@ The constructor throws `SmartMigrationError` with one of these `code`s on bad in
|
||||
- `INVALID_VERSION` — `targetVersion` is not a valid semver
|
||||
- `NO_RESOURCES` — neither `db` nor `bucket` provided
|
||||
- `LEDGER_BACKEND_MISMATCH` — explicit `ledgerBackend` doesn't match the resources you provided
|
||||
- `INVALID_LEDGER_NAME` — `ledgerName` is blank or not a string
|
||||
- `INVALID_LOCK_WAIT_MS` — `lockWaitMs` is not an integer `>= 0`
|
||||
- `INVALID_LOCK_TTL_MS` — `lockTtlMs` is not an integer `>= 1`
|
||||
|
||||
### `migration.step(id: string).from(v).to(v).[description(t)].[resumable()].up(handler)`
|
||||
|
||||
@@ -321,7 +329,7 @@ interface IMigrationRunResult {
|
||||
currentVersionAfter: string;
|
||||
targetVersion: string;
|
||||
wasUpToDate: boolean; // true if no steps ran
|
||||
wasFreshInstall: boolean; // true if freshInstallVersion was used
|
||||
wasFreshInstall: boolean; // true if startup took the fresh-install shortcut
|
||||
stepsApplied: IMigrationStepResult[];
|
||||
stepsSkipped: IMigrationStepResult[];
|
||||
totalDurationMs: number;
|
||||
@@ -330,12 +338,13 @@ interface IMigrationRunResult {
|
||||
|
||||
Throws `SmartMigrationError` with these `code`s:
|
||||
- `LOCK_TIMEOUT` — could not acquire lock within `lockWaitMs`
|
||||
- `LOCK_LOST` — the runner lost its lock while a migration was still in flight
|
||||
- `STEP_FAILED` — a step's handler threw; the failure is persisted to the ledger
|
||||
- `CHAIN_*`, `DUPLICATE_STEP_ID`, `NON_INCREASING_STEP`, `TARGET_NOT_REACHABLE`, `DOWNGRADE_NOT_SUPPORTED` — chain validation / planning errors
|
||||
|
||||
### `migration.plan(): Promise<IMigrationRunResult>`
|
||||
|
||||
Same as `run()` but does not acquire the lock or execute anything. Useful for `--dry-run` style probes in CI.
|
||||
Same as `run()` but does not acquire the lock or execute anything. Useful for `--dry-run` style probes in CI. `stepsSkipped` are the steps that would run, and `currentVersionAfter` is the predicted post-run version after fresh-install resolution.
|
||||
|
||||
### `migration.getCurrentVersion(): Promise<string | null>`
|
||||
|
||||
@@ -347,6 +356,10 @@ Returns the current data version from the ledger, or `null` if the ledger has ne
|
||||
|
||||
Another instance crashed while holding the lock. Wait for `lockTtlMs` (default 10 minutes) for the lock to expire, or manually clear the `lock` field on the ledger document.
|
||||
|
||||
### `LOCK_LOST` during a migration
|
||||
|
||||
The runner stopped renewing its lock or another instance replaced the lock document while a step was still running. Check for long-running steps, make sure `lockTtlMs` is comfortably larger than the expected renewal cadence, and investigate other processes touching the same ledger.
|
||||
|
||||
### `CHAIN_GAP` at startup
|
||||
|
||||
Two adjacent steps have mismatched versions: `step[N].to !== step[N+1].from`. Steps must form a contiguous chain in registration order. Fix the version on the offending step.
|
||||
|
||||
@@ -4,6 +4,24 @@ import * as smartbucket from '@push.rocks/smartbucket';
|
||||
|
||||
const qenv = new Qenv(process.cwd(), process.cwd() + '/.nogit/');
|
||||
|
||||
async function createSmartBucket(): Promise<smartbucket.SmartBucket> {
|
||||
return new smartbucket.SmartBucket({
|
||||
accessKey: await qenv.getEnvVarOnDemandStrict('S3_ACCESSKEY'),
|
||||
accessSecret: await qenv.getEnvVarOnDemandStrict('S3_SECRETKEY'),
|
||||
endpoint: await qenv.getEnvVarOnDemandStrict('S3_ENDPOINT'),
|
||||
port: parseInt(await qenv.getEnvVarOnDemandStrict('S3_PORT'), 10),
|
||||
useSsl: false,
|
||||
});
|
||||
}
|
||||
|
||||
function buildUniqueBucketName(baseName: string, suffix: string): string {
|
||||
const safeBase = baseName.toLowerCase().replace(/[^a-z0-9-]/g, '-');
|
||||
const safeSuffix = suffix.toLowerCase().replace(/[^a-z0-9-]/g, '-');
|
||||
const uniquePart = `${Date.now().toString(36)}-${Math.floor(Math.random() * 1e6).toString(36)}`;
|
||||
const combined = `${safeBase}-${safeSuffix}-${uniquePart}`.replace(/-+/g, '-');
|
||||
return combined.slice(0, 63).replace(/^-+|-+$/g, '');
|
||||
}
|
||||
|
||||
/**
|
||||
* Spin up a fresh `SmartdataDb` connected to the local mongo from .nogit/env.json,
|
||||
* scoped to a unique database name so tests cannot collide. Returns the db plus
|
||||
@@ -45,13 +63,7 @@ export async function makeTestDb(suffix: string) {
|
||||
* unique prefix and clean them up afterwards.
|
||||
*/
|
||||
export async function makeTestBucket() {
|
||||
const sb = new smartbucket.SmartBucket({
|
||||
accessKey: await qenv.getEnvVarOnDemandStrict('S3_ACCESSKEY'),
|
||||
accessSecret: await qenv.getEnvVarOnDemandStrict('S3_SECRETKEY'),
|
||||
endpoint: await qenv.getEnvVarOnDemandStrict('S3_ENDPOINT'),
|
||||
port: parseInt(await qenv.getEnvVarOnDemandStrict('S3_PORT'), 10),
|
||||
useSsl: false,
|
||||
});
|
||||
const sb = await createSmartBucket();
|
||||
const bucketName = await qenv.getEnvVarOnDemandStrict('S3_BUCKET');
|
||||
|
||||
// The bucket may not exist yet — try to fetch it; if missing, create it.
|
||||
@@ -64,3 +76,29 @@ export async function makeTestBucket() {
|
||||
}
|
||||
return { sb, bucket, bucketName };
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a unique bucket for tests that need a truly empty object store.
|
||||
*/
|
||||
export async function makeIsolatedTestBucket(suffix: string) {
|
||||
const sb = await createSmartBucket();
|
||||
const baseBucketName = await qenv.getEnvVarOnDemandStrict('S3_BUCKET');
|
||||
const bucketName = buildUniqueBucketName(baseBucketName, suffix);
|
||||
|
||||
const bucket = await sb.createBucket(bucketName);
|
||||
|
||||
const cleanup = async () => {
|
||||
try {
|
||||
await bucket.cleanAllContents();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
try {
|
||||
await sb.removeBucket(bucketName);
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
};
|
||||
|
||||
return { sb, bucket, bucketName, cleanup };
|
||||
}
|
||||
|
||||
@@ -24,6 +24,21 @@ tap.test('constructor: rejects invalid targetVersion', async () => {
|
||||
expect(caught!.code).toEqual('INVALID_VERSION');
|
||||
});
|
||||
|
||||
tap.test('constructor: rejects invalid freshInstallVersion', async () => {
|
||||
let caught: SmartMigrationError | undefined;
|
||||
try {
|
||||
new SmartMigration({
|
||||
targetVersion: '1.0.0',
|
||||
freshInstallVersion: 'nope',
|
||||
db: {} as any,
|
||||
});
|
||||
} catch (err) {
|
||||
caught = err as SmartMigrationError;
|
||||
}
|
||||
expect(caught).toBeInstanceOf(SmartMigrationError);
|
||||
expect(caught!.code).toEqual('INVALID_VERSION');
|
||||
});
|
||||
|
||||
tap.test('constructor: rejects when neither db nor bucket given', async () => {
|
||||
let caught: SmartMigrationError | undefined;
|
||||
try {
|
||||
@@ -70,6 +85,51 @@ tap.test('constructor: rejects mongo backend without db', async () => {
|
||||
expect(caught!.code).toEqual('LEDGER_BACKEND_MISMATCH');
|
||||
});
|
||||
|
||||
tap.test('constructor: rejects blank ledgerName', async () => {
|
||||
let caught: SmartMigrationError | undefined;
|
||||
try {
|
||||
new SmartMigration({
|
||||
targetVersion: '1.0.0',
|
||||
db: {} as any,
|
||||
ledgerName: ' ',
|
||||
});
|
||||
} catch (err) {
|
||||
caught = err as SmartMigrationError;
|
||||
}
|
||||
expect(caught).toBeInstanceOf(SmartMigrationError);
|
||||
expect(caught!.code).toEqual('INVALID_LEDGER_NAME');
|
||||
});
|
||||
|
||||
tap.test('constructor: rejects negative lockWaitMs', async () => {
|
||||
let caught: SmartMigrationError | undefined;
|
||||
try {
|
||||
new SmartMigration({
|
||||
targetVersion: '1.0.0',
|
||||
db: {} as any,
|
||||
lockWaitMs: -1,
|
||||
});
|
||||
} catch (err) {
|
||||
caught = err as SmartMigrationError;
|
||||
}
|
||||
expect(caught).toBeInstanceOf(SmartMigrationError);
|
||||
expect(caught!.code).toEqual('INVALID_LOCK_WAIT_MS');
|
||||
});
|
||||
|
||||
tap.test('constructor: rejects non-positive lockTtlMs', async () => {
|
||||
let caught: SmartMigrationError | undefined;
|
||||
try {
|
||||
new SmartMigration({
|
||||
targetVersion: '1.0.0',
|
||||
db: {} as any,
|
||||
lockTtlMs: 0,
|
||||
});
|
||||
} catch (err) {
|
||||
caught = err as SmartMigrationError;
|
||||
}
|
||||
expect(caught).toBeInstanceOf(SmartMigrationError);
|
||||
expect(caught!.code).toEqual('INVALID_LOCK_TTL_MS');
|
||||
});
|
||||
|
||||
tap.test('constructor: rejects s3 backend without bucket', async () => {
|
||||
let caught: SmartMigrationError | undefined;
|
||||
try {
|
||||
|
||||
@@ -29,6 +29,7 @@ tap.test('dryRun: returns plan without invoking handlers', async () => {
|
||||
expect(r.stepsApplied).toHaveLength(0);
|
||||
expect(r.stepsSkipped).toHaveLength(2);
|
||||
expect(r.stepsSkipped.map((s) => s.id)).toEqual(['a', 'b']);
|
||||
expect(r.currentVersionAfter).toEqual('2.0.0');
|
||||
|
||||
// The ledger should still be in its initial state (currentVersion = null).
|
||||
const current = await m.getCurrentVersion();
|
||||
@@ -45,12 +46,54 @@ tap.test('plan(): returns plan without writing or running', async () => {
|
||||
expect(stepCalled).toBeFalse();
|
||||
expect(planResult.stepsSkipped).toHaveLength(1);
|
||||
expect(planResult.stepsApplied).toHaveLength(0);
|
||||
expect(planResult.currentVersionAfter).toEqual('2.0.0');
|
||||
|
||||
// Plan does not modify the ledger.
|
||||
const current = await m.getCurrentVersion();
|
||||
expect(current).toBeNull();
|
||||
});
|
||||
|
||||
tap.test('dryRun: models freshInstallVersion on an empty database', async () => {
|
||||
const m = new SmartMigration({
|
||||
targetVersion: '2.0.0',
|
||||
db,
|
||||
ledgerName: 'dryrun_fresh',
|
||||
freshInstallVersion: '2.0.0',
|
||||
dryRun: true,
|
||||
});
|
||||
let stepCalled = false;
|
||||
m.step('a').from('1.0.0').to('2.0.0').up(async () => { stepCalled = true; });
|
||||
|
||||
const r = await m.run();
|
||||
expect(stepCalled).toBeFalse();
|
||||
expect(r.wasFreshInstall).toBeTrue();
|
||||
expect(r.stepsSkipped).toHaveLength(0);
|
||||
expect(r.currentVersionBefore).toBeNull();
|
||||
expect(r.currentVersionAfter).toEqual('2.0.0');
|
||||
expect(await m.getCurrentVersion()).toBeNull();
|
||||
});
|
||||
|
||||
tap.test('plan(): does not use freshInstallVersion when user data already exists', async () => {
|
||||
await db.mongoDb.collection('dryrun_preexisting_users').insertOne({ id: 'user-1' });
|
||||
|
||||
const m = new SmartMigration({
|
||||
targetVersion: '2.0.0',
|
||||
db,
|
||||
ledgerName: 'plan_preexisting',
|
||||
freshInstallVersion: '2.0.0',
|
||||
});
|
||||
let stepCalled = false;
|
||||
m.step('a').from('1.0.0').to('2.0.0').up(async () => { stepCalled = true; });
|
||||
|
||||
const r = await m.plan();
|
||||
expect(stepCalled).toBeFalse();
|
||||
expect(r.wasFreshInstall).toBeFalse();
|
||||
expect(r.stepsSkipped).toHaveLength(1);
|
||||
expect(r.stepsSkipped[0].id).toEqual('a');
|
||||
expect(r.currentVersionAfter).toEqual('2.0.0');
|
||||
expect(await m.getCurrentVersion()).toBeNull();
|
||||
});
|
||||
|
||||
tap.test('cleanup: close shared db', async () => {
|
||||
await cleanup();
|
||||
});
|
||||
|
||||
@@ -74,6 +74,26 @@ tap.test('MongoLedger: expired lock can be stolen', async () => {
|
||||
expect(got2).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('MongoLedger: holder can renew lock before expiry', async () => {
|
||||
const ledger = new MongoLedger(db, 'renew-test');
|
||||
await ledger.init();
|
||||
|
||||
const got = await ledger.acquireLock('renew-holder', 20);
|
||||
expect(got).toBeTrue();
|
||||
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
const renewed = await ledger.renewLock('renew-holder', 40);
|
||||
expect(renewed).toBeTrue();
|
||||
|
||||
await new Promise((r) => setTimeout(r, 25));
|
||||
const stolenEarly = await ledger.acquireLock('other-holder', 40);
|
||||
expect(stolenEarly).toBeFalse();
|
||||
|
||||
await new Promise((r) => setTimeout(r, 25));
|
||||
const stolenLate = await ledger.acquireLock('other-holder', 40);
|
||||
expect(stolenLate).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('cleanup: close shared db', async () => {
|
||||
await cleanup();
|
||||
});
|
||||
|
||||
@@ -65,6 +65,24 @@ tap.test('checkpoint: ctx.checkpoint is undefined for non-resumable steps', asyn
|
||||
expect(observed).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('checkpoint: successful resumable step clears stored progress', async () => {
|
||||
const m1 = new SmartMigration({ targetVersion: '1.1.0', db, ledgerName: 'checkpoint_cleanup' });
|
||||
m1.step('big-job').from('1.0.0').to('1.1.0').resumable().up(async (ctx) => {
|
||||
await ctx.checkpoint!.write('processed', 5);
|
||||
});
|
||||
await m1.run();
|
||||
|
||||
const m2 = new SmartMigration({ targetVersion: '1.2.0', db, ledgerName: 'checkpoint_cleanup' });
|
||||
let resumedFrom: number | undefined;
|
||||
m2
|
||||
.step('big-job').from('1.1.0').to('1.2.0').resumable().up(async (ctx) => {
|
||||
resumedFrom = await ctx.checkpoint!.read<number>('processed');
|
||||
});
|
||||
|
||||
await m2.run();
|
||||
expect(resumedFrom).toBeUndefined();
|
||||
});
|
||||
|
||||
tap.test('cleanup: close shared db', async () => {
|
||||
await cleanup();
|
||||
});
|
||||
|
||||
@@ -117,6 +117,42 @@ tap.test('run: ctx.startSession works inside a step', async () => {
|
||||
expect(sessionWasUsed).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('run: lock heartbeat prevents concurrent execution of a slow step', async () => {
|
||||
let activeSteps = 0;
|
||||
let maxActiveSteps = 0;
|
||||
let totalCalls = 0;
|
||||
|
||||
const createRunner = () => {
|
||||
const m = new SmartMigration({
|
||||
targetVersion: '1.1.0',
|
||||
db,
|
||||
ledgerName: 'lock_heartbeat',
|
||||
lockTtlMs: 100,
|
||||
lockWaitMs: 1_500,
|
||||
});
|
||||
|
||||
m.step('slow-step').from('1.0.0').to('1.1.0').up(async () => {
|
||||
totalCalls++;
|
||||
activeSteps++;
|
||||
maxActiveSteps = Math.max(maxActiveSteps, activeSteps);
|
||||
await new Promise((resolve) => setTimeout(resolve, 900));
|
||||
activeSteps--;
|
||||
});
|
||||
|
||||
return m;
|
||||
};
|
||||
|
||||
const firstRun = createRunner().run();
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
const secondRun = createRunner().run();
|
||||
|
||||
const [firstResult, secondResult] = await Promise.all([firstRun, secondRun]);
|
||||
expect(firstResult.stepsApplied).toHaveLength(1);
|
||||
expect(secondResult.wasUpToDate).toBeTrue();
|
||||
expect(totalCalls).toEqual(1);
|
||||
expect(maxActiveSteps).toEqual(1);
|
||||
});
|
||||
|
||||
tap.test('cleanup: close shared db', async () => {
|
||||
await cleanup();
|
||||
});
|
||||
|
||||
+27
-1
@@ -1,5 +1,5 @@
|
||||
import { tap, expect } from '@git.zone/tstest/tapbundle';
|
||||
import { makeTestBucket } from './helpers/services.js';
|
||||
import { makeIsolatedTestBucket, makeTestBucket } from './helpers/services.js';
|
||||
import type * as smartbucket from '@push.rocks/smartbucket';
|
||||
import { SmartMigration } from '../ts/index.js';
|
||||
|
||||
@@ -94,6 +94,32 @@ tap.test('s3 ctx exposes bucket and s3 client', async () => {
|
||||
expect(observed!.hasS3).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('s3 dryRun: freshInstallVersion skips steps without creating a sidecar', async () => {
|
||||
const isolated = await makeIsolatedTestBucket('fresh-plan');
|
||||
|
||||
try {
|
||||
const ledgerName = 'fresh-plan';
|
||||
const m = new SmartMigration({
|
||||
targetVersion: '2.0.0',
|
||||
bucket: isolated.bucket,
|
||||
ledgerName,
|
||||
freshInstallVersion: '2.0.0',
|
||||
dryRun: true,
|
||||
});
|
||||
let stepCalled = false;
|
||||
m.step('move-uploads').from('1.0.0').to('2.0.0').up(async () => { stepCalled = true; });
|
||||
|
||||
const r = await m.run();
|
||||
expect(stepCalled).toBeFalse();
|
||||
expect(r.wasFreshInstall).toBeTrue();
|
||||
expect(r.stepsSkipped).toHaveLength(0);
|
||||
expect(r.currentVersionAfter).toEqual('2.0.0');
|
||||
expect(await isolated.bucket.fastExists({ path: `.smartmigration/${ledgerName}.json` })).toBeFalse();
|
||||
} finally {
|
||||
await isolated.cleanup();
|
||||
}
|
||||
});
|
||||
|
||||
tap.test('cleanup: wipe smartmigration sidecars', async () => {
|
||||
for await (const key of bucket.listAllObjects('.smartmigration/')) {
|
||||
await bucket.fastRemove({ path: key });
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartmigration',
|
||||
version: '1.2.0',
|
||||
version: '1.3.0',
|
||||
description: 'Unified migration runner for MongoDB (smartdata) and S3 (smartbucket) — designed to be invoked at SaaS app startup, with semver-based version tracking, sequential step execution, idempotent re-runs, and per-step resumable checkpoints.'
|
||||
}
|
||||
|
||||
@@ -102,8 +102,5 @@ function extractS3Client(
|
||||
bucket: plugins.smartbucket.Bucket | undefined,
|
||||
): plugins.TRawS3Client | undefined {
|
||||
if (!bucket) return undefined;
|
||||
// `getStorageClient` is typed as the actual S3Client, but it lives behind
|
||||
// an optional peer dep type so we cast through unknown to keep this file
|
||||
// compiling when smartbucket isn't installed by the consumer.
|
||||
return (bucket as any).getStorageClient() as plugins.TRawS3Client | undefined;
|
||||
return bucket.getStorageClient();
|
||||
}
|
||||
|
||||
+246
-82
@@ -22,6 +22,22 @@ const DEFAULT_LEDGER_NAME = 'smartmigration';
|
||||
const DEFAULT_LOCK_WAIT_MS = 60_000;
|
||||
const DEFAULT_LOCK_TTL_MS = 600_000;
|
||||
const LOCK_POLL_INTERVAL_MS = 500;
|
||||
const MIN_LOCK_HEARTBEAT_MS = 1;
|
||||
|
||||
interface IResolvedLedgerState {
|
||||
currentVersionBefore: string | null;
|
||||
effectiveCurrentVersion: string;
|
||||
bootstrapVersionToPersist: string | null;
|
||||
bootstrapMode: 'none' | 'fresh-install-version' | 'chain-start' | 'target-without-steps';
|
||||
wasFreshInstall: boolean;
|
||||
plannedSteps: IMigrationStepDefinition[];
|
||||
predictedCurrentVersionAfter: string;
|
||||
}
|
||||
|
||||
interface ILockHeartbeat {
|
||||
getError(): SmartMigrationError | null;
|
||||
stop(): Promise<void>;
|
||||
}
|
||||
|
||||
/**
|
||||
* SmartMigration — the runner. See readme.md for the full API.
|
||||
@@ -55,6 +71,10 @@ export class SmartMigration {
|
||||
}
|
||||
VersionResolver.assertValid(options.targetVersion, 'options.targetVersion');
|
||||
|
||||
if (options.freshInstallVersion !== undefined) {
|
||||
VersionResolver.assertValid(options.freshInstallVersion, 'options.freshInstallVersion');
|
||||
}
|
||||
|
||||
if (!options.db && !options.bucket) {
|
||||
throw new SmartMigrationError(
|
||||
'NO_RESOURCES',
|
||||
@@ -78,6 +98,28 @@ export class SmartMigration {
|
||||
);
|
||||
}
|
||||
|
||||
if (
|
||||
options.ledgerName !== undefined &&
|
||||
(typeof options.ledgerName !== 'string' || options.ledgerName.trim() === '')
|
||||
) {
|
||||
throw new SmartMigrationError(
|
||||
'INVALID_LEDGER_NAME',
|
||||
'ledgerName must be a non-empty string when provided.',
|
||||
);
|
||||
}
|
||||
|
||||
if (options.lockWaitMs !== undefined) {
|
||||
this.assertIntegerOption('INVALID_LOCK_WAIT_MS', 'lockWaitMs', options.lockWaitMs, {
|
||||
min: 0,
|
||||
});
|
||||
}
|
||||
|
||||
if (options.lockTtlMs !== undefined) {
|
||||
this.assertIntegerOption('INVALID_LOCK_TTL_MS', 'lockTtlMs', options.lockTtlMs, {
|
||||
min: 1,
|
||||
});
|
||||
}
|
||||
|
||||
this.settings = {
|
||||
targetVersion: options.targetVersion,
|
||||
db: options.db,
|
||||
@@ -176,70 +218,44 @@ export class SmartMigration {
|
||||
|
||||
const runStart = Date.now();
|
||||
const applied: IMigrationStepResult[] = [];
|
||||
let wasFreshInstall = false;
|
||||
let currentVersionBefore: string | null = null;
|
||||
let lockHeartbeat: ILockHeartbeat | null = null;
|
||||
|
||||
try {
|
||||
lockHeartbeat = this.startLockHeartbeat(ledger);
|
||||
|
||||
// Re-read after acquiring lock (state may have changed while we waited).
|
||||
let data = await ledger.read();
|
||||
currentVersionBefore = data.currentVersion;
|
||||
const resolvedState = await this.resolveLedgerState(data);
|
||||
|
||||
// Resolve initial version.
|
||||
let currentVersion: string;
|
||||
if (data.currentVersion === null) {
|
||||
const fresh = await this.detectFreshInstall();
|
||||
if (fresh && this.settings.freshInstallVersion) {
|
||||
wasFreshInstall = true;
|
||||
currentVersion = this.settings.freshInstallVersion;
|
||||
VersionResolver.assertValid(currentVersion, 'freshInstallVersion');
|
||||
data.currentVersion = currentVersion;
|
||||
await ledger.write(data);
|
||||
this.log.log('info', `smartmigration: fresh install detected, jumping to ${currentVersion}`);
|
||||
} else {
|
||||
if (this.steps.length === 0) {
|
||||
// No steps and no current version — nothing to do.
|
||||
data.currentVersion = this.settings.targetVersion;
|
||||
await ledger.write(data);
|
||||
return {
|
||||
currentVersionBefore: null,
|
||||
currentVersionAfter: this.settings.targetVersion,
|
||||
targetVersion: this.settings.targetVersion,
|
||||
wasUpToDate: false,
|
||||
wasFreshInstall: true,
|
||||
stepsApplied: [],
|
||||
stepsSkipped: [],
|
||||
totalDurationMs: Date.now() - runStart,
|
||||
};
|
||||
}
|
||||
currentVersion = this.steps[0].fromVersion;
|
||||
data.currentVersion = currentVersion;
|
||||
await ledger.write(data);
|
||||
if (resolvedState.bootstrapVersionToPersist !== null) {
|
||||
data.currentVersion = resolvedState.bootstrapVersionToPersist;
|
||||
await ledger.write(data);
|
||||
if (resolvedState.bootstrapMode === 'fresh-install-version') {
|
||||
this.log.log(
|
||||
'info',
|
||||
`smartmigration: fresh install detected, jumping to ${resolvedState.bootstrapVersionToPersist}`,
|
||||
);
|
||||
}
|
||||
} else {
|
||||
currentVersion = data.currentVersion;
|
||||
}
|
||||
|
||||
// Already at target after fresh-install resolution?
|
||||
if (VersionResolver.equals(currentVersion, this.settings.targetVersion)) {
|
||||
this.assertLockHealthy(lockHeartbeat);
|
||||
|
||||
if (resolvedState.plannedSteps.length === 0) {
|
||||
return {
|
||||
currentVersionBefore,
|
||||
currentVersionAfter: currentVersion,
|
||||
currentVersionBefore: resolvedState.currentVersionBefore,
|
||||
currentVersionAfter: resolvedState.predictedCurrentVersionAfter,
|
||||
targetVersion: this.settings.targetVersion,
|
||||
wasUpToDate: true,
|
||||
wasFreshInstall,
|
||||
wasUpToDate: resolvedState.bootstrapMode !== 'target-without-steps',
|
||||
wasFreshInstall: resolvedState.wasFreshInstall,
|
||||
stepsApplied: [],
|
||||
stepsSkipped: [],
|
||||
totalDurationMs: Date.now() - runStart,
|
||||
};
|
||||
}
|
||||
|
||||
const plan = VersionResolver.computePlan(
|
||||
this.steps,
|
||||
currentVersion,
|
||||
this.settings.targetVersion,
|
||||
);
|
||||
let currentVersion = resolvedState.effectiveCurrentVersion;
|
||||
|
||||
for (const step of plan) {
|
||||
for (const step of resolvedState.plannedSteps) {
|
||||
const startedAt = new Date();
|
||||
const stepStart = Date.now();
|
||||
let entry: IMigrationLedgerEntry;
|
||||
@@ -272,6 +288,7 @@ export class SmartMigration {
|
||||
log: this.log,
|
||||
});
|
||||
await step.handler(ctx);
|
||||
this.assertLockHealthy(lockHeartbeat, { stepId: step.id });
|
||||
|
||||
const finishedAt = new Date();
|
||||
const durationMs = Date.now() - stepStart;
|
||||
@@ -288,6 +305,7 @@ export class SmartMigration {
|
||||
data = await ledger.read();
|
||||
data.steps[step.id] = entry;
|
||||
data.currentVersion = step.toVersion;
|
||||
delete data.checkpoints[step.id];
|
||||
await ledger.write(data);
|
||||
// Advance the running cursor used by skip-forward detection.
|
||||
currentVersion = step.toVersion;
|
||||
@@ -300,6 +318,15 @@ export class SmartMigration {
|
||||
const finishedAt = new Date();
|
||||
const durationMs = Date.now() - stepStart;
|
||||
const error = err as Error;
|
||||
const lockError = this.getLockHealthError(lockHeartbeat, {
|
||||
stepId: step.id,
|
||||
originalError: error.message,
|
||||
stack: error.stack,
|
||||
});
|
||||
if (lockError) {
|
||||
throw lockError;
|
||||
}
|
||||
|
||||
entry = {
|
||||
id: step.id,
|
||||
fromVersion: step.fromVersion,
|
||||
@@ -330,18 +357,23 @@ export class SmartMigration {
|
||||
}
|
||||
}
|
||||
|
||||
this.assertLockHealthy(lockHeartbeat);
|
||||
|
||||
const finalData = await ledger.read();
|
||||
return {
|
||||
currentVersionBefore,
|
||||
currentVersionBefore: resolvedState.currentVersionBefore,
|
||||
currentVersionAfter: finalData.currentVersion ?? this.settings.targetVersion,
|
||||
targetVersion: this.settings.targetVersion,
|
||||
wasUpToDate: false,
|
||||
wasFreshInstall,
|
||||
wasFreshInstall: resolvedState.wasFreshInstall,
|
||||
stepsApplied: applied,
|
||||
stepsSkipped: [],
|
||||
totalDurationMs: Date.now() - runStart,
|
||||
};
|
||||
} finally {
|
||||
if (lockHeartbeat) {
|
||||
await lockHeartbeat.stop();
|
||||
}
|
||||
await ledger.releaseLock(this.instanceId).catch((err) => {
|
||||
this.log.log(
|
||||
'warn',
|
||||
@@ -355,31 +387,10 @@ export class SmartMigration {
|
||||
* Resolve the plan against the current ledger state without acquiring a
|
||||
* lock or executing anything. Used by `plan()` and `dryRun: true`.
|
||||
*/
|
||||
private computeResultWithoutRun(data: ISmartMigrationLedgerData): IMigrationRunResult {
|
||||
const currentVersion =
|
||||
data.currentVersion ??
|
||||
(this.steps.length > 0 ? this.steps[0].fromVersion : this.settings.targetVersion);
|
||||
private async computeResultWithoutRun(data: ISmartMigrationLedgerData): Promise<IMigrationRunResult> {
|
||||
const resolvedState = await this.resolveLedgerState(data);
|
||||
|
||||
if (VersionResolver.equals(currentVersion, this.settings.targetVersion)) {
|
||||
return {
|
||||
currentVersionBefore: data.currentVersion,
|
||||
currentVersionAfter: currentVersion,
|
||||
targetVersion: this.settings.targetVersion,
|
||||
wasUpToDate: true,
|
||||
wasFreshInstall: false,
|
||||
stepsApplied: [],
|
||||
stepsSkipped: [],
|
||||
totalDurationMs: 0,
|
||||
};
|
||||
}
|
||||
|
||||
const plan = VersionResolver.computePlan(
|
||||
this.steps,
|
||||
currentVersion,
|
||||
this.settings.targetVersion,
|
||||
);
|
||||
|
||||
const skipped: IMigrationStepResult[] = plan.map((step) => ({
|
||||
const skipped: IMigrationStepResult[] = resolvedState.plannedSteps.map((step) => ({
|
||||
id: step.id,
|
||||
fromVersion: step.fromVersion,
|
||||
toVersion: step.toVersion,
|
||||
@@ -390,17 +401,73 @@ export class SmartMigration {
|
||||
}));
|
||||
|
||||
return {
|
||||
currentVersionBefore: data.currentVersion,
|
||||
currentVersionAfter: currentVersion,
|
||||
currentVersionBefore: resolvedState.currentVersionBefore,
|
||||
currentVersionAfter: resolvedState.predictedCurrentVersionAfter,
|
||||
targetVersion: this.settings.targetVersion,
|
||||
wasUpToDate: false,
|
||||
wasFreshInstall: false,
|
||||
wasUpToDate:
|
||||
resolvedState.plannedSteps.length === 0 &&
|
||||
resolvedState.bootstrapMode !== 'target-without-steps',
|
||||
wasFreshInstall: resolvedState.wasFreshInstall,
|
||||
stepsApplied: [],
|
||||
stepsSkipped: skipped,
|
||||
totalDurationMs: 0,
|
||||
};
|
||||
}
|
||||
|
||||
private async resolveLedgerState(
|
||||
data: ISmartMigrationLedgerData,
|
||||
): Promise<IResolvedLedgerState> {
|
||||
let effectiveCurrentVersion: string;
|
||||
let bootstrapVersionToPersist: string | null = null;
|
||||
let bootstrapMode: IResolvedLedgerState['bootstrapMode'] = 'none';
|
||||
let wasFreshInstall = false;
|
||||
|
||||
if (data.currentVersion === null) {
|
||||
const isFreshInstall = await this.detectFreshInstall();
|
||||
if (isFreshInstall && this.settings.freshInstallVersion) {
|
||||
effectiveCurrentVersion = this.settings.freshInstallVersion;
|
||||
bootstrapVersionToPersist = effectiveCurrentVersion;
|
||||
bootstrapMode = 'fresh-install-version';
|
||||
wasFreshInstall = true;
|
||||
} else if (this.steps.length === 0) {
|
||||
effectiveCurrentVersion = this.settings.targetVersion;
|
||||
bootstrapVersionToPersist = effectiveCurrentVersion;
|
||||
bootstrapMode = 'target-without-steps';
|
||||
wasFreshInstall = isFreshInstall;
|
||||
} else {
|
||||
effectiveCurrentVersion = this.steps[0].fromVersion;
|
||||
bootstrapVersionToPersist = effectiveCurrentVersion;
|
||||
bootstrapMode = 'chain-start';
|
||||
}
|
||||
} else {
|
||||
effectiveCurrentVersion = data.currentVersion;
|
||||
}
|
||||
|
||||
const plannedSteps = VersionResolver.equals(
|
||||
effectiveCurrentVersion,
|
||||
this.settings.targetVersion,
|
||||
)
|
||||
? []
|
||||
: VersionResolver.computePlan(
|
||||
this.steps,
|
||||
effectiveCurrentVersion,
|
||||
this.settings.targetVersion,
|
||||
);
|
||||
|
||||
return {
|
||||
currentVersionBefore: data.currentVersion,
|
||||
effectiveCurrentVersion,
|
||||
bootstrapVersionToPersist,
|
||||
bootstrapMode,
|
||||
wasFreshInstall,
|
||||
plannedSteps,
|
||||
predictedCurrentVersionAfter:
|
||||
plannedSteps.length > 0
|
||||
? plannedSteps[plannedSteps.length - 1].toVersion
|
||||
: effectiveCurrentVersion,
|
||||
};
|
||||
}
|
||||
|
||||
private async ensureLedger(): Promise<Ledger> {
|
||||
if (this.ledger) return this.ledger;
|
||||
const ledgerName = this.settings.ledgerName;
|
||||
@@ -424,6 +491,60 @@ export class SmartMigration {
|
||||
return false;
|
||||
}
|
||||
|
||||
private startLockHeartbeat(ledger: Ledger): ILockHeartbeat {
|
||||
const intervalMs = this.getLockHeartbeatMs();
|
||||
let stopped = false;
|
||||
let resolveStop!: () => void;
|
||||
let lockError: SmartMigrationError | null = null;
|
||||
|
||||
const stopSignal = new Promise<void>((resolve) => {
|
||||
resolveStop = resolve;
|
||||
});
|
||||
|
||||
const loopPromise = (async () => {
|
||||
while (!stopped) {
|
||||
await Promise.race([this.sleep(intervalMs), stopSignal]);
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const renewed = await ledger.renewLock(this.instanceId, this.settings.lockTtlMs);
|
||||
if (!renewed) {
|
||||
lockError = new SmartMigrationError(
|
||||
'LOCK_LOST',
|
||||
'Lost the migration lock while running steps. Another instance may have taken over.',
|
||||
{ holderId: this.instanceId },
|
||||
);
|
||||
stopped = true;
|
||||
}
|
||||
} catch (err) {
|
||||
const error = err as Error;
|
||||
lockError = new SmartMigrationError(
|
||||
'LOCK_LOST',
|
||||
`Failed to renew the migration lock: ${error.message}`,
|
||||
{ holderId: this.instanceId, originalError: error.message },
|
||||
);
|
||||
stopped = true;
|
||||
}
|
||||
}
|
||||
})();
|
||||
|
||||
return {
|
||||
getError: () => lockError,
|
||||
stop: async () => {
|
||||
if (stopped) {
|
||||
await loopPromise;
|
||||
return;
|
||||
}
|
||||
|
||||
stopped = true;
|
||||
resolveStop();
|
||||
await loopPromise;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Heuristic fresh-install detector. Returns true when neither mongo nor S3
|
||||
* contain anything besides smartmigration's own ledger artifacts.
|
||||
@@ -439,11 +560,10 @@ export class SmartMigration {
|
||||
if (userCollections.length > 0) return false;
|
||||
}
|
||||
if (this.settings.bucket) {
|
||||
const cursor = (this.settings.bucket as any).createCursor('', { pageSize: 5 });
|
||||
const batch = (await cursor.next()) as string[] | undefined;
|
||||
if (batch && batch.length > 0) {
|
||||
const userKeys = batch.filter((k) => !k.startsWith('.smartmigration/'));
|
||||
if (userKeys.length > 0) return false;
|
||||
for await (const key of this.settings.bucket.listAllObjects('')) {
|
||||
if (!key.startsWith('.smartmigration/')) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
@@ -454,6 +574,50 @@ export class SmartMigration {
|
||||
return name === 'SmartdataEasyStore' || name.startsWith('system.');
|
||||
}
|
||||
|
||||
private getLockHeartbeatMs(): number {
|
||||
return Math.max(MIN_LOCK_HEARTBEAT_MS, Math.floor(this.settings.lockTtlMs / 3));
|
||||
}
|
||||
|
||||
private getLockHealthError(
|
||||
lockHeartbeat: ILockHeartbeat | null,
|
||||
details?: Record<string, unknown>,
|
||||
): SmartMigrationError | null {
|
||||
const lockError = lockHeartbeat?.getError();
|
||||
if (!lockError) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return new SmartMigrationError(lockError.code, lockError.message, {
|
||||
...lockError.details,
|
||||
...details,
|
||||
});
|
||||
}
|
||||
|
||||
private assertLockHealthy(
|
||||
lockHeartbeat: ILockHeartbeat | null,
|
||||
details?: Record<string, unknown>,
|
||||
): void {
|
||||
const lockError = this.getLockHealthError(lockHeartbeat, details);
|
||||
if (lockError) {
|
||||
throw lockError;
|
||||
}
|
||||
}
|
||||
|
||||
private assertIntegerOption(
|
||||
code: string,
|
||||
optionName: string,
|
||||
value: number,
|
||||
constraints: { min: number },
|
||||
): void {
|
||||
if (!Number.isInteger(value) || value < constraints.min) {
|
||||
throw new SmartMigrationError(
|
||||
code,
|
||||
`${optionName} must be an integer >= ${constraints.min}.`,
|
||||
{ [optionName]: value },
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ export abstract class Ledger {
|
||||
public abstract read(): Promise<ISmartMigrationLedgerData>;
|
||||
public abstract write(data: ISmartMigrationLedgerData): Promise<void>;
|
||||
public abstract acquireLock(holderId: string, ttlMs: number): Promise<boolean>;
|
||||
public abstract renewLock(holderId: string, ttlMs: number): Promise<boolean>;
|
||||
public abstract releaseLock(holderId: string): Promise<void>;
|
||||
public abstract close(): Promise<void>;
|
||||
}
|
||||
|
||||
@@ -2,16 +2,20 @@ import type * as plugins from '../plugins.js';
|
||||
import type { ISmartMigrationLedgerData } from '../interfaces.js';
|
||||
import { Ledger, emptyLedgerData } from './classes.ledger.js';
|
||||
|
||||
interface IMongoLedgerDocument {
|
||||
nameId: string;
|
||||
data: ISmartMigrationLedgerData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mongo-backed ledger that persists `ISmartMigrationLedgerData` as a single
|
||||
* document via smartdata's `EasyStore`. The EasyStore's nameId is
|
||||
* `smartmigration:<ledgerName>`, scoping multiple migration ledgers in the
|
||||
* same database.
|
||||
* document in smartdata's `SmartdataEasyStore` collection, keyed by
|
||||
* `smartmigration:<ledgerName>`.
|
||||
*/
|
||||
export class MongoLedger extends Ledger {
|
||||
private db: plugins.smartdata.SmartdataDb;
|
||||
private ledgerName: string;
|
||||
private easyStore: any | null = null; // EasyStore<ISmartMigrationLedgerData> — typed loosely because the peer type may not be present at compile time
|
||||
private indexReadyPromise: Promise<void> | null = null;
|
||||
|
||||
constructor(db: plugins.smartdata.SmartdataDb, ledgerName: string) {
|
||||
super();
|
||||
@@ -20,87 +24,146 @@ export class MongoLedger extends Ledger {
|
||||
}
|
||||
|
||||
public async init(): Promise<void> {
|
||||
this.easyStore = await this.db.createEasyStore(`smartmigration:${this.ledgerName}`);
|
||||
// EasyStore creates an empty `data: {}` on first read. Hydrate it to the
|
||||
// canonical empty shape so subsequent reads always return all fields.
|
||||
const existing = (await this.easyStore.readAll()) as Partial<ISmartMigrationLedgerData>;
|
||||
if (
|
||||
existing.currentVersion === undefined ||
|
||||
existing.steps === undefined ||
|
||||
existing.lock === undefined ||
|
||||
existing.checkpoints === undefined
|
||||
) {
|
||||
await this.easyStore.writeAll(emptyLedgerData());
|
||||
}
|
||||
// Intentionally empty. The backing collection and unique index are created
|
||||
// lazily on first write so `plan()` / `getCurrentVersion()` stay read-only.
|
||||
}
|
||||
|
||||
public async read(): Promise<ISmartMigrationLedgerData> {
|
||||
if (!this.easyStore) {
|
||||
throw new Error('MongoLedger.read() called before init()');
|
||||
}
|
||||
const data = (await this.easyStore.readAll()) as ISmartMigrationLedgerData;
|
||||
return this.normalize(data);
|
||||
const document = await this.getCollection().findOne({ nameId: this.getNameId() });
|
||||
return this.normalize(document?.data);
|
||||
}
|
||||
|
||||
public async write(data: ISmartMigrationLedgerData): Promise<void> {
|
||||
if (!this.easyStore) {
|
||||
throw new Error('MongoLedger.write() called before init()');
|
||||
}
|
||||
// Use EasyStore.replace (added in @push.rocks/smartdata 7.1.7) for true
|
||||
// overwrite semantics. This lets us actually delete keys from
|
||||
// checkpoints / steps when the in-memory ledger drops them — writeAll
|
||||
// would merge and silently retain them.
|
||||
await this.easyStore.replace(data);
|
||||
await this.ensureCollectionReady();
|
||||
await this.getCollection().updateOne(
|
||||
{ nameId: this.getNameId() },
|
||||
{
|
||||
$set: {
|
||||
nameId: this.getNameId(),
|
||||
data,
|
||||
},
|
||||
},
|
||||
{ upsert: true },
|
||||
);
|
||||
}
|
||||
|
||||
public async acquireLock(holderId: string, ttlMs: number): Promise<boolean> {
|
||||
const data = await this.read();
|
||||
await this.ensureDocumentExists();
|
||||
|
||||
const now = new Date();
|
||||
const lockHeld = data.lock.holder !== null;
|
||||
const lockExpired =
|
||||
data.lock.expiresAt !== null && new Date(data.lock.expiresAt).getTime() < now.getTime();
|
||||
const nowIso = now.toISOString();
|
||||
const result = await this.getCollection().findOneAndUpdate(
|
||||
{
|
||||
nameId: this.getNameId(),
|
||||
$or: [
|
||||
{ 'data.lock.holder': null },
|
||||
{ 'data.lock.holder': { $exists: false } },
|
||||
{ 'data.lock.expiresAt': null },
|
||||
{ 'data.lock.expiresAt': { $exists: false } },
|
||||
{ 'data.lock.expiresAt': { $lt: nowIso } },
|
||||
],
|
||||
},
|
||||
{
|
||||
$set: {
|
||||
'data.lock': {
|
||||
holder: holderId,
|
||||
acquiredAt: nowIso,
|
||||
expiresAt: new Date(now.getTime() + ttlMs).toISOString(),
|
||||
},
|
||||
},
|
||||
},
|
||||
{ returnDocument: 'after' },
|
||||
);
|
||||
|
||||
if (lockHeld && !lockExpired) {
|
||||
return false;
|
||||
}
|
||||
return result !== null;
|
||||
}
|
||||
|
||||
const expiresAt = new Date(now.getTime() + ttlMs);
|
||||
data.lock = {
|
||||
holder: holderId,
|
||||
acquiredAt: now.toISOString(),
|
||||
expiresAt: expiresAt.toISOString(),
|
||||
};
|
||||
await this.write(data);
|
||||
public async renewLock(holderId: string, ttlMs: number): Promise<boolean> {
|
||||
await this.ensureCollectionReady();
|
||||
|
||||
// Re-read to confirm we won the race. EasyStore is last-writer-wins so
|
||||
// this is a probabilistic CAS, not a true atomic CAS — adequate for v1.
|
||||
const verify = await this.read();
|
||||
return verify.lock.holder === holderId;
|
||||
const now = new Date();
|
||||
const nowIso = now.toISOString();
|
||||
const result = await this.getCollection().updateOne(
|
||||
{
|
||||
nameId: this.getNameId(),
|
||||
'data.lock.holder': holderId,
|
||||
'data.lock.expiresAt': { $gte: nowIso },
|
||||
},
|
||||
{
|
||||
$set: {
|
||||
'data.lock.expiresAt': new Date(now.getTime() + ttlMs).toISOString(),
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
return result.modifiedCount === 1;
|
||||
}
|
||||
|
||||
public async releaseLock(holderId: string): Promise<void> {
|
||||
const data = await this.read();
|
||||
if (data.lock.holder !== holderId) {
|
||||
// Lock was stolen or never held — nothing to release.
|
||||
return;
|
||||
}
|
||||
data.lock = { holder: null, acquiredAt: null, expiresAt: null };
|
||||
await this.write(data);
|
||||
await this.ensureCollectionReady();
|
||||
|
||||
await this.getCollection().updateOne(
|
||||
{
|
||||
nameId: this.getNameId(),
|
||||
'data.lock.holder': holderId,
|
||||
},
|
||||
{
|
||||
$set: {
|
||||
'data.lock': {
|
||||
holder: null,
|
||||
acquiredAt: null,
|
||||
expiresAt: null,
|
||||
},
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
public async close(): Promise<void> {
|
||||
// EasyStore has no explicit close — it just dereferences when the parent
|
||||
// SmartdataDb closes.
|
||||
this.easyStore = null;
|
||||
// No per-ledger resources to release. The parent SmartdataDb owns the
|
||||
// Mongo connection lifecycle.
|
||||
}
|
||||
|
||||
/** Fill in any missing top-level fields with their defaults. */
|
||||
private normalize(data: Partial<ISmartMigrationLedgerData>): ISmartMigrationLedgerData {
|
||||
private normalize(data: Partial<ISmartMigrationLedgerData> | undefined): ISmartMigrationLedgerData {
|
||||
return {
|
||||
currentVersion: data.currentVersion ?? null,
|
||||
steps: data.steps ?? {},
|
||||
lock: data.lock ?? { holder: null, acquiredAt: null, expiresAt: null },
|
||||
checkpoints: data.checkpoints ?? {},
|
||||
currentVersion: data?.currentVersion ?? null,
|
||||
steps: data?.steps ?? {},
|
||||
lock: data?.lock ?? { holder: null, acquiredAt: null, expiresAt: null },
|
||||
checkpoints: data?.checkpoints ?? {},
|
||||
};
|
||||
}
|
||||
|
||||
private getCollection() {
|
||||
return this.db.mongoDb.collection<IMongoLedgerDocument>('SmartdataEasyStore');
|
||||
}
|
||||
|
||||
private getNameId(): string {
|
||||
return `smartmigration:${this.ledgerName}`;
|
||||
}
|
||||
|
||||
private async ensureDocumentExists(): Promise<void> {
|
||||
await this.ensureCollectionReady();
|
||||
|
||||
await this.getCollection().updateOne(
|
||||
{ nameId: this.getNameId() },
|
||||
{
|
||||
$setOnInsert: {
|
||||
nameId: this.getNameId(),
|
||||
data: emptyLedgerData(),
|
||||
},
|
||||
},
|
||||
{ upsert: true },
|
||||
);
|
||||
}
|
||||
|
||||
private async ensureCollectionReady(): Promise<void> {
|
||||
if (!this.indexReadyPromise) {
|
||||
this.indexReadyPromise = this.getCollection()
|
||||
.createIndex({ nameId: 1 }, { unique: true })
|
||||
.then(() => undefined);
|
||||
}
|
||||
|
||||
await this.indexReadyPromise;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,21 +22,24 @@ export class S3Ledger extends Ledger {
|
||||
}
|
||||
|
||||
public async init(): Promise<void> {
|
||||
const exists = await (this.bucket as any).fastExists({ path: this.path });
|
||||
if (!exists) {
|
||||
await this.write(emptyLedgerData());
|
||||
}
|
||||
// Intentionally empty. The sidecar object is created lazily on first write
|
||||
// so `plan()` / `dryRun` stay read-only for S3-backed ledgers.
|
||||
}
|
||||
|
||||
public async read(): Promise<ISmartMigrationLedgerData> {
|
||||
const buffer = await (this.bucket as any).fastGet({ path: this.path });
|
||||
const exists = await this.bucket.fastExists({ path: this.path });
|
||||
if (!exists) {
|
||||
return emptyLedgerData();
|
||||
}
|
||||
|
||||
const buffer = await this.bucket.fastGet({ path: this.path });
|
||||
const data = JSON.parse(buffer.toString('utf8')) as Partial<ISmartMigrationLedgerData>;
|
||||
return this.normalize(data);
|
||||
}
|
||||
|
||||
public async write(data: ISmartMigrationLedgerData): Promise<void> {
|
||||
const json = JSON.stringify(data, null, 2);
|
||||
await (this.bucket as any).fastPut({
|
||||
await this.bucket.fastPut({
|
||||
path: this.path,
|
||||
contents: json,
|
||||
overwrite: true,
|
||||
@@ -67,6 +70,24 @@ export class S3Ledger extends Ledger {
|
||||
return verify.lock.holder === holderId;
|
||||
}
|
||||
|
||||
public async renewLock(holderId: string, ttlMs: number): Promise<boolean> {
|
||||
const data = await this.read();
|
||||
if (data.lock.holder !== holderId) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const now = new Date();
|
||||
data.lock = {
|
||||
holder: holderId,
|
||||
acquiredAt: data.lock.acquiredAt ?? now.toISOString(),
|
||||
expiresAt: new Date(now.getTime() + ttlMs).toISOString(),
|
||||
};
|
||||
await this.write(data);
|
||||
|
||||
const verify = await this.read();
|
||||
return verify.lock.holder === holderId;
|
||||
}
|
||||
|
||||
public async releaseLock(holderId: string): Promise<void> {
|
||||
const data = await this.read();
|
||||
if (data.lock.holder !== holderId) {
|
||||
|
||||
+2
-1
@@ -19,7 +19,8 @@ export type { smartdata, smartbucket };
|
||||
// Driver types are derived from the smartdata / smartbucket public surface
|
||||
// to avoid having to declare `mongodb` or `@aws-sdk/client-s3` as direct
|
||||
// dependencies. These aliases are exported for use in the public interfaces.
|
||||
export type TEasyStore<T> = smartdata.EasyStore<T>;
|
||||
export type TRawMongoDb = smartdata.SmartdataDb['mongoDb'];
|
||||
export type TRawMongoClient = smartdata.SmartdataDb['mongoDbClient'];
|
||||
export type TMongoClientSession = ReturnType<smartdata.SmartdataDb['startSession']>;
|
||||
export type TRawS3Client = smartbucket.SmartBucket['storageClient'];
|
||||
export type TRawS3Client = ReturnType<smartbucket.Bucket['getStorageClient']>;
|
||||
|
||||
Reference in New Issue
Block a user