import * as plugins from '../plugins.js'; import { Readable } from 'stream'; /** * Multipart upload metadata */ export interface IMultipartUpload { uploadId: string; bucket: string; key: string; initiated: Date; parts: Map; metadata: Record; } /** * Part information */ export interface IPartInfo { partNumber: number; etag: string; size: number; lastModified: Date; } /** * Manages multipart upload state and storage */ export class MultipartUploadManager { private uploads: Map = new Map(); private uploadDir: string; constructor(private rootDir: string) { this.uploadDir = plugins.path.join(rootDir, '.multipart'); } /** * Initialize multipart uploads directory */ public async initialize(): Promise { await plugins.smartfs.directory(this.uploadDir).recursive().create(); } /** * Generate a unique upload ID */ private generateUploadId(): string { return plugins.crypto.randomBytes(16).toString('hex'); } /** * Initiate a new multipart upload */ public async initiateUpload( bucket: string, key: string, metadata: Record ): Promise { const uploadId = this.generateUploadId(); this.uploads.set(uploadId, { uploadId, bucket, key, initiated: new Date(), parts: new Map(), metadata, }); // Create directory for this upload's parts const uploadPath = plugins.path.join(this.uploadDir, uploadId); await plugins.smartfs.directory(uploadPath).recursive().create(); return uploadId; } /** * Upload a part */ public async uploadPart( uploadId: string, partNumber: number, stream: Readable ): Promise { const upload = this.uploads.get(uploadId); if (!upload) { throw new Error('No such upload'); } const partPath = plugins.path.join(this.uploadDir, uploadId, `part-${partNumber}`); // Write part to disk const webWriteStream = await plugins.smartfs.file(partPath).writeStream(); const writer = webWriteStream.getWriter(); let size = 0; const hash = plugins.crypto.createHash('md5'); for await (const chunk of stream) { const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); await writer.write(new Uint8Array(buffer)); hash.update(buffer); size += buffer.length; } await writer.close(); const etag = hash.digest('hex'); const partInfo: IPartInfo = { partNumber, etag, size, lastModified: new Date(), }; upload.parts.set(partNumber, partInfo); return partInfo; } /** * Complete multipart upload - combine all parts */ public async completeUpload( uploadId: string, parts: Array<{ PartNumber: number; ETag: string }> ): Promise<{ etag: string; size: number }> { const upload = this.uploads.get(uploadId); if (!upload) { throw new Error('No such upload'); } // Verify all parts are uploaded for (const part of parts) { const uploadedPart = upload.parts.get(part.PartNumber); if (!uploadedPart) { throw new Error(`Part ${part.PartNumber} not uploaded`); } // Normalize ETag format (remove quotes if present) const normalizedETag = part.ETag.replace(/"/g, ''); if (uploadedPart.etag !== normalizedETag) { throw new Error(`Part ${part.PartNumber} ETag mismatch`); } } // Sort parts by part number const sortedParts = parts.sort((a, b) => a.PartNumber - b.PartNumber); // Combine parts into final object const finalPath = plugins.path.join(this.uploadDir, uploadId, 'final'); const webWriteStream = await plugins.smartfs.file(finalPath).writeStream(); const writer = webWriteStream.getWriter(); const hash = plugins.crypto.createHash('md5'); let totalSize = 0; for (const part of sortedParts) { const partPath = plugins.path.join(this.uploadDir, uploadId, `part-${part.PartNumber}`); // Read part and write to final file const partContent = await plugins.smartfs.file(partPath).read(); const buffer = Buffer.isBuffer(partContent) ? partContent : Buffer.from(partContent as string); await writer.write(new Uint8Array(buffer)); hash.update(buffer); totalSize += buffer.length; } await writer.close(); const etag = hash.digest('hex'); return { etag, size: totalSize }; } /** * Get the final combined file path */ public getFinalPath(uploadId: string): string { return plugins.path.join(this.uploadDir, uploadId, 'final'); } /** * Get upload metadata */ public getUpload(uploadId: string): IMultipartUpload | undefined { return this.uploads.get(uploadId); } /** * Abort multipart upload - clean up parts */ public async abortUpload(uploadId: string): Promise { const upload = this.uploads.get(uploadId); if (!upload) { throw new Error('No such upload'); } // Delete upload directory const uploadPath = plugins.path.join(this.uploadDir, uploadId); await plugins.smartfs.directory(uploadPath).recursive().delete(); // Remove from memory this.uploads.delete(uploadId); } /** * Clean up upload after completion */ public async cleanupUpload(uploadId: string): Promise { const uploadPath = plugins.path.join(this.uploadDir, uploadId); await plugins.smartfs.directory(uploadPath).recursive().delete(); this.uploads.delete(uploadId); } /** * List all in-progress uploads for a bucket */ public listUploads(bucket?: string): IMultipartUpload[] { const uploads = Array.from(this.uploads.values()); if (bucket) { return uploads.filter((u) => u.bucket === bucket); } return uploads; } /** * List parts for an upload */ public listParts(uploadId: string): IPartInfo[] { const upload = this.uploads.get(uploadId); if (!upload) { throw new Error('No such upload'); } return Array.from(upload.parts.values()).sort((a, b) => a.partNumber - b.partNumber); } }