// classes.bucket.ts import * as plugins from './plugins.js'; import * as helpers from './helpers.js'; import * as interfaces from './interfaces.js'; import { SmartBucket } from './classes.smartbucket.js'; import { Directory } from './classes.directory.js'; import { File } from './classes.file.js'; import { Trash } from './classes.trash.js'; /** * The bucket class exposes the basic functionality of a bucket. * The functions of the bucket alone are enough to * operate in S3 basic fashion on blobs of data. */ export class Bucket { public static async getBucketByName(smartbucketRef: SmartBucket, bucketNameArg: string) { const command = new plugins.s3.ListBucketsCommand({}); const buckets = await smartbucketRef.s3Client.send(command); const foundBucket = buckets.Buckets.find((bucket) => bucket.Name === bucketNameArg); if (foundBucket) { console.log(`bucket with name ${bucketNameArg} exists.`); console.log(`Taking this as base for new Bucket instance`); return new this(smartbucketRef, bucketNameArg); } else { console.log(`did not find bucket by name: ${bucketNameArg}`); return null; } } public static async createBucketByName(smartbucketRef: SmartBucket, bucketName: string) { const command = new plugins.s3.CreateBucketCommand({ Bucket: bucketName }); await smartbucketRef.s3Client.send(command).catch((e) => console.log(e)); return new Bucket(smartbucketRef, bucketName); } public static async removeBucketByName(smartbucketRef: SmartBucket, bucketName: string) { const command = new plugins.s3.DeleteBucketCommand({ Bucket: bucketName }); await smartbucketRef.s3Client.send(command).catch((e) => console.log(e)); } public smartbucketRef: SmartBucket; public name: string; constructor(smartbucketRef: SmartBucket, bucketName: string) { this.smartbucketRef = smartbucketRef; this.name = bucketName; } /** * gets the base directory of the bucket */ public async getBaseDirectory(): Promise { return new Directory(this, null!, ''); } /** * gets the trash directory */ public async getTrash(): Promise { const trash = new Trash(this); return trash; } public async getDirectoryFromPath( pathDescriptorArg: interfaces.IPathDecriptor ): Promise { if (!pathDescriptorArg.path && !pathDescriptorArg.directory) { return this.getBaseDirectory(); } const checkPath = await helpers.reducePathDescriptorToPath(pathDescriptorArg); const baseDirectory = await this.getBaseDirectory(); return await baseDirectory.getSubDirectoryByNameStrict(checkPath, { getEmptyDirectory: true, }); } // =============== // Fast Operations // =============== /** * store file */ public async fastPut( optionsArg: interfaces.IPathDecriptor & { contents: string | Buffer; overwrite?: boolean; } ): Promise { try { const reducedPath = await helpers.reducePathDescriptorToPath(optionsArg); const exists = await this.fastExists({ path: reducedPath }); if (exists && !optionsArg.overwrite) { console.error(`Object already exists at path '${reducedPath}' in bucket '${this.name}'.`); return; } else if (exists && optionsArg.overwrite) { console.log( `Overwriting existing object at path '${reducedPath}' in bucket '${this.name}'.` ); } else { console.log(`Creating new object at path '${reducedPath}' in bucket '${this.name}'.`); } const command = new plugins.s3.PutObjectCommand({ Bucket: this.name, Key: reducedPath, Body: optionsArg.contents, }); await this.smartbucketRef.s3Client.send(command); console.log(`Object '${reducedPath}' has been successfully stored in bucket '${this.name}'.`); const parsedPath = plugins.path.parse(reducedPath); return new File({ directoryRefArg: await this.getDirectoryFromPath({ path: parsedPath.dir, }), fileName: parsedPath.base, }); } catch (error) { console.error( `Error storing object at path '${optionsArg.path}' in bucket '${this.name}':`, error ); throw error; } } /** * get file */ public async fastGet(optionsArg: { path: string }): Promise { const done = plugins.smartpromise.defer(); let completeFile: Buffer; const replaySubject = await this.fastGetReplaySubject(optionsArg); const subscription = replaySubject.subscribe({ next: (chunk) => { if (completeFile) { completeFile = Buffer.concat([completeFile, chunk]); } else { completeFile = chunk; } }, complete: () => { done.resolve(); subscription.unsubscribe(); }, error: (err) => { console.log(err); }, }); await done.promise; return completeFile; } /** * good when time to first byte is important * and multiple subscribers are expected * @param optionsArg * @returns */ public async fastGetReplaySubject(optionsArg: { path: string; }): Promise> { const command = new plugins.s3.GetObjectCommand({ Bucket: this.name, Key: optionsArg.path, }); const response = await this.smartbucketRef.s3Client.send(command); const replaySubject = new plugins.smartrx.rxjs.ReplaySubject(); // Convert the stream to a format that supports piping const stream = response.Body as any; // SdkStreamMixin includes readable stream if (typeof stream.pipe === 'function') { const duplexStream = new plugins.smartstream.SmartDuplex({ writeFunction: async (chunk) => { replaySubject.next(chunk); return; }, finalFunction: async (cb) => { replaySubject.complete(); return; }, }); stream.pipe(duplexStream); } return replaySubject; } public fastGetStream( optionsArg: { path: string; }, typeArg: 'webstream' ): Promise; public async fastGetStream( optionsArg: { path: string; }, typeArg: 'nodestream' ): Promise; public async fastGetStream( optionsArg: { path: string }, typeArg: 'webstream' | 'nodestream' = 'nodestream' ): Promise { const command = new plugins.s3.GetObjectCommand({ Bucket: this.name, Key: optionsArg.path, }); const response = await this.smartbucketRef.s3Client.send(command); const stream = response.Body as any; // SdkStreamMixin includes readable stream const duplexStream = new plugins.smartstream.SmartDuplex({ writeFunction: async (chunk) => { return chunk; }, finalFunction: async (cb) => { return null; }, }); if (typeof stream.pipe === 'function') { stream.pipe(duplexStream); } if (typeArg === 'nodestream') { return duplexStream; } if (typeArg === 'webstream') { return (await duplexStream.getWebStreams()).readable; } throw new Error('unknown typeArg'); } /** * store file as stream */ public async fastPutStream(optionsArg: { path: string; readableStream: plugins.stream.Readable | ReadableStream; nativeMetadata?: { [key: string]: string }; overwrite?: boolean; }): Promise { try { const exists = await this.fastExists({ path: optionsArg.path }); if (exists && !optionsArg.overwrite) { console.error( `Object already exists at path '${optionsArg.path}' in bucket '${this.name}'.` ); return; } else if (exists && optionsArg.overwrite) { console.log( `Overwriting existing object at path '${optionsArg.path}' in bucket '${this.name}'.` ); } else { console.log(`Creating new object at path '${optionsArg.path}' in bucket '${this.name}'.`); } const command = new plugins.s3.PutObjectCommand({ Bucket: this.name, Key: optionsArg.path, Body: optionsArg.readableStream, Metadata: optionsArg.nativeMetadata, }); await this.smartbucketRef.s3Client.send(command); console.log( `Object '${optionsArg.path}' has been successfully stored in bucket '${this.name}'.` ); } catch (error) { console.error( `Error storing object at path '${optionsArg.path}' in bucket '${this.name}':`, error ); throw error; } } public async fastCopy(optionsArg: { sourcePath: string; destinationPath?: string; targetBucket?: Bucket; nativeMetadata?: { [key: string]: string }; deleteExistingNativeMetadata?: boolean; }): Promise { try { const targetBucketName = optionsArg.targetBucket ? optionsArg.targetBucket.name : this.name; // Retrieve current object information to use in copy conditions const currentObjInfo = await this.smartbucketRef.s3Client.send( new plugins.s3.HeadObjectCommand({ Bucket: this.name, Key: optionsArg.sourcePath, }) ); // Prepare new metadata const newNativeMetadata = { ...(optionsArg.deleteExistingNativeMetadata ? {} : currentObjInfo.Metadata), ...optionsArg.nativeMetadata, }; // Define the copy operation const copySource = `${this.name}/${optionsArg.sourcePath}`; const command = new plugins.s3.CopyObjectCommand({ Bucket: targetBucketName, CopySource: copySource, Key: optionsArg.destinationPath || optionsArg.sourcePath, Metadata: newNativeMetadata, MetadataDirective: optionsArg.deleteExistingNativeMetadata ? 'REPLACE' : 'COPY', }); await this.smartbucketRef.s3Client.send(command); } catch (err) { console.error('Error updating metadata:', err); throw err; // rethrow to allow caller to handle } } /** * Move object from one path to another within the same bucket or to another bucket */ public async fastMove(optionsArg: { sourcePath: string; destinationPath: string; targetBucket?: Bucket; overwrite?: boolean; }): Promise { try { const destinationBucket = optionsArg.targetBucket || this; const exists = await destinationBucket.fastExists({ path: optionsArg.destinationPath, }); if (exists && !optionsArg.overwrite) { console.error( `Object already exists at destination path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.` ); return; } else if (exists && optionsArg.overwrite) { console.log( `Overwriting existing object at destination path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.` ); } else { console.log( `Moving object to path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.` ); } await this.fastCopy(optionsArg); await this.fastRemove({ path: optionsArg.sourcePath }); console.log( `Object '${optionsArg.sourcePath}' has been successfully moved to '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.` ); } catch (error) { console.error( `Error moving object from '${optionsArg.sourcePath}' to '${optionsArg.destinationPath}':`, error ); throw error; } } /** * removeObject */ public async fastRemove(optionsArg: { path: string }) { const command = new plugins.s3.DeleteObjectCommand({ Bucket: this.name, Key: optionsArg.path, }); await this.smartbucketRef.s3Client.send(command); } /** * check whether file exists * @param optionsArg * @returns */ public async fastExists(optionsArg: { path: string }): Promise { try { const command = new plugins.s3.HeadObjectCommand({ Bucket: this.name, Key: optionsArg.path, }); await this.smartbucketRef.s3Client.send(command); console.log(`Object '${optionsArg.path}' exists in bucket '${this.name}'.`); return true; } catch (error) { if (error.name === 'NotFound') { console.log(`Object '${optionsArg.path}' does not exist in bucket '${this.name}'.`); return false; } else { console.error('Error checking object existence:', error); throw error; // Rethrow if it's not a NotFound error to handle unexpected issues } } } /** * deletes this bucket */ public async delete() { await this.smartbucketRef.s3Client.send( new plugins.s3.DeleteBucketCommand({ Bucket: this.name }) ); } public async fastStat(pathDescriptor: interfaces.IPathDecriptor) { const checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor); const command = new plugins.s3.HeadObjectCommand({ Bucket: this.name, Key: checkPath, }); return this.smartbucketRef.s3Client.send(command); } public async isDirectory(pathDescriptor: interfaces.IPathDecriptor): Promise { const checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor); const command = new plugins.s3.ListObjectsV2Command({ Bucket: this.name, Prefix: checkPath, Delimiter: '/', }); const { CommonPrefixes } = await this.smartbucketRef.s3Client.send(command); return !!CommonPrefixes && CommonPrefixes.length > 0; } public async isFile(pathDescriptor: interfaces.IPathDecriptor): Promise { const checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor); const command = new plugins.s3.ListObjectsV2Command({ Bucket: this.name, Prefix: checkPath, Delimiter: '/', }); const { Contents } = await this.smartbucketRef.s3Client.send(command); return !!Contents && Contents.length > 0; } public async getMagicBytes(optionsArg: { path: string; length: number }): Promise { try { const command = new plugins.s3.GetObjectCommand({ Bucket: this.name, Key: optionsArg.path, Range: `bytes=0-${optionsArg.length - 1}`, }); const response = await this.smartbucketRef.s3Client.send(command); const chunks = []; const stream = response.Body as any; // SdkStreamMixin includes readable stream for await (const chunk of stream) { chunks.push(chunk); } return Buffer.concat(chunks); } catch (error) { console.error( `Error retrieving magic bytes from object at path '${optionsArg.path}' in bucket '${this.name}':`, error ); throw error; } } public async cleanAllContents(): Promise { try { // Define the command type explicitly const listCommandInput: plugins.s3.ListObjectsV2CommandInput = { Bucket: this.name, }; let isTruncated = true; let continuationToken: string | undefined = undefined; while (isTruncated) { // Add the continuation token to the input if present const listCommand = new plugins.s3.ListObjectsV2Command({ ...listCommandInput, ContinuationToken: continuationToken, }); // Explicitly type the response const response: plugins.s3.ListObjectsV2Output = await this.smartbucketRef.s3Client.send(listCommand); console.log(`Cleaning contents of bucket '${this.name}': Now deleting ${response.Contents?.length} items...`); if (response.Contents && response.Contents.length > 0) { // Delete objects in batches, mapping each item to { Key: string } const deleteCommand = new plugins.s3.DeleteObjectsCommand({ Bucket: this.name, Delete: { Objects: response.Contents.map((item) => ({ Key: item.Key! })), Quiet: true, }, }); await this.smartbucketRef.s3Client.send(deleteCommand); } // Update continuation token and truncation status isTruncated = response.IsTruncated || false; continuationToken = response.NextContinuationToken; } console.log(`All contents in bucket '${this.name}' have been deleted.`); } catch (error) { console.error(`Error cleaning contents of bucket '${this.name}':`, error); throw error; } } }