A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.
Go to file
2024-05-05 18:30:05 +02:00
.gitea/workflows fix(core): update 2023-11-03 21:33:10 +01:00
.vscode BREAKING CHANGE(core): switch to esm 2022-03-31 01:20:01 +02:00
assets fix(core): update 2024-05-05 18:30:05 +02:00
test fix(core): update 2024-05-05 18:30:05 +02:00
ts fix(core): update 2024-05-05 18:30:05 +02:00
.gitignore BREAKING CHANGE(core): switch to esm 2022-03-31 01:20:01 +02:00
.gitlab-ci.yml fix(core): update 2023-11-01 14:16:58 +01:00
npmextra.json update tsconfig 2024-04-14 18:25:32 +02:00
package.json fix(core): update 2024-05-05 18:30:05 +02:00
pnpm-lock.yaml fix(core): update 2024-05-05 18:30:05 +02:00
readme_instructions.md fix(core): update 2023-11-09 15:59:28 +01:00
readme.hints.md update tsconfig 2024-04-14 18:25:32 +02:00
readme.md update tsconfig 2024-04-14 18:25:32 +02:00
tsconfig.json fix(core): update 2023-11-01 14:16:58 +01:00

@push.rocks/smartstream

simplifies access to node streams

Install

To install @push.rocks/smartstream, you can use npm or yarn as follows:

npm install @push.rocks/smartstream --save
# OR
yarn add @push.rocks/smartstream

This will add @push.rocks/smartstream to your project's dependencies.

Usage

The @push.rocks/smartstream module is designed to simplify working with Node.js streams by providing a set of utilities for creating and manipulating streams. This module makes heavy use of TypeScript for improved code quality, readability, and maintenance. ESM syntax is utilized throughout the examples.

Importing the Module

Start by importing the module into your TypeScript file:

import * as smartstream from '@push.rocks/smartstream';

Creating Basic Transform Streams

The module provides utilities for creating transform streams. For example, to create a transform stream that modifies chunks of data, you can use the createTransformFunction utility:

import { createTransformFunction } from '@push.rocks/smartstream';

const upperCaseTransform = createTransformFunction<string, string>(async (chunk) => {
  return chunk.toUpperCase();
});

// Usage with pipe
readableStream
  .pipe(upperCaseTransform)
  .pipe(writableStream);

Handling Backpressure with SmartDuplex

SmartDuplex is a powerful part of the smartstream module designed to handle backpressure effectively. Here's an example of how to create a SmartDuplex stream that processes data and respects the consumer's pace:

import { SmartDuplex } from '@push.rocks/smartstream';

const processDataDuplex = new SmartDuplex({
  async writeFunction(chunk, { push }) {
    const processedChunk = await processChunk(chunk); // Assume this is a defined asynchronous function
    push(processedChunk);
  }
});

sourceStream.pipe(processDataDuplex).pipe(destinationStream);

Stream Combiners

Smartstream facilitates easy combining of multiple streams into a single pipeline, handling errors and cleanup automatically. Here's how you can combine multiple streams:

import { StreamWrapper } from '@push.rocks/smartstream';

const combinedStream = new StreamWrapper([
  readStream,       // Source stream
  transformStream1, // Transformation
  transformStream2, // Another transformation
  writeStream       // Destination stream
]);

combinedStream.run()
  .then(() => console.log('Processing completed.'))
  .catch(err => console.error('An error occurred:', err));

Working with StreamIntake

StreamIntake allows for more dynamic control of the reading process, facilitating scenarios where data is not continuously available:

import { StreamIntake } from '@push.rocks/smartstream';

const streamIntake = new StreamIntake<string>();

// Dynamically push data into the intake
streamIntake.pushData('Hello, World!');
streamIntake.pushData('Another message');

// Signal end when no more data is to be pushed
streamIntake.signalEnd();

Real-world Scenario: Processing Large Files

Consider a scenario where you need to process a large CSV file, transform the data row-by-row, and then write the results to a database or another file. With smartstream, you could create a pipe that reads the CSV, processes each row, and handles backpressure, ensuring efficient use of resources.

import { SmartDuplex, createTransformFunction } from '@push.rocks/smartstream';
import fs from 'fs';
import csvParser from 'csv-parser'; // Assume this is a CSV parsing library

const csvReadTransform = createTransformFunction<any, any>(async (row) => {
  // Process row
  return processedRow;
});

fs.createReadStream('path/to/largeFile.csv')
  .pipe(csvParser())
  .pipe(csvReadTransform)
  .pipe(new SmartDuplex({
    async writeFunction(chunk, { push }) {
      await writeToDatabase(chunk); // Assume this writes to a database
    }
  }))
  .on('finish', () => console.log('File processed successfully.'));

This example demonstrates reading a large CSV file, transforming each row with createTransformFunction, and using a SmartDuplex to manage the processed data flow efficiently, ensuring no data is lost due to backpressure issues.

Conclusion

@push.rocks/smartstream offers a robust set of tools for working with Node.js streams, providing a more intuitive and reliable way to create, manipulate, and combine streams. By leveraging TypeScript and ESM syntax, smartstream enables developers to build more maintainable and type-safe stream-based solutions.

For more detailed examples and documentation, visit the GitLab Repository or the GitHub Mirror.

This repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the license file within this repository.

Please note: The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.

Trademarks

This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.

Company Information

Task Venture Capital GmbH
Registered at District court Bremen HRB 35230 HB, Germany

For any legal inquiries or if you require further information, please contact us via email at hello@task.vc.

By using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.