|
|
|
@ -4,7 +4,13 @@ import * as interfaces from './interfaces.js';
|
|
|
|
|
import { SmartBucket } from './classes.smartbucket.js';
|
|
|
|
|
import { Directory } from './classes.directory.js';
|
|
|
|
|
import { File } from './classes.file.js';
|
|
|
|
|
import { Trash } from './classes.trash.js';
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The bucket class exposes the basc functionality of a bucket.
|
|
|
|
|
* The functions of the bucket alone are enough to
|
|
|
|
|
* operate in s3 basic fashion on blobs of data.
|
|
|
|
|
*/
|
|
|
|
|
export class Bucket {
|
|
|
|
|
public static async getBucketByName(smartbucketRef: SmartBucket, bucketNameArg: string) {
|
|
|
|
|
const buckets = await smartbucketRef.minioClient.listBuckets();
|
|
|
|
@ -45,7 +51,17 @@ export class Bucket {
|
|
|
|
|
return new Directory(this, null, '');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public async getDirectoryFromPath(pathDescriptorArg: interfaces.IPathDecriptor): Promise<Directory> {
|
|
|
|
|
/**
|
|
|
|
|
* gets the trash directory
|
|
|
|
|
*/
|
|
|
|
|
public async getTrash(): Promise<Trash> {
|
|
|
|
|
const trash = new Trash(this);
|
|
|
|
|
return trash;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public async getDirectoryFromPath(
|
|
|
|
|
pathDescriptorArg: interfaces.IPathDecriptor
|
|
|
|
|
): Promise<Directory> {
|
|
|
|
|
if (!pathDescriptorArg.path && !pathDescriptorArg.directory) {
|
|
|
|
|
return this.getBaseDirectory();
|
|
|
|
|
}
|
|
|
|
@ -61,34 +77,37 @@ export class Bucket {
|
|
|
|
|
/**
|
|
|
|
|
* store file
|
|
|
|
|
*/
|
|
|
|
|
public async fastPut(optionsArg: {
|
|
|
|
|
path: string;
|
|
|
|
|
public async fastPut(optionsArg: interfaces.IPathDecriptor & {
|
|
|
|
|
contents: string | Buffer;
|
|
|
|
|
overwrite?: boolean;
|
|
|
|
|
}): Promise<File> {
|
|
|
|
|
try {
|
|
|
|
|
const reducedPath = await helpers.reducePathDescriptorToPath({
|
|
|
|
|
path: optionsArg.path,
|
|
|
|
|
})
|
|
|
|
|
const reducedPath = await helpers.reducePathDescriptorToPath(optionsArg);
|
|
|
|
|
// Check if the object already exists
|
|
|
|
|
const exists = await this.fastExists({ path: reducedPath });
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (exists && !optionsArg.overwrite) {
|
|
|
|
|
console.error(`Object already exists at path '${reducedPath}' in bucket '${this.name}'.`);
|
|
|
|
|
return;
|
|
|
|
|
} else if (exists && optionsArg.overwrite) {
|
|
|
|
|
console.log(`Overwriting existing object at path '${reducedPath}' in bucket '${this.name}'.`);
|
|
|
|
|
console.log(
|
|
|
|
|
`Overwriting existing object at path '${reducedPath}' in bucket '${this.name}'.`
|
|
|
|
|
);
|
|
|
|
|
} else {
|
|
|
|
|
console.log(`Creating new object at path '${reducedPath}' in bucket '${this.name}'.`);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Proceed with putting the object
|
|
|
|
|
const streamIntake = new plugins.smartstream.StreamIntake();
|
|
|
|
|
const putPromise = this.smartbucketRef.minioClient.putObject(this.name, reducedPath, streamIntake);
|
|
|
|
|
const putPromise = this.smartbucketRef.minioClient.putObject(
|
|
|
|
|
this.name,
|
|
|
|
|
reducedPath,
|
|
|
|
|
streamIntake
|
|
|
|
|
);
|
|
|
|
|
streamIntake.pushData(optionsArg.contents);
|
|
|
|
|
streamIntake.signalEnd();
|
|
|
|
|
await putPromise;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
console.log(`Object '${reducedPath}' has been successfully stored in bucket '${this.name}'.`);
|
|
|
|
|
const parsedPath = plugins.path.parse(reducedPath);
|
|
|
|
|
return new File({
|
|
|
|
@ -98,18 +117,18 @@ export class Bucket {
|
|
|
|
|
fileName: parsedPath.base,
|
|
|
|
|
});
|
|
|
|
|
} catch (error) {
|
|
|
|
|
console.error(`Error storing object at path '${optionsArg.path}' in bucket '${this.name}':`, error);
|
|
|
|
|
console.error(
|
|
|
|
|
`Error storing object at path '${optionsArg.path}' in bucket '${this.name}':`,
|
|
|
|
|
error
|
|
|
|
|
);
|
|
|
|
|
throw error;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* get file
|
|
|
|
|
*/
|
|
|
|
|
public async fastGet(optionsArg: {
|
|
|
|
|
path: string
|
|
|
|
|
}): Promise<Buffer> {
|
|
|
|
|
public async fastGet(optionsArg: { path: string }): Promise<Buffer> {
|
|
|
|
|
const done = plugins.smartpromise.defer();
|
|
|
|
|
let completeFile: Buffer;
|
|
|
|
|
const replaySubject = await this.fastGetReplaySubject(optionsArg);
|
|
|
|
@ -137,7 +156,7 @@ export class Bucket {
|
|
|
|
|
* good when time to first byte is important
|
|
|
|
|
* and multiple subscribers are expected
|
|
|
|
|
* @param optionsArg
|
|
|
|
|
* @returns
|
|
|
|
|
* @returns
|
|
|
|
|
*/
|
|
|
|
|
public async fastGetReplaySubject(optionsArg: {
|
|
|
|
|
path: string;
|
|
|
|
@ -154,34 +173,40 @@ export class Bucket {
|
|
|
|
|
finalFunction: async (cb) => {
|
|
|
|
|
replaySubject.complete();
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
if (!fileStream) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const smartstream = new plugins.smartstream.StreamWrapper([
|
|
|
|
|
fileStream,
|
|
|
|
|
duplexStream,
|
|
|
|
|
]);
|
|
|
|
|
const smartstream = new plugins.smartstream.StreamWrapper([fileStream, duplexStream]);
|
|
|
|
|
smartstream.run();
|
|
|
|
|
return replaySubject;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public fastGetStream(optionsArg: {
|
|
|
|
|
path: string;
|
|
|
|
|
}, typeArg: 'webstream'): Promise<ReadableStream>
|
|
|
|
|
public async fastGetStream(optionsArg: {
|
|
|
|
|
path: string;
|
|
|
|
|
}, typeArg: 'nodestream'): Promise<plugins.stream.Readable>
|
|
|
|
|
public fastGetStream(
|
|
|
|
|
optionsArg: {
|
|
|
|
|
path: string;
|
|
|
|
|
},
|
|
|
|
|
typeArg: 'webstream'
|
|
|
|
|
): Promise<ReadableStream>;
|
|
|
|
|
public async fastGetStream(
|
|
|
|
|
optionsArg: {
|
|
|
|
|
path: string;
|
|
|
|
|
},
|
|
|
|
|
typeArg: 'nodestream'
|
|
|
|
|
): Promise<plugins.stream.Readable>;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* fastGetStream
|
|
|
|
|
* @param optionsArg
|
|
|
|
|
* @returns
|
|
|
|
|
* @returns
|
|
|
|
|
*/
|
|
|
|
|
public async fastGetStream(optionsArg: { path: string; }, typeArg: 'webstream' | 'nodestream' = 'nodestream'): Promise<ReadableStream | plugins.stream.Readable>{
|
|
|
|
|
public async fastGetStream(
|
|
|
|
|
optionsArg: { path: string },
|
|
|
|
|
typeArg: 'webstream' | 'nodestream' = 'nodestream'
|
|
|
|
|
): Promise<ReadableStream | plugins.stream.Readable> {
|
|
|
|
|
const fileStream = await this.smartbucketRef.minioClient
|
|
|
|
|
.getObject(this.name, optionsArg.path)
|
|
|
|
|
.catch((e) => console.log(e));
|
|
|
|
@ -191,21 +216,18 @@ export class Bucket {
|
|
|
|
|
},
|
|
|
|
|
finalFunction: async (cb) => {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
if (!fileStream) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const smartstream = new plugins.smartstream.StreamWrapper([
|
|
|
|
|
fileStream,
|
|
|
|
|
duplexStream,
|
|
|
|
|
]);
|
|
|
|
|
const smartstream = new plugins.smartstream.StreamWrapper([fileStream, duplexStream]);
|
|
|
|
|
smartstream.run();
|
|
|
|
|
if (typeArg === 'nodestream') {
|
|
|
|
|
return duplexStream;
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
if (typeArg === 'webstream') {
|
|
|
|
|
return (await duplexStream.getWebStreams()).readable;
|
|
|
|
|
}
|
|
|
|
@ -223,34 +245,44 @@ export class Bucket {
|
|
|
|
|
try {
|
|
|
|
|
// Check if the object already exists
|
|
|
|
|
const exists = await this.fastExists({ path: optionsArg.path });
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (exists && !optionsArg.overwrite) {
|
|
|
|
|
console.error(`Object already exists at path '${optionsArg.path}' in bucket '${this.name}'.`);
|
|
|
|
|
console.error(
|
|
|
|
|
`Object already exists at path '${optionsArg.path}' in bucket '${this.name}'.`
|
|
|
|
|
);
|
|
|
|
|
return;
|
|
|
|
|
} else if (exists && optionsArg.overwrite) {
|
|
|
|
|
console.log(`Overwriting existing object at path '${optionsArg.path}' in bucket '${this.name}'.`);
|
|
|
|
|
console.log(
|
|
|
|
|
`Overwriting existing object at path '${optionsArg.path}' in bucket '${this.name}'.`
|
|
|
|
|
);
|
|
|
|
|
} else {
|
|
|
|
|
console.log(`Creating new object at path '${optionsArg.path}' in bucket '${this.name}'.`);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const streamIntake = await plugins.smartstream.StreamIntake.fromStream<Uint8Array>(optionsArg.readableStream);
|
|
|
|
|
|
|
|
|
|
const streamIntake = await plugins.smartstream.StreamIntake.fromStream<Uint8Array>(
|
|
|
|
|
optionsArg.readableStream
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Proceed with putting the object
|
|
|
|
|
await this.smartbucketRef.minioClient.putObject(
|
|
|
|
|
this.name,
|
|
|
|
|
optionsArg.path,
|
|
|
|
|
streamIntake,
|
|
|
|
|
null,
|
|
|
|
|
null, // TODO: Add support for custom metadata once proper support is in minio.
|
|
|
|
|
null // TODO: Add support for custom metadata once proper support is in minio.
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
console.log(
|
|
|
|
|
`Object '${optionsArg.path}' has been successfully stored in bucket '${this.name}'.`
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
console.log(`Object '${optionsArg.path}' has been successfully stored in bucket '${this.name}'.`);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
console.error(`Error storing object at path '${optionsArg.path}' in bucket '${this.name}':`, error);
|
|
|
|
|
console.error(
|
|
|
|
|
`Error storing object at path '${optionsArg.path}' in bucket '${this.name}':`,
|
|
|
|
|
error
|
|
|
|
|
);
|
|
|
|
|
throw error;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public async fastCopy(optionsArg: {
|
|
|
|
|
sourcePath: string;
|
|
|
|
@ -291,60 +323,66 @@ export class Bucket {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
/**
|
|
|
|
|
* Move object from one path to another within the same bucket or to another bucket
|
|
|
|
|
*/
|
|
|
|
|
public async fastMove(optionsArg: {
|
|
|
|
|
sourcePath: string;
|
|
|
|
|
destinationPath: string;
|
|
|
|
|
targetBucket?: Bucket;
|
|
|
|
|
overwrite?: boolean;
|
|
|
|
|
}): Promise<void> {
|
|
|
|
|
try {
|
|
|
|
|
// Check if the destination object already exists
|
|
|
|
|
const destinationBucket = optionsArg.targetBucket || this;
|
|
|
|
|
const exists = await destinationBucket.fastExists({ path: optionsArg.destinationPath });
|
|
|
|
|
|
|
|
|
|
if (exists && !optionsArg.overwrite) {
|
|
|
|
|
console.error(`Object already exists at destination path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`);
|
|
|
|
|
return;
|
|
|
|
|
} else if (exists && optionsArg.overwrite) {
|
|
|
|
|
console.log(`Overwriting existing object at destination path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`);
|
|
|
|
|
} else {
|
|
|
|
|
console.log(`Moving object to path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Proceed with copying the object to the new path
|
|
|
|
|
await this.fastCopy(optionsArg);
|
|
|
|
|
|
|
|
|
|
// Remove the original object after successful copy
|
|
|
|
|
await this.fastRemove({ path: optionsArg.sourcePath });
|
|
|
|
|
|
|
|
|
|
console.log(`Object '${optionsArg.sourcePath}' has been successfully moved to '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
console.error(`Error moving object from '${optionsArg.sourcePath}' to '${optionsArg.destinationPath}':`, error);
|
|
|
|
|
throw error;
|
|
|
|
|
public async fastMove(optionsArg: {
|
|
|
|
|
sourcePath: string;
|
|
|
|
|
destinationPath: string;
|
|
|
|
|
targetBucket?: Bucket;
|
|
|
|
|
overwrite?: boolean;
|
|
|
|
|
}): Promise<void> {
|
|
|
|
|
try {
|
|
|
|
|
// Check if the destination object already exists
|
|
|
|
|
const destinationBucket = optionsArg.targetBucket || this;
|
|
|
|
|
const exists = await destinationBucket.fastExists({ path: optionsArg.destinationPath });
|
|
|
|
|
|
|
|
|
|
if (exists && !optionsArg.overwrite) {
|
|
|
|
|
console.error(
|
|
|
|
|
`Object already exists at destination path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`
|
|
|
|
|
);
|
|
|
|
|
return;
|
|
|
|
|
} else if (exists && optionsArg.overwrite) {
|
|
|
|
|
console.log(
|
|
|
|
|
`Overwriting existing object at destination path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`
|
|
|
|
|
);
|
|
|
|
|
} else {
|
|
|
|
|
console.log(
|
|
|
|
|
`Moving object to path '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Proceed with copying the object to the new path
|
|
|
|
|
await this.fastCopy(optionsArg);
|
|
|
|
|
|
|
|
|
|
// Remove the original object after successful copy
|
|
|
|
|
await this.fastRemove({ path: optionsArg.sourcePath });
|
|
|
|
|
|
|
|
|
|
console.log(
|
|
|
|
|
`Object '${optionsArg.sourcePath}' has been successfully moved to '${optionsArg.destinationPath}' in bucket '${destinationBucket.name}'.`
|
|
|
|
|
);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
console.error(
|
|
|
|
|
`Error moving object from '${optionsArg.sourcePath}' to '${optionsArg.destinationPath}':`,
|
|
|
|
|
error
|
|
|
|
|
);
|
|
|
|
|
throw error;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* removeObject
|
|
|
|
|
*/
|
|
|
|
|
public async fastRemove(optionsArg: {
|
|
|
|
|
path: string;
|
|
|
|
|
}) {
|
|
|
|
|
public async fastRemove(optionsArg: { path: string }) {
|
|
|
|
|
await this.smartbucketRef.minioClient.removeObject(this.name, optionsArg.path);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* check wether file exists
|
|
|
|
|
* @param optionsArg
|
|
|
|
|
* @returns
|
|
|
|
|
* @returns
|
|
|
|
|
*/
|
|
|
|
|
public async fastExists(optionsArg: {
|
|
|
|
|
path: string;
|
|
|
|
|
}): Promise<boolean> {
|
|
|
|
|
public async fastExists(optionsArg: { path: string }): Promise<boolean> {
|
|
|
|
|
try {
|
|
|
|
|
await this.smartbucketRef.minioClient.statObject(this.name, optionsArg.path);
|
|
|
|
|
console.log(`Object '${optionsArg.path}' exists in bucket '${this.name}'.`);
|
|
|
|
@ -374,7 +412,7 @@ export class Bucket {
|
|
|
|
|
|
|
|
|
|
public async isDirectory(pathDescriptor: interfaces.IPathDecriptor): Promise<boolean> {
|
|
|
|
|
let checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// lets check if the checkPath is a directory
|
|
|
|
|
const stream = this.smartbucketRef.minioClient.listObjectsV2(this.name, checkPath, true);
|
|
|
|
|
const done = plugins.smartpromise.defer<boolean>();
|
|
|
|
@ -384,21 +422,21 @@ export class Bucket {
|
|
|
|
|
done.resolve(true);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
stream.on('end', () => {
|
|
|
|
|
done.resolve(false);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
stream.on('error', (err) => {
|
|
|
|
|
done.reject(err);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
return done.promise;
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public async isFile(pathDescriptor: interfaces.IPathDecriptor): Promise<boolean> {
|
|
|
|
|
let checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// lets check if the checkPath is a directory
|
|
|
|
|
const stream = this.smartbucketRef.minioClient.listObjectsV2(this.name, checkPath, true);
|
|
|
|
|
const done = plugins.smartpromise.defer<boolean>();
|
|
|
|
@ -408,11 +446,11 @@ export class Bucket {
|
|
|
|
|
done.resolve(true);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
stream.on('end', () => {
|
|
|
|
|
done.resolve(false);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
stream.on('error', (err) => {
|
|
|
|
|
done.reject(err);
|
|
|
|
|
});
|
|
|
|
|