Files
smartfs/ts/providers/smartfs.provider.rust.ts

539 lines
18 KiB
TypeScript
Raw Normal View History

/**
* Rust filesystem provider for SmartFS
* Uses a Rust binary via smartrust IPC for XFS-safe filesystem operations.
* All metadata-changing operations (rename, unlink, create) fsync the parent
* directory, guaranteeing durability on XFS and other delayed-logging filesystems.
*/
import * as plugins from '../smartfs.plugins.js';
import { fileURLToPath } from 'node:url';
import type {
ISmartFsProvider,
IProviderCapabilities,
TWatchCallback,
IWatcherHandle,
} from '../interfaces/mod.provider.js';
import type {
IFileStats,
IDirectoryEntry,
IReadOptions,
IWriteOptions,
IStreamOptions,
ICopyOptions,
IListOptions,
IWatchOptions,
ITransactionOperation,
} from '../interfaces/mod.types.js';
// ── IPC command type map ────────────────────────────────────────────────────
interface IFileStatsJson {
size: number;
birthtime: string;
mtime: string;
atime: string;
isFile: boolean;
isDirectory: boolean;
isSymbolicLink: boolean;
mode: number;
}
interface IDirectoryEntryJson {
name: string;
path: string;
isFile: boolean;
isDirectory: boolean;
isSymbolicLink: boolean;
stats?: IFileStatsJson;
}
interface IBatchOp {
type: string;
path: string;
targetPath?: string;
content?: string;
encoding?: string;
atomic?: boolean;
mode?: number;
overwrite?: boolean;
recursive?: boolean;
}
interface IBatchResult {
index: number;
success: boolean;
error?: string;
}
interface ITransactionOpJson {
type: string;
path: string;
targetPath?: string;
content?: string;
encoding?: string;
}
type TSmartFsCommands = {
readFile: { params: { path: string; encoding?: string }; result: { content: string; isBuffer?: boolean } };
writeFile: { params: { path: string; content: string; atomic?: boolean; mode?: number; encoding?: string }; result: void };
appendFile: { params: { path: string; content: string; encoding?: string }; result: void };
deleteFile: { params: { path: string }; result: void };
copyFile: { params: { from: string; to: string; overwrite?: boolean; preserveTimestamps?: boolean }; result: void };
moveFile: { params: { from: string; to: string; overwrite?: boolean; preserveTimestamps?: boolean }; result: void };
fileExists: { params: { path: string }; result: boolean };
fileStat: { params: { path: string }; result: IFileStatsJson };
listDirectory: { params: { path: string; recursive?: boolean; includeStats?: boolean; filter?: string }; result: IDirectoryEntryJson[] };
createDirectory: { params: { path: string; recursive?: boolean; mode?: number }; result: void };
deleteDirectory: { params: { path: string; recursive?: boolean }; result: void };
directoryExists: { params: { path: string }; result: boolean };
directoryStat: { params: { path: string }; result: IFileStatsJson };
watch: { params: { path: string; id: string; recursive?: boolean }; result: void };
unwatchAll: { params: Record<string, never>; result: void };
batch: { params: { operations: IBatchOp[] }; result: IBatchResult[] };
executeTransaction: { params: { operations: ITransactionOpJson[] }; result: void };
normalizePath: { params: { path: string }; result: string };
joinPath: { params: { segments: string[] }; result: string };
readFileStream: { params: { path: string; chunkSize?: number }; result: { totalBytes: number }; chunk: string };
writeStreamBegin: { params: { path: string; atomic?: boolean; mode?: number }; result: { streamId: string } };
writeStreamChunk: { params: { streamId: string; data: string; last: boolean }; result: void };
ping: { params: Record<string, never>; result: { pong: boolean } };
};
// ── Provider class ──────────────────────────────────────────────────────────
export class SmartFsProviderRust implements ISmartFsProvider {
public readonly name = 'rust';
public readonly capabilities: IProviderCapabilities = {
supportsWatch: true,
supportsAtomic: true,
supportsTransactions: true,
supportsStreaming: true,
supportsSymlinks: true,
supportsPermissions: true,
};
private bridge: plugins.smartrust.RustBridge<TSmartFsCommands>;
private initialized = false;
private watchCallbacks = new Map<string, TWatchCallback>();
private watchCounter = 0;
constructor(options?: { binaryPath?: string }) {
// Build search paths for the binary
const localPaths: string[] = [];
const currentDir = plugins.path.dirname(fileURLToPath(import.meta.url));
const packageDir = plugins.path.resolve(currentDir, '../../');
// Check dist_rust/ first (production), then rust/target/ (development)
const suffix = SmartFsProviderRust.getPlatformSuffix();
if (suffix) {
localPaths.push(plugins.path.join(packageDir, 'dist_rust', `smartfs-bin_${suffix}`));
}
localPaths.push(plugins.path.join(packageDir, 'dist_rust', 'smartfs-bin'));
localPaths.push(plugins.path.join(packageDir, 'rust', 'target', 'release', 'smartfs-bin'));
localPaths.push(plugins.path.join(packageDir, 'rust', 'target', 'debug', 'smartfs-bin'));
this.bridge = new plugins.smartrust.RustBridge<TSmartFsCommands>({
binaryName: 'smartfs-bin',
cliArgs: ['--management'],
requestTimeoutMs: 30_000,
readyTimeoutMs: 10_000,
localPaths,
searchSystemPath: false,
...(options?.binaryPath ? { binaryPath: options.binaryPath } : {}),
});
// Listen for watch events from Rust
this.bridge.on('management:watch', (data: any) => {
// Event name is "watch:<id>", data contains the watch event
// The smartrust bridge strips the "management:" prefix
});
}
private static getPlatformSuffix(): string | null {
const archMap: Record<string, string> = { x64: 'amd64', arm64: 'arm64' };
const os = process.platform;
const arch = archMap[process.arch];
if (!arch) return null;
return `${os}_${arch}`;
}
private async ensureRunning(): Promise<void> {
if (!this.initialized) {
const started = await this.bridge.spawn();
if (!started) {
throw new Error('SmartFsProviderRust: failed to start smartfs-bin');
}
this.initialized = true;
// The bridge already inherits .on() from EventEmitter.
// Watch events from Rust arrive as 'management:watch:<id>'.
}
}
/**
* Shut down the Rust binary.
*/
public async shutdown(): Promise<void> {
if (this.initialized) {
this.bridge.kill();
this.initialized = false;
}
}
// ── File Operations ───────────────────────────────────────────────────
public async readFile(path: string, options?: IReadOptions): Promise<Buffer | string> {
await this.ensureRunning();
const encoding = options?.encoding || 'utf8';
const result = await this.bridge.sendCommand('readFile', { path, encoding });
if (result.isBuffer) {
// Decode base64 back to Buffer
return Buffer.from(result.content, 'base64');
}
if (encoding === 'buffer') {
return Buffer.from(result.content, 'base64');
}
return result.content;
}
public async writeFile(path: string, content: string | Buffer, options?: IWriteOptions): Promise<void> {
await this.ensureRunning();
const contentStr = Buffer.isBuffer(content) ? content.toString('base64') : content;
const encoding = Buffer.isBuffer(content) ? 'base64' : (options?.encoding || undefined);
await this.bridge.sendCommand('writeFile', {
path,
content: contentStr,
atomic: options?.atomic,
mode: options?.mode,
encoding,
});
}
public async appendFile(path: string, content: string | Buffer, options?: IWriteOptions): Promise<void> {
await this.ensureRunning();
const contentStr = Buffer.isBuffer(content) ? content.toString('base64') : content;
const encoding = Buffer.isBuffer(content) ? 'base64' : (options?.encoding || undefined);
await this.bridge.sendCommand('appendFile', {
path,
content: contentStr,
encoding,
});
}
public async deleteFile(path: string): Promise<void> {
await this.ensureRunning();
await this.bridge.sendCommand('deleteFile', { path });
}
public async copyFile(from: string, to: string, options?: ICopyOptions): Promise<void> {
await this.ensureRunning();
await this.bridge.sendCommand('copyFile', {
from,
to,
overwrite: options?.overwrite,
preserveTimestamps: options?.preserveTimestamps,
});
}
public async moveFile(from: string, to: string, options?: ICopyOptions): Promise<void> {
await this.ensureRunning();
await this.bridge.sendCommand('moveFile', {
from,
to,
overwrite: options?.overwrite,
preserveTimestamps: options?.preserveTimestamps,
});
}
public async fileExists(path: string): Promise<boolean> {
await this.ensureRunning();
return this.bridge.sendCommand('fileExists', { path });
}
public async fileStat(path: string): Promise<IFileStats> {
await this.ensureRunning();
const stats = await this.bridge.sendCommand('fileStat', { path });
return this.convertStats(stats);
}
public async createReadStream(path: string, options?: IStreamOptions): Promise<ReadableStream<Uint8Array>> {
await this.ensureRunning();
const chunkSize = options?.chunkSize || options?.highWaterMark || 65536;
const streaming = this.bridge.sendCommandStreaming('readFileStream', {
path,
chunkSize,
});
const iterator = streaming[Symbol.asyncIterator]();
return new ReadableStream({
async pull(controller) {
const { value, done } = await iterator.next();
if (done) {
controller.close();
return;
}
// value is a base64-encoded string chunk
const buffer = Buffer.from(value as string, 'base64');
controller.enqueue(new Uint8Array(buffer));
},
});
}
public async createWriteStream(path: string, options?: IStreamOptions): Promise<WritableStream<Uint8Array>> {
await this.ensureRunning();
const { streamId } = await this.bridge.sendCommand('writeStreamBegin', {
path,
atomic: undefined,
});
const bridge = this.bridge;
return new WritableStream({
async write(chunk) {
const base64 = Buffer.from(chunk).toString('base64');
await bridge.sendCommand('writeStreamChunk', {
streamId,
data: base64,
last: false,
});
},
async close() {
await bridge.sendCommand('writeStreamChunk', {
streamId,
data: '',
last: true,
});
},
});
}
// ── Directory Operations ──────────────────────────────────────────────
public async listDirectory(path: string, options?: IListOptions): Promise<IDirectoryEntry[]> {
await this.ensureRunning();
// Convert function filter to string pattern for IPC
let filter: string | undefined;
if (options?.filter) {
if (typeof options.filter === 'string') {
filter = options.filter;
} else if (options.filter instanceof RegExp) {
// Prefix with "regex:" so Rust treats it as a raw regex pattern
filter = `regex:${options.filter.source}`;
}
// Function filters can't be serialized — filter client-side after receiving
}
const entries = await this.bridge.sendCommand('listDirectory', {
path,
recursive: options?.recursive,
includeStats: options?.includeStats,
filter,
});
let result: IDirectoryEntry[] = entries.map((e: any) => ({
name: e.name,
path: e.path,
isFile: e.isFile,
isDirectory: e.isDirectory,
isSymbolicLink: e.isSymbolicLink,
stats: e.stats ? this.convertStats(e.stats) : undefined,
}));
// Apply function filter client-side if needed
if (typeof options?.filter === 'function') {
result = result.filter(options.filter);
}
return result;
}
public async createDirectory(path: string, options?: { recursive?: boolean; mode?: number }): Promise<void> {
await this.ensureRunning();
await this.bridge.sendCommand('createDirectory', {
path,
recursive: options?.recursive ?? true,
mode: options?.mode,
});
}
public async deleteDirectory(path: string, options?: { recursive?: boolean }): Promise<void> {
await this.ensureRunning();
await this.bridge.sendCommand('deleteDirectory', {
path,
recursive: options?.recursive ?? true,
});
}
public async directoryExists(path: string): Promise<boolean> {
await this.ensureRunning();
return this.bridge.sendCommand('directoryExists', { path });
}
public async directoryStat(path: string): Promise<IFileStats> {
await this.ensureRunning();
const stats = await this.bridge.sendCommand('directoryStat', { path });
return this.convertStats(stats);
}
// ── Watch Operations ──────────────────────────────────────────────────
public async watch(path: string, callback: TWatchCallback, options?: IWatchOptions): Promise<IWatcherHandle> {
await this.ensureRunning();
const watchId = `w_${++this.watchCounter}`;
this.watchCallbacks.set(watchId, callback);
// Listen for events from this watch
const eventName = `management:watch:${watchId}`;
this.bridge.on(eventName, async (data: any) => {
const cb = this.watchCallbacks.get(watchId);
if (!cb) return;
// Apply filter
if (options?.filter) {
if (typeof options.filter === 'function') {
if (!options.filter(data.path)) return;
} else if (options.filter instanceof RegExp) {
if (!options.filter.test(data.path)) return;
} else if (typeof options.filter === 'string') {
const pattern = options.filter.replace(/\*/g, '.*');
if (!new RegExp(`^${pattern}$`).test(data.path)) return;
}
}
await cb({
type: data.type,
path: data.path,
timestamp: new Date(data.timestamp),
stats: data.stats ? this.convertStats(data.stats) : undefined,
});
});
await this.bridge.sendCommand('watch', {
path,
id: watchId,
recursive: options?.recursive,
});
return {
stop: async () => {
this.watchCallbacks.delete(watchId);
this.bridge.removeAllListeners(eventName);
},
};
}
// ── Transaction Operations ────────────────────────────────────────────
public async prepareTransaction(operations: ITransactionOperation[]): Promise<ITransactionOperation[]> {
// Prepare backups client-side by reading current state
const prepared: ITransactionOperation[] = [];
for (const op of operations) {
const preparedOp = { ...op };
try {
const exists = await this.fileExists(op.path);
if (exists) {
const content = await this.readFile(op.path, { encoding: 'buffer' });
const stats = await this.fileStat(op.path);
preparedOp.backup = {
existed: true,
content: Buffer.isBuffer(content) ? content : Buffer.from(content),
stats,
};
} else {
preparedOp.backup = { existed: false };
}
} catch {
preparedOp.backup = { existed: false };
}
prepared.push(preparedOp);
}
return prepared;
}
public async executeTransaction(operations: ITransactionOperation[]): Promise<void> {
await this.ensureRunning();
const opsJson: ITransactionOpJson[] = operations.map((op) => ({
type: op.type,
path: op.path,
targetPath: op.targetPath,
content: op.content ? (Buffer.isBuffer(op.content) ? op.content.toString('utf8') : op.content) : undefined,
encoding: op.encoding,
}));
await this.bridge.sendCommand('executeTransaction', { operations: opsJson });
}
public async rollbackTransaction(operations: ITransactionOperation[]): Promise<void> {
// Rollback in reverse order using backups
for (let i = operations.length - 1; i >= 0; i--) {
const op = operations[i];
if (!op.backup) continue;
try {
if (op.backup.existed && op.backup.content) {
await this.writeFile(op.path, op.backup.content);
} else if (!op.backup.existed) {
try {
await this.deleteFile(op.path);
} catch {
// Ignore
}
}
} catch {
// Ignore rollback errors
}
}
}
// ── Path Operations ───────────────────────────────────────────────────
public normalizePath(path: string): string {
// Path operations are synchronous in the interface, so use local implementation
return plugins.path.normalize(path);
}
public joinPath(...segments: string[]): string {
return plugins.path.join(...segments);
}
// ── Helpers ───────────────────────────────────────────────────────────
private convertStats(stats: IFileStatsJson): IFileStats {
return {
size: stats.size,
birthtime: this.parseTimestamp(stats.birthtime),
mtime: this.parseTimestamp(stats.mtime),
atime: this.parseTimestamp(stats.atime),
isFile: stats.isFile,
isDirectory: stats.isDirectory,
isSymbolicLink: stats.isSymbolicLink,
mode: stats.mode,
};
}
private parseTimestamp(ts: string): Date {
// Rust sends "<unix_secs>.<millis>Z" format
if (ts.endsWith('Z') && ts.includes('.')) {
const [secsStr, millisStr] = ts.slice(0, -1).split('.');
const secs = parseInt(secsStr, 10);
const millis = parseInt(millisStr, 10);
return new Date(secs * 1000 + millis);
}
return new Date(ts);
}
}