Compare commits

...

81 Commits

Author SHA1 Message Date
3e062103f8 3.0.40 2024-06-02 23:40:52 +02:00
6451e93c12 fix(smartduplex): now has a .getWebStreams method, that exposes a web streams compatible API 2024-06-02 23:40:52 +02:00
70cf93595c 3.0.39 2024-06-02 16:42:42 +02:00
17e03e9790 fix(core): update 2024-06-02 16:42:42 +02:00
e52ce7af61 update description 2024-05-29 14:16:38 +02:00
f548f4b6cb 3.0.38 2024-05-17 19:21:34 +02:00
23a7a77a73 fix(core): update 2024-05-17 19:21:33 +02:00
13d2fc78b8 3.0.37 2024-05-17 18:40:33 +02:00
898cc0407d fix(core): update 2024-05-17 18:40:32 +02:00
8a3f43a11a 3.0.36 2024-05-17 18:13:52 +02:00
da2191bb96 fix(core): update 2024-05-17 18:13:51 +02:00
f13db1e422 3.0.35 2024-05-05 18:30:05 +02:00
42a90e804a fix(core): update 2024-05-05 18:30:05 +02:00
413e2af717 update tsconfig 2024-04-14 18:25:32 +02:00
267a76af13 update tsconfig 2024-04-01 21:41:26 +02:00
7834b7e6d2 update npmextra.json: githost 2024-04-01 19:59:50 +02:00
ae643708e7 update npmextra.json: githost 2024-03-30 21:48:51 +01:00
d9d96b8bb7 3.0.34 2024-03-16 18:29:45 +01:00
a961eea431 fix(core): update 2024-03-16 18:29:44 +01:00
edb58ade28 3.0.33 2024-02-29 12:15:01 +01:00
753a481765 fix(core): update 2024-02-29 12:15:00 +01:00
bbbd1b73b9 3.0.32 2024-02-25 20:14:33 +01:00
271d0be106 fix(core): update 2024-02-25 20:14:33 +01:00
0ceeacd5a0 3.0.31 2024-02-25 20:14:20 +01:00
287695e445 fix(core): update 2024-02-25 20:14:19 +01:00
60f9e541a5 3.0.30 2023-11-14 10:51:23 +01:00
96ea67e135 fix(core): update 2023-11-14 10:51:23 +01:00
ba0a2023ad 3.0.29 2023-11-14 10:43:18 +01:00
a09c359847 fix(core): update 2023-11-14 10:43:17 +01:00
e2b4d772b3 3.0.28 2023-11-14 10:29:44 +01:00
0f46b62b2d fix(core): update 2023-11-14 10:29:44 +01:00
9bf37469c6 3.0.27 2023-11-13 21:38:13 +01:00
12bb125bdc fix(core): update 2023-11-13 21:38:12 +01:00
703dc11c6c 3.0.26 2023-11-13 20:34:22 +01:00
28725d1723 fix(core): update 2023-11-13 20:34:21 +01:00
c77e0f2ba6 3.0.25 2023-11-13 19:12:24 +01:00
196fb6d396 fix(core): update 2023-11-13 19:12:23 +01:00
df0ddf04b3 3.0.24 2023-11-13 19:06:02 +01:00
2e1aa4a8ff fix(core): update 2023-11-13 19:06:02 +01:00
bc09033af0 3.0.23 2023-11-13 18:41:05 +01:00
22df9dfd94 fix(core): update 2023-11-13 18:41:04 +01:00
d48ef6eb43 3.0.22 2023-11-13 18:19:11 +01:00
9421c652a2 fix(core): update 2023-11-13 18:19:11 +01:00
a6ab15bf1d 3.0.21 2023-11-13 17:52:12 +01:00
00d1455367 fix(core): update 2023-11-13 17:52:11 +01:00
116a281c6c 3.0.20 2023-11-13 17:43:15 +01:00
9bf6f251c4 fix(core): update 2023-11-13 17:43:15 +01:00
e3427c2498 3.0.19 2023-11-12 22:34:56 +01:00
a400a0a04c fix(core): update 2023-11-12 22:34:55 +01:00
91392e8bd5 3.0.18 2023-11-11 20:56:46 +01:00
d161d6613a fix(core): update 2023-11-11 20:56:46 +01:00
7a14e67f4f 3.0.17 2023-11-11 20:44:01 +01:00
465ccfec40 fix(core): update 2023-11-11 20:44:00 +01:00
3adb16d1f8 3.0.16 2023-11-11 20:30:43 +01:00
a9230ca790 fix(core): update 2023-11-11 20:30:42 +01:00
788f2665c2 3.0.15 2023-11-11 19:47:21 +01:00
7b678cc856 fix(core): update 2023-11-11 19:47:20 +01:00
12c9d8cc9d 3.0.14 2023-11-11 18:53:39 +01:00
3a2dc1c37e fix(core): update 2023-11-11 18:53:38 +01:00
1f67bc0e1e 3.0.13 2023-11-09 15:59:28 +01:00
b15ddd987c fix(core): update 2023-11-09 15:59:28 +01:00
cc43080513 3.0.12 2023-11-07 21:46:47 +01:00
49d235411f fix(core): update 2023-11-07 21:46:46 +01:00
d238662bea 3.0.11 2023-11-06 22:10:21 +01:00
8efb2b1093 fix(core): update 2023-11-06 22:10:20 +01:00
4926f57d83 3.0.10 2023-11-06 21:59:26 +01:00
86552f2b1b fix(core): update 2023-11-06 21:59:25 +01:00
353a8ecde6 3.0.9 2023-11-06 21:03:45 +01:00
3e03b81a43 fix(core): update 2023-11-06 21:03:44 +01:00
5e4ec5b837 3.0.8 2023-11-06 20:48:33 +01:00
62796f7151 fix(core): update 2023-11-06 20:48:32 +01:00
2c1d9f05ce 3.0.7 2023-11-04 00:17:04 +01:00
34cbf28972 fix(core): update 2023-11-04 00:17:03 +01:00
1b6e38c040 3.0.6 2023-11-03 23:25:01 +01:00
b135e6023a fix(core): update 2023-11-03 23:25:00 +01:00
91d01f3689 3.0.5 2023-11-03 22:26:16 +01:00
e8e067ea77 fix(core): update 2023-11-03 22:26:15 +01:00
2cb490cd2a 3.0.4 2023-11-03 21:47:30 +01:00
98397bb85e fix(core): update 2023-11-03 21:47:29 +01:00
f52b0de21f 3.0.3 2023-11-03 21:36:10 +01:00
1c0e5f264d fix(core): update 2023-11-03 21:36:10 +01:00
24 changed files with 5908 additions and 4165 deletions

View File

@ -1,140 +0,0 @@
# gitzone ci_default
image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
cache:
paths:
- .npmci_cache/
key: '$CI_BUILD_STAGE'
stages:
- security
- test
- release
- metadata
before_script:
- npm install -g @shipzone/npmci
# ====================
# security stage
# ====================
mirror:
stage: security
script:
- npmci git mirror
only:
- tags
tags:
- lossless
- docker
- notpriv
auditProductionDependencies:
image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
stage: security
script:
- npmci npm prepare
- npmci command npm install --production --ignore-scripts
- npmci command npm config set registry https://registry.npmjs.org
- npmci command npm audit --audit-level=high --only=prod --production
tags:
- docker
allow_failure: true
auditDevDependencies:
image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
stage: security
script:
- npmci npm prepare
- npmci command npm install --ignore-scripts
- npmci command npm config set registry https://registry.npmjs.org
- npmci command npm audit --audit-level=high --only=dev
tags:
- docker
allow_failure: true
# ====================
# test stage
# ====================
testStable:
stage: test
script:
- npmci npm prepare
- npmci node install stable
- npmci npm install
- npmci npm test
coverage: /\d+.?\d+?\%\s*coverage/
tags:
- docker
testBuild:
stage: test
script:
- npmci npm prepare
- npmci node install stable
- npmci npm install
- npmci command npm run build
coverage: /\d+.?\d+?\%\s*coverage/
tags:
- docker
release:
stage: release
script:
- npmci node install stable
- npmci npm publish
only:
- tags
tags:
- lossless
- docker
- notpriv
# ====================
# metadata stage
# ====================
codequality:
stage: metadata
allow_failure: true
only:
- tags
script:
- npmci command npm install -g typescript
- npmci npm prepare
- npmci npm install
tags:
- lossless
- docker
- priv
trigger:
stage: metadata
script:
- npmci trigger
only:
- tags
tags:
- lossless
- docker
- notpriv
pages:
stage: metadata
script:
- npmci node install lts
- npmci command npm install -g @git.zone/tsdoc
- npmci npm prepare
- npmci npm install
- npmci command tsdoc
tags:
- lossless
- docker
- notpriv
only:
- tags
artifacts:
expire_in: 1 week
paths:
- public
allow_failure: true

View File

@ -6,12 +6,34 @@
"gitzone": { "gitzone": {
"projectType": "npm", "projectType": "npm",
"module": { "module": {
"githost": "gitlab.com", "githost": "code.foss.global",
"gitscope": "push.rocks", "gitscope": "push.rocks",
"gitrepo": "smartstream", "gitrepo": "smartstream",
"description": "simplifies access to node streams", "description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
"npmPackagename": "@push.rocks/smartstream", "npmPackagename": "@push.rocks/smartstream",
"license": "MIT" "license": "MIT",
"keywords": [
"stream",
"node.js",
"typescript",
"stream manipulation",
"data processing",
"pipeline",
"async transformation",
"event handling",
"backpressure",
"readable stream",
"writable stream",
"duplex stream",
"transform stream",
"file streaming",
"buffer",
"stream utilities",
"esm"
]
} }
},
"tsdoc": {
"legal": "\n## License and Legal Information\n\nThis repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the [license](license) file within this repository. \n\n**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.\n\n### Trademarks\n\nThis project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.\n\n### Company Information\n\nTask Venture Capital GmbH \nRegistered at District court Bremen HRB 35230 HB, Germany\n\nFor any legal inquiries or if you require further information, please contact us via email at hello@task.vc.\n\nBy using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.\n"
} }
} }

View File

@ -1,40 +1,40 @@
{ {
"name": "@push.rocks/smartstream", "name": "@push.rocks/smartstream",
"version": "3.0.2", "version": "3.0.40",
"private": false, "private": false,
"description": "simplifies access to node streams", "description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
"main": "dist_ts/index.js",
"typings": "dist_ts/index.d.ts",
"type": "module", "type": "module",
"exports": {
".": "./dist_ts/index.js",
"./web": "./dist_ts_web/index.js"
},
"scripts": { "scripts": {
"test": "(tstest test/)", "test": "(tstest test/)",
"build": "(tsbuild)", "build": "(tsbuild tsfolders --web --allowimplicitany)"
"buildDocs": "tsdoc"
}, },
"repository": { "repository": {
"type": "git", "type": "git",
"url": "git+https://gitlab.com/push.rocks/smartstream.git" "url": "https://code.foss.global/push.rocks/smartstream.git"
}, },
"author": "Lossless GmbH", "author": "Lossless GmbH",
"license": "MIT", "license": "MIT",
"bugs": { "bugs": {
"url": "https://gitlab.com/push.rocks/smartstream/issues" "url": "https://gitlab.com/push.rocks/smartstream/issues"
}, },
"homepage": "https://gitlab.com/push.rocks/smartstream#readme", "homepage": "https://code.foss.global/push.rocks/smartstream",
"devDependencies": { "devDependencies": {
"@git.zone/tsbuild": "^2.1.66", "@git.zone/tsbuild": "^2.1.80",
"@git.zone/tsrun": "^1.2.44", "@git.zone/tsrun": "^1.2.44",
"@git.zone/tstest": "^1.0.77", "@git.zone/tstest": "^1.0.90",
"@push.rocks/smartfile": "^10.0.33", "@push.rocks/smartfile": "^11.0.15",
"@push.rocks/tapbundle": "^5.0.15" "@push.rocks/tapbundle": "^5.0.23",
"@types/node": "^20.12.12"
}, },
"dependencies": { "dependencies": {
"@push.rocks/lik": "^6.0.15",
"@push.rocks/smartenv": "^5.0.12",
"@push.rocks/smartpromise": "^4.0.3", "@push.rocks/smartpromise": "^4.0.3",
"@push.rocks/smartrx": "^3.0.7", "@push.rocks/smartrx": "^3.0.7"
"@types/from2": "^2.3.4",
"@types/through2": "^2.0.40",
"from2": "^2.3.0",
"through2": "^4.0.2"
}, },
"browserslist": [ "browserslist": [
"last 1 chrome versions" "last 1 chrome versions"
@ -50,5 +50,24 @@
"cli.js", "cli.js",
"npmextra.json", "npmextra.json",
"readme.md" "readme.md"
],
"keywords": [
"stream",
"node.js",
"typescript",
"stream manipulation",
"data processing",
"pipeline",
"async transformation",
"event handling",
"backpressure",
"readable stream",
"writable stream",
"duplex stream",
"transform stream",
"file streaming",
"buffer",
"stream utilities",
"esm"
] ]
} }

8737
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

1
readme.hints.md Normal file
View File

@ -0,0 +1 @@
- make sure to respect backpressure handling.

401
readme.md
View File

@ -1,54 +1,375 @@
# @pushrocks/smartstream ```markdown
simplifies access to node streams # @push.rocks/smartstream
A TypeScript library to simplify the creation and manipulation of Node.js streams, providing utilities for transform, duplex, and readable/writable stream handling while managing backpressure effectively.
## Availabililty and Links ## Install
* [npmjs.org (npm package)](https://www.npmjs.com/package/@pushrocks/smartstream) To install `@push.rocks/smartstream`, you can use npm or yarn as follows:
* [gitlab.com (source)](https://gitlab.com/pushrocks/smartstream)
* [github.com (source mirror)](https://github.com/pushrocks/smartstream)
* [docs (typedoc)](https://pushrocks.gitlab.io/smartstream/)
## Status for master ```bash
npm install @push.rocks/smartstream --save
# OR
yarn add @push.rocks/smartstream
```
Status Category | Status Badge This will add `@push.rocks/smartstream` to your project's dependencies.
-- | --
GitLab Pipelines | [![pipeline status](https://gitlab.com/pushrocks/smartstream/badges/master/pipeline.svg)](https://lossless.cloud)
GitLab Pipline Test Coverage | [![coverage report](https://gitlab.com/pushrocks/smartstream/badges/master/coverage.svg)](https://lossless.cloud)
npm | [![npm downloads per month](https://badgen.net/npm/dy/@pushrocks/smartstream)](https://lossless.cloud)
Snyk | [![Known Vulnerabilities](https://badgen.net/snyk/pushrocks/smartstream)](https://lossless.cloud)
TypeScript Support | [![TypeScript](https://badgen.net/badge/TypeScript/>=%203.x/blue?icon=typescript)](https://lossless.cloud)
node Support | [![node](https://img.shields.io/badge/node->=%2010.x.x-blue.svg)](https://nodejs.org/dist/latest-v10.x/docs/api/)
Code Style | [![Code Style](https://badgen.net/badge/style/prettier/purple)](https://lossless.cloud)
PackagePhobia (total standalone install weight) | [![PackagePhobia](https://badgen.net/packagephobia/install/@pushrocks/smartstream)](https://lossless.cloud)
PackagePhobia (package size on registry) | [![PackagePhobia](https://badgen.net/packagephobia/publish/@pushrocks/smartstream)](https://lossless.cloud)
BundlePhobia (total size when bundled) | [![BundlePhobia](https://badgen.net/bundlephobia/minzip/@pushrocks/smartstream)](https://lossless.cloud)
Platform support | [![Supports Windows 10](https://badgen.net/badge/supports%20Windows%2010/yes/green?icon=windows)](https://lossless.cloud) [![Supports Mac OS X](https://badgen.net/badge/supports%20Mac%20OS%20X/yes/green?icon=apple)](https://lossless.cloud)
## Usage ## Usage
Use TypeScript for best in class instellisense. The `@push.rocks/smartstream` module is designed to simplify working with Node.js streams by providing a set of utilities for creating and manipulating streams. This module makes extensive use of TypeScript for improved code quality, readability, and maintenance. ESM syntax is utilized throughout the examples.
### Importing the Module
Start by importing the module into your TypeScript file:
```typescript ```typescript
import { Smartstream } from 'smartstream' import * as smartstream from '@push.rocks/smartstream';
import * as gUglify from 'gulp-uglify'
let mySmartstream = new Smartstream([
gulp.src(['./file1.js','./file2.js']),
gUglify(),
gulp.dest('./some/output/path')
])
mySmartstream.onError((err) => { /* handle error */ }) // handles all errors in stream
myStream.onCustomEvent('myeventname', (args...) => { /* Do something */ }) // emit an custom event anywhere in your stream
mySmartstream.run().then(() => {/* do something when stream is finished */})
``` ```
## Contribution For a more specific import, you may do the following:
We are always happy for code contributions. If you are not the code contributing type that is ok. Still, maintaining Open Source repositories takes considerable time and thought. If you like the quality of what we do and our modules are useful to you we would appreciate a little monthly contribution: You can [contribute one time](https://lossless.link/contribute-onetime) or [contribute monthly](https://lossless.link/contribute). :) ```typescript
import { SmartDuplex, StreamWrapper, StreamIntake, createTransformFunction, createPassThrough } from '@push.rocks/smartstream';
```
For further information read the linked docs at the top of this readme. ### Creating Basic Transform Streams
> MIT licensed | **©** [Lossless GmbH](https://lossless.gmbh) The module provides utilities for creating transform streams. For example, to create a transform stream that modifies chunks of data, you can use the `createTransformFunction` utility:
| By using this npm module you agree to our [privacy policy](https://lossless.gmbH/privacy)
[![repo-footer](https://lossless.gitlab.io/publicrelations/repofooter.svg)](https://maintainedby.lossless.com) ```typescript
import { createTransformFunction } from '@push.rocks/smartstream';
const upperCaseTransform = createTransformFunction<string, string>(async (chunk) => {
return chunk.toUpperCase();
});
// Usage with pipe
readableStream
.pipe(upperCaseTransform)
.pipe(writableStream);
```
### Handling Backpressure with SmartDuplex
`SmartDuplex` is a powerful part of the `smartstream` module designed to handle backpressure effectively. Here's an example of how to create a `SmartDuplex` stream that processes data and respects the consumer's pace:
```typescript
import { SmartDuplex } from '@push.rocks/smartstream';
const processDataDuplex = new SmartDuplex({
async writeFunction(chunk, { push }) {
const processedChunk = await processChunk(chunk); // Assume this is a defined asynchronous function
push(processedChunk);
}
});
sourceStream.pipe(processDataDuplex).pipe(destinationStream);
```
### Combining Multiple Streams
`Smartstream` facilitates easy combining of multiple streams into a single pipeline, handling errors and cleanup automatically. Here's how you can combine multiple streams:
```typescript
import { StreamWrapper } from '@push.rocks/smartstream';
const combinedStream = new StreamWrapper([
readStream, // Source stream
transformStream1, // Transformation
transformStream2, // Another transformation
writeStream // Destination stream
]);
combinedStream.run()
.then(() => console.log('Processing completed.'))
.catch(err => console.error('An error occurred:', err));
```
### Working with StreamIntake
`StreamIntake` allows for more dynamic control of the reading process, facilitating scenarios where data is not continuously available:
```typescript
import { StreamIntake } from '@push.rocks/smartstream';
const streamIntake = new StreamIntake<string>();
// Dynamically push data into the intake
streamIntake.pushData('Hello, World!');
streamIntake.pushData('Another message');
// Signal end when no more data is to be pushed
streamIntake.signalEnd();
```
### Real-world Scenario: Processing Large Files
Consider a scenario where you need to process a large CSV file, transform the data row-by-row, and then write the results to a database or another file. With `smartstream`, you could create a pipe that reads the CSV, processes each row, and handles backpressure, ensuring efficient use of resources.
```typescript
import { SmartDuplex, createTransformFunction } from '@push.rocks/smartstream';
import fs from 'fs';
import csvParser from 'csv-parser';
const csvReadTransform = createTransformFunction<any, any>(async (row) => {
// Process row
return processedRow;
});
fs.createReadStream('path/to/largeFile.csv')
.pipe(csvParser())
.pipe(csvReadTransform)
.pipe(new SmartDuplex({
async writeFunction(chunk, { push }) {
await writeToDatabase(chunk); // Assume this writes to a database
}
}))
.on('finish', () => console.log('File processed successfully.'));
```
This example demonstrates reading a large CSV file, transforming each row with `createTransformFunction`, and using a `SmartDuplex` to manage the processed data flow efficiently, ensuring no data is lost due to backpressure issues.
### Advanced Use Case: Backpressure Handling
Effective backpressure handling is crucial when working with streams to avoid overwhelming the downstream consumers. Heres a comprehensive example that demonstrates handling backpressure in a pipeline with multiple `SmartDuplex` instances:
```typescript
import { SmartDuplex } from '@push.rocks/smartstream';
// Define the first SmartDuplex, which writes data slowly to simulate backpressure
const slowProcessingStream = new SmartDuplex({
name: 'SlowProcessor',
objectMode: true,
writeFunction: async (chunk, { push }) => {
await new Promise(resolve => setTimeout(resolve, 100)); // Simulated delay
console.log('Processed chunk:', chunk);
push(chunk);
}
});
// Define the second SmartDuplex as a fast processor
const fastProcessingStream = new SmartDuplex({
name: 'FastProcessor',
objectMode: true,
writeFunction: async (chunk, { push }) => {
console.log('Fast processing chunk:', chunk);
push(chunk);
}
});
// Create a StreamIntake to dynamically handle incoming data
const streamIntake = new StreamIntake<string>();
// Chain the streams together and handle the backpressure scenario
streamIntake
.pipe(fastProcessingStream)
.pipe(slowProcessingStream)
.pipe(createPassThrough()) // Use Pass-Through to provide intermediary handling
.on('data', data => console.log('Final output:', data))
.on('error', error => console.error('Stream encountered an error:', error));
// Simulate data pushing with intervals to observe backpressure handling
let counter = 0;
const interval = setInterval(() => {
if (counter >= 10) {
streamIntake.signalEnd();
clearInterval(interval);
} else {
streamIntake.pushData(`Chunk ${counter}`);
counter++;
}
}, 50);
```
In this advanced use case, a `SlowProcessor` and `FastProcessor` are created using `SmartDuplex`, simulating a situation where one stream is slower than another. The `StreamIntake` dynamically handles incoming chunks of data and the intermediary Pass-Through handles any potential interruptions.
### Transform Streams in Parallel
For scenarios where you need to process data in parallel:
```typescript
import { SmartDuplex, createTransformFunction } from '@push.rocks/smartstream';
const parallelTransform = createTransformFunction<any, any>(async (chunk) => {
// Parallel Processing
const results = await Promise.all(chunk.map(async item => await processItem(item)));
return results;
});
const streamIntake = new StreamIntake<any[]>();
streamIntake
.pipe(parallelTransform)
.pipe(new SmartDuplex({
async writeFunction(chunk, { push }) {
console.log('Processed parallel chunk:', chunk);
push(chunk);
}
}))
.on('finish', () => console.log('Parallel processing completed.'));
// Simulate data pushing
streamIntake.pushData([1, 2, 3, 4]);
streamIntake.pushData([5, 6, 7, 8]);
streamIntake.signalEnd();
```
### Error Handling in Stream Pipelines
Error handling is an essential part of working with streams. The `StreamWrapper` assists in combining multiple streams while managing errors seamlessly:
```typescript
import { StreamWrapper } from '@push.rocks/smartstream';
const faultyStream = new SmartDuplex({
async writeFunction(chunk, { push }) {
if (chunk === 'bad data') {
throw new Error('Faulty data encountered');
}
push(chunk);
}
});
const readStream = new StreamIntake<string>();
const writeStream = new SmartDuplex({
async writeFunction(chunk) {
console.log('Written chunk:', chunk);
}
});
const combinedStream = new StreamWrapper([readStream, faultyStream, writeStream]);
combinedStream.run()
.then(() => console.log('Stream processing completed.'))
.catch(err => console.error('Stream error:', err.message));
// Push Data
readStream.pushData('good data');
readStream.pushData('bad data'); // This will throw an error
readStream.pushData('more good data');
readStream.signalEnd();
```
### Testing Streams
Here's an example test case using the `tap` testing framework to verify the integrity of the `SmartDuplex` from a buffer:
```typescript
import { expect, tap } from '@push.rocks/tapbundle';
import { SmartDuplex } from '@push.rocks/smartstream';
tap.test('should create a SmartStream from a Buffer', async () => {
const bufferData = Buffer.from('This is a test buffer');
const smartStream = SmartDuplex.fromBuffer(bufferData, {});
let receivedData = Buffer.alloc(0);
return new Promise<void>((resolve) => {
smartStream.on('data', (chunk: Buffer) => {
receivedData = Buffer.concat([receivedData, chunk]);
});
smartStream.on('end', () => {
expect(receivedData.toString()).toEqual(bufferData.toString());
resolve();
});
});
});
tap.start();
```
### Working with Files and Buffers
You can easily stream files and buffers with `smartstream`. Heres a test illustrating reading and writing with file streams using `smartfile` combined with `smartstream` utilities:
```typescript
import { tap } from '@push.rocks/tapbundle';
import * as smartfile from '@push.rocks/smartfile';
import { SmartDuplex, StreamWrapper } from '@push.rocks/smartstream';
tap.test('should handle file read and write streams', async () => {
const readStream = smartfile.fsStream.createReadStream('./test/assets/readabletext.txt');
const writeStream = smartfile.fsStream.createWriteStream('./test/assets/writabletext.txt');
const transformStream = new SmartDuplex({
async writeFunction(chunk, { push }) {
const transformedChunk = chunk.toString().toUpperCase();
push(transformedChunk);
}
});
const streamWrapper = new StreamWrapper([readStream, transformStream, writeStream]);
await streamWrapper.run();
const outputContent = await smartfile.fs.promises.readFile('./test/assets/writabletext.txt', 'utf-8');
console.log('Output Content:', outputContent);
});
tap.start();
```
### Modular and Scoped Transformations
Creating modular and scoped transformations is straightforward with `SmartDuplex`:
```typescript
import { SmartDuplex } from '@push.rocks/smartstream';
type DataChunk = {
id: number;
data: string;
};
const transformationStream1 = new SmartDuplex<DataChunk, DataChunk>({
async writeFunction(chunk, { push }) {
chunk.data = chunk.data.toUpperCase();
push(chunk);
}
})
const transformationStream2 = new SmartDuplex<DataChunk, DataChunk>({
async writeFunction(chunk, { push }) {
chunk.data = `${chunk.data} processed with transformation 2`;
push(chunk);
}
});
const initialData: DataChunk[] = [
{ id: 1, data: 'first' },
{ id: 2, data: 'second' }
];
const intakeStream = new StreamIntake<DataChunk>();
intakeStream
.pipe(transformationStream1)
.pipe(transformationStream2)
.on('data', data => console.log('Transformed Data:', data));
initialData.forEach(item => intakeStream.pushData(item));
intakeStream.signalEnd();
```
By leveraging `SmartDuplex`, `StreamWrapper`, and `StreamIntake`, you can streamline and enhance your data transformation pipelines in Node.js with a clear, efficient, and backpressure-friendly approach.
```
## License and Legal Information
This repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the [license](license) file within this repository.
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
### Trademarks
This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.
### Company Information
Task Venture Capital GmbH
Registered at District court Bremen HRB 35230 HB, Germany
For any legal inquiries or if you require further information, please contact us via email at hello@task.vc.
By using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.

0
readme_instructions.md Normal file
View File

View File

@ -0,0 +1,68 @@
import { tap, expect } from '@push.rocks/tapbundle';
import { SmartDuplex, type ISmartDuplexOptions, StreamWrapper } from '../ts/index.js';
tap.test('should run backpressure test', async (toolsArg) => {
const done = toolsArg.defer();
async function testBackpressure() {
const stream1 = new SmartDuplex({
name: 'stream1',
objectMode: true,
writeFunction: async (chunk, tools) => {
await new Promise((resolve) => setTimeout(resolve, 10)); // Slow processing
console.log(`processed chunk ${chunk} in stream 1`);
return chunk; // Fast processing
},
});
const stream2 = new SmartDuplex({
name: 'stream2',
objectMode: true,
writeFunction: async (chunk, tools) => {
await new Promise((resolve) => setTimeout(resolve, 20)); // Slow processing
console.log(`processed chunk ${chunk} in stream 2`);
await tools.push(chunk);
// return chunk, optionally return ;
},
}); // This stream processes data more slowly
const stream3 = new SmartDuplex({
objectMode: true,
name: 'stream3',
writeFunction: async (chunk, tools) => {
await new Promise((resolve) => setTimeout(resolve, 200)); // Slow processing
console.log(`processed chunk ${chunk} in stream 3`);
},
});
stream1.pipe(stream2).pipe(stream3);
let backpressured = false;
for (let i = 0; i < 200; i++) {
const canContinue = stream1.write(`Chunk ${i}`, 'utf8');
if (!canContinue) {
backpressured = true;
console.log(`Backpressure at chunk ${i}`);
}
}
stream1.end();
stream1.on('finish', () => {
console.log('Stream 1 finished processing.');
});
stream2.on('finish', () => {
console.log('Stream 2 finished processing.');
});
stream3.on('finish', () => {
console.log('Stream 3 finished processing.');
if (!backpressured) {
throw new Error('No backpressure was observed.');
} else {
done.resolve();
}
});
}
testBackpressure();
await done.promise;
});
await tap.start();

View File

@ -5,7 +5,7 @@ import * as fs from 'fs';
tap.test('should create a SmartStream from a Buffer', async () => { tap.test('should create a SmartStream from a Buffer', async () => {
const bufferData = Buffer.from('This is a test buffer'); const bufferData = Buffer.from('This is a test buffer');
const smartStream = SmartDuplex.fromBuffer(bufferData); const smartStream = SmartDuplex.fromBuffer(bufferData, {});
let receivedData = Buffer.alloc(0); let receivedData = Buffer.alloc(0);
@ -21,24 +21,4 @@ tap.test('should create a SmartStream from a Buffer', async () => {
}); });
}); });
tap.test('should create a SmartStream from an Observable', async () => {
const observableData = 'Observable test data';
const testObservable = smartrx.rxjs.of(Buffer.from(observableData));
const smartStream = SmartDuplex.fromObservable(testObservable);
let receivedData = Buffer.alloc(0);
return new Promise<void>((resolve) => {
smartStream.on('data', (chunk: Buffer) => {
receivedData = Buffer.concat([receivedData, chunk]);
});
smartStream.on('end', () => {
expect(receivedData.toString()).toEqual(observableData);
resolve();
});
});
});
tap.start(); tap.start();

View File

@ -10,27 +10,26 @@ tap.test('should handle a read stream', async (tools) => {
const streamWrapper = new smartstream.StreamWrapper([ const streamWrapper = new smartstream.StreamWrapper([
smartfile.fsStream.createReadStream('./test/assets/readabletext.txt'), smartfile.fsStream.createReadStream('./test/assets/readabletext.txt'),
new smartstream.SmartDuplex({ new smartstream.SmartDuplex({
writeAndTransformFunction: async (chunkStringArg: Buffer, streamTools) => { writeFunction: async (chunkStringArg: Buffer, streamTools) => {
// do something with the stream here // do something with the stream here
const result = chunkStringArg.toString().substr(0, 100); const result = chunkStringArg.toString().substr(0, 100);
streamTools.push('wow =========== \n'); streamTools.push('wow =========== \n');
return Buffer.from(result); return Buffer.from(result);
}, },
streamEndFunction: async (tools) => { finalFunction: async (tools) => {
return Buffer.from('this is the end'); return Buffer.from('this is the end');
}, },
}), }),
new smartstream.SmartDuplex({ new smartstream.SmartDuplex({
writeAndTransformFunction: async (chunkStringArg) => { writeFunction: async (chunkStringArg) => {
console.log(chunkStringArg.toString()); console.log(chunkStringArg.toString());
}, },
streamEndFunction: async (tools) => { finalFunction: async (tools) => {
tools.push(null); tools.push(null);
}, },
}), })
smartstream.cleanPipe(),
]); ]);
// await streamWrapper.run(); await streamWrapper.run();
}); });
tap.test('should create a valid Intake', async (tools) => { tap.test('should create a valid Intake', async (tools) => {
@ -38,7 +37,7 @@ tap.test('should create a valid Intake', async (tools) => {
testIntake.pipe( testIntake.pipe(
new smartstream.SmartDuplex({ new smartstream.SmartDuplex({
objectMode: true, objectMode: true,
writeAndTransformFunction: async (chunkStringArg: string, streamTools) => { writeFunction: async (chunkStringArg: string, streamTools) => {
await tools.delayFor(100); await tools.delayFor(100);
console.log(chunkStringArg); console.log(chunkStringArg);
return chunkStringArg; return chunkStringArg;

70
test/test.ts_web.both.ts Normal file
View File

@ -0,0 +1,70 @@
import { expect, expectAsync, tap } from '@push.rocks/tapbundle';
import * as webstream from '../ts_web/index.js';
tap.test('WebDuplexStream', async (toolsArg) => {
const testDone = toolsArg.defer(); // Create a deferred object to control test completion.
const inputUint8Array = new Uint8Array([1, 2, 3, 4, 5]);
const stream = webstream.WebDuplexStream.fromUInt8Array(inputUint8Array);
const reader = stream.readable.getReader();
let readUint8Array = new Uint8Array();
reader.read().then(function processText({ done, value }) {
if (done) {
expect(readUint8Array).toEqual(inputUint8Array);
testDone.resolve(); // Correctly signal that the test is done.
return;
}
readUint8Array = new Uint8Array([...readUint8Array, ...value]);
return reader.read().then(processText);
});
return testDone.promise; // Return the promise to properly wait for the test to complete.
});
tap.test('should handle transform with a write function', async (toolsArg) => {
const testDone = toolsArg.defer();
const input = [1, 2, 3, 4, 5];
const expectedOutput = [2, 4, 6, 8, 10];
const transformStream = new webstream.WebDuplexStream<number, number>({
writeFunction: (chunk, { push }) => {
push(chunk * 2); // Push the doubled number into the stream
return Promise.resolve(); // Resolve the promise immediately
},
});
const writableStream = transformStream.writable.getWriter();
const readableStream = transformStream.readable.getReader();
const output: number[] = [];
// Process the text and resolve the test once done.
const processText = async ({ done, value }) => {
if (done) {
expect(output).toEqual(expectedOutput);
testDone.resolve(); // Resolve the deferred test once all values have been read.
return;
}
if (value !== undefined) {
output.push(value);
}
// Continue reading and processing.
await readableStream.read().then(processText);
};
// Start the read process before writing to the stream.
readableStream.read().then(processText);
// Sequentially write to the stream and close when done.
for (const num of input) {
await writableStream.write(num);
}
await writableStream.close();
return testDone.promise; // This will wait until the testDone is resolved before completing the test.
});
tap.start();

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartstream', name: '@push.rocks/smartstream',
version: '3.0.2', version: '3.0.40',
description: 'simplifies access to node streams' description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
} }

View File

@ -1,4 +1,8 @@
export * from './smartstream.classes.passthrough.js';
export * from './smartstream.classes.smartduplex.js'; export * from './smartstream.classes.smartduplex.js';
export * from './smartstream.classes.streamwrapper.js'; export * from './smartstream.classes.streamwrapper.js';
export * from './smartstream.classes.streamintake.js'; export * from './smartstream.classes.streamintake.js';
export * from './smartstream.functions.js';
import * as plugins from './smartstream.plugins.js';
export const webstream = plugins.webstream;

View File

@ -1,19 +0,0 @@
import * as plugins from './smartstream.plugins.js';
export class PassThrough extends plugins.stream.Duplex {
constructor(options?: plugins.stream.DuplexOptions) {
super(options);
}
_read(size: number): void {
// No-op: Data written will be automatically available for reading.
}
_write(chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void): void {
if (this.push(chunk, encoding)) {
callback();
} else {
this.once('drain', callback);
}
}
}

View File

@ -3,175 +3,207 @@ import { Duplex, type DuplexOptions } from 'stream';
export interface IStreamTools { export interface IStreamTools {
truncate: () => void; truncate: () => void;
push: (pipeObject: any) => void; push: (pipeObject: any) => Promise<boolean>;
} }
export interface IWriteAndTransformFunction<T, rT> { export interface IStreamWriteFunction<T, rT> {
(chunkArg: T, toolsArg: IStreamTools): Promise<rT>; (chunkArg: T, toolsArg: IStreamTools): Promise<rT>;
} }
export interface IStreamEndFunction<rT> { export interface IStreamFinalFunction<rT> {
(toolsArg: IStreamTools): Promise<rT>; (toolsArg: IStreamTools): Promise<rT>;
} }
export interface SmartStreamOptions<TInput, TOutput> extends DuplexOptions { export interface ISmartDuplexOptions<TInput, TOutput> extends DuplexOptions {
/**
* wether to print debug logs
*/
debug?: boolean;
/**
* the name of the stream
*/
name?: string;
/**
* a function that is being called to read more stuff from whereever to be processed by the stream
* @returns
*/
readFunction?: () => Promise<void>; readFunction?: () => Promise<void>;
writeAndTransformFunction?: IWriteAndTransformFunction<TInput, TOutput>;
streamEndFunction?: IStreamEndFunction<TOutput>; /**
// Add other custom options if necessary * the write function is called for every chunk that is being written to the stream
* it can push or return chunks (but does not have to) to be written to the readable side of the stream
*/
writeFunction?: IStreamWriteFunction<TInput, TOutput>;
/**
* a final function that is run at the end of the stream
*/
finalFunction?: IStreamFinalFunction<TOutput>;
} }
export class SmartDuplex<TInput = any, TOutput = any> extends Duplex { export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
// STATIC // STATIC
static fromBuffer(buffer: Buffer, options?: DuplexOptions): SmartDuplex { static fromBuffer(buffer: Buffer, options?: ISmartDuplexOptions<any, any>): SmartDuplex {
const smartStream = new SmartDuplex(options); const smartDuplex = new SmartDuplex(options);
process.nextTick(() => { process.nextTick(() => {
smartStream.push(buffer); smartDuplex.push(buffer);
smartStream.push(null); // Signal the end of the data smartDuplex.push(null); // Signal the end of the data
}); });
return smartStream; return smartDuplex;
}
static fromObservable(
observable: plugins.smartrx.rxjs.Observable<any>,
options?: DuplexOptions
): SmartDuplex {
const smartStream = new SmartDuplex(options);
smartStream.observableSubscription = observable.subscribe({
next: (data) => {
if (!smartStream.push(data)) {
// Pause the observable if the stream buffer is full
smartStream.observableSubscription?.unsubscribe();
smartStream.once('drain', () => {
// Resume the observable when the stream buffer is drained
smartStream.observableSubscription?.unsubscribe();
smartStream.observableSubscription = observable.subscribe((data) => {
smartStream.push(data);
});
});
}
},
error: (err) => {
smartStream.emit('error', err);
},
complete: () => {
smartStream.push(null); // Signal the end of the data
},
});
return smartStream;
}
static fromReplaySubject(
replaySubject: plugins.smartrx.rxjs.ReplaySubject<any>,
options?: DuplexOptions
): SmartDuplex {
const smartStream = new SmartDuplex(options);
let isBackpressured = false;
// Subscribe to the ReplaySubject
const subscription = replaySubject.subscribe({
next: (data) => {
const canPush = smartStream.push(data);
if (!canPush) {
// If push returns false, pause the subscription because of backpressure
isBackpressured = true;
subscription.unsubscribe();
}
},
error: (err) => {
smartStream.emit('error', err);
},
complete: () => {
smartStream.push(null); // End the stream when the ReplaySubject completes
},
});
// Listen for 'drain' event to resume the subscription if it was paused
smartStream.on('drain', () => {
if (isBackpressured) {
isBackpressured = false;
// Resubscribe to the ReplaySubject since we previously paused
smartStream.observableSubscription = replaySubject.subscribe({
next: (data) => {
if (!smartStream.push(data)) {
smartStream.observableSubscription?.unsubscribe();
isBackpressured = true;
}
},
// No need to repeat error and complete handling here because it's already set up above
});
}
});
return smartStream;
} }
// INSTANCE // INSTANCE
private readFunction?: () => Promise<void>; private backpressuredArray: plugins.lik.BackpressuredArray<TOutput>;
private writeAndTransformFunction?: IWriteAndTransformFunction<TInput, TOutput>; public options: ISmartDuplexOptions<TInput, TOutput>;
private streamEndFunction?: IStreamEndFunction<TOutput>;
private observableSubscription?: plugins.smartrx.rxjs.Subscription; private observableSubscription?: plugins.smartrx.rxjs.Subscription;
private debugLog(messageArg: string) {
if (this.options.debug) {
console.log(messageArg);
}
}
constructor(optionsArg?: SmartStreamOptions<TInput, TOutput>) { constructor(optionsArg?: ISmartDuplexOptions<TInput, TOutput>) {
super(optionsArg); super(Object.assign({
this.readFunction = optionsArg?.readFunction; highWaterMark: 1,
this.writeAndTransformFunction = optionsArg?.writeAndTransformFunction; }, optionsArg));
this.streamEndFunction = optionsArg?.streamEndFunction; this.options = optionsArg;
this.backpressuredArray = new plugins.lik.BackpressuredArray<TOutput>(this.options.highWaterMark || 1)
} }
public async _read(size: number): Promise<void> { public async _read(size: number): Promise<void> {
if (this.readFunction) { this.debugLog(`${this.options.name}: read was called`);
await this.readFunction(); await this.backpressuredArray.waitForItems();
this.debugLog(`${this.options.name}: successfully waited for items.`);
if (this.options.readFunction) {
await this.options.readFunction();
}
let canPushMore = true;
while(this.backpressuredArray.data.length > 0 && canPushMore) {
const nextChunk = this.backpressuredArray.shift();
canPushMore = this.push(nextChunk);
} }
} }
public async backpressuredPush (pushArg: TOutput) {
const canPushMore = this.backpressuredArray.push(pushArg);
if (!canPushMore) {
this.debugLog(`${this.options.name}: cannot push more`);
await this.backpressuredArray.waitForSpace();
this.debugLog(`${this.options.name}: can push more again`);
}
return canPushMore;
};
private asyncWritePromiseObjectmap = new plugins.lik.ObjectMap<Promise<any>>();
// Ensure the _write method types the chunk as TInput and encodes TOutput // Ensure the _write method types the chunk as TInput and encodes TOutput
public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) { public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) {
if (!this.writeAndTransformFunction) { if (!this.options.writeFunction) {
return callback(new Error('No stream function provided')); return callback(new Error('No stream function provided'));
} }
let isTruncated = false;
const tools: IStreamTools = { const tools: IStreamTools = {
truncate: () => { truncate: () => {
this.push(null); this.push(null);
isTruncated = true;
callback(); callback();
}, },
push: (pushArg: TOutput) => this.push(pushArg), push: async (pushArg: TOutput) => {
return await this.backpressuredPush(pushArg);
}
}; };
try { try {
const modifiedChunk = await this.writeAndTransformFunction(chunk, tools); const writeDeferred = plugins.smartpromise.defer();
if (modifiedChunk) { this.asyncWritePromiseObjectmap.add(writeDeferred.promise);
if (!this.push(modifiedChunk)) { const modifiedChunk = await this.options.writeFunction(chunk, tools);
// Handle backpressure if necessary if (isTruncated) {
return;
} }
if (modifiedChunk) {
await tools.push(modifiedChunk);
} }
callback(); callback();
writeDeferred.resolve();
writeDeferred.promise.then(() => {
this.asyncWritePromiseObjectmap.remove(writeDeferred.promise);
});
} catch (err) { } catch (err) {
callback(err); callback(err);
} }
} }
public async _final(callback: (error?: Error | null) => void) { public async _final(callback: (error?: Error | null) => void) {
if (this.streamEndFunction) { await Promise.all(this.asyncWritePromiseObjectmap.getArray());
if (this.options.finalFunction) {
const tools: IStreamTools = { const tools: IStreamTools = {
truncate: () => callback(), truncate: () => callback(),
push: (pipeObject) => this.push(pipeObject), push: async (pipeObject) => {
return this.backpressuredArray.push(pipeObject);
},
}; };
try { try {
const finalChunk = await this.streamEndFunction(tools); const finalChunk = await this.options.finalFunction(tools);
if (finalChunk) { if (finalChunk) {
this.push(finalChunk); this.backpressuredArray.push(finalChunk);
} }
callback();
} catch (err) { } catch (err) {
this.backpressuredArray.push(null);
callback(err); callback(err);
return;
} }
} else { }
this.push(null), this.backpressuredArray.push(null);
callback(); callback();
} }
public getWebStreams(): { readable: ReadableStream, writable: WritableStream } {
const duplex = this;
const readable = new ReadableStream({
start(controller) {
duplex.on('readable', () => {
let chunk;
while (null !== (chunk = duplex.read())) {
controller.enqueue(chunk);
}
});
duplex.on('end', () => {
controller.close();
});
},
cancel(reason) {
duplex.destroy(new Error(reason));
}
});
const writable = new WritableStream({
write(chunk) {
return new Promise<void>((resolve, reject) => {
const isBackpressured = !duplex.write(chunk, (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
if (isBackpressured) {
duplex.once('drain', resolve);
}
});
},
close() {
return new Promise<void>((resolve, reject) => {
duplex.end(resolve);
});
},
abort(reason) {
duplex.destroy(new Error(reason));
}
});
return { readable, writable };
} }
} }

View File

@ -14,7 +14,7 @@ export class StreamIntake<T> extends plugins.stream.Readable {
_read(size: number): void { _read(size: number): void {
// console.log('get next'); // console.log('get next');
const pushChunk = (): void => { const pushChunk = (): void => {
if (this.chunkStore.length > 0) { while (this.chunkStore.length > 0) {
// If push returns false, then we should stop reading // If push returns false, then we should stop reading
if (!this.push(this.chunkStore.shift())) { if (!this.push(this.chunkStore.shift())) {
return; return;

View File

@ -94,14 +94,3 @@ export class StreamWrapper {
return done.promise; return done.promise;
} }
} }
export let cleanPipe = () => {
return plugins.through2.obj(
(file, enc, cb) => {
cb();
},
(cb) => {
cb();
}
);
};

View File

@ -0,0 +1,30 @@
import { Transform, type TransformCallback, type TransformOptions } from 'stream';
import { SmartDuplex } from './smartstream.classes.smartduplex.js';
export interface AsyncTransformFunction<TInput, TOutput> {
(chunkArg: TInput): Promise<TOutput>;
}
export function createTransformFunction<TInput, TOutput>(
asyncFunction: AsyncTransformFunction<TInput, TOutput>,
options?: TransformOptions
): SmartDuplex {
const smartDuplexStream = new SmartDuplex({
...options,
writeFunction: async (chunkArg, toolsArg) => {
const result = await asyncFunction(chunkArg);
return result;
}
});
return smartDuplexStream;
}
export const createPassThrough = () => {
return new SmartDuplex({
objectMode: true,
writeFunction: async (chunkArg, toolsArg) => {
return chunkArg;
}
})
}

View File

@ -4,13 +4,10 @@ import * as stream from 'stream';
export { stream }; export { stream };
// pushrocks scope // pushrocks scope
import * as lik from '@push.rocks/lik';
import * as smartpromise from '@push.rocks/smartpromise'; import * as smartpromise from '@push.rocks/smartpromise';
import * as smartrx from '@push.rocks/smartrx'; import * as smartrx from '@push.rocks/smartrx';
import * as webstream from '../dist_ts_web/index.js';
export { smartpromise, smartrx }; export { lik, smartpromise, smartrx, webstream };
// thirdparty
import from2 from 'from2';
import through2 from 'through2';
export { from2, through2 };

View File

@ -0,0 +1,8 @@
/**
* autocreated commitinfo by @pushrocks/commitinfo
*/
export const commitinfo = {
name: '@push.rocks/smartstream',
version: '3.0.40',
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
}

View File

@ -0,0 +1,156 @@
import * as plugins from './plugins.js';
// ========================================
// READ
// ========================================
export interface IStreamToolsRead<TInput, TOutput> {
done: () => void;
write: (writeArg: TInput) => void;
}
/**
* the read function is called anytime
* -> the WebDuplexStream is being read from
* and at the same time if nothing is enqueued
*/
export interface IStreamReadFunction<TInput, TOutput> {
(toolsArg: IStreamToolsRead<TInput, TOutput>): Promise<void>;
}
// ========================================
// WRITE
// ========================================
export interface IStreamToolsWrite<TInput, TOutput> {
truncate: () => void;
push: (pushArg: TOutput) => void;
}
/**
* the write function can return something.
* It is called anytime a chunk is written to the stream.
*/
export interface IStreamWriteFunction<TInput, TOutput> {
(chunkArg: TInput, toolsArg: IStreamToolsWrite<TInput, TOutput>): Promise<any>;
}
export interface IStreamFinalFunction<TInput, TOutput> {
(toolsArg: IStreamToolsWrite<TInput, TOutput>): Promise<TOutput>;
}
export interface WebDuplexStreamOptions<TInput, TOutput> {
readFunction?: IStreamReadFunction<TInput, TOutput>;
writeFunction?: IStreamWriteFunction<TInput, TOutput>;
finalFunction?: IStreamFinalFunction<TInput, TOutput>;
}
export class WebDuplexStream<TInput = any, TOutput = any> extends TransformStream<TInput, TOutput> {
static fromUInt8Array(uint8Array: Uint8Array): WebDuplexStream<Uint8Array, Uint8Array> {
const stream = new WebDuplexStream<Uint8Array, Uint8Array>({
writeFunction: async (chunk, { push }) => {
push(chunk); // Directly push the chunk as is
return null;
}
});
const writer = stream.writable.getWriter();
writer.write(uint8Array).then(() => writer.close());
return stream;
}
// INSTANCE
options: WebDuplexStreamOptions<TInput, TOutput>;
constructor(optionsArg: WebDuplexStreamOptions<TInput, TOutput>) {
super({
async transform(chunk, controller) {
// Transformation logic remains unchanged
if (optionsArg?.writeFunction) {
const tools: IStreamToolsWrite<TInput, TOutput> = {
truncate: () => controller.terminate(),
push: (pushArg: TOutput) => controller.enqueue(pushArg),
};
optionsArg.writeFunction(chunk, tools)
.then(writeReturnChunk => {
// the write return chunk is optional
// just in case the write function returns something other than void.
if (writeReturnChunk) {
controller.enqueue(writeReturnChunk);
}
})
.catch(err => controller.error(err));
} else {
controller.error(new Error('No write function provided'));
}
},
async flush(controller) {
// Flush logic remains unchanged
if (optionsArg?.finalFunction) {
const tools: IStreamToolsWrite<TInput, TOutput> = {
truncate: () => controller.terminate(),
push: (pipeObject) => controller.enqueue(pipeObject),
};
optionsArg.finalFunction(tools)
.then(finalChunk => {
if (finalChunk) {
controller.enqueue(finalChunk);
}
})
.catch(err => controller.error(err))
.finally(() => controller.terminate());
} else {
controller.terminate();
}
}
});
this.options = optionsArg;
}
// Method to create a custom readable stream that integrates the readFunction
// readFunction is executed whenever the stream is being read from and nothing is enqueued
getCustomReadableStream() {
const readableStream = this.readable;
const options = this.options;
const customReadable = new ReadableStream({
async pull(controller) {
const reader = readableStream.getReader();
// Check the current state of the original stream
const { value, done } = await reader.read();
reader.releaseLock();
if (done) {
// If the original stream is done, close the custom readable stream
controller.close();
} else {
if (value) {
// If there is data in the original stream, enqueue it and do not execute the readFunction
controller.enqueue(value);
} else if (options.readFunction) {
// If the original stream is empty, execute the readFunction and read again
await options.readFunction({
done: () => controller.close(),
write: (writeArg) => controller.enqueue(writeArg),
});
const newReader = readableStream.getReader();
const { value: newValue, done: newDone } = await newReader.read();
newReader.releaseLock();
if (newDone) {
controller.close();
} else {
controller.enqueue(newValue);
}
}
}
}
});
return customReadable;
}
}

2
ts_web/index.ts Normal file
View File

@ -0,0 +1,2 @@
import './plugins.js';
export * from './classes.webduplexstream.js';

15
ts_web/plugins.ts Normal file
View File

@ -0,0 +1,15 @@
// @push.rocks scope
import * as smartenv from '@push.rocks/smartenv';
export {
smartenv,
}
// lets setup dependencies
const smartenvInstance = new smartenv.Smartenv();
await smartenvInstance.getSafeNodeModule<typeof import('stream/web')>('stream/web', async (moduleArg) => {
globalThis.ReadableStream = moduleArg.ReadableStream;
globalThis.WritableStream = moduleArg.WritableStream;
globalThis.TransformStream = moduleArg.TransformStream;
})