Files
smarts3/ts/classes/multipart-manager.ts

431 lines
12 KiB
TypeScript

import * as plugins from '../plugins.js';
import { Readable } from 'stream';
/**
* Multipart upload metadata
*/
export interface IMultipartUpload {
uploadId: string;
bucket: string;
key: string;
initiated: Date;
parts: Map<number, IPartInfo>;
metadata: Record<string, string>;
}
/**
* Part information
*/
export interface IPartInfo {
partNumber: number;
etag: string;
size: number;
lastModified: Date;
}
/**
* Serializable version of upload metadata for disk persistence
*/
interface ISerializableUpload {
uploadId: string;
bucket: string;
key: string;
initiated: string; // ISO date string
metadata: Record<string, string>;
parts: Array<{
partNumber: number;
etag: string;
size: number;
lastModified: string; // ISO date string
}>;
}
/**
* Manages multipart upload state and storage
*/
export class MultipartUploadManager {
private uploads: Map<string, IMultipartUpload> = new Map();
private uploadDir: string;
private cleanupInterval: NodeJS.Timeout | null = null;
private expirationDays: number;
private cleanupIntervalMinutes: number;
constructor(
private rootDir: string,
expirationDays: number = 7,
cleanupIntervalMinutes: number = 60
) {
this.uploadDir = plugins.path.join(rootDir, '.multipart');
this.expirationDays = expirationDays;
this.cleanupIntervalMinutes = cleanupIntervalMinutes;
}
/**
* Initialize multipart uploads directory
*/
public async initialize(): Promise<void> {
await plugins.smartfs.directory(this.uploadDir).recursive().create();
await this.restoreUploadsFromDisk();
}
/**
* Save upload metadata to disk for persistence
*/
private async saveUploadMetadata(uploadId: string): Promise<void> {
const upload = this.uploads.get(uploadId);
if (!upload) {
return;
}
const metadataPath = plugins.path.join(this.uploadDir, uploadId, 'metadata.json');
const serializable: ISerializableUpload = {
uploadId: upload.uploadId,
bucket: upload.bucket,
key: upload.key,
initiated: upload.initiated.toISOString(),
metadata: upload.metadata,
parts: Array.from(upload.parts.values()).map(part => ({
partNumber: part.partNumber,
etag: part.etag,
size: part.size,
lastModified: part.lastModified.toISOString(),
})),
};
await plugins.smartfs.file(metadataPath).write(JSON.stringify(serializable, null, 2));
}
/**
* Restore uploads from disk on initialization
*/
private async restoreUploadsFromDisk(): Promise<void> {
const uploadDirExists = await plugins.smartfs.directory(this.uploadDir).exists();
if (!uploadDirExists) {
return;
}
const entries = await plugins.smartfs.directory(this.uploadDir).includeStats().list();
for (const entry of entries) {
if (!entry.isDirectory) {
continue;
}
const uploadId = entry.name;
const metadataPath = plugins.path.join(this.uploadDir, uploadId, 'metadata.json');
// Check if metadata.json exists
const metadataExists = await plugins.smartfs.file(metadataPath).exists();
if (!metadataExists) {
// Orphaned upload directory - clean it up
console.warn(`Orphaned multipart upload directory found: ${uploadId}, cleaning up`);
await plugins.smartfs.directory(plugins.path.join(this.uploadDir, uploadId)).recursive().delete();
continue;
}
try {
// Read and parse metadata
const metadataContent = await plugins.smartfs.file(metadataPath).read();
const serialized: ISerializableUpload = JSON.parse(metadataContent as string);
// Restore to memory
const parts = new Map<number, IPartInfo>();
for (const part of serialized.parts) {
parts.set(part.partNumber, {
partNumber: part.partNumber,
etag: part.etag,
size: part.size,
lastModified: new Date(part.lastModified),
});
}
this.uploads.set(uploadId, {
uploadId: serialized.uploadId,
bucket: serialized.bucket,
key: serialized.key,
initiated: new Date(serialized.initiated),
parts,
metadata: serialized.metadata,
});
console.log(`Restored multipart upload: ${uploadId} (${serialized.bucket}/${serialized.key})`);
} catch (error) {
// Corrupted metadata - clean up
console.error(`Failed to restore multipart upload ${uploadId}:`, error);
await plugins.smartfs.directory(plugins.path.join(this.uploadDir, uploadId)).recursive().delete();
}
}
}
/**
* Generate a unique upload ID
*/
private generateUploadId(): string {
return plugins.crypto.randomBytes(16).toString('hex');
}
/**
* Initiate a new multipart upload
*/
public async initiateUpload(
bucket: string,
key: string,
metadata: Record<string, string>
): Promise<string> {
const uploadId = this.generateUploadId();
this.uploads.set(uploadId, {
uploadId,
bucket,
key,
initiated: new Date(),
parts: new Map(),
metadata,
});
// Create directory for this upload's parts
const uploadPath = plugins.path.join(this.uploadDir, uploadId);
await plugins.smartfs.directory(uploadPath).recursive().create();
// Persist metadata to disk
await this.saveUploadMetadata(uploadId);
return uploadId;
}
/**
* Upload a part
*/
public async uploadPart(
uploadId: string,
partNumber: number,
stream: Readable
): Promise<IPartInfo> {
const upload = this.uploads.get(uploadId);
if (!upload) {
throw new Error('No such upload');
}
const partPath = plugins.path.join(this.uploadDir, uploadId, `part-${partNumber}`);
// Write part to disk
const webWriteStream = await plugins.smartfs.file(partPath).writeStream();
const writer = webWriteStream.getWriter();
let size = 0;
const hash = plugins.crypto.createHash('md5');
for await (const chunk of stream) {
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
await writer.write(new Uint8Array(buffer));
hash.update(buffer);
size += buffer.length;
}
await writer.close();
const etag = hash.digest('hex');
const partInfo: IPartInfo = {
partNumber,
etag,
size,
lastModified: new Date(),
};
upload.parts.set(partNumber, partInfo);
// Persist updated metadata
await this.saveUploadMetadata(uploadId);
return partInfo;
}
/**
* Complete multipart upload - combine all parts
*/
public async completeUpload(
uploadId: string,
parts: Array<{ PartNumber: number; ETag: string }>
): Promise<{ etag: string; size: number }> {
const upload = this.uploads.get(uploadId);
if (!upload) {
throw new Error('No such upload');
}
// Verify all parts are uploaded
for (const part of parts) {
const uploadedPart = upload.parts.get(part.PartNumber);
if (!uploadedPart) {
throw new Error(`Part ${part.PartNumber} not uploaded`);
}
// Normalize ETag format (remove quotes if present)
const normalizedETag = part.ETag.replace(/"/g, '');
if (uploadedPart.etag !== normalizedETag) {
throw new Error(`Part ${part.PartNumber} ETag mismatch`);
}
}
// Sort parts by part number
const sortedParts = parts.sort((a, b) => a.PartNumber - b.PartNumber);
// Combine parts into final object
const finalPath = plugins.path.join(this.uploadDir, uploadId, 'final');
const webWriteStream = await plugins.smartfs.file(finalPath).writeStream();
const writer = webWriteStream.getWriter();
const hash = plugins.crypto.createHash('md5');
let totalSize = 0;
for (const part of sortedParts) {
const partPath = plugins.path.join(this.uploadDir, uploadId, `part-${part.PartNumber}`);
// Read part and write to final file
const partContent = await plugins.smartfs.file(partPath).read();
const buffer = Buffer.isBuffer(partContent) ? partContent : Buffer.from(partContent as string);
await writer.write(new Uint8Array(buffer));
hash.update(buffer);
totalSize += buffer.length;
}
await writer.close();
const etag = hash.digest('hex');
return { etag, size: totalSize };
}
/**
* Get the final combined file path
*/
public getFinalPath(uploadId: string): string {
return plugins.path.join(this.uploadDir, uploadId, 'final');
}
/**
* Get upload metadata
*/
public getUpload(uploadId: string): IMultipartUpload | undefined {
return this.uploads.get(uploadId);
}
/**
* Abort multipart upload - clean up parts
*/
public async abortUpload(uploadId: string): Promise<void> {
const upload = this.uploads.get(uploadId);
if (!upload) {
throw new Error('No such upload');
}
// Delete upload directory
const uploadPath = plugins.path.join(this.uploadDir, uploadId);
await plugins.smartfs.directory(uploadPath).recursive().delete();
// Remove from memory
this.uploads.delete(uploadId);
}
/**
* Clean up upload after completion
*/
public async cleanupUpload(uploadId: string): Promise<void> {
const uploadPath = plugins.path.join(this.uploadDir, uploadId);
await plugins.smartfs.directory(uploadPath).recursive().delete();
this.uploads.delete(uploadId);
}
/**
* List all in-progress uploads for a bucket
*/
public listUploads(bucket?: string): IMultipartUpload[] {
const uploads = Array.from(this.uploads.values());
if (bucket) {
return uploads.filter((u) => u.bucket === bucket);
}
return uploads;
}
/**
* List parts for an upload
*/
public listParts(uploadId: string): IPartInfo[] {
const upload = this.uploads.get(uploadId);
if (!upload) {
throw new Error('No such upload');
}
return Array.from(upload.parts.values()).sort((a, b) => a.partNumber - b.partNumber);
}
/**
* Start automatic cleanup task for expired uploads
*/
public startCleanupTask(): void {
if (this.cleanupInterval) {
console.warn('Cleanup task is already running');
return;
}
// Run cleanup immediately on start
this.performCleanup().catch(err => {
console.error('Failed to perform initial multipart cleanup:', err);
});
// Then schedule periodic cleanup
const intervalMs = this.cleanupIntervalMinutes * 60 * 1000;
this.cleanupInterval = setInterval(() => {
this.performCleanup().catch(err => {
console.error('Failed to perform scheduled multipart cleanup:', err);
});
}, intervalMs);
console.log(`Multipart cleanup task started (interval: ${this.cleanupIntervalMinutes} minutes, expiration: ${this.expirationDays} days)`);
}
/**
* Stop automatic cleanup task
*/
public stopCleanupTask(): void {
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
this.cleanupInterval = null;
console.log('Multipart cleanup task stopped');
}
}
/**
* Perform cleanup of expired uploads
*/
private async performCleanup(): Promise<void> {
const now = Date.now();
const expirationMs = this.expirationDays * 24 * 60 * 60 * 1000;
const expiredUploads: string[] = [];
// Find expired uploads
for (const [uploadId, upload] of this.uploads.entries()) {
const age = now - upload.initiated.getTime();
if (age > expirationMs) {
expiredUploads.push(uploadId);
}
}
if (expiredUploads.length === 0) {
return;
}
console.log(`Cleaning up ${expiredUploads.length} expired multipart upload(s)`);
// Delete expired uploads
for (const uploadId of expiredUploads) {
try {
await this.abortUpload(uploadId);
console.log(`Deleted expired multipart upload: ${uploadId}`);
} catch (err) {
console.error(`Failed to delete expired upload ${uploadId}:`, err);
}
}
}
}