import * as plugins from './plugins.js'; import * as helpers from './helpers.js'; import * as interfaces from './interfaces.js'; import { SmartBucket } from './classes.smartbucket.js'; import { Directory } from './classes.directory.js'; import { File } from './classes.file.js'; export class Bucket { public static async getBucketByName(smartbucketRef: SmartBucket, bucketNameArg: string) { const buckets = await smartbucketRef.minioClient.listBuckets(); const foundBucket = buckets.find((bucket) => { return bucket.name === bucketNameArg; }); if (foundBucket) { console.log(`bucket with name ${bucketNameArg} exists.`); console.log(`Taking this as base for new Bucket instance`); return new this(smartbucketRef, bucketNameArg); } else { return null; } } public static async createBucketByName(smartbucketRef: SmartBucket, bucketName: string) { await smartbucketRef.minioClient.makeBucket(bucketName, 'ams3').catch((e) => console.log(e)); return new Bucket(smartbucketRef, bucketName); } public static async removeBucketByName(smartbucketRef: SmartBucket, bucketName: string) { await smartbucketRef.minioClient.removeBucket(bucketName).catch((e) => console.log(e)); } public smartbucketRef: SmartBucket; public name: string; constructor(smartbucketRef: SmartBucket, bucketName: string) { this.smartbucketRef = smartbucketRef; this.name = bucketName; } /** * gets the base directory of the bucket */ public async getBaseDirectory(): Promise { return new Directory(this, null, ''); } public async getDirectoryFromPath(pathDescriptorArg: interfaces.IPathDecriptor): Promise { if (!pathDescriptorArg.path && !pathDescriptorArg.directory) { return this.getBaseDirectory(); } let checkPath = await helpers.reducePathDescriptorToPath(pathDescriptorArg); const baseDirectory = await this.getBaseDirectory(); return await baseDirectory.getSubDirectoryByName(checkPath); } // =============== // Fast Operations // =============== /** * store file */ public async fastPut(optionsArg: { path: string; contents: string | Buffer; overwrite?: boolean; }): Promise { try { const reducedPath = await helpers.reducePathDescriptorToPath({ path: optionsArg.path, }) // Check if the object already exists const exists = await this.fastExists({ path: reducedPath }); if (exists && !optionsArg.overwrite) { console.error(`Object already exists at path '${reducedPath}' in bucket '${this.name}'.`); return; } else if (exists && optionsArg.overwrite) { console.log(`Overwriting existing object at path '${reducedPath}' in bucket '${this.name}'.`); } else { console.log(`Creating new object at path '${reducedPath}' in bucket '${this.name}'.`); } // Proceed with putting the object const streamIntake = new plugins.smartstream.StreamIntake(); const putPromise = this.smartbucketRef.minioClient.putObject(this.name, reducedPath, streamIntake); streamIntake.pushData(optionsArg.contents); streamIntake.signalEnd(); await putPromise; console.log(`Object '${reducedPath}' has been successfully stored in bucket '${this.name}'.`); const parsedPath = plugins.path.parse(reducedPath); return new File({ directoryRefArg: await this.getDirectoryFromPath({ path: parsedPath.dir, }), fileName: parsedPath.base, }); } catch (error) { console.error(`Error storing object at path '${optionsArg.path}' in bucket '${this.name}':`, error); throw error; } } /** * get file */ public async fastGet(optionsArg: { path: string }): Promise { const done = plugins.smartpromise.defer(); let completeFile: Buffer; const replaySubject = await this.fastGetReplaySubject(optionsArg); const subscription = replaySubject.subscribe({ next: (chunk) => { if (completeFile) { completeFile = Buffer.concat([completeFile, chunk]); } else { completeFile = chunk; } }, complete: () => { done.resolve(); subscription.unsubscribe(); }, error: (err) => { console.log(err); }, }); await done.promise; return completeFile; } /** * good when time to first byte is important * and multiple subscribers are expected * @param optionsArg * @returns */ public async fastGetReplaySubject(optionsArg: { path: string; }): Promise> { const fileStream = await this.smartbucketRef.minioClient .getObject(this.name, optionsArg.path) .catch((e) => console.log(e)); const replaySubject = new plugins.smartrx.rxjs.ReplaySubject(); const duplexStream = new plugins.smartstream.SmartDuplex({ writeFunction: async (chunk) => { replaySubject.next(chunk); return; }, finalFunction: async (cb) => { replaySubject.complete(); return; } }); if (!fileStream) { return null; } const smartstream = new plugins.smartstream.StreamWrapper([ fileStream, duplexStream, ]); smartstream.run(); return replaySubject; } public fastGetStream(optionsArg: { path: string; }, typeArg: 'webstream'): Promise public async fastGetStream(optionsArg: { path: string; }, typeArg: 'nodestream'): Promise /** * fastGetStream * @param optionsArg * @returns */ public async fastGetStream(optionsArg: { path: string; }, typeArg: 'webstream' | 'nodestream' = 'nodestream'): Promise{ const fileStream = await this.smartbucketRef.minioClient .getObject(this.name, optionsArg.path) .catch((e) => console.log(e)); const duplexStream = new plugins.smartstream.SmartDuplex({ writeFunction: async (chunk) => { return chunk; }, finalFunction: async (cb) => { return null; } }); if (!fileStream) { return null; } const smartstream = new plugins.smartstream.StreamWrapper([ fileStream, duplexStream, ]); smartstream.run(); if (typeArg === 'nodestream') { return duplexStream; }; if (typeArg === 'webstream') { return (await duplexStream.getWebStreams()).readable; } } /** * store file as stream */ public async fastPutStream(optionsArg: { path: string; readableStream: plugins.stream.Readable | ReadableStream; nativeMetadata?: { [key: string]: string }; overwrite?: boolean; }): Promise { try { // Check if the object already exists const exists = await this.fastExists({ path: optionsArg.path }); if (exists && !optionsArg.overwrite) { console.error(`Object already exists at path '${optionsArg.path}' in bucket '${this.name}'.`); return; } else if (exists && optionsArg.overwrite) { console.log(`Overwriting existing object at path '${optionsArg.path}' in bucket '${this.name}'.`); } else { console.log(`Creating new object at path '${optionsArg.path}' in bucket '${this.name}'.`); } const streamIntake = await plugins.smartstream.StreamIntake.fromStream(optionsArg.readableStream); // Proceed with putting the object await this.smartbucketRef.minioClient.putObject( this.name, optionsArg.path, streamIntake, null, ...(optionsArg.nativeMetadata ? (() => { const returnObject: any = {}; return returnObject; })() : {}) ); console.log(`Object '${optionsArg.path}' has been successfully stored in bucket '${this.name}'.`); } catch (error) { console.error(`Error storing object at path '${optionsArg.path}' in bucket '${this.name}':`, error); throw error; } } public async fastCopy(optionsArg: { sourcePath: string; destinationPath?: string; targetBucket?: Bucket; nativeMetadata?: { [key: string]: string }; deleteExistingNativeMetadata?: boolean; }): Promise { try { const targetBucketName = optionsArg.targetBucket ? optionsArg.targetBucket.name : this.name; // Retrieve current object information to use in copy conditions const currentObjInfo = await this.smartbucketRef.minioClient.statObject( targetBucketName, optionsArg.sourcePath ); // Setting up copy conditions const copyConditions = new plugins.minio.CopyConditions(); // Prepare new metadata const newNativeMetadata = { ...(optionsArg.deleteExistingNativeMetadata ? {} : currentObjInfo.metaData), ...optionsArg.nativeMetadata, }; // Define the copy operation as a Promise // TODO: check on issue here: https://github.com/minio/minio-js/issues/1286 await this.smartbucketRef.minioClient.copyObject( this.name, optionsArg.sourcePath, `/${targetBucketName}/${optionsArg.destinationPath || optionsArg.sourcePath}`, copyConditions ); } catch (err) { console.error('Error updating metadata:', err); throw err; // rethrow to allow caller to handle } } /** * Move object from one path to another within the same bucket or to another bucket */ public async fastMove(optionsArg: { sourcePath: string; destinationPath: string; targetBucket?: Bucket; overwrite?: boolean; }): Promise { try { // Check if the destination object already exists const destinationBucket = optionsArg.targetBucket || this; const exists = await destinationBucket.fastExists({ path: optionsArg.destinationPath }); if (exists && !optionsArg.overwrite) { console.error(`Object already exists at destination path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`); return; } else if (exists && optionsArg.overwrite) { console.log(`Overwriting existing object at destination path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`); } else { console.log(`Moving object to path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`); } // Proceed with copying the object to the new path await this.fastCopy(optionsArg); // Remove the original object after successful copy await this.fastRemove({ path: optionsArg.sourcePath }); console.log(`Object '${optionsArg.sourcePath}' has been successfully moved to '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`); } catch (error) { console.error(`Error moving object from '${optionsArg.sourcePath}' to '${optionsArg.destinationPath}':`, error); throw error; } } /** * removeObject */ public async fastRemove(optionsArg: { path: string; }) { await this.smartbucketRef.minioClient.removeObject(this.name, optionsArg.path); } /** * check wether file exists * @param optionsArg * @returns */ public async fastExists(optionsArg: { path: string; }): Promise { try { await this.smartbucketRef.minioClient.statObject(this.name, optionsArg.path); console.log(`Object '${optionsArg.path}' exists in bucket '${this.name}'.`); return true; } catch (error) { if (error.code === 'NotFound') { console.log(`Object '${optionsArg.path}' does not exist in bucket '${this.name}'.`); return false; } else { console.error('Error checking object existence:', error); throw error; // Rethrow if it's not a NotFound error to handle unexpected issues } } } /** * deletes this bucket */ public async delete() { await this.smartbucketRef.minioClient.removeBucket(this.name); } public async fastStat(pathDescriptor: interfaces.IPathDecriptor) { let checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor); return this.smartbucketRef.minioClient.statObject(this.name, checkPath); } public async isDirectory(pathDescriptor: interfaces.IPathDecriptor): Promise { let checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor); // lets check if the checkPath is a directory const stream = this.smartbucketRef.minioClient.listObjectsV2(this.name, checkPath, true); const done = plugins.smartpromise.defer(); stream.on('data', (dataArg) => { stream.destroy(); // Stop the stream early if we find at least one object if (dataArg.prefix.startsWith(checkPath + '/')) { done.resolve(true); } }); stream.on('end', () => { done.resolve(false); }); stream.on('error', (err) => { done.reject(err); }); return done.promise; }; public async isFile(pathDescriptor: interfaces.IPathDecriptor): Promise { let checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor); // lets check if the checkPath is a directory const stream = this.smartbucketRef.minioClient.listObjectsV2(this.name, checkPath, true); const done = plugins.smartpromise.defer(); stream.on('data', (dataArg) => { stream.destroy(); // Stop the stream early if we find at least one object if (dataArg.prefix === checkPath) { done.resolve(true); } }); stream.on('end', () => { done.resolve(false); }); stream.on('error', (err) => { done.reject(err); }); return done.promise; } }