diff --git a/changelog.md b/changelog.md index 1c1299d..9bbd305 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,14 @@ # Changelog +## 2025-11-23 - 3.2.0 - feat(multipart) +Add multipart upload support with MultipartUploadManager and controller integration + +- Introduce MultipartUploadManager (ts/classes/multipart-manager.ts) to manage multipart upload lifecycle and store parts on disk +- Wire multipart manager into server and request context (S3Context, Smarts3Server) and initialize multipart storage on server start +- Add multipart-related routes and handlers in ObjectController: initiate (POST ?uploads), upload part (PUT ?partNumber&uploadId), complete (POST ?uploadId), and abort (DELETE ?uploadId) +- On complete, combine parts into final object and store via existing FilesystemStore workflow +- Expose multipart manager on Smarts3Server for controller access + ## 2025-11-23 - 3.1.0 - feat(logging) Add structured Logger and integrate into Smarts3Server; pass full config to server diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 3f1f242..71bead7 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smarts3', - version: '3.1.0', + version: '3.2.0', description: 'A Node.js TypeScript package to create a local S3 endpoint for simulating AWS S3 operations using mapped local directories for development and testing purposes.' } diff --git a/ts/classes/context.ts b/ts/classes/context.ts index 43b38fd..2401ca6 100644 --- a/ts/classes/context.ts +++ b/ts/classes/context.ts @@ -2,6 +2,7 @@ import * as plugins from '../plugins.js'; import { S3Error } from './s3-error.js'; import { createXml } from '../utils/xml.utils.js'; import type { FilesystemStore } from './filesystem-store.js'; +import type { MultipartUploadManager } from './multipart-manager.js'; import type { Readable } from 'stream'; /** @@ -14,6 +15,7 @@ export class S3Context { public params: Record = {}; public query: Record = {}; public store: FilesystemStore; + public multipart: MultipartUploadManager; private req: plugins.http.IncomingMessage; private res: plugins.http.ServerResponse; @@ -23,11 +25,13 @@ export class S3Context { constructor( req: plugins.http.IncomingMessage, res: plugins.http.ServerResponse, - store: FilesystemStore + store: FilesystemStore, + multipart: MultipartUploadManager ) { this.req = req; this.res = res; this.store = store; + this.multipart = multipart; this.method = req.method || 'GET'; this.headers = req.headers; diff --git a/ts/classes/smarts3-server.ts b/ts/classes/smarts3-server.ts index a98d7b8..b09cdbb 100644 --- a/ts/classes/smarts3-server.ts +++ b/ts/classes/smarts3-server.ts @@ -5,6 +5,7 @@ import { S3Context } from './context.js'; import { FilesystemStore } from './filesystem-store.js'; import { S3Error } from './s3-error.js'; import { Logger } from './logger.js'; +import { MultipartUploadManager } from './multipart-manager.js'; import { ServiceController } from '../controllers/service.controller.js'; import { BucketController } from '../controllers/bucket.controller.js'; import { ObjectController } from '../controllers/object.controller.js'; @@ -28,6 +29,7 @@ export class Smarts3Server { private router: S3Router; private middlewares: MiddlewareStack; public store: FilesystemStore; // Made public for direct access from Smarts3 class + public multipart: MultipartUploadManager; // Made public for controller access private options: Required>; private config: Required; private logger: Logger; @@ -80,6 +82,7 @@ export class Smarts3Server { this.logger = new Logger(this.config.logging); this.store = new FilesystemStore(this.options.directory); + this.multipart = new MultipartUploadManager(this.options.directory); this.router = new S3Router(); this.middlewares = new MiddlewareStack(); @@ -220,6 +223,7 @@ export class Smarts3Server { // Object level (/:bucket/:key*) this.router.put('/:bucket/:key*', ObjectController.putObject); + this.router.post('/:bucket/:key*', ObjectController.postObject); // For multipart operations this.router.get('/:bucket/:key*', ObjectController.getObject); this.router.head('/:bucket/:key*', ObjectController.headObject); this.router.delete('/:bucket/:key*', ObjectController.deleteObject); @@ -232,7 +236,7 @@ export class Smarts3Server { req: plugins.http.IncomingMessage, res: plugins.http.ServerResponse ): Promise { - const context = new S3Context(req, res, this.store); + const context = new S3Context(req, res, this.store, this.multipart); try { // Execute middleware stack @@ -290,6 +294,9 @@ export class Smarts3Server { // Initialize store await this.store.initialize(); + // Initialize multipart upload manager + await this.multipart.initialize(); + // Clean slate if requested if (this.options.cleanSlate) { await this.store.reset(); diff --git a/ts/controllers/object.controller.ts b/ts/controllers/object.controller.ts index f9c2f8d..7bd55c9 100644 --- a/ts/controllers/object.controller.ts +++ b/ts/controllers/object.controller.ts @@ -6,7 +6,7 @@ import type { S3Context } from '../classes/context.js'; */ export class ObjectController { /** - * PUT /:bucket/:key* - Upload object or copy object + * PUT /:bucket/:key* - Upload object, copy object, or upload part */ public static async putObject( req: plugins.http.IncomingMessage, @@ -16,6 +16,11 @@ export class ObjectController { ): Promise { const { bucket, key } = params; + // Check if this is a multipart upload part + if (ctx.query.partNumber && ctx.query.uploadId) { + return ObjectController.uploadPart(req, res, ctx, params); + } + // Check if this is a COPY operation const copySource = ctx.headers['x-amz-copy-source'] as string | undefined; if (copySource) { @@ -133,7 +138,7 @@ export class ObjectController { } /** - * DELETE /:bucket/:key* - Delete object + * DELETE /:bucket/:key* - Delete object or abort multipart upload */ public static async deleteObject( req: plugins.http.IncomingMessage, @@ -143,6 +148,11 @@ export class ObjectController { ): Promise { const { bucket, key } = params; + // Check if this is an abort multipart upload + if (ctx.query.uploadId) { + return ObjectController.abortMultipartUpload(req, res, ctx, params); + } + await ctx.store.deleteObject(bucket, key); ctx.status(204).send(''); } @@ -201,4 +211,168 @@ export class ObjectController { }, }); } + + /** + * POST /:bucket/:key* - Initiate or complete multipart upload + */ + public static async postObject( + req: plugins.http.IncomingMessage, + res: plugins.http.ServerResponse, + ctx: S3Context, + params: Record + ): Promise { + // Check if this is initiate multipart upload + if (ctx.query.uploads !== undefined) { + return ObjectController.initiateMultipartUpload(req, res, ctx, params); + } + + // Check if this is complete multipart upload + if (ctx.query.uploadId) { + return ObjectController.completeMultipartUpload(req, res, ctx, params); + } + + ctx.throw('InvalidRequest', 'Invalid POST request'); + } + + /** + * Initiate Multipart Upload (POST with ?uploads) + */ + private static async initiateMultipartUpload( + req: plugins.http.IncomingMessage, + res: plugins.http.ServerResponse, + ctx: S3Context, + params: Record + ): Promise { + const { bucket, key } = params; + + // Extract metadata from headers + const metadata: Record = {}; + for (const [header, value] of Object.entries(ctx.headers)) { + if (header.startsWith('x-amz-meta-')) { + metadata[header] = value as string; + } + if (header === 'content-type' && value) { + metadata['content-type'] = value as string; + } + } + + // Initiate upload + const uploadId = await ctx.multipart.initiateUpload(bucket, key, metadata); + + // Send XML response + await ctx.sendXML({ + InitiateMultipartUploadResult: { + Bucket: bucket, + Key: key, + UploadId: uploadId, + }, + }); + } + + /** + * Upload Part (PUT with ?partNumber&uploadId) + */ + private static async uploadPart( + req: plugins.http.IncomingMessage, + res: plugins.http.ServerResponse, + ctx: S3Context, + params: Record + ): Promise { + const uploadId = ctx.query.uploadId!; + const partNumber = parseInt(ctx.query.partNumber!); + + if (isNaN(partNumber) || partNumber < 1 || partNumber > 10000) { + ctx.throw('InvalidPartNumber', 'Part number must be between 1 and 10000'); + } + + // Upload the part + const partInfo = await ctx.multipart.uploadPart( + uploadId, + partNumber, + ctx.getRequestStream() as any as import('stream').Readable + ); + + // Set ETag header (part ETag) + ctx.setHeader('ETag', `"${partInfo.etag}"`); + ctx.status(200).send(''); + } + + /** + * Complete Multipart Upload (POST with ?uploadId) + */ + private static async completeMultipartUpload( + req: plugins.http.IncomingMessage, + res: plugins.http.ServerResponse, + ctx: S3Context, + params: Record + ): Promise { + const { bucket, key } = params; + const uploadId = ctx.query.uploadId!; + + // Read and parse request body (XML with part list) + const body = await ctx.readBody(); + + // Parse XML to extract parts + // Expected format: 1"etag"... + const partMatches = body.matchAll(/.*?(\d+)<\/PartNumber>.*?(.*?)<\/ETag>.*?<\/Part>/gs); + const parts: Array<{ PartNumber: number; ETag: string }> = []; + + for (const match of partMatches) { + parts.push({ + PartNumber: parseInt(match[1]), + ETag: match[2], + }); + } + + // Complete the upload + const result = await ctx.multipart.completeUpload(uploadId, parts); + + // Get upload metadata + const upload = ctx.multipart.getUpload(uploadId); + if (!upload) { + ctx.throw('NoSuchUpload', 'The specified upload does not exist'); + } + + // Move final file to object store + const finalPath = ctx.multipart.getFinalPath(uploadId); + const finalContent = await plugins.smartfs.file(finalPath).read(); + const finalStream = plugins.http.IncomingMessage.prototype; + + // Create a readable stream from the buffer + const { Readable } = await import('stream'); + const finalReadableStream = Readable.from([finalContent]); + + // Store the final object + await ctx.store.putObject(bucket, key, finalReadableStream, upload.metadata); + + // Clean up multipart upload data + await ctx.multipart.cleanupUpload(uploadId); + + // Send XML response + await ctx.sendXML({ + CompleteMultipartUploadResult: { + Location: `/${bucket}/${key}`, + Bucket: bucket, + Key: key, + ETag: `"${result.etag}"`, + }, + }); + } + + /** + * Abort Multipart Upload (DELETE with ?uploadId) + */ + private static async abortMultipartUpload( + req: plugins.http.IncomingMessage, + res: plugins.http.ServerResponse, + ctx: S3Context, + params: Record + ): Promise { + const uploadId = ctx.query.uploadId!; + + // Abort and cleanup + await ctx.multipart.abortUpload(uploadId); + + ctx.status(204).send(''); + } } diff --git a/ts/index.ts b/ts/index.ts index f223184..dd8bacc 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -153,7 +153,7 @@ function mergeConfig(userConfig: ISmarts3Config): Required { */ export class Smarts3 { // STATIC - public static async createAndStart(configArg: ISmarts3Config = {}) { + public static async createAndStart(configArg: ISmarts3Config | ILegacySmarts3Config = {}) { const smartS3Instance = new Smarts3(configArg); await smartS3Instance.start(); return smartS3Instance; @@ -163,7 +163,7 @@ export class Smarts3 { public config: Required; public s3Instance: Smarts3Server; - constructor(configArg: ISmarts3Config = {}) { + constructor(configArg: ISmarts3Config | ILegacySmarts3Config = {}) { this.config = mergeConfig(configArg); }