456 lines
15 KiB
TypeScript
456 lines
15 KiB
TypeScript
import * as plugins from '../plugins.js';
|
|
import type { IStorageAdapter } from './IStorageAdapter.js';
|
|
import type { IStoredDocument, IOpLogEntry, Document } from '../types/interfaces.js';
|
|
|
|
/**
|
|
* In-memory storage adapter for TsmDB
|
|
* Optionally supports persistence to a file
|
|
*/
|
|
export class MemoryStorageAdapter implements IStorageAdapter {
|
|
// Database -> Collection -> Documents
|
|
private databases: Map<string, Map<string, Map<string, IStoredDocument>>> = new Map();
|
|
|
|
// Database -> Collection -> Indexes
|
|
private indexes: Map<string, Map<string, Array<{
|
|
name: string;
|
|
key: Record<string, any>;
|
|
unique?: boolean;
|
|
sparse?: boolean;
|
|
expireAfterSeconds?: number;
|
|
}>>> = new Map();
|
|
|
|
// OpLog entries
|
|
private opLog: IOpLogEntry[] = [];
|
|
private opLogCounter = 0;
|
|
|
|
// Persistence settings
|
|
private persistPath?: string;
|
|
private persistInterval?: ReturnType<typeof setInterval>;
|
|
private fs = new plugins.smartfs.SmartFs(new plugins.smartfs.SmartFsProviderNode());
|
|
|
|
constructor(options?: { persistPath?: string; persistIntervalMs?: number }) {
|
|
this.persistPath = options?.persistPath;
|
|
if (this.persistPath && options?.persistIntervalMs) {
|
|
this.persistInterval = setInterval(() => {
|
|
this.persist().catch(console.error);
|
|
}, options.persistIntervalMs);
|
|
}
|
|
}
|
|
|
|
async initialize(): Promise<void> {
|
|
if (this.persistPath) {
|
|
await this.restore();
|
|
}
|
|
}
|
|
|
|
async close(): Promise<void> {
|
|
if (this.persistInterval) {
|
|
clearInterval(this.persistInterval);
|
|
}
|
|
if (this.persistPath) {
|
|
await this.persist();
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Database Operations
|
|
// ============================================================================
|
|
|
|
async listDatabases(): Promise<string[]> {
|
|
return Array.from(this.databases.keys());
|
|
}
|
|
|
|
async createDatabase(dbName: string): Promise<void> {
|
|
if (!this.databases.has(dbName)) {
|
|
this.databases.set(dbName, new Map());
|
|
this.indexes.set(dbName, new Map());
|
|
}
|
|
}
|
|
|
|
async dropDatabase(dbName: string): Promise<boolean> {
|
|
const existed = this.databases.has(dbName);
|
|
this.databases.delete(dbName);
|
|
this.indexes.delete(dbName);
|
|
return existed;
|
|
}
|
|
|
|
async databaseExists(dbName: string): Promise<boolean> {
|
|
return this.databases.has(dbName);
|
|
}
|
|
|
|
// ============================================================================
|
|
// Collection Operations
|
|
// ============================================================================
|
|
|
|
async listCollections(dbName: string): Promise<string[]> {
|
|
const db = this.databases.get(dbName);
|
|
return db ? Array.from(db.keys()) : [];
|
|
}
|
|
|
|
async createCollection(dbName: string, collName: string): Promise<void> {
|
|
await this.createDatabase(dbName);
|
|
const db = this.databases.get(dbName)!;
|
|
if (!db.has(collName)) {
|
|
db.set(collName, new Map());
|
|
// Initialize default _id index
|
|
const dbIndexes = this.indexes.get(dbName)!;
|
|
dbIndexes.set(collName, [{ name: '_id_', key: { _id: 1 }, unique: true }]);
|
|
}
|
|
}
|
|
|
|
async dropCollection(dbName: string, collName: string): Promise<boolean> {
|
|
const db = this.databases.get(dbName);
|
|
if (!db) return false;
|
|
const existed = db.has(collName);
|
|
db.delete(collName);
|
|
const dbIndexes = this.indexes.get(dbName);
|
|
if (dbIndexes) {
|
|
dbIndexes.delete(collName);
|
|
}
|
|
return existed;
|
|
}
|
|
|
|
async collectionExists(dbName: string, collName: string): Promise<boolean> {
|
|
const db = this.databases.get(dbName);
|
|
return db ? db.has(collName) : false;
|
|
}
|
|
|
|
async renameCollection(dbName: string, oldName: string, newName: string): Promise<void> {
|
|
const db = this.databases.get(dbName);
|
|
if (!db || !db.has(oldName)) {
|
|
throw new Error(`Collection ${oldName} not found`);
|
|
}
|
|
const collection = db.get(oldName)!;
|
|
db.set(newName, collection);
|
|
db.delete(oldName);
|
|
|
|
// Also rename indexes
|
|
const dbIndexes = this.indexes.get(dbName);
|
|
if (dbIndexes && dbIndexes.has(oldName)) {
|
|
const collIndexes = dbIndexes.get(oldName)!;
|
|
dbIndexes.set(newName, collIndexes);
|
|
dbIndexes.delete(oldName);
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Document Operations
|
|
// ============================================================================
|
|
|
|
private getCollection(dbName: string, collName: string): Map<string, IStoredDocument> {
|
|
const db = this.databases.get(dbName);
|
|
if (!db) {
|
|
throw new Error(`Database ${dbName} not found`);
|
|
}
|
|
const collection = db.get(collName);
|
|
if (!collection) {
|
|
throw new Error(`Collection ${collName} not found`);
|
|
}
|
|
return collection;
|
|
}
|
|
|
|
private ensureCollection(dbName: string, collName: string): Map<string, IStoredDocument> {
|
|
if (!this.databases.has(dbName)) {
|
|
this.databases.set(dbName, new Map());
|
|
this.indexes.set(dbName, new Map());
|
|
}
|
|
const db = this.databases.get(dbName)!;
|
|
if (!db.has(collName)) {
|
|
db.set(collName, new Map());
|
|
const dbIndexes = this.indexes.get(dbName)!;
|
|
dbIndexes.set(collName, [{ name: '_id_', key: { _id: 1 }, unique: true }]);
|
|
}
|
|
return db.get(collName)!;
|
|
}
|
|
|
|
async insertOne(dbName: string, collName: string, doc: Document): Promise<IStoredDocument> {
|
|
const collection = this.ensureCollection(dbName, collName);
|
|
const storedDoc: IStoredDocument = {
|
|
...doc,
|
|
_id: doc._id instanceof plugins.bson.ObjectId ? doc._id : new plugins.bson.ObjectId(doc._id),
|
|
};
|
|
|
|
if (!storedDoc._id) {
|
|
storedDoc._id = new plugins.bson.ObjectId();
|
|
}
|
|
|
|
const idStr = storedDoc._id.toHexString();
|
|
if (collection.has(idStr)) {
|
|
throw new Error(`Duplicate key error: _id ${idStr}`);
|
|
}
|
|
|
|
collection.set(idStr, storedDoc);
|
|
return storedDoc;
|
|
}
|
|
|
|
async insertMany(dbName: string, collName: string, docs: Document[]): Promise<IStoredDocument[]> {
|
|
const results: IStoredDocument[] = [];
|
|
for (const doc of docs) {
|
|
results.push(await this.insertOne(dbName, collName, doc));
|
|
}
|
|
return results;
|
|
}
|
|
|
|
async findAll(dbName: string, collName: string): Promise<IStoredDocument[]> {
|
|
const collection = this.ensureCollection(dbName, collName);
|
|
return Array.from(collection.values());
|
|
}
|
|
|
|
async findByIds(dbName: string, collName: string, ids: Set<string>): Promise<IStoredDocument[]> {
|
|
const collection = this.ensureCollection(dbName, collName);
|
|
const results: IStoredDocument[] = [];
|
|
for (const id of ids) {
|
|
const doc = collection.get(id);
|
|
if (doc) {
|
|
results.push(doc);
|
|
}
|
|
}
|
|
return results;
|
|
}
|
|
|
|
async findById(dbName: string, collName: string, id: plugins.bson.ObjectId): Promise<IStoredDocument | null> {
|
|
const collection = this.ensureCollection(dbName, collName);
|
|
return collection.get(id.toHexString()) || null;
|
|
}
|
|
|
|
async updateById(dbName: string, collName: string, id: plugins.bson.ObjectId, doc: IStoredDocument): Promise<boolean> {
|
|
const collection = this.ensureCollection(dbName, collName);
|
|
const idStr = id.toHexString();
|
|
if (!collection.has(idStr)) {
|
|
return false;
|
|
}
|
|
collection.set(idStr, doc);
|
|
return true;
|
|
}
|
|
|
|
async deleteById(dbName: string, collName: string, id: plugins.bson.ObjectId): Promise<boolean> {
|
|
const collection = this.ensureCollection(dbName, collName);
|
|
return collection.delete(id.toHexString());
|
|
}
|
|
|
|
async deleteByIds(dbName: string, collName: string, ids: plugins.bson.ObjectId[]): Promise<number> {
|
|
let count = 0;
|
|
for (const id of ids) {
|
|
if (await this.deleteById(dbName, collName, id)) {
|
|
count++;
|
|
}
|
|
}
|
|
return count;
|
|
}
|
|
|
|
async count(dbName: string, collName: string): Promise<number> {
|
|
const collection = this.ensureCollection(dbName, collName);
|
|
return collection.size;
|
|
}
|
|
|
|
// ============================================================================
|
|
// Index Operations
|
|
// ============================================================================
|
|
|
|
async saveIndex(
|
|
dbName: string,
|
|
collName: string,
|
|
indexName: string,
|
|
indexSpec: { key: Record<string, any>; unique?: boolean; sparse?: boolean; expireAfterSeconds?: number }
|
|
): Promise<void> {
|
|
await this.createCollection(dbName, collName);
|
|
const dbIndexes = this.indexes.get(dbName)!;
|
|
let collIndexes = dbIndexes.get(collName);
|
|
if (!collIndexes) {
|
|
collIndexes = [{ name: '_id_', key: { _id: 1 }, unique: true }];
|
|
dbIndexes.set(collName, collIndexes);
|
|
}
|
|
|
|
// Check if index already exists
|
|
const existingIndex = collIndexes.findIndex(i => i.name === indexName);
|
|
if (existingIndex >= 0) {
|
|
collIndexes[existingIndex] = { name: indexName, ...indexSpec };
|
|
} else {
|
|
collIndexes.push({ name: indexName, ...indexSpec });
|
|
}
|
|
}
|
|
|
|
async getIndexes(dbName: string, collName: string): Promise<Array<{
|
|
name: string;
|
|
key: Record<string, any>;
|
|
unique?: boolean;
|
|
sparse?: boolean;
|
|
expireAfterSeconds?: number;
|
|
}>> {
|
|
const dbIndexes = this.indexes.get(dbName);
|
|
if (!dbIndexes) return [{ name: '_id_', key: { _id: 1 }, unique: true }];
|
|
const collIndexes = dbIndexes.get(collName);
|
|
return collIndexes || [{ name: '_id_', key: { _id: 1 }, unique: true }];
|
|
}
|
|
|
|
async dropIndex(dbName: string, collName: string, indexName: string): Promise<boolean> {
|
|
if (indexName === '_id_') {
|
|
throw new Error('Cannot drop _id index');
|
|
}
|
|
const dbIndexes = this.indexes.get(dbName);
|
|
if (!dbIndexes) return false;
|
|
const collIndexes = dbIndexes.get(collName);
|
|
if (!collIndexes) return false;
|
|
|
|
const idx = collIndexes.findIndex(i => i.name === indexName);
|
|
if (idx >= 0) {
|
|
collIndexes.splice(idx, 1);
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
// ============================================================================
|
|
// OpLog Operations
|
|
// ============================================================================
|
|
|
|
async appendOpLog(entry: IOpLogEntry): Promise<void> {
|
|
this.opLog.push(entry);
|
|
// Trim oplog if it gets too large (keep last 10000 entries)
|
|
if (this.opLog.length > 10000) {
|
|
this.opLog = this.opLog.slice(-10000);
|
|
}
|
|
}
|
|
|
|
async getOpLogAfter(ts: plugins.bson.Timestamp, limit: number = 1000): Promise<IOpLogEntry[]> {
|
|
const tsValue = ts.toNumber();
|
|
const entries = this.opLog.filter(e => e.ts.toNumber() > tsValue);
|
|
return entries.slice(0, limit);
|
|
}
|
|
|
|
async getLatestOpLogTimestamp(): Promise<plugins.bson.Timestamp | null> {
|
|
if (this.opLog.length === 0) return null;
|
|
return this.opLog[this.opLog.length - 1].ts;
|
|
}
|
|
|
|
/**
|
|
* Generate a new timestamp for oplog entries
|
|
*/
|
|
generateTimestamp(): plugins.bson.Timestamp {
|
|
this.opLogCounter++;
|
|
return new plugins.bson.Timestamp({ t: Math.floor(Date.now() / 1000), i: this.opLogCounter });
|
|
}
|
|
|
|
// ============================================================================
|
|
// Transaction Support
|
|
// ============================================================================
|
|
|
|
async createSnapshot(dbName: string, collName: string): Promise<IStoredDocument[]> {
|
|
const docs = await this.findAll(dbName, collName);
|
|
// Deep clone the documents for snapshot isolation
|
|
return docs.map(doc => JSON.parse(JSON.stringify(doc)));
|
|
}
|
|
|
|
async hasConflicts(
|
|
dbName: string,
|
|
collName: string,
|
|
ids: plugins.bson.ObjectId[],
|
|
snapshotTime: plugins.bson.Timestamp
|
|
): Promise<boolean> {
|
|
// Check if any of the given document IDs have been modified after snapshotTime
|
|
const ns = `${dbName}.${collName}`;
|
|
const modifiedIds = new Set<string>();
|
|
|
|
for (const entry of this.opLog) {
|
|
if (entry.ts.greaterThan(snapshotTime) && entry.ns === ns) {
|
|
if (entry.o._id) {
|
|
modifiedIds.add(entry.o._id.toString());
|
|
}
|
|
if (entry.o2?._id) {
|
|
modifiedIds.add(entry.o2._id.toString());
|
|
}
|
|
}
|
|
}
|
|
|
|
for (const id of ids) {
|
|
if (modifiedIds.has(id.toString())) {
|
|
return true;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
// ============================================================================
|
|
// Persistence
|
|
// ============================================================================
|
|
|
|
async persist(): Promise<void> {
|
|
if (!this.persistPath) return;
|
|
|
|
const data = {
|
|
databases: {} as Record<string, Record<string, IStoredDocument[]>>,
|
|
indexes: {} as Record<string, Record<string, any[]>>,
|
|
opLogCounter: this.opLogCounter,
|
|
};
|
|
|
|
for (const [dbName, collections] of this.databases) {
|
|
data.databases[dbName] = {};
|
|
for (const [collName, docs] of collections) {
|
|
data.databases[dbName][collName] = Array.from(docs.values());
|
|
}
|
|
}
|
|
|
|
for (const [dbName, collIndexes] of this.indexes) {
|
|
data.indexes[dbName] = {};
|
|
for (const [collName, indexes] of collIndexes) {
|
|
data.indexes[dbName][collName] = indexes;
|
|
}
|
|
}
|
|
|
|
// Ensure parent directory exists
|
|
const dir = this.persistPath.substring(0, this.persistPath.lastIndexOf('/'));
|
|
if (dir) {
|
|
await this.fs.directory(dir).recursive().create();
|
|
}
|
|
await this.fs.file(this.persistPath).encoding('utf8').write(JSON.stringify(data, null, 2));
|
|
}
|
|
|
|
async restore(): Promise<void> {
|
|
if (!this.persistPath) return;
|
|
|
|
try {
|
|
const exists = await this.fs.file(this.persistPath).exists();
|
|
if (!exists) return;
|
|
|
|
const content = await this.fs.file(this.persistPath).encoding('utf8').read();
|
|
const data = JSON.parse(content as string);
|
|
|
|
this.databases.clear();
|
|
this.indexes.clear();
|
|
|
|
for (const [dbName, collections] of Object.entries(data.databases || {})) {
|
|
const dbMap = new Map<string, Map<string, IStoredDocument>>();
|
|
this.databases.set(dbName, dbMap);
|
|
|
|
for (const [collName, docs] of Object.entries(collections as Record<string, any[]>)) {
|
|
const collMap = new Map<string, IStoredDocument>();
|
|
for (const doc of docs) {
|
|
// Restore ObjectId
|
|
if (doc._id && typeof doc._id === 'string') {
|
|
doc._id = new plugins.bson.ObjectId(doc._id);
|
|
} else if (doc._id && typeof doc._id === 'object' && doc._id.$oid) {
|
|
doc._id = new plugins.bson.ObjectId(doc._id.$oid);
|
|
}
|
|
collMap.set(doc._id.toHexString(), doc);
|
|
}
|
|
dbMap.set(collName, collMap);
|
|
}
|
|
}
|
|
|
|
for (const [dbName, collIndexes] of Object.entries(data.indexes || {})) {
|
|
const indexMap = new Map<string, any[]>();
|
|
this.indexes.set(dbName, indexMap);
|
|
for (const [collName, indexes] of Object.entries(collIndexes as Record<string, any[]>)) {
|
|
indexMap.set(collName, indexes);
|
|
}
|
|
}
|
|
|
|
this.opLogCounter = data.opLogCounter || 0;
|
|
} catch (error) {
|
|
// If restore fails, start fresh
|
|
console.warn('Failed to restore from persistence:', error);
|
|
}
|
|
}
|
|
}
|