smartbucket/ts/classes.bucket.ts

428 lines
14 KiB
TypeScript
Raw Permalink Normal View History

2024-05-20 23:22:21 +00:00
import * as plugins from './plugins.js';
import * as helpers from './helpers.js';
import * as interfaces from './interfaces.js';
2024-05-20 23:22:21 +00:00
import { SmartBucket } from './classes.smartbucket.js';
import { Directory } from './classes.directory.js';
import { File } from './classes.file.js';
2019-10-14 18:55:07 +00:00
2019-10-15 12:16:28 +00:00
export class Bucket {
2019-10-15 17:23:06 +00:00
public static async getBucketByName(smartbucketRef: SmartBucket, bucketNameArg: string) {
2019-10-15 12:16:28 +00:00
const buckets = await smartbucketRef.minioClient.listBuckets();
2020-10-12 00:37:50 +00:00
const foundBucket = buckets.find((bucket) => {
2019-10-15 12:16:28 +00:00
return bucket.name === bucketNameArg;
});
if (foundBucket) {
2019-10-16 17:15:48 +00:00
console.log(`bucket with name ${bucketNameArg} exists.`);
2019-10-15 12:16:28 +00:00
console.log(`Taking this as base for new Bucket instance`);
return new this(smartbucketRef, bucketNameArg);
2019-10-15 17:23:06 +00:00
} else {
return null;
2019-10-15 12:16:28 +00:00
}
}
2019-10-15 17:23:06 +00:00
public static async createBucketByName(smartbucketRef: SmartBucket, bucketName: string) {
2020-10-12 00:37:50 +00:00
await smartbucketRef.minioClient.makeBucket(bucketName, 'ams3').catch((e) => console.log(e));
2019-10-15 17:23:06 +00:00
return new Bucket(smartbucketRef, bucketName);
}
public static async removeBucketByName(smartbucketRef: SmartBucket, bucketName: string) {
2020-10-12 00:37:50 +00:00
await smartbucketRef.minioClient.removeBucket(bucketName).catch((e) => console.log(e));
2019-10-15 17:23:06 +00:00
}
2019-10-15 12:16:28 +00:00
public smartbucketRef: SmartBucket;
public name: string;
2019-10-15 17:23:06 +00:00
2019-10-15 12:16:28 +00:00
constructor(smartbucketRef: SmartBucket, bucketName: string) {
this.smartbucketRef = smartbucketRef;
this.name = bucketName;
}
2019-10-15 17:23:06 +00:00
2019-10-16 17:11:28 +00:00
/**
* gets the base directory of the bucket
*/
public async getBaseDirectory(): Promise<Directory> {
2019-10-16 17:11:28 +00:00
return new Directory(this, null, '');
}
public async getDirectoryFromPath(pathDescriptorArg: interfaces.IPathDecriptor): Promise<Directory> {
if (!pathDescriptorArg.path && !pathDescriptorArg.directory) {
return this.getBaseDirectory();
}
let checkPath = await helpers.reducePathDescriptorToPath(pathDescriptorArg);
const baseDirectory = await this.getBaseDirectory();
return await baseDirectory.getSubDirectoryByName(checkPath);
}
2019-10-16 16:12:18 +00:00
// ===============
// Fast Operations
// ===============
2019-10-15 17:23:06 +00:00
/**
* store file
*/
2024-05-17 16:53:11 +00:00
public async fastPut(optionsArg: {
path: string;
contents: string | Buffer;
2024-05-20 23:22:21 +00:00
overwrite?: boolean;
}): Promise<File> {
2024-05-20 23:22:21 +00:00
try {
const reducedPath = await helpers.reducePathDescriptorToPath({
path: optionsArg.path,
})
2024-05-20 23:22:21 +00:00
// Check if the object already exists
const exists = await this.fastExists({ path: reducedPath });
2024-05-20 23:22:21 +00:00
if (exists && !optionsArg.overwrite) {
console.error(`Object already exists at path '${reducedPath}' in bucket '${this.name}'.`);
2024-05-20 23:22:21 +00:00
return;
} else if (exists && optionsArg.overwrite) {
console.log(`Overwriting existing object at path '${reducedPath}' in bucket '${this.name}'.`);
2024-05-20 23:22:21 +00:00
} else {
console.log(`Creating new object at path '${reducedPath}' in bucket '${this.name}'.`);
2024-05-20 23:22:21 +00:00
}
// Proceed with putting the object
const streamIntake = new plugins.smartstream.StreamIntake();
const putPromise = this.smartbucketRef.minioClient.putObject(this.name, reducedPath, streamIntake);
2024-05-20 23:22:21 +00:00
streamIntake.pushData(optionsArg.contents);
streamIntake.signalEnd();
await putPromise;
console.log(`Object '${reducedPath}' has been successfully stored in bucket '${this.name}'.`);
const parsedPath = plugins.path.parse(reducedPath);
return new File({
directoryRefArg: await this.getDirectoryFromPath({
path: parsedPath.dir,
}),
fileName: parsedPath.base,
});
2024-05-20 23:22:21 +00:00
} catch (error) {
console.error(`Error storing object at path '${optionsArg.path}' in bucket '${this.name}':`, error);
throw error;
}
2019-10-16 13:21:02 +00:00
}
2024-05-20 23:22:21 +00:00
2019-10-16 13:21:02 +00:00
/**
* get file
*/
2024-06-03 19:35:08 +00:00
public async fastGet(optionsArg: {
path: string
}): Promise<Buffer> {
2019-10-16 13:21:02 +00:00
const done = plugins.smartpromise.defer();
2020-06-19 00:42:26 +00:00
let completeFile: Buffer;
2024-06-03 19:35:08 +00:00
const replaySubject = await this.fastGetReplaySubject(optionsArg);
2024-05-17 16:53:11 +00:00
const subscription = replaySubject.subscribe({
next: (chunk) => {
2020-06-19 00:42:26 +00:00
if (completeFile) {
completeFile = Buffer.concat([completeFile, chunk]);
2021-04-07 19:01:35 +00:00
} else {
completeFile = chunk;
2020-06-19 00:42:26 +00:00
}
2019-10-20 10:27:58 +00:00
},
2024-05-17 16:53:11 +00:00
complete: () => {
2019-10-20 10:27:58 +00:00
done.resolve();
2021-04-07 18:42:03 +00:00
subscription.unsubscribe();
2024-05-17 16:53:11 +00:00
},
error: (err) => {
console.log(err);
},
});
2019-10-20 10:27:58 +00:00
await done.promise;
return completeFile;
}
2024-06-03 19:35:08 +00:00
/**
* good when time to first byte is important
* and multiple subscribers are expected
* @param optionsArg
* @returns
*/
public async fastGetReplaySubject(optionsArg: {
2024-05-17 16:53:11 +00:00
path: string;
}): Promise<plugins.smartrx.rxjs.ReplaySubject<Buffer>> {
2019-10-16 17:15:48 +00:00
const fileStream = await this.smartbucketRef.minioClient
2024-05-17 16:53:11 +00:00
.getObject(this.name, optionsArg.path)
2020-10-12 00:37:50 +00:00
.catch((e) => console.log(e));
2020-06-19 00:42:26 +00:00
const replaySubject = new plugins.smartrx.rxjs.ReplaySubject<Buffer>();
2024-05-17 17:09:00 +00:00
const duplexStream = new plugins.smartstream.SmartDuplex<Buffer, void>({
2024-05-17 16:53:11 +00:00
writeFunction: async (chunk) => {
2020-06-19 00:42:26 +00:00
replaySubject.next(chunk);
2024-05-17 17:09:00 +00:00
return;
2019-10-16 17:15:48 +00:00
},
2024-05-17 16:53:11 +00:00
finalFunction: async (cb) => {
2019-10-20 10:27:58 +00:00
replaySubject.complete();
2024-05-17 17:09:00 +00:00
return;
2019-10-16 17:15:48 +00:00
}
2024-05-17 16:53:11 +00:00
});
2019-10-16 13:21:02 +00:00
if (!fileStream) {
return null;
}
2022-06-07 15:15:36 +00:00
const smartstream = new plugins.smartstream.StreamWrapper([
2022-03-30 23:45:46 +00:00
fileStream,
duplexStream,
]);
smartstream.run();
2019-10-20 10:27:58 +00:00
return replaySubject;
2019-10-16 13:21:02 +00:00
}
2024-06-03 19:35:08 +00:00
public fastGetStream(optionsArg: {
path: string;
}, typeArg: 'webstream'): Promise<ReadableStream>
public async fastGetStream(optionsArg: {
path: string;
}, typeArg: 'nodestream'): Promise<plugins.stream.Readable>
/**
* fastGetStream
* @param optionsArg
* @returns
*/
public async fastGetStream(optionsArg: { path: string; }, typeArg: 'webstream' | 'nodestream' = 'nodestream'): Promise<ReadableStream | plugins.stream.Readable>{
const fileStream = await this.smartbucketRef.minioClient
.getObject(this.name, optionsArg.path)
.catch((e) => console.log(e));
const duplexStream = new plugins.smartstream.SmartDuplex<Buffer, Buffer>({
writeFunction: async (chunk) => {
return chunk;
},
finalFunction: async (cb) => {
return null;
}
});
if (!fileStream) {
return null;
}
const smartstream = new plugins.smartstream.StreamWrapper([
fileStream,
duplexStream,
]);
smartstream.run();
if (typeArg === 'nodestream') {
return duplexStream;
};
if (typeArg === 'webstream') {
return (await duplexStream.getWebStreams()).readable;
}
}
2024-05-05 17:52:50 +00:00
/**
* store file as stream
*/
public async fastPutStream(optionsArg: {
2024-05-17 16:53:11 +00:00
path: string;
2024-06-09 14:02:33 +00:00
readableStream: plugins.stream.Readable | ReadableStream;
2024-05-17 16:53:11 +00:00
nativeMetadata?: { [key: string]: string };
2024-05-20 23:22:21 +00:00
overwrite?: boolean;
2024-05-05 17:52:50 +00:00
}): Promise<void> {
2024-05-20 23:22:21 +00:00
try {
// Check if the object already exists
const exists = await this.fastExists({ path: optionsArg.path });
if (exists && !optionsArg.overwrite) {
console.error(`Object already exists at path '${optionsArg.path}' in bucket '${this.name}'.`);
return;
} else if (exists && optionsArg.overwrite) {
console.log(`Overwriting existing object at path '${optionsArg.path}' in bucket '${this.name}'.`);
} else {
console.log(`Creating new object at path '${optionsArg.path}' in bucket '${this.name}'.`);
}
2024-06-03 19:35:08 +00:00
2024-06-09 14:02:33 +00:00
const streamIntake = await plugins.smartstream.StreamIntake.fromStream<Uint8Array>(optionsArg.readableStream);
2024-05-20 23:22:21 +00:00
// Proceed with putting the object
await this.smartbucketRef.minioClient.putObject(
this.name,
optionsArg.path,
2024-06-03 19:35:08 +00:00
streamIntake,
2024-05-20 23:22:21 +00:00
null,
...(optionsArg.nativeMetadata
? (() => {
const returnObject: any = {};
return returnObject;
})()
: {})
);
console.log(`Object '${optionsArg.path}' has been successfully stored in bucket '${this.name}'.`);
} catch (error) {
console.error(`Error storing object at path '${optionsArg.path}' in bucket '${this.name}':`, error);
throw error;
}
2024-05-05 17:52:50 +00:00
}
2024-05-20 23:22:21 +00:00
2024-05-05 17:52:50 +00:00
public async fastCopy(optionsArg: {
sourcePath: string;
destinationPath?: string;
2024-05-17 16:53:11 +00:00
targetBucket?: Bucket;
nativeMetadata?: { [key: string]: string };
deleteExistingNativeMetadata?: boolean;
}): Promise<void> {
2024-05-05 17:52:50 +00:00
try {
2024-05-17 16:53:11 +00:00
const targetBucketName = optionsArg.targetBucket ? optionsArg.targetBucket.name : this.name;
2024-05-05 17:52:50 +00:00
// Retrieve current object information to use in copy conditions
2024-05-17 16:53:11 +00:00
const currentObjInfo = await this.smartbucketRef.minioClient.statObject(
targetBucketName,
optionsArg.sourcePath
2024-05-17 16:53:11 +00:00
);
2024-05-05 17:52:50 +00:00
// Setting up copy conditions
const copyConditions = new plugins.minio.CopyConditions();
2024-05-17 16:53:11 +00:00
// Prepare new metadata
const newNativeMetadata = {
...(optionsArg.deleteExistingNativeMetadata ? {} : currentObjInfo.metaData),
...optionsArg.nativeMetadata,
2024-05-05 17:52:50 +00:00
};
2024-05-17 16:53:11 +00:00
2024-05-05 17:52:50 +00:00
// Define the copy operation as a Promise
2024-05-17 16:53:11 +00:00
// TODO: check on issue here: https://github.com/minio/minio-js/issues/1286
2024-05-05 17:52:50 +00:00
await this.smartbucketRef.minioClient.copyObject(
2024-05-17 16:53:11 +00:00
this.name,
optionsArg.sourcePath,
`/${targetBucketName}/${optionsArg.destinationPath || optionsArg.sourcePath}`,
2024-05-17 16:53:11 +00:00
copyConditions
2024-05-05 17:52:50 +00:00
);
} catch (err) {
console.error('Error updating metadata:', err);
throw err; // rethrow to allow caller to handle
}
}
/**
* Move object from one path to another within the same bucket or to another bucket
*/
public async fastMove(optionsArg: {
sourcePath: string;
destinationPath: string;
targetBucket?: Bucket;
overwrite?: boolean;
}): Promise<void> {
try {
// Check if the destination object already exists
const destinationBucket = optionsArg.targetBucket || this;
const exists = await destinationBucket.fastExists({ path: optionsArg.destinationPath });
if (exists && !optionsArg.overwrite) {
console.error(`Object already exists at destination path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`);
return;
} else if (exists && optionsArg.overwrite) {
console.log(`Overwriting existing object at destination path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`);
} else {
console.log(`Moving object to path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`);
}
// Proceed with copying the object to the new path
await this.fastCopy(optionsArg);
// Remove the original object after successful copy
await this.fastRemove({ path: optionsArg.sourcePath });
console.log(`Object '${optionsArg.sourcePath}' has been successfully moved to '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`);
} catch (error) {
console.error(`Error moving object from '${optionsArg.sourcePath}' to '${optionsArg.destinationPath}':`, error);
throw error;
}
}
2019-10-16 13:21:02 +00:00
/**
* removeObject
*/
2024-05-17 16:53:11 +00:00
public async fastRemove(optionsArg: {
path: string;
}) {
await this.smartbucketRef.minioClient.removeObject(this.name, optionsArg.path);
}
2024-05-20 23:22:21 +00:00
/**
* check wether file exists
* @param optionsArg
* @returns
*/
public async fastExists(optionsArg: {
2024-05-17 16:53:11 +00:00
path: string;
}): Promise<boolean> {
try {
await this.smartbucketRef.minioClient.statObject(this.name, optionsArg.path);
console.log(`Object '${optionsArg.path}' exists in bucket '${this.name}'.`);
return true;
} catch (error) {
if (error.code === 'NotFound') {
console.log(`Object '${optionsArg.path}' does not exist in bucket '${this.name}'.`);
return false;
} else {
console.error('Error checking object existence:', error);
throw error; // Rethrow if it's not a NotFound error to handle unexpected issues
}
}
2019-10-15 17:23:06 +00:00
}
2024-05-20 23:22:21 +00:00
2024-06-03 19:35:08 +00:00
/**
* deletes this bucket
*/
public async delete() {
await this.smartbucketRef.minioClient.removeBucket(this.name);
}
public async fastStat(pathDescriptor: interfaces.IPathDecriptor) {
let checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor);
return this.smartbucketRef.minioClient.statObject(this.name, checkPath);
}
public async isDirectory(pathDescriptor: interfaces.IPathDecriptor): Promise<boolean> {
let checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor);
// lets check if the checkPath is a directory
const stream = this.smartbucketRef.minioClient.listObjectsV2(this.name, checkPath, true);
const done = plugins.smartpromise.defer<boolean>();
stream.on('data', (dataArg) => {
stream.destroy(); // Stop the stream early if we find at least one object
if (dataArg.prefix.startsWith(checkPath + '/')) {
done.resolve(true);
}
});
stream.on('end', () => {
done.resolve(false);
});
stream.on('error', (err) => {
done.reject(err);
});
return done.promise;
};
public async isFile(pathDescriptor: interfaces.IPathDecriptor): Promise<boolean> {
let checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor);
// lets check if the checkPath is a directory
const stream = this.smartbucketRef.minioClient.listObjectsV2(this.name, checkPath, true);
const done = plugins.smartpromise.defer<boolean>();
stream.on('data', (dataArg) => {
stream.destroy(); // Stop the stream early if we find at least one object
if (dataArg.prefix === checkPath) {
done.resolve(true);
}
});
stream.on('end', () => {
done.resolve(false);
});
stream.on('error', (err) => {
done.reject(err);
});
return done.promise;
2024-05-20 23:22:21 +00:00
}
2019-10-15 12:16:28 +00:00
}