fix(core): update
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
// classes.bucket.ts
|
||||
|
||||
import * as plugins from './plugins.js';
|
||||
import * as helpers from './helpers.js';
|
||||
import * as interfaces from './interfaces.js';
|
||||
@@ -7,16 +9,15 @@ import { File } from './classes.file.js';
|
||||
import { Trash } from './classes.trash.js';
|
||||
|
||||
/**
|
||||
* The bucket class exposes the basc functionality of a bucket.
|
||||
* The bucket class exposes the basic functionality of a bucket.
|
||||
* The functions of the bucket alone are enough to
|
||||
* operate in s3 basic fashion on blobs of data.
|
||||
* operate in S3 basic fashion on blobs of data.
|
||||
*/
|
||||
export class Bucket {
|
||||
public static async getBucketByName(smartbucketRef: SmartBucket, bucketNameArg: string) {
|
||||
const buckets = await smartbucketRef.minioClient.listBuckets();
|
||||
const foundBucket = buckets.find((bucket) => {
|
||||
return bucket.name === bucketNameArg;
|
||||
});
|
||||
const command = new plugins.s3.ListBucketsCommand({});
|
||||
const buckets = await smartbucketRef.s3Client.send(command);
|
||||
const foundBucket = buckets.Buckets.find((bucket) => bucket.Name === bucketNameArg);
|
||||
|
||||
if (foundBucket) {
|
||||
console.log(`bucket with name ${bucketNameArg} exists.`);
|
||||
@@ -28,12 +29,14 @@ export class Bucket {
|
||||
}
|
||||
|
||||
public static async createBucketByName(smartbucketRef: SmartBucket, bucketName: string) {
|
||||
await smartbucketRef.minioClient.makeBucket(bucketName, 'ams3').catch((e) => console.log(e));
|
||||
const command = new plugins.s3.CreateBucketCommand({ Bucket: bucketName });
|
||||
await smartbucketRef.s3Client.send(command).catch((e) => console.log(e));
|
||||
return new Bucket(smartbucketRef, bucketName);
|
||||
}
|
||||
|
||||
public static async removeBucketByName(smartbucketRef: SmartBucket, bucketName: string) {
|
||||
await smartbucketRef.minioClient.removeBucket(bucketName).catch((e) => console.log(e));
|
||||
const command = new plugins.s3.DeleteBucketCommand({ Bucket: bucketName });
|
||||
await smartbucketRef.s3Client.send(command).catch((e) => console.log(e));
|
||||
}
|
||||
|
||||
public smartbucketRef: SmartBucket;
|
||||
@@ -65,7 +68,7 @@ export class Bucket {
|
||||
if (!pathDescriptorArg.path && !pathDescriptorArg.directory) {
|
||||
return this.getBaseDirectory();
|
||||
}
|
||||
let checkPath = await helpers.reducePathDescriptorToPath(pathDescriptorArg);
|
||||
const checkPath = await helpers.reducePathDescriptorToPath(pathDescriptorArg);
|
||||
const baseDirectory = await this.getBaseDirectory();
|
||||
return await baseDirectory.getSubDirectoryByName(checkPath);
|
||||
}
|
||||
@@ -77,13 +80,14 @@ export class Bucket {
|
||||
/**
|
||||
* store file
|
||||
*/
|
||||
public async fastPut(optionsArg: interfaces.IPathDecriptor & {
|
||||
contents: string | Buffer;
|
||||
overwrite?: boolean;
|
||||
}): Promise<File> {
|
||||
public async fastPut(
|
||||
optionsArg: interfaces.IPathDecriptor & {
|
||||
contents: string | Buffer;
|
||||
overwrite?: boolean;
|
||||
}
|
||||
): Promise<File> {
|
||||
try {
|
||||
const reducedPath = await helpers.reducePathDescriptorToPath(optionsArg);
|
||||
// Check if the object already exists
|
||||
const exists = await this.fastExists({ path: reducedPath });
|
||||
|
||||
if (exists && !optionsArg.overwrite) {
|
||||
@@ -97,16 +101,12 @@ export class Bucket {
|
||||
console.log(`Creating new object at path '${reducedPath}' in bucket '${this.name}'.`);
|
||||
}
|
||||
|
||||
// Proceed with putting the object
|
||||
const streamIntake = new plugins.smartstream.StreamIntake();
|
||||
const putPromise = this.smartbucketRef.minioClient.putObject(
|
||||
this.name,
|
||||
reducedPath,
|
||||
streamIntake
|
||||
);
|
||||
streamIntake.pushData(optionsArg.contents);
|
||||
streamIntake.signalEnd();
|
||||
await putPromise;
|
||||
const command = new plugins.s3.PutObjectCommand({
|
||||
Bucket: this.name,
|
||||
Key: reducedPath,
|
||||
Body: optionsArg.contents,
|
||||
});
|
||||
await this.smartbucketRef.s3Client.send(command);
|
||||
|
||||
console.log(`Object '${reducedPath}' has been successfully stored in bucket '${this.name}'.`);
|
||||
const parsedPath = plugins.path.parse(reducedPath);
|
||||
@@ -161,27 +161,30 @@ export class Bucket {
|
||||
public async fastGetReplaySubject(optionsArg: {
|
||||
path: string;
|
||||
}): Promise<plugins.smartrx.rxjs.ReplaySubject<Buffer>> {
|
||||
const fileStream = await this.smartbucketRef.minioClient
|
||||
.getObject(this.name, optionsArg.path)
|
||||
.catch((e) => console.log(e));
|
||||
const replaySubject = new plugins.smartrx.rxjs.ReplaySubject<Buffer>();
|
||||
const duplexStream = new plugins.smartstream.SmartDuplex<Buffer, void>({
|
||||
writeFunction: async (chunk) => {
|
||||
replaySubject.next(chunk);
|
||||
return;
|
||||
},
|
||||
finalFunction: async (cb) => {
|
||||
replaySubject.complete();
|
||||
return;
|
||||
},
|
||||
const command = new plugins.s3.GetObjectCommand({
|
||||
Bucket: this.name,
|
||||
Key: optionsArg.path,
|
||||
});
|
||||
const response = await this.smartbucketRef.s3Client.send(command);
|
||||
const replaySubject = new plugins.smartrx.rxjs.ReplaySubject<Buffer>();
|
||||
|
||||
if (!fileStream) {
|
||||
return null;
|
||||
// Convert the stream to a format that supports piping
|
||||
const stream = response.Body as any; // SdkStreamMixin includes readable stream
|
||||
if (typeof stream.pipe === 'function') {
|
||||
const duplexStream = new plugins.smartstream.SmartDuplex<Buffer, void>({
|
||||
writeFunction: async (chunk) => {
|
||||
replaySubject.next(chunk);
|
||||
return;
|
||||
},
|
||||
finalFunction: async (cb) => {
|
||||
replaySubject.complete();
|
||||
return;
|
||||
},
|
||||
});
|
||||
|
||||
stream.pipe(duplexStream);
|
||||
}
|
||||
|
||||
const smartstream = new plugins.smartstream.StreamWrapper([fileStream, duplexStream]);
|
||||
smartstream.run();
|
||||
return replaySubject;
|
||||
}
|
||||
|
||||
@@ -198,18 +201,17 @@ export class Bucket {
|
||||
typeArg: 'nodestream'
|
||||
): Promise<plugins.stream.Readable>;
|
||||
|
||||
/**
|
||||
* fastGetStream
|
||||
* @param optionsArg
|
||||
* @returns
|
||||
*/
|
||||
public async fastGetStream(
|
||||
optionsArg: { path: string },
|
||||
typeArg: 'webstream' | 'nodestream' = 'nodestream'
|
||||
): Promise<ReadableStream | plugins.stream.Readable> {
|
||||
const fileStream = await this.smartbucketRef.minioClient
|
||||
.getObject(this.name, optionsArg.path)
|
||||
.catch((e) => console.log(e));
|
||||
const command = new plugins.s3.GetObjectCommand({
|
||||
Bucket: this.name,
|
||||
Key: optionsArg.path,
|
||||
});
|
||||
const response = await this.smartbucketRef.s3Client.send(command);
|
||||
const stream = response.Body as any; // SdkStreamMixin includes readable stream
|
||||
|
||||
const duplexStream = new plugins.smartstream.SmartDuplex<Buffer, Buffer>({
|
||||
writeFunction: async (chunk) => {
|
||||
return chunk;
|
||||
@@ -219,12 +221,10 @@ export class Bucket {
|
||||
},
|
||||
});
|
||||
|
||||
if (!fileStream) {
|
||||
return null;
|
||||
if (typeof stream.pipe === 'function') {
|
||||
stream.pipe(duplexStream);
|
||||
}
|
||||
|
||||
const smartstream = new plugins.smartstream.StreamWrapper([fileStream, duplexStream]);
|
||||
smartstream.run();
|
||||
if (typeArg === 'nodestream') {
|
||||
return duplexStream;
|
||||
}
|
||||
@@ -243,7 +243,6 @@ export class Bucket {
|
||||
overwrite?: boolean;
|
||||
}): Promise<void> {
|
||||
try {
|
||||
// Check if the object already exists
|
||||
const exists = await this.fastExists({ path: optionsArg.path });
|
||||
|
||||
if (exists && !optionsArg.overwrite) {
|
||||
@@ -259,18 +258,13 @@ export class Bucket {
|
||||
console.log(`Creating new object at path '${optionsArg.path}' in bucket '${this.name}'.`);
|
||||
}
|
||||
|
||||
const streamIntake = await plugins.smartstream.StreamIntake.fromStream<Uint8Array>(
|
||||
optionsArg.readableStream
|
||||
);
|
||||
|
||||
// Proceed with putting the object
|
||||
await this.smartbucketRef.minioClient.putObject(
|
||||
this.name,
|
||||
optionsArg.path,
|
||||
streamIntake,
|
||||
null,
|
||||
null // TODO: Add support for custom metadata once proper support is in minio.
|
||||
);
|
||||
const command = new plugins.s3.PutObjectCommand({
|
||||
Bucket: this.name,
|
||||
Key: optionsArg.path,
|
||||
Body: optionsArg.readableStream,
|
||||
Metadata: optionsArg.nativeMetadata,
|
||||
});
|
||||
await this.smartbucketRef.s3Client.send(command);
|
||||
|
||||
console.log(
|
||||
`Object '${optionsArg.path}' has been successfully stored in bucket '${this.name}'.`
|
||||
@@ -295,28 +289,29 @@ export class Bucket {
|
||||
const targetBucketName = optionsArg.targetBucket ? optionsArg.targetBucket.name : this.name;
|
||||
|
||||
// Retrieve current object information to use in copy conditions
|
||||
const currentObjInfo = await this.smartbucketRef.minioClient.statObject(
|
||||
targetBucketName,
|
||||
optionsArg.sourcePath
|
||||
const currentObjInfo = await this.smartbucketRef.s3Client.send(
|
||||
new plugins.s3.HeadObjectCommand({
|
||||
Bucket: this.name,
|
||||
Key: optionsArg.sourcePath,
|
||||
})
|
||||
);
|
||||
|
||||
// Setting up copy conditions
|
||||
const copyConditions = new plugins.minio.CopyConditions();
|
||||
|
||||
// Prepare new metadata
|
||||
const newNativeMetadata = {
|
||||
...(optionsArg.deleteExistingNativeMetadata ? {} : currentObjInfo.metaData),
|
||||
...(optionsArg.deleteExistingNativeMetadata ? {} : currentObjInfo.Metadata),
|
||||
...optionsArg.nativeMetadata,
|
||||
};
|
||||
|
||||
// Define the copy operation as a Promise
|
||||
// TODO: check on issue here: https://github.com/minio/minio-js/issues/1286
|
||||
await this.smartbucketRef.minioClient.copyObject(
|
||||
this.name,
|
||||
optionsArg.sourcePath,
|
||||
`/${targetBucketName}/${optionsArg.destinationPath || optionsArg.sourcePath}`,
|
||||
copyConditions
|
||||
);
|
||||
// Define the copy operation
|
||||
const copySource = `${this.name}/${optionsArg.sourcePath}`;
|
||||
const command = new plugins.s3.CopyObjectCommand({
|
||||
Bucket: targetBucketName,
|
||||
CopySource: copySource,
|
||||
Key: optionsArg.destinationPath || optionsArg.sourcePath,
|
||||
Metadata: newNativeMetadata,
|
||||
MetadataDirective: optionsArg.deleteExistingNativeMetadata ? 'REPLACE' : 'COPY',
|
||||
});
|
||||
await this.smartbucketRef.s3Client.send(command);
|
||||
} catch (err) {
|
||||
console.error('Error updating metadata:', err);
|
||||
throw err; // rethrow to allow caller to handle
|
||||
@@ -333,7 +328,6 @@ export class Bucket {
|
||||
overwrite?: boolean;
|
||||
}): Promise<void> {
|
||||
try {
|
||||
// Check if the destination object already exists
|
||||
const destinationBucket = optionsArg.targetBucket || this;
|
||||
const exists = await destinationBucket.fastExists({ path: optionsArg.destinationPath });
|
||||
|
||||
@@ -352,10 +346,7 @@ export class Bucket {
|
||||
);
|
||||
}
|
||||
|
||||
// Proceed with copying the object to the new path
|
||||
await this.fastCopy(optionsArg);
|
||||
|
||||
// Remove the original object after successful copy
|
||||
await this.fastRemove({ path: optionsArg.sourcePath });
|
||||
|
||||
console.log(
|
||||
@@ -374,21 +365,29 @@ export class Bucket {
|
||||
* removeObject
|
||||
*/
|
||||
public async fastRemove(optionsArg: { path: string }) {
|
||||
await this.smartbucketRef.minioClient.removeObject(this.name, optionsArg.path);
|
||||
const command = new plugins.s3.DeleteObjectCommand({
|
||||
Bucket: this.name,
|
||||
Key: optionsArg.path,
|
||||
});
|
||||
await this.smartbucketRef.s3Client.send(command);
|
||||
}
|
||||
|
||||
/**
|
||||
* check wether file exists
|
||||
* check whether file exists
|
||||
* @param optionsArg
|
||||
* @returns
|
||||
*/
|
||||
public async fastExists(optionsArg: { path: string }): Promise<boolean> {
|
||||
try {
|
||||
await this.smartbucketRef.minioClient.statObject(this.name, optionsArg.path);
|
||||
const command = new plugins.s3.HeadObjectCommand({
|
||||
Bucket: this.name,
|
||||
Key: optionsArg.path,
|
||||
});
|
||||
await this.smartbucketRef.s3Client.send(command);
|
||||
console.log(`Object '${optionsArg.path}' exists in bucket '${this.name}'.`);
|
||||
return true;
|
||||
} catch (error) {
|
||||
if (error.code === 'NotFound') {
|
||||
if (error.name === 'NotFound') {
|
||||
console.log(`Object '${optionsArg.path}' does not exist in bucket '${this.name}'.`);
|
||||
return false;
|
||||
} else {
|
||||
@@ -402,59 +401,39 @@ export class Bucket {
|
||||
* deletes this bucket
|
||||
*/
|
||||
public async delete() {
|
||||
await this.smartbucketRef.minioClient.removeBucket(this.name);
|
||||
await this.smartbucketRef.s3Client.send(
|
||||
new plugins.s3.DeleteBucketCommand({ Bucket: this.name })
|
||||
);
|
||||
}
|
||||
|
||||
public async fastStat(pathDescriptor: interfaces.IPathDecriptor) {
|
||||
let checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor);
|
||||
return this.smartbucketRef.minioClient.statObject(this.name, checkPath);
|
||||
const checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor);
|
||||
const command = new plugins.s3.HeadObjectCommand({
|
||||
Bucket: this.name,
|
||||
Key: checkPath,
|
||||
});
|
||||
return this.smartbucketRef.s3Client.send(command);
|
||||
}
|
||||
|
||||
public async isDirectory(pathDescriptor: interfaces.IPathDecriptor): Promise<boolean> {
|
||||
let checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor);
|
||||
|
||||
// lets check if the checkPath is a directory
|
||||
const stream = this.smartbucketRef.minioClient.listObjectsV2(this.name, checkPath, true);
|
||||
const done = plugins.smartpromise.defer<boolean>();
|
||||
stream.on('data', (dataArg) => {
|
||||
stream.destroy(); // Stop the stream early if we find at least one object
|
||||
if (dataArg.prefix.startsWith(checkPath + '/')) {
|
||||
done.resolve(true);
|
||||
}
|
||||
const checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor);
|
||||
const command = new plugins.s3.ListObjectsV2Command({
|
||||
Bucket: this.name,
|
||||
Prefix: checkPath,
|
||||
Delimiter: '/',
|
||||
});
|
||||
|
||||
stream.on('end', () => {
|
||||
done.resolve(false);
|
||||
});
|
||||
|
||||
stream.on('error', (err) => {
|
||||
done.reject(err);
|
||||
});
|
||||
|
||||
return done.promise;
|
||||
const response = await this.smartbucketRef.s3Client.send(command);
|
||||
return response.CommonPrefixes.length > 0;
|
||||
}
|
||||
|
||||
public async isFile(pathDescriptor: interfaces.IPathDecriptor): Promise<boolean> {
|
||||
let checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor);
|
||||
|
||||
// lets check if the checkPath is a directory
|
||||
const stream = this.smartbucketRef.minioClient.listObjectsV2(this.name, checkPath, true);
|
||||
const done = plugins.smartpromise.defer<boolean>();
|
||||
stream.on('data', (dataArg) => {
|
||||
stream.destroy(); // Stop the stream early if we find at least one object
|
||||
if (dataArg.prefix === checkPath) {
|
||||
done.resolve(true);
|
||||
}
|
||||
const checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor);
|
||||
const command = new plugins.s3.ListObjectsV2Command({
|
||||
Bucket: this.name,
|
||||
Prefix: checkPath,
|
||||
Delimiter: '/',
|
||||
});
|
||||
|
||||
stream.on('end', () => {
|
||||
done.resolve(false);
|
||||
});
|
||||
|
||||
stream.on('error', (err) => {
|
||||
done.reject(err);
|
||||
});
|
||||
|
||||
return done.promise;
|
||||
const response = await this.smartbucketRef.s3Client.send(command);
|
||||
return response.Contents.length > 0;
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user