239 lines
6.0 KiB
TypeScript
239 lines
6.0 KiB
TypeScript
import * as plugins from '../plugins.js';
|
|
import { Readable } from 'stream';
|
|
|
|
/**
|
|
* Multipart upload metadata
|
|
*/
|
|
export interface IMultipartUpload {
|
|
uploadId: string;
|
|
bucket: string;
|
|
key: string;
|
|
initiated: Date;
|
|
parts: Map<number, IPartInfo>;
|
|
metadata: Record<string, string>;
|
|
}
|
|
|
|
/**
|
|
* Part information
|
|
*/
|
|
export interface IPartInfo {
|
|
partNumber: number;
|
|
etag: string;
|
|
size: number;
|
|
lastModified: Date;
|
|
}
|
|
|
|
/**
|
|
* Manages multipart upload state and storage
|
|
*/
|
|
export class MultipartUploadManager {
|
|
private uploads: Map<string, IMultipartUpload> = new Map();
|
|
private uploadDir: string;
|
|
|
|
constructor(private rootDir: string) {
|
|
this.uploadDir = plugins.path.join(rootDir, '.multipart');
|
|
}
|
|
|
|
/**
|
|
* Initialize multipart uploads directory
|
|
*/
|
|
public async initialize(): Promise<void> {
|
|
await plugins.smartfs.directory(this.uploadDir).recursive().create();
|
|
}
|
|
|
|
/**
|
|
* Generate a unique upload ID
|
|
*/
|
|
private generateUploadId(): string {
|
|
return plugins.crypto.randomBytes(16).toString('hex');
|
|
}
|
|
|
|
/**
|
|
* Initiate a new multipart upload
|
|
*/
|
|
public async initiateUpload(
|
|
bucket: string,
|
|
key: string,
|
|
metadata: Record<string, string>
|
|
): Promise<string> {
|
|
const uploadId = this.generateUploadId();
|
|
|
|
this.uploads.set(uploadId, {
|
|
uploadId,
|
|
bucket,
|
|
key,
|
|
initiated: new Date(),
|
|
parts: new Map(),
|
|
metadata,
|
|
});
|
|
|
|
// Create directory for this upload's parts
|
|
const uploadPath = plugins.path.join(this.uploadDir, uploadId);
|
|
await plugins.smartfs.directory(uploadPath).recursive().create();
|
|
|
|
return uploadId;
|
|
}
|
|
|
|
/**
|
|
* Upload a part
|
|
*/
|
|
public async uploadPart(
|
|
uploadId: string,
|
|
partNumber: number,
|
|
stream: Readable
|
|
): Promise<IPartInfo> {
|
|
const upload = this.uploads.get(uploadId);
|
|
if (!upload) {
|
|
throw new Error('No such upload');
|
|
}
|
|
|
|
const partPath = plugins.path.join(this.uploadDir, uploadId, `part-${partNumber}`);
|
|
|
|
// Write part to disk
|
|
const webWriteStream = await plugins.smartfs.file(partPath).writeStream();
|
|
const writer = webWriteStream.getWriter();
|
|
|
|
let size = 0;
|
|
const hash = plugins.crypto.createHash('md5');
|
|
|
|
for await (const chunk of stream) {
|
|
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
|
|
await writer.write(new Uint8Array(buffer));
|
|
hash.update(buffer);
|
|
size += buffer.length;
|
|
}
|
|
|
|
await writer.close();
|
|
|
|
const etag = hash.digest('hex');
|
|
|
|
const partInfo: IPartInfo = {
|
|
partNumber,
|
|
etag,
|
|
size,
|
|
lastModified: new Date(),
|
|
};
|
|
|
|
upload.parts.set(partNumber, partInfo);
|
|
|
|
return partInfo;
|
|
}
|
|
|
|
/**
|
|
* Complete multipart upload - combine all parts
|
|
*/
|
|
public async completeUpload(
|
|
uploadId: string,
|
|
parts: Array<{ PartNumber: number; ETag: string }>
|
|
): Promise<{ etag: string; size: number }> {
|
|
const upload = this.uploads.get(uploadId);
|
|
if (!upload) {
|
|
throw new Error('No such upload');
|
|
}
|
|
|
|
// Verify all parts are uploaded
|
|
for (const part of parts) {
|
|
const uploadedPart = upload.parts.get(part.PartNumber);
|
|
if (!uploadedPart) {
|
|
throw new Error(`Part ${part.PartNumber} not uploaded`);
|
|
}
|
|
// Normalize ETag format (remove quotes if present)
|
|
const normalizedETag = part.ETag.replace(/"/g, '');
|
|
if (uploadedPart.etag !== normalizedETag) {
|
|
throw new Error(`Part ${part.PartNumber} ETag mismatch`);
|
|
}
|
|
}
|
|
|
|
// Sort parts by part number
|
|
const sortedParts = parts.sort((a, b) => a.PartNumber - b.PartNumber);
|
|
|
|
// Combine parts into final object
|
|
const finalPath = plugins.path.join(this.uploadDir, uploadId, 'final');
|
|
const webWriteStream = await plugins.smartfs.file(finalPath).writeStream();
|
|
const writer = webWriteStream.getWriter();
|
|
|
|
const hash = plugins.crypto.createHash('md5');
|
|
let totalSize = 0;
|
|
|
|
for (const part of sortedParts) {
|
|
const partPath = plugins.path.join(this.uploadDir, uploadId, `part-${part.PartNumber}`);
|
|
|
|
// Read part and write to final file
|
|
const partContent = await plugins.smartfs.file(partPath).read();
|
|
const buffer = Buffer.isBuffer(partContent) ? partContent : Buffer.from(partContent as string);
|
|
|
|
await writer.write(new Uint8Array(buffer));
|
|
hash.update(buffer);
|
|
totalSize += buffer.length;
|
|
}
|
|
|
|
await writer.close();
|
|
|
|
const etag = hash.digest('hex');
|
|
|
|
return { etag, size: totalSize };
|
|
}
|
|
|
|
/**
|
|
* Get the final combined file path
|
|
*/
|
|
public getFinalPath(uploadId: string): string {
|
|
return plugins.path.join(this.uploadDir, uploadId, 'final');
|
|
}
|
|
|
|
/**
|
|
* Get upload metadata
|
|
*/
|
|
public getUpload(uploadId: string): IMultipartUpload | undefined {
|
|
return this.uploads.get(uploadId);
|
|
}
|
|
|
|
/**
|
|
* Abort multipart upload - clean up parts
|
|
*/
|
|
public async abortUpload(uploadId: string): Promise<void> {
|
|
const upload = this.uploads.get(uploadId);
|
|
if (!upload) {
|
|
throw new Error('No such upload');
|
|
}
|
|
|
|
// Delete upload directory
|
|
const uploadPath = plugins.path.join(this.uploadDir, uploadId);
|
|
await plugins.smartfs.directory(uploadPath).recursive().delete();
|
|
|
|
// Remove from memory
|
|
this.uploads.delete(uploadId);
|
|
}
|
|
|
|
/**
|
|
* Clean up upload after completion
|
|
*/
|
|
public async cleanupUpload(uploadId: string): Promise<void> {
|
|
const uploadPath = plugins.path.join(this.uploadDir, uploadId);
|
|
await plugins.smartfs.directory(uploadPath).recursive().delete();
|
|
this.uploads.delete(uploadId);
|
|
}
|
|
|
|
/**
|
|
* List all in-progress uploads for a bucket
|
|
*/
|
|
public listUploads(bucket?: string): IMultipartUpload[] {
|
|
const uploads = Array.from(this.uploads.values());
|
|
if (bucket) {
|
|
return uploads.filter((u) => u.bucket === bucket);
|
|
}
|
|
return uploads;
|
|
}
|
|
|
|
/**
|
|
* List parts for an upload
|
|
*/
|
|
public listParts(uploadId: string): IPartInfo[] {
|
|
const upload = this.uploads.get(uploadId);
|
|
if (!upload) {
|
|
throw new Error('No such upload');
|
|
}
|
|
return Array.from(upload.parts.values()).sort((a, b) => a.partNumber - b.partNumber);
|
|
}
|
|
}
|