import * as plugins from '../plugins.js'; import { S3Error } from './s3-error.js'; import { Readable } from 'stream'; export interface IS3Bucket { name: string; creationDate: Date; } export interface IS3Object { key: string; size: number; lastModified: Date; md5: string; metadata: Record; content?: Readable; } export interface IListObjectsOptions { prefix?: string; delimiter?: string; maxKeys?: number; continuationToken?: string; } export interface IListObjectsResult { contents: IS3Object[]; commonPrefixes: string[]; isTruncated: boolean; nextContinuationToken?: string; prefix: string; delimiter: string; maxKeys: number; } export interface IRangeOptions { start: number; end: number; } /** * Filesystem-backed storage for S3 objects using smartfs */ export class FilesystemStore { constructor(private rootDir: string) {} /** * Initialize store (ensure root directory exists) */ public async initialize(): Promise { await plugins.smartfs.directory(this.rootDir).recursive().create(); } /** * Reset store (delete all buckets) */ public async reset(): Promise { // Delete directory and recreate it const exists = await plugins.smartfs.directory(this.rootDir).exists(); if (exists) { await plugins.smartfs.directory(this.rootDir).recursive().delete(); } await plugins.smartfs.directory(this.rootDir).recursive().create(); } // ============================ // BUCKET OPERATIONS // ============================ /** * List all buckets */ public async listBuckets(): Promise { const entries = await plugins.smartfs.directory(this.rootDir).includeStats().list(); const buckets: IS3Bucket[] = []; for (const entry of entries) { if (entry.isDirectory && entry.stats) { buckets.push({ name: entry.name, creationDate: entry.stats.birthtime, }); } } return buckets.sort((a, b) => a.name.localeCompare(b.name)); } /** * Check if bucket exists */ public async bucketExists(bucket: string): Promise { const bucketPath = this.getBucketPath(bucket); return plugins.smartfs.directory(bucketPath).exists(); } /** * Create bucket */ public async createBucket(bucket: string): Promise { const bucketPath = this.getBucketPath(bucket); await plugins.smartfs.directory(bucketPath).recursive().create(); } /** * Delete bucket (must be empty) */ public async deleteBucket(bucket: string): Promise { const bucketPath = this.getBucketPath(bucket); // Check if bucket exists if (!(await this.bucketExists(bucket))) { throw new S3Error('NoSuchBucket', 'The specified bucket does not exist'); } // Check if bucket is empty const files = await plugins.smartfs.directory(bucketPath).recursive().list(); if (files.length > 0) { throw new S3Error('BucketNotEmpty', 'The bucket you tried to delete is not empty'); } await plugins.smartfs.directory(bucketPath).recursive().delete(); } // ============================ // OBJECT OPERATIONS // ============================ /** * List objects in bucket */ public async listObjects( bucket: string, options: IListObjectsOptions = {} ): Promise { const bucketPath = this.getBucketPath(bucket); if (!(await this.bucketExists(bucket))) { throw new S3Error('NoSuchBucket', 'The specified bucket does not exist'); } const { prefix = '', delimiter = '', maxKeys = 1000, continuationToken, } = options; // List all object files recursively with filter const entries = await plugins.smartfs .directory(bucketPath) .recursive() .filter((entry) => entry.name.endsWith('._S3_object')) .list(); // Convert file paths to keys let keys = entries.map((entry) => { const relativePath = plugins.path.relative(bucketPath, entry.path); const key = this.decodeKey(relativePath.replace(/\._S3_object$/, '')); return key; }); // Apply prefix filter if (prefix) { keys = keys.filter((key) => key.startsWith(prefix)); } // Sort keys keys = keys.sort(); // Handle continuation token (simple implementation using key name) if (continuationToken) { const startIndex = keys.findIndex((key) => key > continuationToken); if (startIndex > 0) { keys = keys.slice(startIndex); } } // Handle delimiter (common prefixes) const commonPrefixes: Set = new Set(); const contents: IS3Object[] = []; for (const key of keys) { if (delimiter) { // Find first delimiter after prefix const remainingKey = key.slice(prefix.length); const delimiterIndex = remainingKey.indexOf(delimiter); if (delimiterIndex !== -1) { // This key has a delimiter, add to common prefixes const commonPrefix = prefix + remainingKey.slice(0, delimiterIndex + delimiter.length); commonPrefixes.add(commonPrefix); continue; } } // Add to contents (limited by maxKeys) if (contents.length >= maxKeys) { break; } try { const objectInfo = await this.getObjectInfo(bucket, key); contents.push(objectInfo); } catch (err) { // Skip if object no longer exists continue; } } const isTruncated = keys.length > contents.length + commonPrefixes.size; const nextContinuationToken = isTruncated ? contents[contents.length - 1]?.key : undefined; return { contents, commonPrefixes: Array.from(commonPrefixes).sort(), isTruncated, nextContinuationToken, prefix, delimiter, maxKeys, }; } /** * Get object info (without content) */ private async getObjectInfo(bucket: string, key: string): Promise { const objectPath = this.getObjectPath(bucket, key); const metadataPath = `${objectPath}.metadata.json`; const md5Path = `${objectPath}.md5`; const [stats, metadata, md5] = await Promise.all([ plugins.smartfs.file(objectPath).stat(), this.readMetadata(metadataPath), this.readMD5(objectPath, md5Path), ]); return { key, size: stats.size, lastModified: stats.mtime, md5, metadata, }; } /** * Check if object exists */ public async objectExists(bucket: string, key: string): Promise { const objectPath = this.getObjectPath(bucket, key); return plugins.smartfs.file(objectPath).exists(); } /** * Put object (upload with streaming) */ public async putObject( bucket: string, key: string, stream: NodeJS.ReadableStream, metadata: Record = {} ): Promise<{ size: number; md5: string }> { const objectPath = this.getObjectPath(bucket, key); // Ensure bucket exists if (!(await this.bucketExists(bucket))) { throw new S3Error('NoSuchBucket', 'The specified bucket does not exist'); } // Ensure parent directory exists const parentDir = plugins.path.dirname(objectPath); await plugins.smartfs.directory(parentDir).recursive().create(); // Write with MD5 calculation const result = await this.writeStreamWithMD5(stream, objectPath); // Save metadata const metadataPath = `${objectPath}.metadata.json`; await plugins.smartfs.file(metadataPath).write(JSON.stringify(metadata, null, 2)); return result; } /** * Get object (download with streaming) */ public async getObject( bucket: string, key: string, range?: IRangeOptions ): Promise { const objectPath = this.getObjectPath(bucket, key); if (!(await this.objectExists(bucket, key))) { throw new S3Error('NoSuchKey', 'The specified key does not exist'); } const info = await this.getObjectInfo(bucket, key); // Get Web ReadableStream from smartfs const webStream = await plugins.smartfs.file(objectPath).readStream(); // Convert Web Stream to Node.js Readable stream let nodeStream = Readable.fromWeb(webStream as any); // Handle range requests if needed if (range) { // For range requests, we need to skip bytes and limit output let bytesRead = 0; const rangeStart = range.start; const rangeEnd = range.end; nodeStream = nodeStream.pipe(new (require('stream').Transform)({ transform(chunk: Buffer, encoding, callback) { const chunkStart = bytesRead; const chunkEnd = bytesRead + chunk.length - 1; bytesRead += chunk.length; // Skip chunks before range if (chunkEnd < rangeStart) { callback(); return; } // Stop after range if (chunkStart > rangeEnd) { this.end(); callback(); return; } // Slice chunk to fit range const sliceStart = Math.max(0, rangeStart - chunkStart); const sliceEnd = Math.min(chunk.length, rangeEnd - chunkStart + 1); callback(null, chunk.slice(sliceStart, sliceEnd)); } })); } return { ...info, content: nodeStream, }; } /** * Delete object */ public async deleteObject(bucket: string, key: string): Promise { const objectPath = this.getObjectPath(bucket, key); const metadataPath = `${objectPath}.metadata.json`; const md5Path = `${objectPath}.md5`; // S3 doesn't throw error if object doesn't exist await Promise.all([ plugins.smartfs.file(objectPath).delete().catch(() => {}), plugins.smartfs.file(metadataPath).delete().catch(() => {}), plugins.smartfs.file(md5Path).delete().catch(() => {}), ]); } /** * Copy object */ public async copyObject( srcBucket: string, srcKey: string, destBucket: string, destKey: string, metadataDirective: 'COPY' | 'REPLACE' = 'COPY', newMetadata?: Record ): Promise<{ size: number; md5: string }> { const srcObjectPath = this.getObjectPath(srcBucket, srcKey); const destObjectPath = this.getObjectPath(destBucket, destKey); // Check source exists if (!(await this.objectExists(srcBucket, srcKey))) { throw new S3Error('NoSuchKey', 'The specified key does not exist'); } // Ensure dest bucket exists if (!(await this.bucketExists(destBucket))) { throw new S3Error('NoSuchBucket', 'The specified bucket does not exist'); } // Ensure parent directory exists const parentDir = plugins.path.dirname(destObjectPath); await plugins.smartfs.directory(parentDir).recursive().create(); // Copy object file await plugins.smartfs.file(srcObjectPath).copy(destObjectPath); // Handle metadata if (metadataDirective === 'COPY') { // Copy metadata const srcMetadataPath = `${srcObjectPath}.metadata.json`; const destMetadataPath = `${destObjectPath}.metadata.json`; await plugins.smartfs.file(srcMetadataPath).copy(destMetadataPath).catch(() => {}); } else if (newMetadata) { // Replace with new metadata const destMetadataPath = `${destObjectPath}.metadata.json`; await plugins.smartfs.file(destMetadataPath).write(JSON.stringify(newMetadata, null, 2)); } // Copy MD5 const srcMD5Path = `${srcObjectPath}.md5`; const destMD5Path = `${destObjectPath}.md5`; await plugins.smartfs.file(srcMD5Path).copy(destMD5Path).catch(() => {}); // Get result info const stats = await plugins.smartfs.file(destObjectPath).stat(); const md5 = await this.readMD5(destObjectPath, destMD5Path); return { size: stats.size, md5 }; } // ============================ // HELPER METHODS // ============================ /** * Get bucket directory path */ private getBucketPath(bucket: string): string { return plugins.path.join(this.rootDir, bucket); } /** * Get object file path */ private getObjectPath(bucket: string, key: string): string { return plugins.path.join( this.rootDir, bucket, this.encodeKey(key) + '._S3_object' ); } /** * Encode key for Windows compatibility */ private encodeKey(key: string): string { if (process.platform === 'win32') { // Replace invalid Windows filename chars with hex encoding return key.replace(/[<>:"\\|?*]/g, (ch) => '&' + Buffer.from(ch, 'utf8').toString('hex') ); } return key; } /** * Decode key from filesystem path */ private decodeKey(encodedKey: string): string { if (process.platform === 'win32') { // Decode hex-encoded chars return encodedKey.replace(/&([0-9a-f]{2})/gi, (_, hex) => Buffer.from(hex, 'hex').toString('utf8') ); } return encodedKey; } /** * Write stream to file with MD5 calculation */ private async writeStreamWithMD5( input: NodeJS.ReadableStream, destPath: string ): Promise<{ size: number; md5: string }> { const hash = plugins.crypto.createHash('md5'); let totalSize = 0; return new Promise(async (resolve, reject) => { // Get Web WritableStream from smartfs const webWriteStream = await plugins.smartfs.file(destPath).writeStream(); const writer = webWriteStream.getWriter(); // Read from Node.js stream and write to Web stream input.on('data', async (chunk: Buffer) => { hash.update(chunk); totalSize += chunk.length; try { await writer.write(new Uint8Array(chunk)); } catch (err) { reject(err); } }); input.on('error', (err) => { writer.abort(err); reject(err); }); input.on('end', async () => { try { await writer.close(); const md5 = hash.digest('hex'); // Save MD5 to separate file const md5Path = `${destPath}.md5`; await plugins.smartfs.file(md5Path).write(md5); resolve({ size: totalSize, md5 }); } catch (err) { reject(err); } }); }); } /** * Read MD5 hash (calculate if missing) */ private async readMD5(objectPath: string, md5Path: string): Promise { try { // Try to read cached MD5 const md5 = await plugins.smartfs.file(md5Path).encoding('utf8').read() as string; return md5.trim(); } catch (err) { // Calculate MD5 if not cached return new Promise(async (resolve, reject) => { const hash = plugins.crypto.createHash('md5'); try { const webStream = await plugins.smartfs.file(objectPath).readStream(); const nodeStream = Readable.fromWeb(webStream as any); nodeStream.on('data', (chunk: Buffer) => hash.update(chunk)); nodeStream.on('end', async () => { const md5 = hash.digest('hex'); // Cache it await plugins.smartfs.file(md5Path).write(md5); resolve(md5); }); nodeStream.on('error', reject); } catch (err) { reject(err); } }); } } /** * Read metadata from JSON file */ private async readMetadata(metadataPath: string): Promise> { try { const content = await plugins.smartfs.file(metadataPath).encoding('utf8').read() as string; return JSON.parse(content); } catch (err) { return {}; } } }