431 lines
12 KiB
TypeScript
431 lines
12 KiB
TypeScript
import * as plugins from '../plugins.js';
|
|
import { Readable } from 'stream';
|
|
|
|
/**
|
|
* Multipart upload metadata
|
|
*/
|
|
export interface IMultipartUpload {
|
|
uploadId: string;
|
|
bucket: string;
|
|
key: string;
|
|
initiated: Date;
|
|
parts: Map<number, IPartInfo>;
|
|
metadata: Record<string, string>;
|
|
}
|
|
|
|
/**
|
|
* Part information
|
|
*/
|
|
export interface IPartInfo {
|
|
partNumber: number;
|
|
etag: string;
|
|
size: number;
|
|
lastModified: Date;
|
|
}
|
|
|
|
/**
|
|
* Serializable version of upload metadata for disk persistence
|
|
*/
|
|
interface ISerializableUpload {
|
|
uploadId: string;
|
|
bucket: string;
|
|
key: string;
|
|
initiated: string; // ISO date string
|
|
metadata: Record<string, string>;
|
|
parts: Array<{
|
|
partNumber: number;
|
|
etag: string;
|
|
size: number;
|
|
lastModified: string; // ISO date string
|
|
}>;
|
|
}
|
|
|
|
/**
|
|
* Manages multipart upload state and storage
|
|
*/
|
|
export class MultipartUploadManager {
|
|
private uploads: Map<string, IMultipartUpload> = new Map();
|
|
private uploadDir: string;
|
|
private cleanupInterval: NodeJS.Timeout | null = null;
|
|
private expirationDays: number;
|
|
private cleanupIntervalMinutes: number;
|
|
|
|
constructor(
|
|
private rootDir: string,
|
|
expirationDays: number = 7,
|
|
cleanupIntervalMinutes: number = 60
|
|
) {
|
|
this.uploadDir = plugins.path.join(rootDir, '.multipart');
|
|
this.expirationDays = expirationDays;
|
|
this.cleanupIntervalMinutes = cleanupIntervalMinutes;
|
|
}
|
|
|
|
/**
|
|
* Initialize multipart uploads directory
|
|
*/
|
|
public async initialize(): Promise<void> {
|
|
await plugins.smartfs.directory(this.uploadDir).recursive().create();
|
|
await this.restoreUploadsFromDisk();
|
|
}
|
|
|
|
/**
|
|
* Save upload metadata to disk for persistence
|
|
*/
|
|
private async saveUploadMetadata(uploadId: string): Promise<void> {
|
|
const upload = this.uploads.get(uploadId);
|
|
if (!upload) {
|
|
return;
|
|
}
|
|
|
|
const metadataPath = plugins.path.join(this.uploadDir, uploadId, 'metadata.json');
|
|
|
|
const serializable: ISerializableUpload = {
|
|
uploadId: upload.uploadId,
|
|
bucket: upload.bucket,
|
|
key: upload.key,
|
|
initiated: upload.initiated.toISOString(),
|
|
metadata: upload.metadata,
|
|
parts: Array.from(upload.parts.values()).map(part => ({
|
|
partNumber: part.partNumber,
|
|
etag: part.etag,
|
|
size: part.size,
|
|
lastModified: part.lastModified.toISOString(),
|
|
})),
|
|
};
|
|
|
|
await plugins.smartfs.file(metadataPath).write(JSON.stringify(serializable, null, 2));
|
|
}
|
|
|
|
/**
|
|
* Restore uploads from disk on initialization
|
|
*/
|
|
private async restoreUploadsFromDisk(): Promise<void> {
|
|
const uploadDirExists = await plugins.smartfs.directory(this.uploadDir).exists();
|
|
if (!uploadDirExists) {
|
|
return;
|
|
}
|
|
|
|
const entries = await plugins.smartfs.directory(this.uploadDir).includeStats().list();
|
|
|
|
for (const entry of entries) {
|
|
if (!entry.isDirectory) {
|
|
continue;
|
|
}
|
|
|
|
const uploadId = entry.name;
|
|
const metadataPath = plugins.path.join(this.uploadDir, uploadId, 'metadata.json');
|
|
|
|
// Check if metadata.json exists
|
|
const metadataExists = await plugins.smartfs.file(metadataPath).exists();
|
|
if (!metadataExists) {
|
|
// Orphaned upload directory - clean it up
|
|
console.warn(`Orphaned multipart upload directory found: ${uploadId}, cleaning up`);
|
|
await plugins.smartfs.directory(plugins.path.join(this.uploadDir, uploadId)).recursive().delete();
|
|
continue;
|
|
}
|
|
|
|
try {
|
|
// Read and parse metadata
|
|
const metadataContent = await plugins.smartfs.file(metadataPath).read();
|
|
const serialized: ISerializableUpload = JSON.parse(metadataContent as string);
|
|
|
|
// Restore to memory
|
|
const parts = new Map<number, IPartInfo>();
|
|
for (const part of serialized.parts) {
|
|
parts.set(part.partNumber, {
|
|
partNumber: part.partNumber,
|
|
etag: part.etag,
|
|
size: part.size,
|
|
lastModified: new Date(part.lastModified),
|
|
});
|
|
}
|
|
|
|
this.uploads.set(uploadId, {
|
|
uploadId: serialized.uploadId,
|
|
bucket: serialized.bucket,
|
|
key: serialized.key,
|
|
initiated: new Date(serialized.initiated),
|
|
parts,
|
|
metadata: serialized.metadata,
|
|
});
|
|
|
|
console.log(`Restored multipart upload: ${uploadId} (${serialized.bucket}/${serialized.key})`);
|
|
} catch (error) {
|
|
// Corrupted metadata - clean up
|
|
console.error(`Failed to restore multipart upload ${uploadId}:`, error);
|
|
await plugins.smartfs.directory(plugins.path.join(this.uploadDir, uploadId)).recursive().delete();
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Generate a unique upload ID
|
|
*/
|
|
private generateUploadId(): string {
|
|
return plugins.crypto.randomBytes(16).toString('hex');
|
|
}
|
|
|
|
/**
|
|
* Initiate a new multipart upload
|
|
*/
|
|
public async initiateUpload(
|
|
bucket: string,
|
|
key: string,
|
|
metadata: Record<string, string>
|
|
): Promise<string> {
|
|
const uploadId = this.generateUploadId();
|
|
|
|
this.uploads.set(uploadId, {
|
|
uploadId,
|
|
bucket,
|
|
key,
|
|
initiated: new Date(),
|
|
parts: new Map(),
|
|
metadata,
|
|
});
|
|
|
|
// Create directory for this upload's parts
|
|
const uploadPath = plugins.path.join(this.uploadDir, uploadId);
|
|
await plugins.smartfs.directory(uploadPath).recursive().create();
|
|
|
|
// Persist metadata to disk
|
|
await this.saveUploadMetadata(uploadId);
|
|
|
|
return uploadId;
|
|
}
|
|
|
|
/**
|
|
* Upload a part
|
|
*/
|
|
public async uploadPart(
|
|
uploadId: string,
|
|
partNumber: number,
|
|
stream: Readable
|
|
): Promise<IPartInfo> {
|
|
const upload = this.uploads.get(uploadId);
|
|
if (!upload) {
|
|
throw new Error('No such upload');
|
|
}
|
|
|
|
const partPath = plugins.path.join(this.uploadDir, uploadId, `part-${partNumber}`);
|
|
|
|
// Write part to disk
|
|
const webWriteStream = await plugins.smartfs.file(partPath).writeStream();
|
|
const writer = webWriteStream.getWriter();
|
|
|
|
let size = 0;
|
|
const hash = plugins.crypto.createHash('md5');
|
|
|
|
for await (const chunk of stream) {
|
|
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
|
|
await writer.write(new Uint8Array(buffer));
|
|
hash.update(buffer);
|
|
size += buffer.length;
|
|
}
|
|
|
|
await writer.close();
|
|
|
|
const etag = hash.digest('hex');
|
|
|
|
const partInfo: IPartInfo = {
|
|
partNumber,
|
|
etag,
|
|
size,
|
|
lastModified: new Date(),
|
|
};
|
|
|
|
upload.parts.set(partNumber, partInfo);
|
|
|
|
// Persist updated metadata
|
|
await this.saveUploadMetadata(uploadId);
|
|
|
|
return partInfo;
|
|
}
|
|
|
|
/**
|
|
* Complete multipart upload - combine all parts
|
|
*/
|
|
public async completeUpload(
|
|
uploadId: string,
|
|
parts: Array<{ PartNumber: number; ETag: string }>
|
|
): Promise<{ etag: string; size: number }> {
|
|
const upload = this.uploads.get(uploadId);
|
|
if (!upload) {
|
|
throw new Error('No such upload');
|
|
}
|
|
|
|
// Verify all parts are uploaded
|
|
for (const part of parts) {
|
|
const uploadedPart = upload.parts.get(part.PartNumber);
|
|
if (!uploadedPart) {
|
|
throw new Error(`Part ${part.PartNumber} not uploaded`);
|
|
}
|
|
// Normalize ETag format (remove quotes if present)
|
|
const normalizedETag = part.ETag.replace(/"/g, '');
|
|
if (uploadedPart.etag !== normalizedETag) {
|
|
throw new Error(`Part ${part.PartNumber} ETag mismatch`);
|
|
}
|
|
}
|
|
|
|
// Sort parts by part number
|
|
const sortedParts = parts.sort((a, b) => a.PartNumber - b.PartNumber);
|
|
|
|
// Combine parts into final object
|
|
const finalPath = plugins.path.join(this.uploadDir, uploadId, 'final');
|
|
const webWriteStream = await plugins.smartfs.file(finalPath).writeStream();
|
|
const writer = webWriteStream.getWriter();
|
|
|
|
const hash = plugins.crypto.createHash('md5');
|
|
let totalSize = 0;
|
|
|
|
for (const part of sortedParts) {
|
|
const partPath = plugins.path.join(this.uploadDir, uploadId, `part-${part.PartNumber}`);
|
|
|
|
// Read part and write to final file
|
|
const partContent = await plugins.smartfs.file(partPath).read();
|
|
const buffer = Buffer.isBuffer(partContent) ? partContent : Buffer.from(partContent as string);
|
|
|
|
await writer.write(new Uint8Array(buffer));
|
|
hash.update(buffer);
|
|
totalSize += buffer.length;
|
|
}
|
|
|
|
await writer.close();
|
|
|
|
const etag = hash.digest('hex');
|
|
|
|
return { etag, size: totalSize };
|
|
}
|
|
|
|
/**
|
|
* Get the final combined file path
|
|
*/
|
|
public getFinalPath(uploadId: string): string {
|
|
return plugins.path.join(this.uploadDir, uploadId, 'final');
|
|
}
|
|
|
|
/**
|
|
* Get upload metadata
|
|
*/
|
|
public getUpload(uploadId: string): IMultipartUpload | undefined {
|
|
return this.uploads.get(uploadId);
|
|
}
|
|
|
|
/**
|
|
* Abort multipart upload - clean up parts
|
|
*/
|
|
public async abortUpload(uploadId: string): Promise<void> {
|
|
const upload = this.uploads.get(uploadId);
|
|
if (!upload) {
|
|
throw new Error('No such upload');
|
|
}
|
|
|
|
// Delete upload directory
|
|
const uploadPath = plugins.path.join(this.uploadDir, uploadId);
|
|
await plugins.smartfs.directory(uploadPath).recursive().delete();
|
|
|
|
// Remove from memory
|
|
this.uploads.delete(uploadId);
|
|
}
|
|
|
|
/**
|
|
* Clean up upload after completion
|
|
*/
|
|
public async cleanupUpload(uploadId: string): Promise<void> {
|
|
const uploadPath = plugins.path.join(this.uploadDir, uploadId);
|
|
await plugins.smartfs.directory(uploadPath).recursive().delete();
|
|
this.uploads.delete(uploadId);
|
|
}
|
|
|
|
/**
|
|
* List all in-progress uploads for a bucket
|
|
*/
|
|
public listUploads(bucket?: string): IMultipartUpload[] {
|
|
const uploads = Array.from(this.uploads.values());
|
|
if (bucket) {
|
|
return uploads.filter((u) => u.bucket === bucket);
|
|
}
|
|
return uploads;
|
|
}
|
|
|
|
/**
|
|
* List parts for an upload
|
|
*/
|
|
public listParts(uploadId: string): IPartInfo[] {
|
|
const upload = this.uploads.get(uploadId);
|
|
if (!upload) {
|
|
throw new Error('No such upload');
|
|
}
|
|
return Array.from(upload.parts.values()).sort((a, b) => a.partNumber - b.partNumber);
|
|
}
|
|
|
|
/**
|
|
* Start automatic cleanup task for expired uploads
|
|
*/
|
|
public startCleanupTask(): void {
|
|
if (this.cleanupInterval) {
|
|
console.warn('Cleanup task is already running');
|
|
return;
|
|
}
|
|
|
|
// Run cleanup immediately on start
|
|
this.performCleanup().catch(err => {
|
|
console.error('Failed to perform initial multipart cleanup:', err);
|
|
});
|
|
|
|
// Then schedule periodic cleanup
|
|
const intervalMs = this.cleanupIntervalMinutes * 60 * 1000;
|
|
this.cleanupInterval = setInterval(() => {
|
|
this.performCleanup().catch(err => {
|
|
console.error('Failed to perform scheduled multipart cleanup:', err);
|
|
});
|
|
}, intervalMs);
|
|
|
|
console.log(`Multipart cleanup task started (interval: ${this.cleanupIntervalMinutes} minutes, expiration: ${this.expirationDays} days)`);
|
|
}
|
|
|
|
/**
|
|
* Stop automatic cleanup task
|
|
*/
|
|
public stopCleanupTask(): void {
|
|
if (this.cleanupInterval) {
|
|
clearInterval(this.cleanupInterval);
|
|
this.cleanupInterval = null;
|
|
console.log('Multipart cleanup task stopped');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Perform cleanup of expired uploads
|
|
*/
|
|
private async performCleanup(): Promise<void> {
|
|
const now = Date.now();
|
|
const expirationMs = this.expirationDays * 24 * 60 * 60 * 1000;
|
|
const expiredUploads: string[] = [];
|
|
|
|
// Find expired uploads
|
|
for (const [uploadId, upload] of this.uploads.entries()) {
|
|
const age = now - upload.initiated.getTime();
|
|
if (age > expirationMs) {
|
|
expiredUploads.push(uploadId);
|
|
}
|
|
}
|
|
|
|
if (expiredUploads.length === 0) {
|
|
return;
|
|
}
|
|
|
|
console.log(`Cleaning up ${expiredUploads.length} expired multipart upload(s)`);
|
|
|
|
// Delete expired uploads
|
|
for (const uploadId of expiredUploads) {
|
|
try {
|
|
await this.abortUpload(uploadId);
|
|
console.log(`Deleted expired multipart upload: ${uploadId}`);
|
|
} catch (err) {
|
|
console.error(`Failed to delete expired upload ${uploadId}:`, err);
|
|
}
|
|
}
|
|
}
|
|
}
|