feat(multipart): Add multipart upload support with MultipartUploadManager and controller integration
This commit is contained in:
@@ -6,7 +6,7 @@ import type { S3Context } from '../classes/context.js';
|
||||
*/
|
||||
export class ObjectController {
|
||||
/**
|
||||
* PUT /:bucket/:key* - Upload object or copy object
|
||||
* PUT /:bucket/:key* - Upload object, copy object, or upload part
|
||||
*/
|
||||
public static async putObject(
|
||||
req: plugins.http.IncomingMessage,
|
||||
@@ -16,6 +16,11 @@ export class ObjectController {
|
||||
): Promise<void> {
|
||||
const { bucket, key } = params;
|
||||
|
||||
// Check if this is a multipart upload part
|
||||
if (ctx.query.partNumber && ctx.query.uploadId) {
|
||||
return ObjectController.uploadPart(req, res, ctx, params);
|
||||
}
|
||||
|
||||
// Check if this is a COPY operation
|
||||
const copySource = ctx.headers['x-amz-copy-source'] as string | undefined;
|
||||
if (copySource) {
|
||||
@@ -133,7 +138,7 @@ export class ObjectController {
|
||||
}
|
||||
|
||||
/**
|
||||
* DELETE /:bucket/:key* - Delete object
|
||||
* DELETE /:bucket/:key* - Delete object or abort multipart upload
|
||||
*/
|
||||
public static async deleteObject(
|
||||
req: plugins.http.IncomingMessage,
|
||||
@@ -143,6 +148,11 @@ export class ObjectController {
|
||||
): Promise<void> {
|
||||
const { bucket, key } = params;
|
||||
|
||||
// Check if this is an abort multipart upload
|
||||
if (ctx.query.uploadId) {
|
||||
return ObjectController.abortMultipartUpload(req, res, ctx, params);
|
||||
}
|
||||
|
||||
await ctx.store.deleteObject(bucket, key);
|
||||
ctx.status(204).send('');
|
||||
}
|
||||
@@ -201,4 +211,168 @@ export class ObjectController {
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* POST /:bucket/:key* - Initiate or complete multipart upload
|
||||
*/
|
||||
public static async postObject(
|
||||
req: plugins.http.IncomingMessage,
|
||||
res: plugins.http.ServerResponse,
|
||||
ctx: S3Context,
|
||||
params: Record<string, string>
|
||||
): Promise<void> {
|
||||
// Check if this is initiate multipart upload
|
||||
if (ctx.query.uploads !== undefined) {
|
||||
return ObjectController.initiateMultipartUpload(req, res, ctx, params);
|
||||
}
|
||||
|
||||
// Check if this is complete multipart upload
|
||||
if (ctx.query.uploadId) {
|
||||
return ObjectController.completeMultipartUpload(req, res, ctx, params);
|
||||
}
|
||||
|
||||
ctx.throw('InvalidRequest', 'Invalid POST request');
|
||||
}
|
||||
|
||||
/**
|
||||
* Initiate Multipart Upload (POST with ?uploads)
|
||||
*/
|
||||
private static async initiateMultipartUpload(
|
||||
req: plugins.http.IncomingMessage,
|
||||
res: plugins.http.ServerResponse,
|
||||
ctx: S3Context,
|
||||
params: Record<string, string>
|
||||
): Promise<void> {
|
||||
const { bucket, key } = params;
|
||||
|
||||
// Extract metadata from headers
|
||||
const metadata: Record<string, string> = {};
|
||||
for (const [header, value] of Object.entries(ctx.headers)) {
|
||||
if (header.startsWith('x-amz-meta-')) {
|
||||
metadata[header] = value as string;
|
||||
}
|
||||
if (header === 'content-type' && value) {
|
||||
metadata['content-type'] = value as string;
|
||||
}
|
||||
}
|
||||
|
||||
// Initiate upload
|
||||
const uploadId = await ctx.multipart.initiateUpload(bucket, key, metadata);
|
||||
|
||||
// Send XML response
|
||||
await ctx.sendXML({
|
||||
InitiateMultipartUploadResult: {
|
||||
Bucket: bucket,
|
||||
Key: key,
|
||||
UploadId: uploadId,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload Part (PUT with ?partNumber&uploadId)
|
||||
*/
|
||||
private static async uploadPart(
|
||||
req: plugins.http.IncomingMessage,
|
||||
res: plugins.http.ServerResponse,
|
||||
ctx: S3Context,
|
||||
params: Record<string, string>
|
||||
): Promise<void> {
|
||||
const uploadId = ctx.query.uploadId!;
|
||||
const partNumber = parseInt(ctx.query.partNumber!);
|
||||
|
||||
if (isNaN(partNumber) || partNumber < 1 || partNumber > 10000) {
|
||||
ctx.throw('InvalidPartNumber', 'Part number must be between 1 and 10000');
|
||||
}
|
||||
|
||||
// Upload the part
|
||||
const partInfo = await ctx.multipart.uploadPart(
|
||||
uploadId,
|
||||
partNumber,
|
||||
ctx.getRequestStream() as any as import('stream').Readable
|
||||
);
|
||||
|
||||
// Set ETag header (part ETag)
|
||||
ctx.setHeader('ETag', `"${partInfo.etag}"`);
|
||||
ctx.status(200).send('');
|
||||
}
|
||||
|
||||
/**
|
||||
* Complete Multipart Upload (POST with ?uploadId)
|
||||
*/
|
||||
private static async completeMultipartUpload(
|
||||
req: plugins.http.IncomingMessage,
|
||||
res: plugins.http.ServerResponse,
|
||||
ctx: S3Context,
|
||||
params: Record<string, string>
|
||||
): Promise<void> {
|
||||
const { bucket, key } = params;
|
||||
const uploadId = ctx.query.uploadId!;
|
||||
|
||||
// Read and parse request body (XML with part list)
|
||||
const body = await ctx.readBody();
|
||||
|
||||
// Parse XML to extract parts
|
||||
// Expected format: <CompleteMultipartUpload><Part><PartNumber>1</PartNumber><ETag>"etag"</ETag></Part>...</CompleteMultipartUpload>
|
||||
const partMatches = body.matchAll(/<Part>.*?<PartNumber>(\d+)<\/PartNumber>.*?<ETag>(.*?)<\/ETag>.*?<\/Part>/gs);
|
||||
const parts: Array<{ PartNumber: number; ETag: string }> = [];
|
||||
|
||||
for (const match of partMatches) {
|
||||
parts.push({
|
||||
PartNumber: parseInt(match[1]),
|
||||
ETag: match[2],
|
||||
});
|
||||
}
|
||||
|
||||
// Complete the upload
|
||||
const result = await ctx.multipart.completeUpload(uploadId, parts);
|
||||
|
||||
// Get upload metadata
|
||||
const upload = ctx.multipart.getUpload(uploadId);
|
||||
if (!upload) {
|
||||
ctx.throw('NoSuchUpload', 'The specified upload does not exist');
|
||||
}
|
||||
|
||||
// Move final file to object store
|
||||
const finalPath = ctx.multipart.getFinalPath(uploadId);
|
||||
const finalContent = await plugins.smartfs.file(finalPath).read();
|
||||
const finalStream = plugins.http.IncomingMessage.prototype;
|
||||
|
||||
// Create a readable stream from the buffer
|
||||
const { Readable } = await import('stream');
|
||||
const finalReadableStream = Readable.from([finalContent]);
|
||||
|
||||
// Store the final object
|
||||
await ctx.store.putObject(bucket, key, finalReadableStream, upload.metadata);
|
||||
|
||||
// Clean up multipart upload data
|
||||
await ctx.multipart.cleanupUpload(uploadId);
|
||||
|
||||
// Send XML response
|
||||
await ctx.sendXML({
|
||||
CompleteMultipartUploadResult: {
|
||||
Location: `/${bucket}/${key}`,
|
||||
Bucket: bucket,
|
||||
Key: key,
|
||||
ETag: `"${result.etag}"`,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Abort Multipart Upload (DELETE with ?uploadId)
|
||||
*/
|
||||
private static async abortMultipartUpload(
|
||||
req: plugins.http.IncomingMessage,
|
||||
res: plugins.http.ServerResponse,
|
||||
ctx: S3Context,
|
||||
params: Record<string, string>
|
||||
): Promise<void> {
|
||||
const uploadId = ctx.query.uploadId!;
|
||||
|
||||
// Abort and cleanup
|
||||
await ctx.multipart.abortUpload(uploadId);
|
||||
|
||||
ctx.status(204).send('');
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user