496 lines
13 KiB
TypeScript
496 lines
13 KiB
TypeScript
import * as plugins from '../plugins.js';
|
|
import { S3Error } from './s3-error.js';
|
|
import type { Readable } from 'stream';
|
|
|
|
export interface IS3Bucket {
|
|
name: string;
|
|
creationDate: Date;
|
|
}
|
|
|
|
export interface IS3Object {
|
|
key: string;
|
|
size: number;
|
|
lastModified: Date;
|
|
md5: string;
|
|
metadata: Record<string, string>;
|
|
content?: Readable;
|
|
}
|
|
|
|
export interface IListObjectsOptions {
|
|
prefix?: string;
|
|
delimiter?: string;
|
|
maxKeys?: number;
|
|
continuationToken?: string;
|
|
}
|
|
|
|
export interface IListObjectsResult {
|
|
contents: IS3Object[];
|
|
commonPrefixes: string[];
|
|
isTruncated: boolean;
|
|
nextContinuationToken?: string;
|
|
prefix: string;
|
|
delimiter: string;
|
|
maxKeys: number;
|
|
}
|
|
|
|
export interface IRangeOptions {
|
|
start: number;
|
|
end: number;
|
|
}
|
|
|
|
/**
|
|
* Filesystem-backed storage for S3 objects
|
|
*/
|
|
export class FilesystemStore {
|
|
constructor(private rootDir: string) {}
|
|
|
|
/**
|
|
* Initialize store (ensure root directory exists)
|
|
*/
|
|
public async initialize(): Promise<void> {
|
|
await plugins.fs.promises.mkdir(this.rootDir, { recursive: true });
|
|
}
|
|
|
|
/**
|
|
* Reset store (delete all buckets)
|
|
*/
|
|
public async reset(): Promise<void> {
|
|
await plugins.smartfile.fs.ensureEmptyDir(this.rootDir);
|
|
}
|
|
|
|
// ============================
|
|
// BUCKET OPERATIONS
|
|
// ============================
|
|
|
|
/**
|
|
* List all buckets
|
|
*/
|
|
public async listBuckets(): Promise<IS3Bucket[]> {
|
|
const dirs = await plugins.smartfile.fs.listFolders(this.rootDir);
|
|
const buckets: IS3Bucket[] = [];
|
|
|
|
for (const dir of dirs) {
|
|
const bucketPath = plugins.path.join(this.rootDir, dir);
|
|
const stats = await plugins.smartfile.fs.stat(bucketPath);
|
|
|
|
buckets.push({
|
|
name: dir,
|
|
creationDate: stats.birthtime,
|
|
});
|
|
}
|
|
|
|
return buckets.sort((a, b) => a.name.localeCompare(b.name));
|
|
}
|
|
|
|
/**
|
|
* Check if bucket exists
|
|
*/
|
|
public async bucketExists(bucket: string): Promise<boolean> {
|
|
const bucketPath = this.getBucketPath(bucket);
|
|
return plugins.smartfile.fs.isDirectory(bucketPath);
|
|
}
|
|
|
|
/**
|
|
* Create bucket
|
|
*/
|
|
public async createBucket(bucket: string): Promise<void> {
|
|
const bucketPath = this.getBucketPath(bucket);
|
|
await plugins.fs.promises.mkdir(bucketPath, { recursive: true });
|
|
}
|
|
|
|
/**
|
|
* Delete bucket (must be empty)
|
|
*/
|
|
public async deleteBucket(bucket: string): Promise<void> {
|
|
const bucketPath = this.getBucketPath(bucket);
|
|
|
|
// Check if bucket exists
|
|
if (!(await this.bucketExists(bucket))) {
|
|
throw new S3Error('NoSuchBucket', 'The specified bucket does not exist');
|
|
}
|
|
|
|
// Check if bucket is empty
|
|
const files = await plugins.smartfile.fs.listFileTree(bucketPath, '**/*');
|
|
if (files.length > 0) {
|
|
throw new S3Error('BucketNotEmpty', 'The bucket you tried to delete is not empty');
|
|
}
|
|
|
|
await plugins.smartfile.fs.remove(bucketPath);
|
|
}
|
|
|
|
// ============================
|
|
// OBJECT OPERATIONS
|
|
// ============================
|
|
|
|
/**
|
|
* List objects in bucket
|
|
*/
|
|
public async listObjects(
|
|
bucket: string,
|
|
options: IListObjectsOptions = {}
|
|
): Promise<IListObjectsResult> {
|
|
const bucketPath = this.getBucketPath(bucket);
|
|
|
|
if (!(await this.bucketExists(bucket))) {
|
|
throw new S3Error('NoSuchBucket', 'The specified bucket does not exist');
|
|
}
|
|
|
|
const {
|
|
prefix = '',
|
|
delimiter = '',
|
|
maxKeys = 1000,
|
|
continuationToken,
|
|
} = options;
|
|
|
|
// List all object files
|
|
const objectPattern = '**/*._S3_object';
|
|
const objectFiles = await plugins.smartfile.fs.listFileTree(bucketPath, objectPattern);
|
|
|
|
// Convert file paths to keys
|
|
let keys = objectFiles.map((filePath) => {
|
|
const relativePath = plugins.path.relative(bucketPath, filePath);
|
|
const key = this.decodeKey(relativePath.replace(/\._S3_object$/, ''));
|
|
return key;
|
|
});
|
|
|
|
// Apply prefix filter
|
|
if (prefix) {
|
|
keys = keys.filter((key) => key.startsWith(prefix));
|
|
}
|
|
|
|
// Sort keys
|
|
keys = keys.sort();
|
|
|
|
// Handle continuation token (simple implementation using key name)
|
|
if (continuationToken) {
|
|
const startIndex = keys.findIndex((key) => key > continuationToken);
|
|
if (startIndex > 0) {
|
|
keys = keys.slice(startIndex);
|
|
}
|
|
}
|
|
|
|
// Handle delimiter (common prefixes)
|
|
const commonPrefixes: Set<string> = new Set();
|
|
const contents: IS3Object[] = [];
|
|
|
|
for (const key of keys) {
|
|
if (delimiter) {
|
|
// Find first delimiter after prefix
|
|
const remainingKey = key.slice(prefix.length);
|
|
const delimiterIndex = remainingKey.indexOf(delimiter);
|
|
|
|
if (delimiterIndex !== -1) {
|
|
// This key has a delimiter, add to common prefixes
|
|
const commonPrefix = prefix + remainingKey.slice(0, delimiterIndex + delimiter.length);
|
|
commonPrefixes.add(commonPrefix);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// Add to contents (limited by maxKeys)
|
|
if (contents.length >= maxKeys) {
|
|
break;
|
|
}
|
|
|
|
try {
|
|
const objectInfo = await this.getObjectInfo(bucket, key);
|
|
contents.push(objectInfo);
|
|
} catch (err) {
|
|
// Skip if object no longer exists
|
|
continue;
|
|
}
|
|
}
|
|
|
|
const isTruncated = keys.length > contents.length + commonPrefixes.size;
|
|
const nextContinuationToken = isTruncated
|
|
? contents[contents.length - 1]?.key
|
|
: undefined;
|
|
|
|
return {
|
|
contents,
|
|
commonPrefixes: Array.from(commonPrefixes).sort(),
|
|
isTruncated,
|
|
nextContinuationToken,
|
|
prefix,
|
|
delimiter,
|
|
maxKeys,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Get object info (without content)
|
|
*/
|
|
private async getObjectInfo(bucket: string, key: string): Promise<IS3Object> {
|
|
const objectPath = this.getObjectPath(bucket, key);
|
|
const metadataPath = `${objectPath}.metadata.json`;
|
|
const md5Path = `${objectPath}.md5`;
|
|
|
|
const [stats, metadata, md5] = await Promise.all([
|
|
plugins.smartfile.fs.stat(objectPath),
|
|
this.readMetadata(metadataPath),
|
|
this.readMD5(objectPath, md5Path),
|
|
]);
|
|
|
|
return {
|
|
key,
|
|
size: stats.size,
|
|
lastModified: stats.mtime,
|
|
md5,
|
|
metadata,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Check if object exists
|
|
*/
|
|
public async objectExists(bucket: string, key: string): Promise<boolean> {
|
|
const objectPath = this.getObjectPath(bucket, key);
|
|
return plugins.smartfile.fs.fileExists(objectPath);
|
|
}
|
|
|
|
/**
|
|
* Put object (upload with streaming)
|
|
*/
|
|
public async putObject(
|
|
bucket: string,
|
|
key: string,
|
|
stream: NodeJS.ReadableStream,
|
|
metadata: Record<string, string> = {}
|
|
): Promise<{ size: number; md5: string }> {
|
|
const objectPath = this.getObjectPath(bucket, key);
|
|
|
|
// Ensure bucket exists
|
|
if (!(await this.bucketExists(bucket))) {
|
|
throw new S3Error('NoSuchBucket', 'The specified bucket does not exist');
|
|
}
|
|
|
|
// Ensure parent directory exists
|
|
await plugins.fs.promises.mkdir(plugins.path.dirname(objectPath), { recursive: true });
|
|
|
|
// Write with MD5 calculation
|
|
const result = await this.writeStreamWithMD5(stream, objectPath);
|
|
|
|
// Save metadata
|
|
const metadataPath = `${objectPath}.metadata.json`;
|
|
await plugins.fs.promises.writeFile(metadataPath, JSON.stringify(metadata, null, 2));
|
|
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Get object (download with streaming)
|
|
*/
|
|
public async getObject(
|
|
bucket: string,
|
|
key: string,
|
|
range?: IRangeOptions
|
|
): Promise<IS3Object> {
|
|
const objectPath = this.getObjectPath(bucket, key);
|
|
|
|
if (!(await this.objectExists(bucket, key))) {
|
|
throw new S3Error('NoSuchKey', 'The specified key does not exist');
|
|
}
|
|
|
|
const info = await this.getObjectInfo(bucket, key);
|
|
|
|
// Create read stream with optional range (using native fs for range support)
|
|
const stream = range
|
|
? plugins.fs.createReadStream(objectPath, { start: range.start, end: range.end })
|
|
: plugins.fs.createReadStream(objectPath);
|
|
|
|
return {
|
|
...info,
|
|
content: stream,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Delete object
|
|
*/
|
|
public async deleteObject(bucket: string, key: string): Promise<void> {
|
|
const objectPath = this.getObjectPath(bucket, key);
|
|
const metadataPath = `${objectPath}.metadata.json`;
|
|
const md5Path = `${objectPath}.md5`;
|
|
|
|
// S3 doesn't throw error if object doesn't exist
|
|
await Promise.all([
|
|
plugins.smartfile.fs.remove(objectPath).catch(() => {}),
|
|
plugins.smartfile.fs.remove(metadataPath).catch(() => {}),
|
|
plugins.smartfile.fs.remove(md5Path).catch(() => {}),
|
|
]);
|
|
}
|
|
|
|
/**
|
|
* Copy object
|
|
*/
|
|
public async copyObject(
|
|
srcBucket: string,
|
|
srcKey: string,
|
|
destBucket: string,
|
|
destKey: string,
|
|
metadataDirective: 'COPY' | 'REPLACE' = 'COPY',
|
|
newMetadata?: Record<string, string>
|
|
): Promise<{ size: number; md5: string }> {
|
|
const srcObjectPath = this.getObjectPath(srcBucket, srcKey);
|
|
const destObjectPath = this.getObjectPath(destBucket, destKey);
|
|
|
|
// Check source exists
|
|
if (!(await this.objectExists(srcBucket, srcKey))) {
|
|
throw new S3Error('NoSuchKey', 'The specified key does not exist');
|
|
}
|
|
|
|
// Ensure dest bucket exists
|
|
if (!(await this.bucketExists(destBucket))) {
|
|
throw new S3Error('NoSuchBucket', 'The specified bucket does not exist');
|
|
}
|
|
|
|
// Ensure parent directory exists
|
|
await plugins.fs.promises.mkdir(plugins.path.dirname(destObjectPath), { recursive: true });
|
|
|
|
// Copy object file
|
|
await plugins.smartfile.fs.copy(srcObjectPath, destObjectPath);
|
|
|
|
// Handle metadata
|
|
if (metadataDirective === 'COPY') {
|
|
// Copy metadata
|
|
const srcMetadataPath = `${srcObjectPath}.metadata.json`;
|
|
const destMetadataPath = `${destObjectPath}.metadata.json`;
|
|
await plugins.smartfile.fs.copy(srcMetadataPath, destMetadataPath).catch(() => {});
|
|
} else if (newMetadata) {
|
|
// Replace with new metadata
|
|
const destMetadataPath = `${destObjectPath}.metadata.json`;
|
|
await plugins.fs.promises.writeFile(destMetadataPath, JSON.stringify(newMetadata, null, 2));
|
|
}
|
|
|
|
// Copy MD5
|
|
const srcMD5Path = `${srcObjectPath}.md5`;
|
|
const destMD5Path = `${destObjectPath}.md5`;
|
|
await plugins.smartfile.fs.copy(srcMD5Path, destMD5Path).catch(() => {});
|
|
|
|
// Get result info
|
|
const stats = await plugins.smartfile.fs.stat(destObjectPath);
|
|
const md5 = await this.readMD5(destObjectPath, destMD5Path);
|
|
|
|
return { size: stats.size, md5 };
|
|
}
|
|
|
|
// ============================
|
|
// HELPER METHODS
|
|
// ============================
|
|
|
|
/**
|
|
* Get bucket directory path
|
|
*/
|
|
private getBucketPath(bucket: string): string {
|
|
return plugins.path.join(this.rootDir, bucket);
|
|
}
|
|
|
|
/**
|
|
* Get object file path
|
|
*/
|
|
private getObjectPath(bucket: string, key: string): string {
|
|
return plugins.path.join(
|
|
this.rootDir,
|
|
bucket,
|
|
this.encodeKey(key) + '._S3_object'
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Encode key for Windows compatibility
|
|
*/
|
|
private encodeKey(key: string): string {
|
|
if (process.platform === 'win32') {
|
|
// Replace invalid Windows filename chars with hex encoding
|
|
return key.replace(/[<>:"\\|?*]/g, (ch) =>
|
|
'&' + Buffer.from(ch, 'utf8').toString('hex')
|
|
);
|
|
}
|
|
return key;
|
|
}
|
|
|
|
/**
|
|
* Decode key from filesystem path
|
|
*/
|
|
private decodeKey(encodedKey: string): string {
|
|
if (process.platform === 'win32') {
|
|
// Decode hex-encoded chars
|
|
return encodedKey.replace(/&([0-9a-f]{2})/gi, (_, hex) =>
|
|
Buffer.from(hex, 'hex').toString('utf8')
|
|
);
|
|
}
|
|
return encodedKey;
|
|
}
|
|
|
|
/**
|
|
* Write stream to file with MD5 calculation
|
|
*/
|
|
private async writeStreamWithMD5(
|
|
input: NodeJS.ReadableStream,
|
|
destPath: string
|
|
): Promise<{ size: number; md5: string }> {
|
|
const hash = plugins.crypto.createHash('md5');
|
|
let totalSize = 0;
|
|
|
|
return new Promise((resolve, reject) => {
|
|
const output = plugins.fs.createWriteStream(destPath);
|
|
|
|
input.on('data', (chunk: Buffer) => {
|
|
hash.update(chunk);
|
|
totalSize += chunk.length;
|
|
});
|
|
|
|
input.on('error', reject);
|
|
output.on('error', reject);
|
|
|
|
input.pipe(output).on('finish', async () => {
|
|
const md5 = hash.digest('hex');
|
|
|
|
// Save MD5 to separate file
|
|
const md5Path = `${destPath}.md5`;
|
|
await plugins.fs.promises.writeFile(md5Path, md5);
|
|
|
|
resolve({ size: totalSize, md5 });
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Read MD5 hash (calculate if missing)
|
|
*/
|
|
private async readMD5(objectPath: string, md5Path: string): Promise<string> {
|
|
try {
|
|
// Try to read cached MD5
|
|
const md5 = await plugins.smartfile.fs.toStringSync(md5Path);
|
|
return md5.trim();
|
|
} catch (err) {
|
|
// Calculate MD5 if not cached
|
|
return new Promise((resolve, reject) => {
|
|
const hash = plugins.crypto.createHash('md5');
|
|
const stream = plugins.fs.createReadStream(objectPath);
|
|
|
|
stream.on('data', (chunk: Buffer) => hash.update(chunk));
|
|
stream.on('end', async () => {
|
|
const md5 = hash.digest('hex');
|
|
// Cache it
|
|
await plugins.fs.promises.writeFile(md5Path, md5);
|
|
resolve(md5);
|
|
});
|
|
stream.on('error', reject);
|
|
});
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Read metadata from JSON file
|
|
*/
|
|
private async readMetadata(metadataPath: string): Promise<Record<string, string>> {
|
|
try {
|
|
const content = await plugins.smartfile.fs.toStringSync(metadataPath);
|
|
return JSON.parse(content);
|
|
} catch (err) {
|
|
return {};
|
|
}
|
|
}
|
|
}
|