fix(core): update
This commit is contained in:
		@@ -48,4 +48,3 @@ hi+wow
 | 
			
		||||
hi+wow
 | 
			
		||||
hi+wow
 | 
			
		||||
hi+wow
 | 
			
		||||
noice
 | 
			
		||||
@@ -1,11 +1,11 @@
 | 
			
		||||
import { expect, tap } from '@push.rocks/tapbundle';
 | 
			
		||||
import { SmartStream } from '../ts/smartstream.classes.smartstream.js'; // Adjust the import to your file structure
 | 
			
		||||
import { SmartDuplex } from '../ts/smartstream.classes.smartduplex.js'; // Adjust the import to your file structure
 | 
			
		||||
import * as smartrx from '@push.rocks/smartrx';
 | 
			
		||||
import * as fs from 'fs';
 | 
			
		||||
 | 
			
		||||
tap.test('should create a SmartStream from a Buffer', async () => {
 | 
			
		||||
  const bufferData = Buffer.from('This is a test buffer');
 | 
			
		||||
  const smartStream = SmartStream.fromBuffer(bufferData);
 | 
			
		||||
  const smartStream = SmartDuplex.fromBuffer(bufferData);
 | 
			
		||||
  
 | 
			
		||||
  let receivedData = Buffer.alloc(0);
 | 
			
		||||
  
 | 
			
		||||
@@ -25,7 +25,7 @@ tap.test('should create a SmartStream from an Observable', async () => {
 | 
			
		||||
  const observableData = 'Observable test data';
 | 
			
		||||
  const testObservable = smartrx.rxjs.of(Buffer.from(observableData));
 | 
			
		||||
  
 | 
			
		||||
  const smartStream = SmartStream.fromObservable(testObservable);
 | 
			
		||||
  const smartStream = SmartDuplex.fromObservable(testObservable);
 | 
			
		||||
  
 | 
			
		||||
  let receivedData = Buffer.alloc(0);
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -7,43 +7,43 @@ let testIntake: smartstream.StreamIntake<string>;
 | 
			
		||||
 | 
			
		||||
tap.test('should handle a read stream', async (tools) => {
 | 
			
		||||
  const counter = 0;
 | 
			
		||||
  const testSmartstream = new smartstream.StreamWrapper([
 | 
			
		||||
  const streamWrapper = new smartstream.StreamWrapper([
 | 
			
		||||
    smartfile.fsStream.createReadStream('./test/assets/readabletext.txt'),
 | 
			
		||||
    smartstream.createDuplexStream<Buffer, Buffer>(
 | 
			
		||||
      async (chunkStringArg: Buffer, streamTools) => {
 | 
			
		||||
    new smartstream.SmartDuplex({
 | 
			
		||||
      writeAndTransformFunction: async (chunkStringArg: Buffer, streamTools) => {
 | 
			
		||||
        // do something with the stream here
 | 
			
		||||
        const result = chunkStringArg.toString().substr(0, 100);
 | 
			
		||||
        streamTools.pipeMore('wow =========== \n');
 | 
			
		||||
        streamTools.push('wow =========== \n');
 | 
			
		||||
        return Buffer.from(result);
 | 
			
		||||
      },
 | 
			
		||||
      async (tools) => {
 | 
			
		||||
        // tools.pipeMore('hey, this is the end')
 | 
			
		||||
      streamEndFunction: async (tools) => {
 | 
			
		||||
        return Buffer.from('this is the end');
 | 
			
		||||
      },
 | 
			
		||||
      { objectMode: false }
 | 
			
		||||
    ),
 | 
			
		||||
    smartstream.createDuplexStream<Buffer, string>(async (chunkStringArg) => {
 | 
			
		||||
      console.log(chunkStringArg.toString());
 | 
			
		||||
      return null;
 | 
			
		||||
    }),
 | 
			
		||||
    new smartstream.SmartDuplex({
 | 
			
		||||
      writeAndTransformFunction: async (chunkStringArg) => {
 | 
			
		||||
        console.log(chunkStringArg.toString());
 | 
			
		||||
      },
 | 
			
		||||
      streamEndFunction: async (tools) => {
 | 
			
		||||
        tools.push(null);
 | 
			
		||||
      },
 | 
			
		||||
    }),
 | 
			
		||||
    smartstream.cleanPipe(),
 | 
			
		||||
  ]);
 | 
			
		||||
  await testSmartstream.run();
 | 
			
		||||
  // await streamWrapper.run();
 | 
			
		||||
});
 | 
			
		||||
 | 
			
		||||
tap.test('should create a valid Intake', async (tools) => {
 | 
			
		||||
  testIntake = new smartstream.StreamIntake<string>();
 | 
			
		||||
  testIntake.pipe(
 | 
			
		||||
      smartstream.createDuplexStream<string, string>(
 | 
			
		||||
        async (chunkString) => {
 | 
			
		||||
      new smartstream.SmartDuplex({
 | 
			
		||||
        objectMode: true,
 | 
			
		||||
        writeAndTransformFunction: async (chunkStringArg: string, streamTools) => {
 | 
			
		||||
          await tools.delayFor(100);
 | 
			
		||||
          console.log(chunkString);
 | 
			
		||||
          return chunkString;
 | 
			
		||||
        },
 | 
			
		||||
        async () => {
 | 
			
		||||
          return 'noice';
 | 
			
		||||
          console.log(chunkStringArg);
 | 
			
		||||
          return chunkStringArg;
 | 
			
		||||
        }
 | 
			
		||||
      )
 | 
			
		||||
      })
 | 
			
		||||
    )
 | 
			
		||||
    .pipe(smartfile.fsStream.createWriteStream('./test/assets/writabletext.txt'));
 | 
			
		||||
  const testFinished = tools.defer();
 | 
			
		||||
 
 | 
			
		||||
@@ -3,6 +3,6 @@
 | 
			
		||||
 */
 | 
			
		||||
export const commitinfo = {
 | 
			
		||||
  name: '@push.rocks/smartstream',
 | 
			
		||||
  version: '3.0.0',
 | 
			
		||||
  version: '3.0.1',
 | 
			
		||||
  description: 'simplifies access to node streams'
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,5 +1,4 @@
 | 
			
		||||
export * from './smartstream.classes.passthrough.js';
 | 
			
		||||
export * from './smartstream.classes.smartstream.js';
 | 
			
		||||
export * from './smartstream.classes.smartduplex.js';
 | 
			
		||||
export * from './smartstream.classes.streamwrapper.js';
 | 
			
		||||
export * from './smartstream.classes.streamintake.js';
 | 
			
		||||
export * from './smartstream.duplex.js';
 | 
			
		||||
@@ -1,39 +1,30 @@
 | 
			
		||||
import * as plugins from './smartstream.plugins.js';
 | 
			
		||||
import { Duplex, type DuplexOptions } from 'stream';
 | 
			
		||||
export interface SmartStreamOptions<TInput, TOutput> extends DuplexOptions {
 | 
			
		||||
  // You can add more custom options relevant to TInput and TOutput if necessary
 | 
			
		||||
 | 
			
		||||
export interface IStreamTools {
 | 
			
		||||
  truncate: () => void;
 | 
			
		||||
  push: (pipeObject: any) => void;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export class SmartStream<TInput = any, TOutput = any> extends Duplex {
 | 
			
		||||
  private observableSubscription?: plugins.smartrx.rxjs.Subscription;
 | 
			
		||||
  private asyncChunkModifier?: (chunk: TInput) => Promise<TOutput>;
 | 
			
		||||
export interface IWriteAndTransformFunction<T, rT> {
 | 
			
		||||
  (chunkArg: T, toolsArg: IStreamTools): Promise<rT>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
  constructor(options?: SmartStreamOptions<TInput, TOutput>, asyncChunkModifierArg?: (chunk: TInput) => Promise<TOutput>) {
 | 
			
		||||
    super(options);
 | 
			
		||||
    this.asyncChunkModifier = asyncChunkModifierArg;
 | 
			
		||||
  }
 | 
			
		||||
export interface IStreamEndFunction<rT> {
 | 
			
		||||
  (toolsArg: IStreamTools): Promise<rT>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
  // Ensure the _write method types the chunk as TInput and encodes TOutput
 | 
			
		||||
  public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) {
 | 
			
		||||
    try {
 | 
			
		||||
      if (this.asyncChunkModifier) {
 | 
			
		||||
        const modifiedChunk = await this.asyncChunkModifier(chunk);
 | 
			
		||||
        if (!this.push(modifiedChunk)) {
 | 
			
		||||
          // Handle backpressure here if necessary
 | 
			
		||||
        }
 | 
			
		||||
      } else {
 | 
			
		||||
        if (!this.push(chunk as unknown as TOutput)) {
 | 
			
		||||
          // Handle backpressure here if necessary
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      callback();
 | 
			
		||||
    } catch (err) {
 | 
			
		||||
      callback(err);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
export interface SmartStreamOptions<TInput, TOutput> extends DuplexOptions {
 | 
			
		||||
  readFunction?: () => Promise<void>;
 | 
			
		||||
  writeAndTransformFunction?: IWriteAndTransformFunction<TInput, TOutput>;
 | 
			
		||||
  streamEndFunction?: IStreamEndFunction<TOutput>;
 | 
			
		||||
  // Add other custom options if necessary
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
  static fromBuffer(buffer: Buffer, options?: DuplexOptions): SmartStream {
 | 
			
		||||
    const smartStream = new SmartStream(options);
 | 
			
		||||
export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
 | 
			
		||||
  // STATIC
 | 
			
		||||
  static fromBuffer(buffer: Buffer, options?: DuplexOptions): SmartDuplex {
 | 
			
		||||
    const smartStream = new SmartDuplex(options);
 | 
			
		||||
    process.nextTick(() => {
 | 
			
		||||
      smartStream.push(buffer);
 | 
			
		||||
      smartStream.push(null); // Signal the end of the data
 | 
			
		||||
@@ -41,8 +32,11 @@ export class SmartStream<TInput = any, TOutput = any> extends Duplex {
 | 
			
		||||
    return smartStream;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  static fromObservable(observable: plugins.smartrx.rxjs.Observable<any>, options?: DuplexOptions): SmartStream {
 | 
			
		||||
    const smartStream = new SmartStream(options);
 | 
			
		||||
  static fromObservable(
 | 
			
		||||
    observable: plugins.smartrx.rxjs.Observable<any>,
 | 
			
		||||
    options?: DuplexOptions
 | 
			
		||||
  ): SmartDuplex {
 | 
			
		||||
    const smartStream = new SmartDuplex(options);
 | 
			
		||||
    smartStream.observableSubscription = observable.subscribe({
 | 
			
		||||
      next: (data) => {
 | 
			
		||||
        if (!smartStream.push(data)) {
 | 
			
		||||
@@ -51,7 +45,7 @@ export class SmartStream<TInput = any, TOutput = any> extends Duplex {
 | 
			
		||||
          smartStream.once('drain', () => {
 | 
			
		||||
            // Resume the observable when the stream buffer is drained
 | 
			
		||||
            smartStream.observableSubscription?.unsubscribe();
 | 
			
		||||
            smartStream.observableSubscription = observable.subscribe(data => {
 | 
			
		||||
            smartStream.observableSubscription = observable.subscribe((data) => {
 | 
			
		||||
              smartStream.push(data);
 | 
			
		||||
            });
 | 
			
		||||
          });
 | 
			
		||||
@@ -62,14 +56,17 @@ export class SmartStream<TInput = any, TOutput = any> extends Duplex {
 | 
			
		||||
      },
 | 
			
		||||
      complete: () => {
 | 
			
		||||
        smartStream.push(null); // Signal the end of the data
 | 
			
		||||
      }
 | 
			
		||||
      },
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    return smartStream;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  static fromReplaySubject(replaySubject: plugins.smartrx.rxjs.ReplaySubject<any>, options?: DuplexOptions): SmartStream {
 | 
			
		||||
    const smartStream = new SmartStream(options);
 | 
			
		||||
  static fromReplaySubject(
 | 
			
		||||
    replaySubject: plugins.smartrx.rxjs.ReplaySubject<any>,
 | 
			
		||||
    options?: DuplexOptions
 | 
			
		||||
  ): SmartDuplex {
 | 
			
		||||
    const smartStream = new SmartDuplex(options);
 | 
			
		||||
    let isBackpressured = false;
 | 
			
		||||
 | 
			
		||||
    // Subscribe to the ReplaySubject
 | 
			
		||||
@@ -87,7 +84,7 @@ export class SmartStream<TInput = any, TOutput = any> extends Duplex {
 | 
			
		||||
      },
 | 
			
		||||
      complete: () => {
 | 
			
		||||
        smartStream.push(null); // End the stream when the ReplaySubject completes
 | 
			
		||||
      }
 | 
			
		||||
      },
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    // Listen for 'drain' event to resume the subscription if it was paused
 | 
			
		||||
@@ -109,4 +106,72 @@ export class SmartStream<TInput = any, TOutput = any> extends Duplex {
 | 
			
		||||
 | 
			
		||||
    return smartStream;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  // INSTANCE
 | 
			
		||||
  private readFunction?: () => Promise<void>;
 | 
			
		||||
  private writeAndTransformFunction?: IWriteAndTransformFunction<TInput, TOutput>;
 | 
			
		||||
  private streamEndFunction?: IStreamEndFunction<TOutput>;
 | 
			
		||||
  private observableSubscription?: plugins.smartrx.rxjs.Subscription;
 | 
			
		||||
 | 
			
		||||
  constructor(optionsArg?: SmartStreamOptions<TInput, TOutput>) {
 | 
			
		||||
    super(optionsArg);
 | 
			
		||||
    this.readFunction = optionsArg?.readFunction;
 | 
			
		||||
    this.writeAndTransformFunction = optionsArg?.writeAndTransformFunction;
 | 
			
		||||
    this.streamEndFunction = optionsArg?.streamEndFunction;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async _read(size: number): Promise<void> {
 | 
			
		||||
    if (this.readFunction) {
 | 
			
		||||
      await this.readFunction();
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  // Ensure the _write method types the chunk as TInput and encodes TOutput
 | 
			
		||||
  public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) {
 | 
			
		||||
    if (!this.writeAndTransformFunction) {
 | 
			
		||||
      return callback(new Error('No stream function provided'));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    const tools: IStreamTools = {
 | 
			
		||||
      truncate: () => {
 | 
			
		||||
        this.push(null);
 | 
			
		||||
        callback();
 | 
			
		||||
      },
 | 
			
		||||
      push: (pushArg: TOutput) => this.push(pushArg),
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    try {
 | 
			
		||||
      const modifiedChunk = await this.writeAndTransformFunction(chunk, tools);
 | 
			
		||||
      if (modifiedChunk) {
 | 
			
		||||
        if (!this.push(modifiedChunk)) {
 | 
			
		||||
          // Handle backpressure if necessary
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      callback();
 | 
			
		||||
    } catch (err) {
 | 
			
		||||
      callback(err);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public async _final(callback: (error?: Error | null) => void) {
 | 
			
		||||
    if (this.streamEndFunction) {
 | 
			
		||||
      const tools: IStreamTools = {
 | 
			
		||||
        truncate: () => callback(),
 | 
			
		||||
        push: (pipeObject) => this.push(pipeObject),
 | 
			
		||||
      };
 | 
			
		||||
 | 
			
		||||
      try {
 | 
			
		||||
        const finalChunk = await this.streamEndFunction(tools);
 | 
			
		||||
        if (finalChunk) {
 | 
			
		||||
          this.push(finalChunk);
 | 
			
		||||
        }
 | 
			
		||||
        callback();
 | 
			
		||||
      } catch (err) {
 | 
			
		||||
        callback(err);
 | 
			
		||||
      }
 | 
			
		||||
    } else {
 | 
			
		||||
      this.push(null),
 | 
			
		||||
      callback();
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
@@ -1,83 +0,0 @@
 | 
			
		||||
import * as plugins from './smartstream.plugins.js';
 | 
			
		||||
 | 
			
		||||
export interface ITruncateFunc {
 | 
			
		||||
  (): void;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export interface IPipeMoreFunc {
 | 
			
		||||
  (pipeObject: any): void;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export interface IStreamTools {
 | 
			
		||||
  truncate: ITruncateFunc;
 | 
			
		||||
  pipeMore: IPipeMoreFunc;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export interface IStreamFunction<T, rT> {
 | 
			
		||||
  (chunkArg: T, toolsArg: IStreamTools): Promise<rT>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export interface IStreamEndFunction<rT> {
 | 
			
		||||
  (toolsArg: IStreamTools): Promise<rT>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export interface IStreamOptions {
 | 
			
		||||
  objectMode?: boolean;
 | 
			
		||||
  readableObjectMode?: boolean;
 | 
			
		||||
  writableObjectMode?: boolean;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export let createDuplexStream = <T, rT>(
 | 
			
		||||
  funcArg: IStreamFunction<T, rT>,
 | 
			
		||||
  endFuncArg?: IStreamEndFunction<rT>,
 | 
			
		||||
  optionsArg: IStreamOptions = {
 | 
			
		||||
    objectMode: false,
 | 
			
		||||
    readableObjectMode: true,
 | 
			
		||||
    writableObjectMode: true,
 | 
			
		||||
  }
 | 
			
		||||
) => {
 | 
			
		||||
  return plugins.through2(
 | 
			
		||||
    optionsArg,
 | 
			
		||||
    function (chunk, enc, cb) {
 | 
			
		||||
      let truncated = false;
 | 
			
		||||
      const tools: IStreamTools = {
 | 
			
		||||
        truncate: () => {
 | 
			
		||||
          truncated = true;
 | 
			
		||||
          cb(null, null);
 | 
			
		||||
        },
 | 
			
		||||
        pipeMore: (pipeObject) => {
 | 
			
		||||
          this.push(pipeObject);
 | 
			
		||||
        },
 | 
			
		||||
      };
 | 
			
		||||
      const asyncWrapper = async () => {
 | 
			
		||||
        const resultChunk: rT = await funcArg(chunk, tools);
 | 
			
		||||
        if (!truncated) {
 | 
			
		||||
          cb(null, resultChunk);
 | 
			
		||||
        }
 | 
			
		||||
      };
 | 
			
		||||
      asyncWrapper().catch((err) => {
 | 
			
		||||
        console.log(err);
 | 
			
		||||
      });
 | 
			
		||||
    },
 | 
			
		||||
    function (cb) {
 | 
			
		||||
      const tools: IStreamTools = {
 | 
			
		||||
        truncate: () => {
 | 
			
		||||
          cb();
 | 
			
		||||
        },
 | 
			
		||||
        pipeMore: (pushArg) => {
 | 
			
		||||
          this.push(pushArg);
 | 
			
		||||
        },
 | 
			
		||||
      };
 | 
			
		||||
      const asyncWrapper = async () => {
 | 
			
		||||
        if (endFuncArg) {
 | 
			
		||||
          const result = await endFuncArg(tools);
 | 
			
		||||
          this.push(result);
 | 
			
		||||
        }
 | 
			
		||||
        cb();
 | 
			
		||||
      };
 | 
			
		||||
      asyncWrapper().catch((err) => {
 | 
			
		||||
        console.log(err);
 | 
			
		||||
      });
 | 
			
		||||
    }
 | 
			
		||||
  );
 | 
			
		||||
};
 | 
			
		||||
		Reference in New Issue
	
	Block a user