From da2191bb96c48f7860d4cd32c06e31186bcdd733 Mon Sep 17 00:00:00 2001 From: Philipp Kunz Date: Fri, 17 May 2024 18:13:51 +0200 Subject: [PATCH] fix(core): update --- npmextra.json | 22 ++-- package.json | 24 ++-- readme.hints.md | 2 +- readme.md | 242 +++++++++++++++++++++++++++++++++++++-- ts/00_commitinfo_data.ts | 4 +- 5 files changed, 261 insertions(+), 33 deletions(-) diff --git a/npmextra.json b/npmextra.json index 78e9bd8..7f79e91 100644 --- a/npmextra.json +++ b/npmextra.json @@ -9,27 +9,27 @@ "githost": "code.foss.global", "gitscope": "push.rocks", "gitrepo": "smartstream", - "description": "simplifies access to node streams", + "description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.", "npmPackagename": "@push.rocks/smartstream", "license": "MIT", "keywords": [ + "stream", "node.js", - "streams", + "typescript", "stream manipulation", - "pipeline", "data processing", + "pipeline", "async transformation", "event handling", - "backpressure management", - "readable streams", - "writable streams", - "duplex streams", - "transform streams", + "backpressure", + "readable stream", + "writable stream", + "duplex stream", + "transform stream", "file streaming", - "buffer streams", + "buffer", "stream utilities", - "stream intake", - "stream output" + "esm" ] } }, diff --git a/package.json b/package.json index ad9d77c..ce8d0f0 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "name": "@push.rocks/smartstream", "version": "3.0.35", "private": false, - "description": "simplifies access to node streams", + "description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.", "main": "dist_ts/index.js", "typings": "dist_ts/index.d.ts", "type": "module", @@ -51,22 +51,22 @@ "readme.md" ], "keywords": [ + "stream", "node.js", - "streams", + "typescript", "stream manipulation", - "pipeline", "data processing", + "pipeline", "async transformation", "event handling", - "backpressure management", - "readable streams", - "writable streams", - "duplex streams", - "transform streams", + "backpressure", + "readable stream", + "writable stream", + "duplex stream", + "transform stream", "file streaming", - "buffer streams", + "buffer", "stream utilities", - "stream intake", - "stream output" + "esm" ] -} +} \ No newline at end of file diff --git a/readme.hints.md b/readme.hints.md index 0519ecb..cb19b34 100644 --- a/readme.hints.md +++ b/readme.hints.md @@ -1 +1 @@ - \ No newline at end of file + - make sure to respect backpressure handling. \ No newline at end of file diff --git a/readme.md b/readme.md index f8d3e47..1fc6cab 100644 --- a/readme.md +++ b/readme.md @@ -1,5 +1,6 @@ +```markdown # @push.rocks/smartstream -simplifies access to node streams +A TypeScript library to simplify the creation and manipulation of Node.js streams, providing utilities for transform, duplex, and readable/writable stream handling while managing backpressure effectively. ## Install To install `@push.rocks/smartstream`, you can use npm or yarn as follows: @@ -14,7 +15,7 @@ This will add `@push.rocks/smartstream` to your project's dependencies. ## Usage -The `@push.rocks/smartstream` module is designed to simplify working with Node.js streams by providing a set of utilities for creating and manipulating streams. This module makes heavy use of TypeScript for improved code quality, readability, and maintenance. ESM syntax is utilized throughout the examples. +The `@push.rocks/smartstream` module is designed to simplify working with Node.js streams by providing a set of utilities for creating and manipulating streams. This module makes extensive use of TypeScript for improved code quality, readability, and maintenance. ESM syntax is utilized throughout the examples. ### Importing the Module @@ -24,6 +25,12 @@ Start by importing the module into your TypeScript file: import * as smartstream from '@push.rocks/smartstream'; ``` +For a more specific import, you may do the following: + +```typescript +import { SmartDuplex, StreamWrapper, StreamIntake, createTransformFunction, createPassThrough } from '@push.rocks/smartstream'; +``` + ### Creating Basic Transform Streams The module provides utilities for creating transform streams. For example, to create a transform stream that modifies chunks of data, you can use the `createTransformFunction` utility: @@ -58,7 +65,7 @@ const processDataDuplex = new SmartDuplex({ sourceStream.pipe(processDataDuplex).pipe(destinationStream); ``` -### Stream Combiners +### Combining Multiple Streams `Smartstream` facilitates easy combining of multiple streams into a single pipeline, handling errors and cleanup automatically. Here's how you can combine multiple streams: @@ -101,7 +108,7 @@ Consider a scenario where you need to process a large CSV file, transform the da ```typescript import { SmartDuplex, createTransformFunction } from '@push.rocks/smartstream'; import fs from 'fs'; -import csvParser from 'csv-parser'; // Assume this is a CSV parsing library +import csvParser from 'csv-parser'; const csvReadTransform = createTransformFunction(async (row) => { // Process row @@ -121,11 +128,232 @@ fs.createReadStream('path/to/largeFile.csv') This example demonstrates reading a large CSV file, transforming each row with `createTransformFunction`, and using a `SmartDuplex` to manage the processed data flow efficiently, ensuring no data is lost due to backpressure issues. -### Conclusion +### Advanced Use Case: Backpressure Handling -`@push.rocks/smartstream` offers a robust set of tools for working with Node.js streams, providing a more intuitive and reliable way to create, manipulate, and combine streams. By leveraging TypeScript and ESM syntax, `smartstream` enables developers to build more maintainable and type-safe stream-based solutions. +Effective backpressure handling is crucial when working with streams to avoid overwhelming the downstream consumers. Here’s a comprehensive example that demonstrates handling backpressure in a pipeline with multiple `SmartDuplex` instances: + +```typescript +import { SmartDuplex } from '@push.rocks/smartstream'; + +// Define the first SmartDuplex, which writes data slowly to simulate backpressure +const slowProcessingStream = new SmartDuplex({ + name: 'SlowProcessor', + objectMode: true, + writeFunction: async (chunk, { push }) => { + await new Promise(resolve => setTimeout(resolve, 100)); // Simulated delay + console.log('Processed chunk:', chunk); + push(chunk); + } +}); + +// Define the second SmartDuplex as a fast processor +const fastProcessingStream = new SmartDuplex({ + name: 'FastProcessor', + objectMode: true, + writeFunction: async (chunk, { push }) => { + console.log('Fast processing chunk:', chunk); + push(chunk); + } +}); + +// Create a StreamIntake to dynamically handle incoming data +const streamIntake = new StreamIntake(); + +// Chain the streams together and handle the backpressure scenario +streamIntake + .pipe(fastProcessingStream) + .pipe(slowProcessingStream) + .pipe(createPassThrough()) // Use Pass-Through to provide intermediary handling + .on('data', data => console.log('Final output:', data)) + .on('error', error => console.error('Stream encountered an error:', error)); + +// Simulate data pushing with intervals to observe backpressure handling +let counter = 0; +const interval = setInterval(() => { + if (counter >= 10) { + streamIntake.signalEnd(); + clearInterval(interval); + } else { + streamIntake.pushData(`Chunk ${counter}`); + counter++; + } +}, 50); +``` + +In this advanced use case, a `SlowProcessor` and `FastProcessor` are created using `SmartDuplex`, simulating a situation where one stream is slower than another. The `StreamIntake` dynamically handles incoming chunks of data and the intermediary Pass-Through handles any potential interruptions. + +### Transform Streams in Parallel + +For scenarios where you need to process data in parallel: + +```typescript +import { SmartDuplex, createTransformFunction } from '@push.rocks/smartstream'; + +const parallelTransform = createTransformFunction(async (chunk) => { + // Parallel Processing + const results = await Promise.all(chunk.map(async item => await processItem(item))); + return results; +}); + +const streamIntake = new StreamIntake(); + +streamIntake + .pipe(parallelTransform) + .pipe(new SmartDuplex({ + async writeFunction(chunk, { push }) { + console.log('Processed parallel chunk:', chunk); + push(chunk); + } + })) + .on('finish', () => console.log('Parallel processing completed.')); + +// Simulate data pushing +streamIntake.pushData([1, 2, 3, 4]); +streamIntake.pushData([5, 6, 7, 8]); +streamIntake.signalEnd(); +``` + +### Error Handling in Stream Pipelines + +Error handling is an essential part of working with streams. The `StreamWrapper` assists in combining multiple streams while managing errors seamlessly: + +```typescript +import { StreamWrapper } from '@push.rocks/smartstream'; + +const faultyStream = new SmartDuplex({ + async writeFunction(chunk, { push }) { + if (chunk === 'bad data') { + throw new Error('Faulty data encountered'); + } + push(chunk); + } +}); + +const readStream = new StreamIntake(); +const writeStream = new SmartDuplex({ + async writeFunction(chunk) { + console.log('Written chunk:', chunk); + } +}); + +const combinedStream = new StreamWrapper([readStream, faultyStream, writeStream]); + +combinedStream.run() + .then(() => console.log('Stream processing completed.')) + .catch(err => console.error('Stream error:', err.message)); + +// Push Data +readStream.pushData('good data'); +readStream.pushData('bad data'); // This will throw an error +readStream.pushData('more good data'); +readStream.signalEnd(); +``` + +### Testing Streams + +Here's an example test case using the `tap` testing framework to verify the integrity of the `SmartDuplex` from a buffer: + +```typescript +import { expect, tap } from '@push.rocks/tapbundle'; +import { SmartDuplex } from '@push.rocks/smartstream'; + +tap.test('should create a SmartStream from a Buffer', async () => { + const bufferData = Buffer.from('This is a test buffer'); + const smartStream = SmartDuplex.fromBuffer(bufferData, {}); + + let receivedData = Buffer.alloc(0); + + return new Promise((resolve) => { + smartStream.on('data', (chunk: Buffer) => { + receivedData = Buffer.concat([receivedData, chunk]); + }); + + smartStream.on('end', () => { + expect(receivedData.toString()).toEqual(bufferData.toString()); + resolve(); + }); + }); +}); + +tap.start(); +``` + +### Working with Files and Buffers + +You can easily stream files and buffers with `smartstream`. Here’s a test illustrating reading and writing with file streams using `smartfile` combined with `smartstream` utilities: + +```typescript +import { tap } from '@push.rocks/tapbundle'; +import * as smartfile from '@push.rocks/smartfile'; +import { SmartDuplex, StreamWrapper } from '@push.rocks/smartstream'; + +tap.test('should handle file read and write streams', async () => { + const readStream = smartfile.fsStream.createReadStream('./test/assets/readabletext.txt'); + const writeStream = smartfile.fsStream.createWriteStream('./test/assets/writabletext.txt'); + + const transformStream = new SmartDuplex({ + async writeFunction(chunk, { push }) { + const transformedChunk = chunk.toString().toUpperCase(); + push(transformedChunk); + } + }); + + const streamWrapper = new StreamWrapper([readStream, transformStream, writeStream]); + + await streamWrapper.run(); + + const outputContent = await smartfile.fs.promises.readFile('./test/assets/writabletext.txt', 'utf-8'); + console.log('Output Content:', outputContent); +}); + +tap.start(); +``` + +### Modular and Scoped Transformations + +Creating modular and scoped transformations is straightforward with `SmartDuplex`: + +```typescript +import { SmartDuplex } from '@push.rocks/smartstream'; + +type DataChunk = { + id: number; + data: string; +}; + +const transformationStream1 = new SmartDuplex({ + async writeFunction(chunk, { push }) { + chunk.data = chunk.data.toUpperCase(); + push(chunk); + } +}) + +const transformationStream2 = new SmartDuplex({ + async writeFunction(chunk, { push }) { + chunk.data = `${chunk.data} processed with transformation 2`; + push(chunk); + } +}); + +const initialData: DataChunk[] = [ + { id: 1, data: 'first' }, + { id: 2, data: 'second' } +]; + +const intakeStream = new StreamIntake(); + +intakeStream + .pipe(transformationStream1) + .pipe(transformationStream2) + .on('data', data => console.log('Transformed Data:', data)); + +initialData.forEach(item => intakeStream.pushData(item)); +intakeStream.signalEnd(); +``` + +By leveraging `SmartDuplex`, `StreamWrapper`, and `StreamIntake`, you can streamline and enhance your data transformation pipelines in Node.js with a clear, efficient, and backpressure-friendly approach. +``` -For more detailed examples and documentation, visit the [GitLab Repository](https://gitlab.com/push.rocks/smartstream) or the [GitHub Mirror](https://github.com/pushrocks/smartstream). ## License and Legal Information diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 68676da..942be9f 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '3.0.35', - description: 'simplifies access to node streams' + version: '3.0.36', + description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.' }