// classes.bucket.ts import * as plugins from './plugins.js'; import * as helpers from './helpers.js'; import * as interfaces from './interfaces.js'; import { SmartBucket } from './classes.smartbucket.js'; import { Directory } from './classes.directory.js'; import { File } from './classes.file.js'; import { Trash } from './classes.trash.js'; import { ListCursor, type IListCursorOptions } from './classes.listcursor.js'; /** * The bucket class exposes the basic functionality of a bucket. * The functions of the bucket alone are enough to * operate in S3 basic fashion on blobs of data. */ export class Bucket { public static async getBucketByName(smartbucketRef: SmartBucket, bucketNameArg: string): Promise { const command = new plugins.s3.ListBucketsCommand({}); const buckets = await smartbucketRef.s3Client.send(command); const foundBucket = buckets.Buckets!.find((bucket) => bucket.Name === bucketNameArg); if (foundBucket) { console.log(`bucket with name ${bucketNameArg} exists.`); console.log(`Taking this as base for new Bucket instance`); return new this(smartbucketRef, bucketNameArg); } else { throw new Error(`Bucket '${bucketNameArg}' not found.`); } } public static async createBucketByName(smartbucketRef: SmartBucket, bucketName: string) { const command = new plugins.s3.CreateBucketCommand({ Bucket: bucketName }); await smartbucketRef.s3Client.send(command).catch((e) => console.log(e)); return new Bucket(smartbucketRef, bucketName); } public static async removeBucketByName(smartbucketRef: SmartBucket, bucketName: string) { const command = new plugins.s3.DeleteBucketCommand({ Bucket: bucketName }); await smartbucketRef.s3Client.send(command).catch((e) => console.log(e)); } public smartbucketRef: SmartBucket; public name: string; constructor(smartbucketRef: SmartBucket, bucketName: string) { this.smartbucketRef = smartbucketRef; this.name = bucketName; } /** * gets the base directory of the bucket */ public async getBaseDirectory(): Promise { return new Directory(this, null!, ''); } /** * gets the trash directory */ public async getTrash(): Promise { const trash = new Trash(this); return trash; } public async getDirectoryFromPath( pathDescriptorArg: interfaces.IPathDecriptor ): Promise { if (!pathDescriptorArg.path && !pathDescriptorArg.directory) { return this.getBaseDirectory(); } const checkPath = await helpers.reducePathDescriptorToPath(pathDescriptorArg); const baseDirectory = await this.getBaseDirectory(); return await baseDirectory.getSubDirectoryByName(checkPath, { getEmptyDirectory: true, }); } // =============== // Fast Operations // =============== /** * store file */ public async fastPut( optionsArg: interfaces.IPathDecriptor & { contents: string | Buffer; overwrite?: boolean; } ): Promise { try { const reducedPath = await helpers.reducePathDescriptorToPath(optionsArg); const exists = await this.fastExists({ path: reducedPath }); if (exists && !optionsArg.overwrite) { throw new Error( `Object already exists at path '${reducedPath}' in bucket '${this.name}'. ` + `Set overwrite:true to replace it.` ); } else if (exists && optionsArg.overwrite) { console.log( `Overwriting existing object at path '${reducedPath}' in bucket '${this.name}'.` ); } else { console.log(`Creating new object at path '${reducedPath}' in bucket '${this.name}'.`); } const command = new plugins.s3.PutObjectCommand({ Bucket: this.name, Key: reducedPath, Body: optionsArg.contents, }); await this.smartbucketRef.s3Client.send(command); console.log(`Object '${reducedPath}' has been successfully stored in bucket '${this.name}'.`); const parsedPath = plugins.path.parse(reducedPath); return new File({ directoryRefArg: await this.getDirectoryFromPath({ path: parsedPath.dir, }), fileName: parsedPath.base, }); } catch (error) { console.error( `Error storing object at path '${optionsArg.path}' in bucket '${this.name}':`, error ); throw error; } } /** * get file */ public async fastGet(optionsArg: { path: string }): Promise { const done = plugins.smartpromise.defer(); let completeFile: Buffer; const replaySubject = await this.fastGetReplaySubject(optionsArg); const subscription = replaySubject.subscribe({ next: (chunk) => { if (completeFile) { completeFile = Buffer.concat([completeFile, chunk]); } else { completeFile = chunk; } }, complete: () => { done.resolve(); subscription.unsubscribe(); }, error: (err) => { console.log(err); }, }); await done.promise; return completeFile!; } /** * good when time to first byte is important * and multiple subscribers are expected * @param optionsArg * @returns */ public async fastGetReplaySubject(optionsArg: { path: string; }): Promise> { const command = new plugins.s3.GetObjectCommand({ Bucket: this.name, Key: optionsArg.path, }); const response = await this.smartbucketRef.s3Client.send(command); const replaySubject = new plugins.smartrx.rxjs.ReplaySubject(); // Convert the stream to a format that supports piping const stream = response.Body as any; // SdkStreamMixin includes readable stream if (typeof stream.pipe === 'function') { const duplexStream = new plugins.smartstream.SmartDuplex({ writeFunction: async (chunk) => { replaySubject.next(chunk); return; }, finalFunction: async (cb) => { replaySubject.complete(); return; }, }); stream.pipe(duplexStream); } return replaySubject; } public fastGetStream( optionsArg: { path: string; }, typeArg: 'webstream' ): Promise; public async fastGetStream( optionsArg: { path: string; }, typeArg: 'nodestream' ): Promise; public async fastGetStream( optionsArg: { path: string }, typeArg: 'webstream' | 'nodestream' = 'nodestream' ): Promise { const command = new plugins.s3.GetObjectCommand({ Bucket: this.name, Key: optionsArg.path, }); const response = await this.smartbucketRef.s3Client.send(command); const stream = response.Body as any; // SdkStreamMixin includes readable stream const duplexStream = new plugins.smartstream.SmartDuplex({ writeFunction: async (chunk) => { return chunk; }, finalFunction: async (cb) => { return null!; }, }); if (typeof stream.pipe === 'function') { stream.pipe(duplexStream); } if (typeArg === 'nodestream') { return duplexStream; } if (typeArg === 'webstream') { return (await duplexStream.getWebStreams()).readable; } throw new Error('unknown typeArg'); } /** * store file as stream */ public async fastPutStream(optionsArg: { path: string; readableStream: plugins.stream.Readable | ReadableStream; nativeMetadata?: { [key: string]: string }; overwrite?: boolean; }): Promise { try { const exists = await this.fastExists({ path: optionsArg.path }); if (exists && !optionsArg.overwrite) { throw new Error( `Object already exists at path '${optionsArg.path}' in bucket '${this.name}'. ` + `Set overwrite:true to replace it.` ); } else if (exists && optionsArg.overwrite) { console.log( `Overwriting existing object at path '${optionsArg.path}' in bucket '${this.name}'.` ); } else { console.log(`Creating new object at path '${optionsArg.path}' in bucket '${this.name}'.`); } const command = new plugins.s3.PutObjectCommand({ Bucket: this.name, Key: optionsArg.path, Body: optionsArg.readableStream, Metadata: optionsArg.nativeMetadata, }); await this.smartbucketRef.s3Client.send(command); console.log( `Object '${optionsArg.path}' has been successfully stored in bucket '${this.name}'.` ); } catch (error) { console.error( `Error storing object at path '${optionsArg.path}' in bucket '${this.name}':`, error ); throw error; } } public async fastCopy(optionsArg: { sourcePath: string; destinationPath?: string; targetBucket?: Bucket; nativeMetadata?: { [key: string]: string }; deleteExistingNativeMetadata?: boolean; }): Promise { try { const targetBucketName = optionsArg.targetBucket ? optionsArg.targetBucket.name : this.name; // Retrieve current object information to use in copy conditions const currentObjInfo = await this.smartbucketRef.s3Client.send( new plugins.s3.HeadObjectCommand({ Bucket: this.name, Key: optionsArg.sourcePath, }) ); // Prepare new metadata const newNativeMetadata = { ...(optionsArg.deleteExistingNativeMetadata ? {} : currentObjInfo.Metadata), ...optionsArg.nativeMetadata, }; // Define the copy operation const copySource = `${this.name}/${optionsArg.sourcePath}`; const command = new plugins.s3.CopyObjectCommand({ Bucket: targetBucketName, CopySource: copySource, Key: optionsArg.destinationPath || optionsArg.sourcePath, Metadata: newNativeMetadata, MetadataDirective: optionsArg.deleteExistingNativeMetadata ? 'REPLACE' : 'COPY', }); await this.smartbucketRef.s3Client.send(command); } catch (err) { console.error('Error updating metadata:', err); throw err; // rethrow to allow caller to handle } } /** * Move object from one path to another within the same bucket or to another bucket */ public async fastMove(optionsArg: { sourcePath: string; destinationPath: string; targetBucket?: Bucket; overwrite?: boolean; }): Promise { try { const destinationBucket = optionsArg.targetBucket || this; const exists = await destinationBucket.fastExists({ path: optionsArg.destinationPath, }); if (exists && !optionsArg.overwrite) { console.error( `Object already exists at destination path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.` ); return; } else if (exists && optionsArg.overwrite) { console.log( `Overwriting existing object at destination path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.` ); } else { console.log( `Moving object to path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.` ); } await this.fastCopy(optionsArg); await this.fastRemove({ path: optionsArg.sourcePath }); console.log( `Object '${optionsArg.sourcePath}' has been successfully moved to '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.` ); } catch (error) { console.error( `Error moving object from '${optionsArg.sourcePath}' to '${optionsArg.destinationPath}':`, error ); throw error; } } /** * removeObject */ public async fastRemove(optionsArg: { path: string }) { const command = new plugins.s3.DeleteObjectCommand({ Bucket: this.name, Key: optionsArg.path, }); await this.smartbucketRef.s3Client.send(command); } /** * check whether file exists * @param optionsArg * @returns */ public async fastExists(optionsArg: { path: string }): Promise { try { const command = new plugins.s3.HeadObjectCommand({ Bucket: this.name, Key: optionsArg.path, }); await this.smartbucketRef.s3Client.send(command); console.log(`Object '${optionsArg.path}' exists in bucket '${this.name}'.`); return true; } catch (error: any) { if (error?.name === 'NotFound') { console.log(`Object '${optionsArg.path}' does not exist in bucket '${this.name}'.`); return false; } else { console.error('Error checking object existence:', error); throw error; // Rethrow if it's not a NotFound error to handle unexpected issues } } } /** * deletes this bucket */ public async delete() { await this.smartbucketRef.s3Client.send( new plugins.s3.DeleteBucketCommand({ Bucket: this.name }) ); } public async fastStat(pathDescriptor: interfaces.IPathDecriptor) { const checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor); const command = new plugins.s3.HeadObjectCommand({ Bucket: this.name, Key: checkPath, }); return this.smartbucketRef.s3Client.send(command); } public async isDirectory(pathDescriptor: interfaces.IPathDecriptor): Promise { const checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor); const command = new plugins.s3.ListObjectsV2Command({ Bucket: this.name, Prefix: checkPath, Delimiter: '/', }); const { CommonPrefixes } = await this.smartbucketRef.s3Client.send(command); return !!CommonPrefixes && CommonPrefixes.length > 0; } public async isFile(pathDescriptor: interfaces.IPathDecriptor): Promise { const checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor); const command = new plugins.s3.ListObjectsV2Command({ Bucket: this.name, Prefix: checkPath, Delimiter: '/', }); const { Contents } = await this.smartbucketRef.s3Client.send(command); return !!Contents && Contents.length > 0; } public async getMagicBytes(optionsArg: { path: string; length: number }): Promise { try { const command = new plugins.s3.GetObjectCommand({ Bucket: this.name, Key: optionsArg.path, Range: `bytes=0-${optionsArg.length - 1}`, }); const response = await this.smartbucketRef.s3Client.send(command); const chunks: Buffer[] = []; const stream = response.Body as any; // SdkStreamMixin includes readable stream for await (const chunk of stream) { chunks.push(chunk); } return Buffer.concat(chunks); } catch (error) { console.error( `Error retrieving magic bytes from object at path '${optionsArg.path}' in bucket '${this.name}':`, error ); throw error; } } // ========================================== // Memory-Efficient Listing Methods (Phase 1) // ========================================== /** * List all objects with a given prefix using async generator (memory-efficient streaming) * @param prefix - Optional prefix to filter objects (default: '' for all objects) * @yields Object keys one at a time * @example * ```ts * for await (const key of bucket.listAllObjects('npm/')) { * console.log(key); * if (shouldStop) break; // Early exit supported * } * ``` */ public async *listAllObjects(prefix: string = ''): AsyncIterableIterator { let continuationToken: string | undefined; do { const command = new plugins.s3.ListObjectsV2Command({ Bucket: this.name, Prefix: prefix, ContinuationToken: continuationToken, }); const response = await this.smartbucketRef.s3Client.send(command); for (const obj of response.Contents || []) { if (obj.Key) yield obj.Key; } continuationToken = response.NextContinuationToken; } while (continuationToken); } /** * List all objects as an RxJS Observable (for complex reactive pipelines) * @param prefix - Optional prefix to filter objects (default: '' for all objects) * @returns Observable that emits object keys * @example * ```ts * bucket.listAllObjectsObservable('npm/') * .pipe( * filter(key => key.endsWith('.json')), * take(100) * ) * .subscribe(key => console.log(key)); * ``` */ public listAllObjectsObservable(prefix: string = ''): plugins.smartrx.rxjs.Observable { return new plugins.smartrx.rxjs.Observable((subscriber) => { const fetchPage = async (token?: string) => { try { const command = new plugins.s3.ListObjectsV2Command({ Bucket: this.name, Prefix: prefix, ContinuationToken: token, }); const response = await this.smartbucketRef.s3Client.send(command); for (const obj of response.Contents || []) { if (obj.Key) subscriber.next(obj.Key); } if (response.NextContinuationToken) { await fetchPage(response.NextContinuationToken); } else { subscriber.complete(); } } catch (error) { subscriber.error(error); } }; fetchPage(); }); } /** * Create a cursor for manual pagination control * @param prefix - Optional prefix to filter objects (default: '' for all objects) * @param options - Cursor options (pageSize, etc.) * @returns ListCursor instance * @example * ```ts * const cursor = bucket.createCursor('npm/', { pageSize: 500 }); * while (cursor.hasMore()) { * const { keys, done } = await cursor.next(); * console.log(`Processing ${keys.length} keys...`); * } * ``` */ public createCursor(prefix: string = '', options?: IListCursorOptions): ListCursor { return new ListCursor(this, prefix, options); } // ========================================== // High-Level Listing Helpers (Phase 2) // ========================================== /** * Find objects matching a glob pattern (memory-efficient) * @param pattern - Glob pattern (e.g., "**\/*.json", "npm/packages/*\/index.json") * @yields Matching object keys * @example * ```ts * for await (const key of bucket.findByGlob('npm/packages/*\/index.json')) { * console.log('Found package index:', key); * } * ``` */ public async *findByGlob(pattern: string): AsyncIterableIterator { const matcher = new plugins.Minimatch(pattern); for await (const key of this.listAllObjects('')) { if (matcher.match(key)) yield key; } } /** * List all objects and collect into an array (convenience method) * WARNING: Loads entire result set into memory. Use listAllObjects() generator for large buckets. * @param prefix - Optional prefix to filter objects (default: '' for all objects) * @returns Array of all object keys * @example * ```ts * const allKeys = await bucket.listAllObjectsArray('npm/'); * console.log(`Found ${allKeys.length} objects`); * ``` */ public async listAllObjectsArray(prefix: string = ''): Promise { const keys: string[] = []; for await (const key of this.listAllObjects(prefix)) { keys.push(key); } return keys; } public async cleanAllContents(): Promise { try { // Define the command type explicitly const listCommandInput: plugins.s3.ListObjectsV2CommandInput = { Bucket: this.name, }; let isTruncated = true; let continuationToken: string | undefined = undefined; while (isTruncated) { // Add the continuation token to the input if present const listCommand = new plugins.s3.ListObjectsV2Command({ ...listCommandInput, ContinuationToken: continuationToken, }); // Explicitly type the response const response: plugins.s3.ListObjectsV2Output = await this.smartbucketRef.s3Client.send(listCommand); console.log(`Cleaning contents of bucket '${this.name}': Now deleting ${response.Contents?.length} items...`); if (response.Contents && response.Contents.length > 0) { // Delete objects in batches, mapping each item to { Key: string } const deleteCommand = new plugins.s3.DeleteObjectsCommand({ Bucket: this.name, Delete: { Objects: response.Contents.map((item) => ({ Key: item.Key! })), Quiet: true, }, }); await this.smartbucketRef.s3Client.send(deleteCommand); } // Update continuation token and truncation status isTruncated = response.IsTruncated || false; continuationToken = response.NextContinuationToken; } console.log(`All contents in bucket '${this.name}' have been deleted.`); } catch (error) { console.error(`Error cleaning contents of bucket '${this.name}':`, error); throw error; } } }