Files
smartdb/ts/ts_migration/migrators/v0_to_v1.ts

254 lines
8.2 KiB
TypeScript

import * as fs from 'fs';
import * as path from 'path';
import * as crypto from 'crypto';
import { BSON } from 'bson';
// ---------------------------------------------------------------------------
// Binary format constants (must match Rust: record.rs)
// ---------------------------------------------------------------------------
/** File-level magic: "SMARTDB\0" */
const FILE_MAGIC = Buffer.from('SMARTDB\0', 'ascii');
/** Current format version */
const FORMAT_VERSION = 1;
/** File type tags */
const FILE_TYPE_DATA = 1;
const FILE_TYPE_HINT = 3;
/** File header total size */
const FILE_HEADER_SIZE = 64;
/** Per-record magic */
const RECORD_MAGIC = 0xDB01;
/** Per-record header size */
const RECORD_HEADER_SIZE = 22; // 2 + 8 + 4 + 4 + 4
// ---------------------------------------------------------------------------
// Binary encoding helpers
// ---------------------------------------------------------------------------
function writeFileHeader(fileType: number): Buffer {
const buf = Buffer.alloc(FILE_HEADER_SIZE, 0);
FILE_MAGIC.copy(buf, 0);
buf.writeUInt16LE(FORMAT_VERSION, 8);
buf.writeUInt8(fileType, 10);
buf.writeUInt32LE(0, 11); // flags
const now = BigInt(Date.now());
buf.writeBigUInt64LE(now, 15);
// bytes 23..64 are reserved (zeros)
return buf;
}
function encodeDataRecord(timestamp: bigint, key: Buffer, value: Buffer): Buffer {
const keyLen = key.length;
const valLen = value.length;
const totalSize = RECORD_HEADER_SIZE + keyLen + valLen;
const buf = Buffer.alloc(totalSize);
// Write header fields (without CRC)
buf.writeUInt16LE(RECORD_MAGIC, 0);
buf.writeBigUInt64LE(timestamp, 2);
buf.writeUInt32LE(keyLen, 10);
buf.writeUInt32LE(valLen, 14);
// CRC placeholder at offset 18..22 (will fill below)
key.copy(buf, RECORD_HEADER_SIZE);
value.copy(buf, RECORD_HEADER_SIZE + keyLen);
// CRC32 covers everything except the CRC field itself:
// bytes [0..18] + bytes [22..]
const crc = crc32(Buffer.concat([
buf.subarray(0, 18),
buf.subarray(22),
]));
buf.writeUInt32LE(crc, 18);
return buf;
}
function encodeHintEntry(key: string, offset: bigint, recordLen: number, valueLen: number, timestamp: bigint): Buffer {
const keyBuf = Buffer.from(key, 'utf-8');
const buf = Buffer.alloc(4 + keyBuf.length + 8 + 4 + 4 + 8);
let pos = 0;
buf.writeUInt32LE(keyBuf.length, pos); pos += 4;
keyBuf.copy(buf, pos); pos += keyBuf.length;
buf.writeBigUInt64LE(offset, pos); pos += 8;
buf.writeUInt32LE(recordLen, pos); pos += 4;
buf.writeUInt32LE(valueLen, pos); pos += 4;
buf.writeBigUInt64LE(timestamp, pos);
return buf;
}
// ---------------------------------------------------------------------------
// CRC32 (matching crc32fast in Rust)
// ---------------------------------------------------------------------------
const CRC32_TABLE = (() => {
const table = new Uint32Array(256);
for (let i = 0; i < 256; i++) {
let crc = i;
for (let j = 0; j < 8; j++) {
crc = (crc & 1) ? (0xEDB88320 ^ (crc >>> 1)) : (crc >>> 1);
}
table[i] = crc;
}
return table;
})();
function crc32(data: Buffer): number {
let crc = 0xFFFFFFFF;
for (let i = 0; i < data.length; i++) {
crc = CRC32_TABLE[(crc ^ data[i]) & 0xFF] ^ (crc >>> 8);
}
return (crc ^ 0xFFFFFFFF) >>> 0;
}
// ---------------------------------------------------------------------------
// Migration: v0 (JSON) → v1 (Bitcask binary)
// ---------------------------------------------------------------------------
interface IKeyDirEntry {
offset: bigint;
recordLen: number;
valueLen: number;
timestamp: bigint;
}
/**
* Migrate a storage directory from v0 (JSON-per-collection) to v1 (Bitcask binary).
*
* - Original .json files are NOT modified or deleted.
* - New v1 files are written into {db}/{coll}/ subdirectories.
* - Returns a list of old files that can be safely deleted.
* - On failure, cleans up any partial new files and throws.
*/
export async function migrateV0ToV1(storagePath: string): Promise<string[]> {
const deletableFiles: string[] = [];
const createdDirs: string[] = [];
try {
const dbEntries = fs.readdirSync(storagePath, { withFileTypes: true });
for (const dbEntry of dbEntries) {
if (!dbEntry.isDirectory()) continue;
const dbDir = path.join(storagePath, dbEntry.name);
const collFiles = fs.readdirSync(dbDir, { withFileTypes: true });
for (const collFile of collFiles) {
if (!collFile.isFile()) continue;
if (!collFile.name.endsWith('.json')) continue;
if (collFile.name.endsWith('.indexes.json')) continue;
const collName = collFile.name.replace(/\.json$/, '');
const jsonPath = path.join(dbDir, collFile.name);
const indexJsonPath = path.join(dbDir, `${collName}.indexes.json`);
// Target directory
const collDir = path.join(dbDir, collName);
if (fs.existsSync(collDir)) {
// Already migrated
continue;
}
console.log(`[smartdb] Migrating ${dbEntry.name}.${collName}...`);
// Read the JSON collection
const jsonData = fs.readFileSync(jsonPath, 'utf-8');
const docs: any[] = JSON.parse(jsonData);
// Create collection directory
fs.mkdirSync(collDir, { recursive: true });
createdDirs.push(collDir);
// Write data.rdb
const dataPath = path.join(collDir, 'data.rdb');
const fd = fs.openSync(dataPath, 'w');
try {
// File header
const headerBuf = writeFileHeader(FILE_TYPE_DATA);
fs.writeSync(fd, headerBuf);
let currentOffset = BigInt(FILE_HEADER_SIZE);
const keydir: Map<string, IKeyDirEntry> = new Map();
const ts = BigInt(Date.now());
for (const doc of docs) {
// Extract _id
let idHex: string;
if (doc._id && doc._id.$oid) {
idHex = doc._id.$oid;
} else if (typeof doc._id === 'string') {
idHex = doc._id;
} else if (doc._id) {
idHex = String(doc._id);
} else {
// Generate a new ObjectId
idHex = crypto.randomBytes(12).toString('hex');
doc._id = { $oid: idHex };
}
// Serialize to BSON
const bsonBytes = BSON.serialize(doc);
const keyBuf = Buffer.from(idHex, 'utf-8');
const valueBuf = Buffer.from(bsonBytes);
const record = encodeDataRecord(ts, keyBuf, valueBuf);
fs.writeSync(fd, record);
keydir.set(idHex, {
offset: currentOffset,
recordLen: record.length,
valueLen: valueBuf.length,
timestamp: ts,
});
currentOffset += BigInt(record.length);
}
fs.fsyncSync(fd);
fs.closeSync(fd);
// Write keydir.hint
const hintPath = path.join(collDir, 'keydir.hint');
const hintFd = fs.openSync(hintPath, 'w');
fs.writeSync(hintFd, writeFileHeader(FILE_TYPE_HINT));
for (const [key, entry] of keydir) {
fs.writeSync(hintFd, encodeHintEntry(key, entry.offset, entry.recordLen, entry.valueLen, entry.timestamp));
}
fs.fsyncSync(hintFd);
fs.closeSync(hintFd);
} catch (writeErr) {
// Clean up on write failure
try { fs.closeSync(fd); } catch {}
throw writeErr;
}
// Copy indexes.json if it exists
if (fs.existsSync(indexJsonPath)) {
const destIndexPath = path.join(collDir, 'indexes.json');
fs.copyFileSync(indexJsonPath, destIndexPath);
deletableFiles.push(indexJsonPath);
} else {
// Write default _id index
const destIndexPath = path.join(collDir, 'indexes.json');
fs.writeFileSync(destIndexPath, JSON.stringify([{ name: '_id_', key: { _id: 1 } }], null, 2));
}
deletableFiles.push(jsonPath);
console.log(`[smartdb] Migrated ${dbEntry.name}.${collName}: ${docs.length} documents`);
}
}
} catch (err) {
// Clean up any partially created directories
for (const dir of createdDirs) {
try {
fs.rmSync(dir, { recursive: true, force: true });
} catch {}
}
throw err;
}
return deletableFiles;
}