fix(core): update
This commit is contained in:
		@@ -3,6 +3,6 @@
 | 
				
			|||||||
 */
 | 
					 */
 | 
				
			||||||
export const commitinfo = {
 | 
					export const commitinfo = {
 | 
				
			||||||
  name: '@push.rocks/smartstream',
 | 
					  name: '@push.rocks/smartstream',
 | 
				
			||||||
  version: '3.0.10',
 | 
					  version: '3.0.11',
 | 
				
			||||||
  description: 'simplifies access to node streams'
 | 
					  description: 'simplifies access to node streams'
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -109,15 +109,15 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
  // INSTANCE
 | 
					  // INSTANCE
 | 
				
			||||||
  private readFunction?: () => Promise<void>;
 | 
					  private readFunction?: () => Promise<void>;
 | 
				
			||||||
  private writeAndTransformFunction?: IWriteAndTransformFunction<TInput, TOutput>;
 | 
					  private writeFunction?: IWriteAndTransformFunction<TInput, TOutput>;
 | 
				
			||||||
  private streamEndFunction?: IStreamEndFunction<TOutput>;
 | 
					  private finalFunction?: IStreamEndFunction<TOutput>;
 | 
				
			||||||
  private observableSubscription?: plugins.smartrx.rxjs.Subscription;
 | 
					  private observableSubscription?: plugins.smartrx.rxjs.Subscription;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  constructor(optionsArg?: SmartStreamOptions<TInput, TOutput>) {
 | 
					  constructor(optionsArg?: SmartStreamOptions<TInput, TOutput>) {
 | 
				
			||||||
    super(optionsArg);
 | 
					    super(optionsArg);
 | 
				
			||||||
    this.readFunction = optionsArg?.readFunction;
 | 
					    this.readFunction = optionsArg?.readFunction;
 | 
				
			||||||
    this.writeAndTransformFunction = optionsArg?.writeAndTransformFunction;
 | 
					    this.writeFunction = optionsArg?.writeAndTransformFunction;
 | 
				
			||||||
    this.streamEndFunction = optionsArg?.finalFunction;
 | 
					    this.finalFunction = optionsArg?.finalFunction;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  public async _read(size: number): Promise<void> {
 | 
					  public async _read(size: number): Promise<void> {
 | 
				
			||||||
@@ -128,7 +128,7 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
  // Ensure the _write method types the chunk as TInput and encodes TOutput
 | 
					  // Ensure the _write method types the chunk as TInput and encodes TOutput
 | 
				
			||||||
  public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) {
 | 
					  public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) {
 | 
				
			||||||
    if (!this.writeAndTransformFunction) {
 | 
					    if (!this.writeFunction) {
 | 
				
			||||||
      return callback(new Error('No stream function provided'));
 | 
					      return callback(new Error('No stream function provided'));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -141,7 +141,7 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
 | 
				
			|||||||
    };
 | 
					    };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    try {
 | 
					    try {
 | 
				
			||||||
      const modifiedChunk = await this.writeAndTransformFunction(chunk, tools);
 | 
					      const modifiedChunk = await this.writeFunction(chunk, tools);
 | 
				
			||||||
      if (modifiedChunk) {
 | 
					      if (modifiedChunk) {
 | 
				
			||||||
        if (!this.push(modifiedChunk)) {
 | 
					        if (!this.push(modifiedChunk)) {
 | 
				
			||||||
          // Handle backpressure if necessary
 | 
					          // Handle backpressure if necessary
 | 
				
			||||||
@@ -154,24 +154,24 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
 | 
				
			|||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  public async _final(callback: (error?: Error | null) => void) {
 | 
					  public async _final(callback: (error?: Error | null) => void) {
 | 
				
			||||||
    if (this.streamEndFunction) {
 | 
					    if (this.finalFunction) {
 | 
				
			||||||
      const tools: IStreamTools = {
 | 
					      const tools: IStreamTools = {
 | 
				
			||||||
        truncate: () => callback(),
 | 
					        truncate: () => callback(),
 | 
				
			||||||
        push: (pipeObject) => this.push(pipeObject),
 | 
					        push: (pipeObject) => this.push(pipeObject),
 | 
				
			||||||
      };
 | 
					      };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      try {
 | 
					      try {
 | 
				
			||||||
        const finalChunk = await this.streamEndFunction(tools);
 | 
					        const finalChunk = await this.finalFunction(tools);
 | 
				
			||||||
        if (finalChunk) {
 | 
					        if (finalChunk) {
 | 
				
			||||||
          this.push(finalChunk);
 | 
					          this.push(finalChunk);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        callback();
 | 
					 | 
				
			||||||
      } catch (err) {
 | 
					      } catch (err) {
 | 
				
			||||||
        callback(err);
 | 
					        callback(err);
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
      callback();
 | 
					      // nothing here
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    this.push(null);
 | 
					    this.push(null);
 | 
				
			||||||
 | 
					    callback();
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user