fix(core): update
This commit is contained in:
@@ -107,10 +107,12 @@ export class Bucket {
|
||||
/**
|
||||
* get file
|
||||
*/
|
||||
public async fastGet(optionsArg: Parameters<typeof this.fastGetStream>[0]): Promise<Buffer> {
|
||||
public async fastGet(optionsArg: {
|
||||
path: string
|
||||
}): Promise<Buffer> {
|
||||
const done = plugins.smartpromise.defer();
|
||||
let completeFile: Buffer;
|
||||
const replaySubject = await this.fastGetStream(optionsArg);
|
||||
const replaySubject = await this.fastGetReplaySubject(optionsArg);
|
||||
const subscription = replaySubject.subscribe({
|
||||
next: (chunk) => {
|
||||
if (completeFile) {
|
||||
@@ -131,7 +133,13 @@ export class Bucket {
|
||||
return completeFile;
|
||||
}
|
||||
|
||||
public async fastGetStream(optionsArg: {
|
||||
/**
|
||||
* good when time to first byte is important
|
||||
* and multiple subscribers are expected
|
||||
* @param optionsArg
|
||||
* @returns
|
||||
*/
|
||||
public async fastGetReplaySubject(optionsArg: {
|
||||
path: string;
|
||||
}): Promise<plugins.smartrx.rxjs.ReplaySubject<Buffer>> {
|
||||
const fileStream = await this.smartbucketRef.minioClient
|
||||
@@ -161,12 +169,54 @@ export class Bucket {
|
||||
return replaySubject;
|
||||
}
|
||||
|
||||
public fastGetStream(optionsArg: {
|
||||
path: string;
|
||||
}, typeArg: 'webstream'): Promise<ReadableStream>
|
||||
public async fastGetStream(optionsArg: {
|
||||
path: string;
|
||||
}, typeArg: 'nodestream'): Promise<plugins.stream.Readable>
|
||||
|
||||
/**
|
||||
* fastGetStream
|
||||
* @param optionsArg
|
||||
* @returns
|
||||
*/
|
||||
public async fastGetStream(optionsArg: { path: string; }, typeArg: 'webstream' | 'nodestream' = 'nodestream'): Promise<ReadableStream | plugins.stream.Readable>{
|
||||
const fileStream = await this.smartbucketRef.minioClient
|
||||
.getObject(this.name, optionsArg.path)
|
||||
.catch((e) => console.log(e));
|
||||
const duplexStream = new plugins.smartstream.SmartDuplex<Buffer, Buffer>({
|
||||
writeFunction: async (chunk) => {
|
||||
return chunk;
|
||||
},
|
||||
finalFunction: async (cb) => {
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
if (!fileStream) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const smartstream = new plugins.smartstream.StreamWrapper([
|
||||
fileStream,
|
||||
duplexStream,
|
||||
]);
|
||||
smartstream.run();
|
||||
if (typeArg === 'nodestream') {
|
||||
return duplexStream;
|
||||
};
|
||||
if (typeArg === 'webstream') {
|
||||
return (await duplexStream.getWebStreams()).readable;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* store file as stream
|
||||
*/
|
||||
public async fastPutStream(optionsArg: {
|
||||
path: string;
|
||||
dataStream: plugins.stream.Readable;
|
||||
dataStream: plugins.stream.Readable | ReadableStream;
|
||||
nativeMetadata?: { [key: string]: string };
|
||||
overwrite?: boolean;
|
||||
}): Promise<void> {
|
||||
@@ -182,12 +232,14 @@ export class Bucket {
|
||||
} else {
|
||||
console.log(`Creating new object at path '${optionsArg.path}' in bucket '${this.name}'.`);
|
||||
}
|
||||
|
||||
const streamIntake = await plugins.smartstream.StreamIntake.fromStream<Uint8Array>(optionsArg.dataStream);
|
||||
|
||||
// Proceed with putting the object
|
||||
await this.smartbucketRef.minioClient.putObject(
|
||||
this.name,
|
||||
optionsArg.path,
|
||||
optionsArg.dataStream,
|
||||
streamIntake,
|
||||
null,
|
||||
...(optionsArg.nativeMetadata
|
||||
? (() => {
|
||||
@@ -313,6 +365,13 @@ export class Bucket {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* deletes this bucket
|
||||
*/
|
||||
public async delete() {
|
||||
await this.smartbucketRef.minioClient.removeBucket(this.name);
|
||||
}
|
||||
|
||||
public async fastStat(pathDescriptor: interfaces.IPathDecriptor) {
|
||||
let checkPath = await helpers.reducePathDescriptorToPath(pathDescriptor);
|
||||
return this.smartbucketRef.minioClient.statObject(this.name, checkPath);
|
||||
|
Reference in New Issue
Block a user