fix(core): update
This commit is contained in:
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartstream',
|
||||
version: '3.0.19',
|
||||
version: '3.0.20',
|
||||
description: 'simplifies access to node streams'
|
||||
}
|
||||
|
@@ -15,6 +15,8 @@ export interface IStreamFinalFunction<rT> {
|
||||
}
|
||||
|
||||
export interface ISmartDuplexOptions<TInput, TOutput> extends DuplexOptions {
|
||||
debug?: boolean;
|
||||
name?: string;
|
||||
handleBackpressure?: boolean;
|
||||
readFunction?: () => Promise<void>;
|
||||
writeFunction?: IStreamWriteFunction<TInput, TOutput>;
|
||||
@@ -33,141 +35,71 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
||||
return smartDuplex;
|
||||
}
|
||||
|
||||
static fromObservable(
|
||||
observable: plugins.smartrx.rxjs.Observable<any>,
|
||||
options?: ISmartDuplexOptions<any, any>
|
||||
): SmartDuplex {
|
||||
const smartStream = new SmartDuplex(options);
|
||||
smartStream.observableSubscription = observable.subscribe({
|
||||
next: (data) => {
|
||||
if (!smartStream.push(data) && smartStream.handleBackpressure) {
|
||||
// Pause the observable if the stream buffer is full
|
||||
smartStream.observableSubscription?.unsubscribe();
|
||||
smartStream.once('drain', () => {
|
||||
// Resume the observable when the stream buffer is drained
|
||||
smartStream.observableSubscription?.unsubscribe();
|
||||
smartStream.observableSubscription = observable.subscribe((data) => {
|
||||
smartStream.push(data);
|
||||
});
|
||||
});
|
||||
}
|
||||
},
|
||||
error: (err) => {
|
||||
smartStream.emit('error', err);
|
||||
},
|
||||
complete: () => {
|
||||
smartStream.push(null); // Signal the end of the data
|
||||
},
|
||||
});
|
||||
|
||||
return smartStream;
|
||||
}
|
||||
|
||||
static fromReplaySubject(
|
||||
replaySubject: plugins.smartrx.rxjs.ReplaySubject<any>,
|
||||
options?: DuplexOptions
|
||||
): SmartDuplex {
|
||||
const smartStream = new SmartDuplex(options);
|
||||
let isBackpressured = false;
|
||||
|
||||
// Subscribe to the ReplaySubject
|
||||
const subscription = replaySubject.subscribe({
|
||||
next: (data) => {
|
||||
const canPush = smartStream.push(data);
|
||||
if (!canPush) {
|
||||
// If push returns false, pause the subscription because of backpressure
|
||||
isBackpressured = true;
|
||||
subscription.unsubscribe();
|
||||
}
|
||||
},
|
||||
error: (err) => {
|
||||
smartStream.emit('error', err);
|
||||
},
|
||||
complete: () => {
|
||||
smartStream.push(null); // End the stream when the ReplaySubject completes
|
||||
},
|
||||
});
|
||||
|
||||
// Listen for 'drain' event to resume the subscription if it was paused
|
||||
smartStream.on('drain', () => {
|
||||
if (isBackpressured) {
|
||||
isBackpressured = false;
|
||||
// Resubscribe to the ReplaySubject since we previously paused
|
||||
smartStream.observableSubscription = replaySubject.subscribe({
|
||||
next: (data) => {
|
||||
if (!smartStream.push(data)) {
|
||||
smartStream.observableSubscription?.unsubscribe();
|
||||
isBackpressured = true;
|
||||
}
|
||||
},
|
||||
// No need to repeat error and complete handling here because it's already set up above
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
return smartStream;
|
||||
}
|
||||
|
||||
// INSTANCE
|
||||
private readFunction?: () => Promise<void>;
|
||||
private handleBackpressure: boolean;
|
||||
private writeFunction?: IStreamWriteFunction<TInput, TOutput>;
|
||||
private finalFunction?: IStreamFinalFunction<TOutput>;
|
||||
private backpressuredArray = new plugins.lik.BackpressuredArray<TOutput>();
|
||||
public options: ISmartDuplexOptions<TInput, TOutput>;
|
||||
private observableSubscription?: plugins.smartrx.rxjs.Subscription;
|
||||
private debugLog(messageArg: string) {
|
||||
if (this.options.debug) {
|
||||
console.log(messageArg);
|
||||
}
|
||||
}
|
||||
|
||||
constructor(optionsArg?: ISmartDuplexOptions<TInput, TOutput>) {
|
||||
super(optionsArg);
|
||||
this.readFunction = optionsArg?.readFunction;
|
||||
this.writeFunction = optionsArg?.writeFunction;
|
||||
this.finalFunction = optionsArg?.finalFunction;
|
||||
this.handleBackpressure = optionsArg?.handleBackpressure ?? true;
|
||||
this.options = optionsArg;
|
||||
}
|
||||
|
||||
public async _read(size: number): Promise<void> {
|
||||
if (this.readFunction) {
|
||||
await this.readFunction();
|
||||
await this.backpressuredArray.waitForItems();
|
||||
this.debugLog(`${this.options.name}: read was called`);
|
||||
if (this.options.readFunction) {
|
||||
await this.options.readFunction();
|
||||
}
|
||||
let canPushMore = true;
|
||||
while(this.backpressuredArray.data.length > 0 && canPushMore) {
|
||||
const nextChunk = this.backpressuredArray.shift();
|
||||
if (nextChunk) {
|
||||
canPushMore = this.push(nextChunk);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private asyncWritePromiseObjectmap = new plugins.lik.ObjectMap<Promise<any>>();
|
||||
|
||||
// Ensure the _write method types the chunk as TInput and encodes TOutput
|
||||
public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) {
|
||||
if (!this.writeFunction) {
|
||||
if (!this.options.writeFunction) {
|
||||
return callback(new Error('No stream function provided'));
|
||||
}
|
||||
|
||||
let isTruncated = false;
|
||||
const tools: IStreamTools = {
|
||||
truncate: () => {
|
||||
this.push(null);
|
||||
isTruncated = true;
|
||||
callback();
|
||||
},
|
||||
push: (pushArg: TOutput) => this.push(pushArg),
|
||||
push: (pushArg: TOutput) => {
|
||||
this.backpressuredArray.push(pushArg);
|
||||
},
|
||||
};
|
||||
|
||||
try {
|
||||
const writeDeferred = plugins.smartpromise.defer();
|
||||
this.asyncWritePromiseObjectmap.add(writeDeferred.promise);
|
||||
const modifiedChunk = await this.writeFunction(chunk, tools);
|
||||
if (modifiedChunk) {
|
||||
const drainDeferred = plugins.smartpromise.defer();
|
||||
this.once('drain', () => {
|
||||
drainDeferred.resolve();
|
||||
});
|
||||
const canPushMore = this.push(modifiedChunk);
|
||||
if (!canPushMore) {
|
||||
await drainDeferred.promise;
|
||||
console.log('jojojo');
|
||||
callback();
|
||||
writeDeferred.resolve();
|
||||
} else {
|
||||
callback();
|
||||
writeDeferred.resolve();
|
||||
}
|
||||
} else {
|
||||
callback();
|
||||
writeDeferred.resolve();
|
||||
const modifiedChunk = await this.options.writeFunction(chunk, tools);
|
||||
if (isTruncated) {
|
||||
return;
|
||||
}
|
||||
if (modifiedChunk) {
|
||||
const canPushMore = this.backpressuredArray.push(modifiedChunk);
|
||||
if (!canPushMore) {
|
||||
this.debugLog(`${this.options.name}: cannot push more`);
|
||||
await this.backpressuredArray.waitForSpace();
|
||||
this.debugLog(`${this.options.name}: can push more again`);
|
||||
}
|
||||
}
|
||||
callback();
|
||||
writeDeferred.resolve();
|
||||
writeDeferred.promise.then(() => {
|
||||
this.asyncWritePromiseObjectmap.remove(writeDeferred.promise);
|
||||
@@ -179,14 +111,14 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
||||
|
||||
public async _final(callback: (error?: Error | null) => void) {
|
||||
await Promise.all(this.asyncWritePromiseObjectmap.getArray());
|
||||
if (this.finalFunction) {
|
||||
if (this.options.finalFunction) {
|
||||
const tools: IStreamTools = {
|
||||
truncate: () => callback(),
|
||||
push: (pipeObject) => this.push(pipeObject),
|
||||
};
|
||||
|
||||
try {
|
||||
const finalChunk = await this.finalFunction(tools);
|
||||
const finalChunk = await this.options.finalFunction(tools);
|
||||
if (finalChunk) {
|
||||
this.push(finalChunk);
|
||||
}
|
||||
|
Reference in New Issue
Block a user