fix(filesystem): Migrate filesystem implementation to @push.rocks/smartfs and add Web Streams handling
This commit is contained in:
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smarts3',
|
||||
version: '3.0.2',
|
||||
version: '3.0.3',
|
||||
description: 'A Node.js TypeScript package to create a local S3 endpoint for simulating AWS S3 operations using mapped local directories for development and testing purposes.'
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import * as plugins from '../plugins.js';
|
||||
import { S3Error } from './s3-error.js';
|
||||
import type { Readable } from 'stream';
|
||||
import { Readable } from 'stream';
|
||||
|
||||
export interface IS3Bucket {
|
||||
name: string;
|
||||
@@ -39,7 +39,7 @@ export interface IRangeOptions {
|
||||
}
|
||||
|
||||
/**
|
||||
* Filesystem-backed storage for S3 objects
|
||||
* Filesystem-backed storage for S3 objects using smartfs
|
||||
*/
|
||||
export class FilesystemStore {
|
||||
constructor(private rootDir: string) {}
|
||||
@@ -48,14 +48,19 @@ export class FilesystemStore {
|
||||
* Initialize store (ensure root directory exists)
|
||||
*/
|
||||
public async initialize(): Promise<void> {
|
||||
await plugins.fs.promises.mkdir(this.rootDir, { recursive: true });
|
||||
await plugins.smartfs.directory(this.rootDir).recursive().create();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset store (delete all buckets)
|
||||
*/
|
||||
public async reset(): Promise<void> {
|
||||
await plugins.smartfile.fs.ensureEmptyDir(this.rootDir);
|
||||
// Delete directory and recreate it
|
||||
const exists = await plugins.smartfs.directory(this.rootDir).exists();
|
||||
if (exists) {
|
||||
await plugins.smartfs.directory(this.rootDir).recursive().delete();
|
||||
}
|
||||
await plugins.smartfs.directory(this.rootDir).recursive().create();
|
||||
}
|
||||
|
||||
// ============================
|
||||
@@ -66,17 +71,16 @@ export class FilesystemStore {
|
||||
* List all buckets
|
||||
*/
|
||||
public async listBuckets(): Promise<IS3Bucket[]> {
|
||||
const dirs = await plugins.smartfile.fs.listFolders(this.rootDir);
|
||||
const entries = await plugins.smartfs.directory(this.rootDir).includeStats().list();
|
||||
const buckets: IS3Bucket[] = [];
|
||||
|
||||
for (const dir of dirs) {
|
||||
const bucketPath = plugins.path.join(this.rootDir, dir);
|
||||
const stats = await plugins.smartfile.fs.stat(bucketPath);
|
||||
|
||||
buckets.push({
|
||||
name: dir,
|
||||
creationDate: stats.birthtime,
|
||||
});
|
||||
for (const entry of entries) {
|
||||
if (entry.isDirectory && entry.stats) {
|
||||
buckets.push({
|
||||
name: entry.name,
|
||||
creationDate: entry.stats.birthtime,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return buckets.sort((a, b) => a.name.localeCompare(b.name));
|
||||
@@ -87,7 +91,7 @@ export class FilesystemStore {
|
||||
*/
|
||||
public async bucketExists(bucket: string): Promise<boolean> {
|
||||
const bucketPath = this.getBucketPath(bucket);
|
||||
return plugins.smartfile.fs.isDirectory(bucketPath);
|
||||
return plugins.smartfs.directory(bucketPath).exists();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -95,7 +99,7 @@ export class FilesystemStore {
|
||||
*/
|
||||
public async createBucket(bucket: string): Promise<void> {
|
||||
const bucketPath = this.getBucketPath(bucket);
|
||||
await plugins.fs.promises.mkdir(bucketPath, { recursive: true });
|
||||
await plugins.smartfs.directory(bucketPath).recursive().create();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -110,12 +114,12 @@ export class FilesystemStore {
|
||||
}
|
||||
|
||||
// Check if bucket is empty
|
||||
const files = await plugins.smartfile.fs.listFileTree(bucketPath, '**/*');
|
||||
const files = await plugins.smartfs.directory(bucketPath).recursive().list();
|
||||
if (files.length > 0) {
|
||||
throw new S3Error('BucketNotEmpty', 'The bucket you tried to delete is not empty');
|
||||
}
|
||||
|
||||
await plugins.smartfile.fs.remove(bucketPath);
|
||||
await plugins.smartfs.directory(bucketPath).recursive().delete();
|
||||
}
|
||||
|
||||
// ============================
|
||||
@@ -142,13 +146,16 @@ export class FilesystemStore {
|
||||
continuationToken,
|
||||
} = options;
|
||||
|
||||
// List all object files
|
||||
const objectPattern = '**/*._S3_object';
|
||||
const objectFiles = await plugins.smartfile.fs.listFileTree(bucketPath, objectPattern);
|
||||
// List all object files recursively with filter
|
||||
const entries = await plugins.smartfs
|
||||
.directory(bucketPath)
|
||||
.recursive()
|
||||
.filter((entry) => entry.name.endsWith('._S3_object'))
|
||||
.list();
|
||||
|
||||
// Convert file paths to keys
|
||||
let keys = objectFiles.map((filePath) => {
|
||||
const relativePath = plugins.path.relative(bucketPath, filePath);
|
||||
let keys = entries.map((entry) => {
|
||||
const relativePath = plugins.path.relative(bucketPath, entry.path);
|
||||
const key = this.decodeKey(relativePath.replace(/\._S3_object$/, ''));
|
||||
return key;
|
||||
});
|
||||
@@ -226,7 +233,7 @@ export class FilesystemStore {
|
||||
const md5Path = `${objectPath}.md5`;
|
||||
|
||||
const [stats, metadata, md5] = await Promise.all([
|
||||
plugins.smartfile.fs.stat(objectPath),
|
||||
plugins.smartfs.file(objectPath).stat(),
|
||||
this.readMetadata(metadataPath),
|
||||
this.readMD5(objectPath, md5Path),
|
||||
]);
|
||||
@@ -245,7 +252,7 @@ export class FilesystemStore {
|
||||
*/
|
||||
public async objectExists(bucket: string, key: string): Promise<boolean> {
|
||||
const objectPath = this.getObjectPath(bucket, key);
|
||||
return plugins.smartfile.fs.fileExists(objectPath);
|
||||
return plugins.smartfs.file(objectPath).exists();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -265,14 +272,15 @@ export class FilesystemStore {
|
||||
}
|
||||
|
||||
// Ensure parent directory exists
|
||||
await plugins.fs.promises.mkdir(plugins.path.dirname(objectPath), { recursive: true });
|
||||
const parentDir = plugins.path.dirname(objectPath);
|
||||
await plugins.smartfs.directory(parentDir).recursive().create();
|
||||
|
||||
// Write with MD5 calculation
|
||||
const result = await this.writeStreamWithMD5(stream, objectPath);
|
||||
|
||||
// Save metadata
|
||||
const metadataPath = `${objectPath}.metadata.json`;
|
||||
await plugins.fs.promises.writeFile(metadataPath, JSON.stringify(metadata, null, 2));
|
||||
await plugins.smartfs.file(metadataPath).write(JSON.stringify(metadata, null, 2));
|
||||
|
||||
return result;
|
||||
}
|
||||
@@ -293,14 +301,50 @@ export class FilesystemStore {
|
||||
|
||||
const info = await this.getObjectInfo(bucket, key);
|
||||
|
||||
// Create read stream with optional range (using native fs for range support)
|
||||
const stream = range
|
||||
? plugins.fs.createReadStream(objectPath, { start: range.start, end: range.end })
|
||||
: plugins.fs.createReadStream(objectPath);
|
||||
// Get Web ReadableStream from smartfs
|
||||
const webStream = await plugins.smartfs.file(objectPath).readStream();
|
||||
|
||||
// Convert Web Stream to Node.js Readable stream
|
||||
let nodeStream = Readable.fromWeb(webStream as any);
|
||||
|
||||
// Handle range requests if needed
|
||||
if (range) {
|
||||
// For range requests, we need to skip bytes and limit output
|
||||
let bytesRead = 0;
|
||||
const rangeStart = range.start;
|
||||
const rangeEnd = range.end;
|
||||
|
||||
nodeStream = nodeStream.pipe(new (require('stream').Transform)({
|
||||
transform(chunk: Buffer, encoding, callback) {
|
||||
const chunkStart = bytesRead;
|
||||
const chunkEnd = bytesRead + chunk.length - 1;
|
||||
bytesRead += chunk.length;
|
||||
|
||||
// Skip chunks before range
|
||||
if (chunkEnd < rangeStart) {
|
||||
callback();
|
||||
return;
|
||||
}
|
||||
|
||||
// Stop after range
|
||||
if (chunkStart > rangeEnd) {
|
||||
this.end();
|
||||
callback();
|
||||
return;
|
||||
}
|
||||
|
||||
// Slice chunk to fit range
|
||||
const sliceStart = Math.max(0, rangeStart - chunkStart);
|
||||
const sliceEnd = Math.min(chunk.length, rangeEnd - chunkStart + 1);
|
||||
|
||||
callback(null, chunk.slice(sliceStart, sliceEnd));
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
return {
|
||||
...info,
|
||||
content: stream,
|
||||
content: nodeStream,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -314,9 +358,9 @@ export class FilesystemStore {
|
||||
|
||||
// S3 doesn't throw error if object doesn't exist
|
||||
await Promise.all([
|
||||
plugins.smartfile.fs.remove(objectPath).catch(() => {}),
|
||||
plugins.smartfile.fs.remove(metadataPath).catch(() => {}),
|
||||
plugins.smartfile.fs.remove(md5Path).catch(() => {}),
|
||||
plugins.smartfs.file(objectPath).delete().catch(() => {}),
|
||||
plugins.smartfs.file(metadataPath).delete().catch(() => {}),
|
||||
plugins.smartfs.file(md5Path).delete().catch(() => {}),
|
||||
]);
|
||||
}
|
||||
|
||||
@@ -345,30 +389,31 @@ export class FilesystemStore {
|
||||
}
|
||||
|
||||
// Ensure parent directory exists
|
||||
await plugins.fs.promises.mkdir(plugins.path.dirname(destObjectPath), { recursive: true });
|
||||
const parentDir = plugins.path.dirname(destObjectPath);
|
||||
await plugins.smartfs.directory(parentDir).recursive().create();
|
||||
|
||||
// Copy object file
|
||||
await plugins.smartfile.fs.copy(srcObjectPath, destObjectPath);
|
||||
await plugins.smartfs.file(srcObjectPath).copy(destObjectPath);
|
||||
|
||||
// Handle metadata
|
||||
if (metadataDirective === 'COPY') {
|
||||
// Copy metadata
|
||||
const srcMetadataPath = `${srcObjectPath}.metadata.json`;
|
||||
const destMetadataPath = `${destObjectPath}.metadata.json`;
|
||||
await plugins.smartfile.fs.copy(srcMetadataPath, destMetadataPath).catch(() => {});
|
||||
await plugins.smartfs.file(srcMetadataPath).copy(destMetadataPath).catch(() => {});
|
||||
} else if (newMetadata) {
|
||||
// Replace with new metadata
|
||||
const destMetadataPath = `${destObjectPath}.metadata.json`;
|
||||
await plugins.fs.promises.writeFile(destMetadataPath, JSON.stringify(newMetadata, null, 2));
|
||||
await plugins.smartfs.file(destMetadataPath).write(JSON.stringify(newMetadata, null, 2));
|
||||
}
|
||||
|
||||
// Copy MD5
|
||||
const srcMD5Path = `${srcObjectPath}.md5`;
|
||||
const destMD5Path = `${destObjectPath}.md5`;
|
||||
await plugins.smartfile.fs.copy(srcMD5Path, destMD5Path).catch(() => {});
|
||||
await plugins.smartfs.file(srcMD5Path).copy(destMD5Path).catch(() => {});
|
||||
|
||||
// Get result info
|
||||
const stats = await plugins.smartfile.fs.stat(destObjectPath);
|
||||
const stats = await plugins.smartfs.file(destObjectPath).stat();
|
||||
const md5 = await this.readMD5(destObjectPath, destMD5Path);
|
||||
|
||||
return { size: stats.size, md5 };
|
||||
@@ -432,25 +477,41 @@ export class FilesystemStore {
|
||||
const hash = plugins.crypto.createHash('md5');
|
||||
let totalSize = 0;
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const output = plugins.fs.createWriteStream(destPath);
|
||||
return new Promise(async (resolve, reject) => {
|
||||
// Get Web WritableStream from smartfs
|
||||
const webWriteStream = await plugins.smartfs.file(destPath).writeStream();
|
||||
const writer = webWriteStream.getWriter();
|
||||
|
||||
input.on('data', (chunk: Buffer) => {
|
||||
// Read from Node.js stream and write to Web stream
|
||||
input.on('data', async (chunk: Buffer) => {
|
||||
hash.update(chunk);
|
||||
totalSize += chunk.length;
|
||||
|
||||
try {
|
||||
await writer.write(new Uint8Array(chunk));
|
||||
} catch (err) {
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
|
||||
input.on('error', reject);
|
||||
output.on('error', reject);
|
||||
input.on('error', (err) => {
|
||||
writer.abort(err);
|
||||
reject(err);
|
||||
});
|
||||
|
||||
input.pipe(output).on('finish', async () => {
|
||||
const md5 = hash.digest('hex');
|
||||
input.on('end', async () => {
|
||||
try {
|
||||
await writer.close();
|
||||
const md5 = hash.digest('hex');
|
||||
|
||||
// Save MD5 to separate file
|
||||
const md5Path = `${destPath}.md5`;
|
||||
await plugins.fs.promises.writeFile(md5Path, md5);
|
||||
// Save MD5 to separate file
|
||||
const md5Path = `${destPath}.md5`;
|
||||
await plugins.smartfs.file(md5Path).write(md5);
|
||||
|
||||
resolve({ size: totalSize, md5 });
|
||||
resolve({ size: totalSize, md5 });
|
||||
} catch (err) {
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -461,22 +522,28 @@ export class FilesystemStore {
|
||||
private async readMD5(objectPath: string, md5Path: string): Promise<string> {
|
||||
try {
|
||||
// Try to read cached MD5
|
||||
const md5 = await plugins.smartfile.fs.toStringSync(md5Path);
|
||||
const md5 = await plugins.smartfs.file(md5Path).encoding('utf8').read() as string;
|
||||
return md5.trim();
|
||||
} catch (err) {
|
||||
// Calculate MD5 if not cached
|
||||
return new Promise((resolve, reject) => {
|
||||
return new Promise(async (resolve, reject) => {
|
||||
const hash = plugins.crypto.createHash('md5');
|
||||
const stream = plugins.fs.createReadStream(objectPath);
|
||||
|
||||
stream.on('data', (chunk: Buffer) => hash.update(chunk));
|
||||
stream.on('end', async () => {
|
||||
const md5 = hash.digest('hex');
|
||||
// Cache it
|
||||
await plugins.fs.promises.writeFile(md5Path, md5);
|
||||
resolve(md5);
|
||||
});
|
||||
stream.on('error', reject);
|
||||
try {
|
||||
const webStream = await plugins.smartfs.file(objectPath).readStream();
|
||||
const nodeStream = Readable.fromWeb(webStream as any);
|
||||
|
||||
nodeStream.on('data', (chunk: Buffer) => hash.update(chunk));
|
||||
nodeStream.on('end', async () => {
|
||||
const md5 = hash.digest('hex');
|
||||
// Cache it
|
||||
await plugins.smartfs.file(md5Path).write(md5);
|
||||
resolve(md5);
|
||||
});
|
||||
nodeStream.on('error', reject);
|
||||
} catch (err) {
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -486,7 +553,7 @@ export class FilesystemStore {
|
||||
*/
|
||||
private async readMetadata(metadataPath: string): Promise<Record<string, string>> {
|
||||
try {
|
||||
const content = await plugins.smartfile.fs.toStringSync(metadataPath);
|
||||
const content = await plugins.smartfs.file(metadataPath).encoding('utf8').read() as string;
|
||||
return JSON.parse(content);
|
||||
} catch (err) {
|
||||
return {};
|
||||
|
||||
@@ -3,17 +3,19 @@ import * as path from 'path';
|
||||
import * as http from 'http';
|
||||
import * as crypto from 'crypto';
|
||||
import * as url from 'url';
|
||||
import * as fs from 'fs';
|
||||
|
||||
export { path, http, crypto, url, fs };
|
||||
export { path, http, crypto, url };
|
||||
|
||||
// @push.rocks scope
|
||||
import * as smartbucket from '@push.rocks/smartbucket';
|
||||
import * as smartfile from '@push.rocks/smartfile';
|
||||
import { SmartFs, SmartFsProviderNode } from '@push.rocks/smartfs';
|
||||
import * as smartpath from '@push.rocks/smartpath';
|
||||
import { SmartXml } from '@push.rocks/smartxml';
|
||||
|
||||
export { smartbucket, smartfile, smartpath, SmartXml };
|
||||
// Create SmartFs instance with Node.js provider
|
||||
export const smartfs = new SmartFs(new SmartFsProviderNode());
|
||||
|
||||
export { smartbucket, smartpath, SmartXml };
|
||||
|
||||
// @tsclass scope
|
||||
import * as tsclass from '@tsclass/tsclass';
|
||||
|
||||
Reference in New Issue
Block a user