Compare commits

...

97 Commits

Author SHA1 Message Date
jkunz a765363371 v3.4.2 2026-04-30 12:04:51 +00:00
jkunz fccd0f86ad fix(streams): tighten stream typings and guard optional runtime paths for duplex and wrapper utilities 2026-04-30 12:04:51 +00:00
jkunz f78469a299 v3.4.1 2026-03-02 07:01:47 +00:00
jkunz 04d2f3223a fix(readme): improve README: clarify entry points, add Web & Node stream examples, finalization and backpressure tips, and comprehensive API reference 2026-03-02 07:01:47 +00:00
jkunz 77046acac7 v3.4.0 2026-03-02 06:55:11 +00:00
jkunz 2acf1972a2 feat(smartduplex): improve backpressure handling and web/node stream interoperability 2026-03-02 06:55:11 +00:00
jkunz 1262c48fe9 v3.3.0 2026-02-28 08:48:58 +00:00
jkunz 9b9b1be62b feat(smartstream): bump dependencies, update build/publish config, refactor tests, and overhaul documentation 2026-02-28 08:48:58 +00:00
philkunz 3d13cb76f6 3.2.5 2024-11-19 09:51:06 +01:00
philkunz 9e3fd28c4a fix(nodewebhelpers): Fix import and use correct module structure for Node.js streams in smartstream.nodewebhelpers.ts 2024-11-19 09:51:05 +01:00
philkunz 673f5c86fb 3.2.4 2024-10-16 02:28:32 +02:00
philkunz a225188e24 fix(SmartDuplex): Fix stream termination when reading from a web readable stream 2024-10-16 02:28:31 +02:00
philkunz 4fc82d0dc6 3.2.3 2024-10-16 02:03:57 +02:00
philkunz 3d58a01b29 fix(smartduplex): Enhance documentation for read function in SmartDuplex 2024-10-16 02:03:56 +02:00
philkunz f7e9636bf6 3.2.2 2024-10-16 02:02:48 +02:00
philkunz f211cc8ddd fix(SmartDuplex): Fix issue with SmartDuplex fromWebReadableStream method 2024-10-16 02:02:48 +02:00
philkunz 60c8824f33 3.2.1 2024-10-16 01:54:39 +02:00
philkunz 40e8e06ff1 fix(core): Fix the order of operations in SmartDuplex _read method to ensure proper waiting for items. 2024-10-16 01:54:39 +02:00
philkunz 30f2facd59 3.2.0 2024-10-16 01:02:47 +02:00
philkunz ddb7d4af03 feat(SmartDuplex): Added method to create SmartDuplex from a WebReadableStream. 2024-10-16 01:02:46 +02:00
philkunz 22d93b4c07 3.1.2 2024-10-14 13:55:15 +02:00
philkunz e138bca39d fix(WebDuplexStream): Fix variable naming inconsistency in WebDuplexStream test 2024-10-14 13:55:14 +02:00
philkunz 6a2ef1b152 3.1.1 2024-10-13 20:20:32 +02:00
philkunz 7b1d2199e9 fix(WebDuplexStream): Improved read/write interface and error handling in WebDuplexStream 2024-10-13 20:20:31 +02:00
philkunz 04c22f73df 3.1.0 2024-10-13 13:49:14 +02:00
philkunz c8dc791c83 feat(core): Add support for creating Web ReadableStream from a file 2024-10-13 13:49:13 +02:00
philkunz 9c30e5bab1 3.0.46 2024-10-13 11:16:46 +02:00
philkunz 5f2c5f9380 fix(WebDuplexStream): Fix errors in WebDuplexStream transformation and test logic 2024-10-13 11:16:46 +02:00
philkunz f9b8bf33b0 3.0.45 2024-10-13 00:02:01 +02:00
philkunz a55b2548d7 fix(ts): Fixed formatting issues in SmartDuplex class 2024-10-13 00:02:01 +02:00
philkunz c8465b82be 3.0.44 2024-06-04 18:58:08 +02:00
philkunz b593e3a32c fix(core): update 2024-06-04 18:58:08 +02:00
philkunz a490f521ab 3.0.43 2024-06-03 15:29:15 +02:00
philkunz 59027782dc fix(core): update 2024-06-03 15:29:14 +02:00
philkunz 8c7dd7970c 3.0.42 2024-06-03 14:59:41 +02:00
philkunz 22d18dc21f fix(core): update 2024-06-03 14:59:40 +02:00
philkunz 1cb6f727af 3.0.41 2024-06-03 10:27:08 +02:00
philkunz 824c44d165 fix(core): update 2024-06-03 10:27:07 +02:00
philkunz 3e062103f8 3.0.40 2024-06-02 23:40:52 +02:00
philkunz 6451e93c12 fix(smartduplex): now has a .getWebStreams method, that exposes a web streams compatible API 2024-06-02 23:40:52 +02:00
philkunz 70cf93595c 3.0.39 2024-06-02 16:42:42 +02:00
philkunz 17e03e9790 fix(core): update 2024-06-02 16:42:42 +02:00
philkunz e52ce7af61 update description 2024-05-29 14:16:38 +02:00
philkunz f548f4b6cb 3.0.38 2024-05-17 19:21:34 +02:00
philkunz 23a7a77a73 fix(core): update 2024-05-17 19:21:33 +02:00
philkunz 13d2fc78b8 3.0.37 2024-05-17 18:40:33 +02:00
philkunz 898cc0407d fix(core): update 2024-05-17 18:40:32 +02:00
philkunz 8a3f43a11a 3.0.36 2024-05-17 18:13:52 +02:00
philkunz da2191bb96 fix(core): update 2024-05-17 18:13:51 +02:00
philkunz f13db1e422 3.0.35 2024-05-05 18:30:05 +02:00
philkunz 42a90e804a fix(core): update 2024-05-05 18:30:05 +02:00
philkunz 413e2af717 update tsconfig 2024-04-14 18:25:32 +02:00
philkunz 267a76af13 update tsconfig 2024-04-01 21:41:26 +02:00
philkunz 7834b7e6d2 update npmextra.json: githost 2024-04-01 19:59:50 +02:00
philkunz ae643708e7 update npmextra.json: githost 2024-03-30 21:48:51 +01:00
philkunz d9d96b8bb7 3.0.34 2024-03-16 18:29:45 +01:00
philkunz a961eea431 fix(core): update 2024-03-16 18:29:44 +01:00
philkunz edb58ade28 3.0.33 2024-02-29 12:15:01 +01:00
philkunz 753a481765 fix(core): update 2024-02-29 12:15:00 +01:00
philkunz bbbd1b73b9 3.0.32 2024-02-25 20:14:33 +01:00
philkunz 271d0be106 fix(core): update 2024-02-25 20:14:33 +01:00
philkunz 0ceeacd5a0 3.0.31 2024-02-25 20:14:20 +01:00
philkunz 287695e445 fix(core): update 2024-02-25 20:14:19 +01:00
philkunz 60f9e541a5 3.0.30 2023-11-14 10:51:23 +01:00
philkunz 96ea67e135 fix(core): update 2023-11-14 10:51:23 +01:00
philkunz ba0a2023ad 3.0.29 2023-11-14 10:43:18 +01:00
philkunz a09c359847 fix(core): update 2023-11-14 10:43:17 +01:00
philkunz e2b4d772b3 3.0.28 2023-11-14 10:29:44 +01:00
philkunz 0f46b62b2d fix(core): update 2023-11-14 10:29:44 +01:00
philkunz 9bf37469c6 3.0.27 2023-11-13 21:38:13 +01:00
philkunz 12bb125bdc fix(core): update 2023-11-13 21:38:12 +01:00
philkunz 703dc11c6c 3.0.26 2023-11-13 20:34:22 +01:00
philkunz 28725d1723 fix(core): update 2023-11-13 20:34:21 +01:00
philkunz c77e0f2ba6 3.0.25 2023-11-13 19:12:24 +01:00
philkunz 196fb6d396 fix(core): update 2023-11-13 19:12:23 +01:00
philkunz df0ddf04b3 3.0.24 2023-11-13 19:06:02 +01:00
philkunz 2e1aa4a8ff fix(core): update 2023-11-13 19:06:02 +01:00
philkunz bc09033af0 3.0.23 2023-11-13 18:41:05 +01:00
philkunz 22df9dfd94 fix(core): update 2023-11-13 18:41:04 +01:00
philkunz d48ef6eb43 3.0.22 2023-11-13 18:19:11 +01:00
philkunz 9421c652a2 fix(core): update 2023-11-13 18:19:11 +01:00
philkunz a6ab15bf1d 3.0.21 2023-11-13 17:52:12 +01:00
philkunz 00d1455367 fix(core): update 2023-11-13 17:52:11 +01:00
philkunz 116a281c6c 3.0.20 2023-11-13 17:43:15 +01:00
philkunz 9bf6f251c4 fix(core): update 2023-11-13 17:43:15 +01:00
philkunz e3427c2498 3.0.19 2023-11-12 22:34:56 +01:00
philkunz a400a0a04c fix(core): update 2023-11-12 22:34:55 +01:00
philkunz 91392e8bd5 3.0.18 2023-11-11 20:56:46 +01:00
philkunz d161d6613a fix(core): update 2023-11-11 20:56:46 +01:00
philkunz 7a14e67f4f 3.0.17 2023-11-11 20:44:01 +01:00
philkunz 465ccfec40 fix(core): update 2023-11-11 20:44:00 +01:00
philkunz 3adb16d1f8 3.0.16 2023-11-11 20:30:43 +01:00
philkunz a9230ca790 fix(core): update 2023-11-11 20:30:42 +01:00
philkunz 788f2665c2 3.0.15 2023-11-11 19:47:21 +01:00
philkunz 7b678cc856 fix(core): update 2023-11-11 19:47:20 +01:00
philkunz 12c9d8cc9d 3.0.14 2023-11-11 18:53:39 +01:00
philkunz 3a2dc1c37e fix(core): update 2023-11-11 18:53:38 +01:00
36 changed files with 9912 additions and 5382 deletions
-140
View File
@@ -1,140 +0,0 @@
# gitzone ci_default
image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
cache:
paths:
- .npmci_cache/
key: '$CI_BUILD_STAGE'
stages:
- security
- test
- release
- metadata
before_script:
- npm install -g @shipzone/npmci
# ====================
# security stage
# ====================
mirror:
stage: security
script:
- npmci git mirror
only:
- tags
tags:
- lossless
- docker
- notpriv
auditProductionDependencies:
image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
stage: security
script:
- npmci npm prepare
- npmci command npm install --production --ignore-scripts
- npmci command npm config set registry https://registry.npmjs.org
- npmci command npm audit --audit-level=high --only=prod --production
tags:
- docker
allow_failure: true
auditDevDependencies:
image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
stage: security
script:
- npmci npm prepare
- npmci command npm install --ignore-scripts
- npmci command npm config set registry https://registry.npmjs.org
- npmci command npm audit --audit-level=high --only=dev
tags:
- docker
allow_failure: true
# ====================
# test stage
# ====================
testStable:
stage: test
script:
- npmci npm prepare
- npmci node install stable
- npmci npm install
- npmci npm test
coverage: /\d+.?\d+?\%\s*coverage/
tags:
- docker
testBuild:
stage: test
script:
- npmci npm prepare
- npmci node install stable
- npmci npm install
- npmci command npm run build
coverage: /\d+.?\d+?\%\s*coverage/
tags:
- docker
release:
stage: release
script:
- npmci node install stable
- npmci npm publish
only:
- tags
tags:
- lossless
- docker
- notpriv
# ====================
# metadata stage
# ====================
codequality:
stage: metadata
allow_failure: true
only:
- tags
script:
- npmci command npm install -g typescript
- npmci npm prepare
- npmci npm install
tags:
- lossless
- docker
- priv
trigger:
stage: metadata
script:
- npmci trigger
only:
- tags
tags:
- lossless
- docker
- notpriv
pages:
stage: metadata
script:
- npmci node install lts
- npmci command npm install -g @git.zone/tsdoc
- npmci npm prepare
- npmci npm install
- npmci command tsdoc
tags:
- lossless
- docker
- notpriv
only:
- tags
artifacts:
expire_in: 1 week
paths:
- public
allow_failure: true
+27
View File
@@ -0,0 +1,27 @@
{
"@git.zone/cli": {
"projectType": "npm",
"module": {
"githost": "code.foss.global",
"gitscope": "push.rocks",
"gitrepo": "smartstream",
"description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
"npmPackagename": "@push.rocks/smartstream",
"license": "MIT",
"projectDomain": "push.rocks"
},
"release": {
"registries": [
"https://verdaccio.lossless.digital",
"https://registry.npmjs.org"
],
"accessLevel": "public"
}
},
"@git.zone/tsdoc": {
"legal": "\n## License and Legal Information\n\nThis repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the [license](license) file within this repository. \n\n**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.\n\n### Trademarks\n\nThis project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.\n\n### Company Information\n\nTask Venture Capital GmbH \nRegistered at District court Bremen HRB 35230 HB, Germany\n\nFor any legal inquiries or if you require further information, please contact us via email at hello@task.vc.\n\nBy using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.\n"
},
"@ship.zone/szci": {
"npmGlobalTools": []
}
}
+131
View File
@@ -0,0 +1,131 @@
# Changelog
## 2026-04-30 - 3.4.2 - fix(streams)
tighten stream typings and guard optional runtime paths for duplex and wrapper utilities
- allow SmartDuplex write and final handlers to return empty values in addition to transformed output
- prevent StreamWrapper from piping or starting when no executable stream chain is produced
- guard optional WebDuplexStream readFunction execution before invoking it
- update tests and TypeScript configuration to satisfy stricter noImplicitAny checks and newer tstest tooling
## 2026-03-02 - 3.4.1 - fix(readme)
improve README: clarify entry points, add Web & Node stream examples, finalization and backpressure tips, and comprehensive API reference
- Clarified package entry points and import paths so consumers can choose Node or Web builds.
- Expanded examples: SmartDuplex finalFunction pushing multiple chunks, converting SmartDuplex to Web streams (reader/writer example), WebDuplexStream finalFunction, and a Node↔Web round-trip conversion example.
- Added guidance and examples for concurrency and backpressure (TransformStream read/write concurrency tip, reading concurrently with writes, and backpressure notes for Node↔Web converters).
- Documented StreamWrapper.streamStarted() and onCustomEvent() usage with examples showing awaiting stream startup.
- Added a new API reference section documenting SmartDuplex, WebDuplexStream, StreamWrapper, StreamIntake, nodewebhelpers, and utility functions.
- Various README wording, formatting, and example clarifications (tips, headings, and minor cosmetic fixes).
## 2026-03-02 - 3.4.0 - feat(smartduplex)
improve backpressure handling and web/node stream interoperability
- Refactored SmartDuplex to use synchronous _read/_write/_final (avoids async pitfalls), added internal backpressured buffer draining and consumer signaling
- Implemented pull-based backpressure for Node <-> Web stream conversions (nodewebhelpers and convertNodeReadableToWebReadable/convertWebReadableToNodeReadable)
- StreamIntake.fromStream now reads from 'readable' and drains properly; StreamWrapper resolves safely on end/close/finish
- Improved getWebStreams / WebDuplexStream behavior (safer enqueue/close/abort handling, final/readFunction improvements)
- Added many new unit tests covering backpressure, web/node helpers, StreamIntake, StreamWrapper, WebDuplexStream; bumped @push.rocks/lik and @types/node versions
## 2026-02-28 - 3.3.0 - feat(smartstream)
bump dependencies, update build/publish config, refactor tests, and overhaul documentation
- Upgrade devDependencies (e.g. @git.zone/tsbuild -> ^4.1.2, @git.zone/tsrun -> ^2.0.1, @git.zone/tstest -> ^3.1.8, @types/node -> ^25.3.2) and runtime deps (e.g. @push.rocks/lik -> ^6.2.2, @push.rocks/smartenv -> ^6.0.0, @push.rocks/smartpromise -> ^4.2.3, @push.rocks/smartrx -> ^3.0.10).
- Refactor tests to use Node's native fs streams instead of @push.rocks/smartfile.fsStream and export default tap.start() to support ESM test runner patterns.
- Adjust build/publish: remove --web flag from build script, add pnpm override for agentkeepalive, add tspublish.json files for publish order, and add release registries/access in npmextra.json (verdaccio + npm).
- Rework project metadata in npmextra.json (namespaced @git.zone keys, tsdoc entry changes) and minor TypeScript/web fix: cast stream/web constructors to any in ts_web/plugins.ts.
- Large README rewrite: improved installation (pnpm), clearer Node vs Web entrypoints, expanded examples, and updated legal/license wording.
## 2024-11-19 - 3.2.5 - fix(nodewebhelpers)
Fix import and use correct module structure for Node.js streams in smartstream.nodewebhelpers.ts
- Corrected the import statement for the fs module.
- Updated the use of the fs.createReadStream method.
## 2024-10-16 - 3.2.4 - fix(SmartDuplex)
Fix stream termination when reading from a web readable stream
- Resolved an issue in SmartDuplex where the stream did not properly terminate after reaching the end of a web readable stream.
## 2024-10-16 - 3.2.3 - fix(smartduplex)
Enhance documentation for read function in SmartDuplex
- Added inline comments to clarify the behavior and importance of unlocking the reader in the readFunction of SmartDuplex.fromWebReadableStream.
## 2024-10-16 - 3.2.2 - fix(SmartDuplex)
Fix issue with SmartDuplex fromWebReadableStream method
- Resolved a potential unhandled promise rejection in fromWebReadableStream method
- Ensured proper release of stream reader lock in case of read completion
## 2024-10-16 - 3.2.1 - fix(core)
Fix the order of operations in SmartDuplex _read method to ensure proper waiting for items.
- Adjusted the order of reading function execution and waiting for items in the SmartDuplex _read method.
- Fixed potential issues with stream data processing timing.
## 2024-10-16 - 3.2.0 - feat(SmartDuplex)
Added method to create SmartDuplex from a WebReadableStream.
- Implemented a static method in SmartDuplex to allow creating an instance from a WebReadableStream.
- This addition enhances the capability of SmartDuplex to integrate with web streams, facilitating seamless stream manipulation across environments.
## 2024-10-14 - 3.1.2 - fix(WebDuplexStream)
Fix variable naming inconsistency in WebDuplexStream test
- Changed variable names from 'transformStream' to 'webDuplexStream' for consistency.
- Renamed 'writableStream' and 'readableStream' to 'writer' and 'reader' respectively.
## 2024-10-13 - 3.1.1 - fix(WebDuplexStream)
Improved read/write interface and error handling in WebDuplexStream
- Enhanced the IStreamToolsRead and IStreamToolsWrite interfaces for better Promise handling
- Refined readFunction and writeFunction handling to accommodate asynchronous data processing and error propagation
- Added internal _startReading method to facilitate initial data handling if readFunction is present
- Maintained backward compatibility while ensuring data continuity when no writeFunction is specified
## 2024-10-13 - 3.1.0 - feat(core)
Add support for creating Web ReadableStream from a file
- Introduced a new helper function `createWebReadableStreamFromFile` that allows for creating a Web ReadableStream from a file path.
- Updated exports in `ts/index.ts` to include `nodewebhelpers` which provides the new web stream feature.
## 2024-10-13 - 3.0.46 - fix(WebDuplexStream)
Fix errors in WebDuplexStream transformation and test logic
- Corrected async handling in WebDuplexStream write function
- Fixed `WebDuplexStream` tests to properly handle asynchronous reading and writing
## 2024-10-13 - 3.0.45 - fix(ts)
Fixed formatting issues in SmartDuplex class
- Resolved inconsistent spacing in SmartDuplex class methods and constructor.
- Ensured consistent formatting in the getWebStreams method.
## 2024-06-02 - 3.0.39 - smartduplex
Add .getWebStreams method
- Introduced a new `.getWebStreams` method in the smartduplex module, providing compatibility with the web streams API.
## 2024-03-16 - 3.0.34 - configuration
Update project configuration files
- Updated `tsconfig` for optimization.
- Modified `npmextra.json` to set the `githost` attribute.
## 2023-11-03 - 3.0.0 to 3.0.8 - core
Transition to major version 3.x
- Implemented breaking changes in the core system for better performance and feature set.
- Continuous core updates to improve stability and performance across minor version increments.
## 2023-11-02 - 2.0.4 to 2.0.8 - core
Core updates and a major fix
- Implemented core updates addressing minor bugs and enhancements.
- A significant breaking change update transitioning from 2.0.x to 3.0.0.
## 2022-03-31 - 2.0.0 - core
Major esm transition
- Implemented a breaking change by switching the core to ESM (ECMAScript Module) format for modernized module handling.
+21
View File
@@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2016 Push.Rocks
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
+36 -8
View File
@@ -1,17 +1,45 @@
{
"npmci": {
"npmGlobalTools": [],
"npmAccessLevel": "public"
},
"gitzone": {
"@git.zone/cli": {
"projectType": "npm",
"module": {
"githost": "gitlab.com",
"githost": "code.foss.global",
"gitscope": "push.rocks",
"gitrepo": "smartstream",
"description": "simplifies access to node streams",
"description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
"npmPackagename": "@push.rocks/smartstream",
"license": "MIT"
"license": "MIT",
"keywords": [
"stream",
"node.js",
"typescript",
"stream manipulation",
"data processing",
"pipeline",
"async transformation",
"event handling",
"backpressure",
"readable stream",
"writable stream",
"duplex stream",
"transform stream",
"file streaming",
"buffer",
"stream utilities",
"esm"
]
},
"release": {
"registries": [
"https://verdaccio.lossless.digital",
"https://registry.npmjs.org"
],
"accessLevel": "public"
}
},
"@git.zone/tsdoc": {
"legal": "\n## License and Legal Information\n\nThis repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the [license](license) file within this repository. \n\n**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.\n\n### Trademarks\n\nThis project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.\n\n### Company Information\n\nTask Venture Capital GmbH \nRegistered at District court Bremen HRB 35230 HB, Germany\n\nFor any legal inquiries or if you require further information, please contact us via email at hello@task.vc.\n\nBy using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.\n"
},
"@ship.zone/szci": {
"npmGlobalTools": []
}
}
+46 -18
View File
@@ -1,37 +1,38 @@
{
"name": "@push.rocks/smartstream",
"version": "3.0.13",
"version": "3.4.2",
"private": false,
"description": "simplifies access to node streams",
"main": "dist_ts/index.js",
"typings": "dist_ts/index.d.ts",
"description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
"type": "module",
"exports": {
".": "./dist_ts/index.js",
"./web": "./dist_ts_web/index.js"
},
"scripts": {
"test": "(tstest test/)",
"build": "(tsbuild)",
"buildDocs": "tsdoc"
"test": "(tstest test/ --verbose --logfile --timeout 60)",
"build": "(tsbuild tsfolders)"
},
"repository": {
"type": "git",
"url": "git+https://gitlab.com/push.rocks/smartstream.git"
"url": "https://code.foss.global/push.rocks/smartstream.git"
},
"author": "Lossless GmbH",
"license": "MIT",
"bugs": {
"url": "https://gitlab.com/push.rocks/smartstream/issues"
},
"homepage": "https://gitlab.com/push.rocks/smartstream#readme",
"homepage": "https://code.foss.global/push.rocks/smartstream",
"devDependencies": {
"@git.zone/tsbuild": "^2.1.66",
"@git.zone/tsrun": "^1.2.44",
"@git.zone/tstest": "^1.0.77",
"@push.rocks/smartfile": "^10.0.37",
"@push.rocks/tapbundle": "^5.0.15",
"@types/node": "^20.8.10"
"@git.zone/tsbuild": "^4.4.0",
"@git.zone/tsrun": "^2.0.2",
"@git.zone/tstest": "^3.6.3",
"@types/node": "^25.6.0"
},
"dependencies": {
"@push.rocks/smartpromise": "^4.0.3",
"@push.rocks/smartrx": "^3.0.7"
"@push.rocks/lik": "^6.4.1",
"@push.rocks/smartenv": "^6.0.0",
"@push.rocks/smartpromise": "^4.2.3",
"@push.rocks/smartrx": "^3.0.10"
},
"browserslist": [
"last 1 chrome versions"
@@ -45,7 +46,34 @@
"dist_ts_web/**/*",
"assets/**/*",
"cli.js",
".smartconfig.json",
"license",
"npmextra.json",
"readme.md"
]
],
"pnpm": {
"overrides": {
"agentkeepalive": "^4.6.0"
}
},
"keywords": [
"stream",
"node.js",
"typescript",
"stream manipulation",
"data processing",
"pipeline",
"async transformation",
"event handling",
"backpressure",
"readable stream",
"writable stream",
"duplex stream",
"transform stream",
"file streaming",
"buffer",
"stream utilities",
"esm"
],
"packageManager": "pnpm@10.28.2"
}
+7238 -4808
View File
File diff suppressed because it is too large Load Diff
+1
View File
@@ -0,0 +1 @@
- make sure to respect backpressure handling.
+587 -39
View File
@@ -1,54 +1,602 @@
# @pushrocks/smartstream
simplifies access to node streams
# @push.rocks/smartstream
## Availabililty and Links
* [npmjs.org (npm package)](https://www.npmjs.com/package/@pushrocks/smartstream)
* [gitlab.com (source)](https://gitlab.com/pushrocks/smartstream)
* [github.com (source mirror)](https://github.com/pushrocks/smartstream)
* [docs (typedoc)](https://pushrocks.gitlab.io/smartstream/)
A TypeScript-first library for creating and manipulating Node.js and Web streams with built-in backpressure handling, async transformations, and seamless Node.js ↔ Web stream interoperability.
## Status for master
## Issue Reporting and Security
Status Category | Status Badge
-- | --
GitLab Pipelines | [![pipeline status](https://gitlab.com/pushrocks/smartstream/badges/master/pipeline.svg)](https://lossless.cloud)
GitLab Pipline Test Coverage | [![coverage report](https://gitlab.com/pushrocks/smartstream/badges/master/coverage.svg)](https://lossless.cloud)
npm | [![npm downloads per month](https://badgen.net/npm/dy/@pushrocks/smartstream)](https://lossless.cloud)
Snyk | [![Known Vulnerabilities](https://badgen.net/snyk/pushrocks/smartstream)](https://lossless.cloud)
TypeScript Support | [![TypeScript](https://badgen.net/badge/TypeScript/>=%203.x/blue?icon=typescript)](https://lossless.cloud)
node Support | [![node](https://img.shields.io/badge/node->=%2010.x.x-blue.svg)](https://nodejs.org/dist/latest-v10.x/docs/api/)
Code Style | [![Code Style](https://badgen.net/badge/style/prettier/purple)](https://lossless.cloud)
PackagePhobia (total standalone install weight) | [![PackagePhobia](https://badgen.net/packagephobia/install/@pushrocks/smartstream)](https://lossless.cloud)
PackagePhobia (package size on registry) | [![PackagePhobia](https://badgen.net/packagephobia/publish/@pushrocks/smartstream)](https://lossless.cloud)
BundlePhobia (total size when bundled) | [![BundlePhobia](https://badgen.net/bundlephobia/minzip/@pushrocks/smartstream)](https://lossless.cloud)
Platform support | [![Supports Windows 10](https://badgen.net/badge/supports%20Windows%2010/yes/green?icon=windows)](https://lossless.cloud) [![Supports Mac OS X](https://badgen.net/badge/supports%20Mac%20OS%20X/yes/green?icon=apple)](https://lossless.cloud)
For reporting bugs, issues, or security vulnerabilities, please visit [community.foss.global/](https://community.foss.global/). This is the central community hub for all issue reporting. Developers who sign and comply with our contribution agreement and go through identification can also get a [code.foss.global/](https://code.foss.global/) account to submit Pull Requests directly.
## Install
```bash
pnpm install @push.rocks/smartstream
```
The package ships with **two entry points** so you can pick exactly what you need:
| Entry Point | Import Path | Environment |
|---|---|---|
| **Node.js** (default) | `@push.rocks/smartstream` | Node.js — full stream utilities, duplex, intake, wrappers, and Node↔Web helpers |
| **Web** | `@push.rocks/smartstream/web` | Browser & Node.js — pure Web Streams API (`WebDuplexStream`) |
## Usage
Use TypeScript for best in class instellisense.
All examples use ESM / TypeScript syntax.
### 📦 Importing
```typescript
import { Smartstream } from 'smartstream'
import * as gUglify from 'gulp-uglify'
// Node.js — full API
import {
SmartDuplex,
StreamWrapper,
StreamIntake,
createTransformFunction,
createPassThrough,
nodewebhelpers,
} from '@push.rocks/smartstream';
let mySmartstream = new Smartstream([
gulp.src(['./file1.js','./file2.js']),
gUglify(),
gulp.dest('./some/output/path')
])
mySmartstream.onError((err) => { /* handle error */ }) // handles all errors in stream
myStream.onCustomEvent('myeventname', (args...) => { /* Do something */ }) // emit an custom event anywhere in your stream
mySmartstream.run().then(() => {/* do something when stream is finished */})
// Web — browser-safe, zero Node.js dependencies
import { WebDuplexStream } from '@push.rocks/smartstream/web';
```
## Contribution
---
We are always happy for code contributions. If you are not the code contributing type that is ok. Still, maintaining Open Source repositories takes considerable time and thought. If you like the quality of what we do and our modules are useful to you we would appreciate a little monthly contribution: You can [contribute one time](https://lossless.link/contribute-onetime) or [contribute monthly](https://lossless.link/contribute). :)
### 🔄 SmartDuplex — The Core Stream Primitive
For further information read the linked docs at the top of this readme.
`SmartDuplex` extends Node.js `Duplex` with first-class async support, built-in backpressure management, and a clean functional API. Instead of overriding `_transform` or `_write` manually, you pass a `writeFunction` that receives each chunk along with a `tools` object.
> MIT licensed | **&copy;** [Lossless GmbH](https://lossless.gmbh)
| By using this npm module you agree to our [privacy policy](https://lossless.gmbH/privacy)
#### Basic Transform
[![repo-footer](https://lossless.gitlab.io/publicrelations/repofooter.svg)](https://maintainedby.lossless.com)
```typescript
import { SmartDuplex } from '@push.rocks/smartstream';
const upperCaser = new SmartDuplex<Buffer, Buffer>({
writeFunction: async (chunk, tools) => {
// Return a value to push it downstream
return Buffer.from(chunk.toString().toUpperCase());
},
});
readableStream.pipe(upperCaser).pipe(writableStream);
```
#### Using `tools.push()` for Multiple Outputs
The `writeFunction` can emit multiple chunks per input via `tools.push()`:
```typescript
const splitter = new SmartDuplex<string, string>({
objectMode: true,
writeFunction: async (chunk, tools) => {
const words = chunk.split(' ');
for (const word of words) {
await tools.push(word);
}
// Returning nothing — output was already pushed
},
});
```
#### Final Function
Run cleanup or emit final data when the writable side ends:
```typescript
let runningTotal = 0;
const aggregator = new SmartDuplex<number, number>({
objectMode: true,
writeFunction: async (chunk, tools) => {
runningTotal += chunk;
// Don't emit anything per-chunk
},
finalFunction: async (tools) => {
return runningTotal; // Emitted as the last chunk
},
});
```
The `finalFunction` can also push multiple chunks via `tools.push()`, just like `writeFunction`.
#### Truncating a Stream Early
Call `tools.truncate()` inside `writeFunction` to signal that no more data should be read:
```typescript
const limiter = new SmartDuplex<string, string>({
objectMode: true,
writeFunction: async (chunk, tools) => {
if (chunk === 'STOP') {
tools.truncate();
return;
}
return chunk;
},
});
```
#### Creating from a Buffer
```typescript
const stream = SmartDuplex.fromBuffer(Buffer.from('hello world'));
stream.on('data', (chunk) => console.log(chunk.toString())); // "hello world"
```
#### Creating from a Web ReadableStream
Bridge the Web Streams API into a Node.js Duplex:
```typescript
const response = await fetch('https://example.com/data');
const nodeDuplex = SmartDuplex.fromWebReadableStream(response.body);
nodeDuplex.pipe(processTransform).pipe(outputStream);
```
#### Getting Web Streams from SmartDuplex
Convert a `SmartDuplex` into a Web `ReadableStream` + `WritableStream` pair:
```typescript
const duplex = new SmartDuplex({
objectMode: true,
writeFunction: async (chunk, tools) => {
return transform(chunk);
},
});
const { readable, writable } = await duplex.getWebStreams();
const writer = writable.getWriter();
const reader = readable.getReader();
// Read and write concurrently to avoid TransformStream backpressure
const readAll = async () => {
const results = [];
while (true) {
const { value, done } = await reader.read();
if (done) break;
results.push(value);
}
return results;
};
const readPromise = readAll();
await writer.write('hello');
await writer.close();
const results = await readPromise;
```
#### Debug Mode
Pass `debug: true` and `name` to get detailed internal logs:
```typescript
const stream = new SmartDuplex({
name: 'MyStream',
debug: true,
writeFunction: async (chunk, tools) => chunk,
});
```
---
### 🧩 StreamWrapper — Pipeline Composition
`StreamWrapper` takes an array of streams, pipes them together, attaches error listeners on all of them, and returns a `Promise` that resolves when the pipeline finishes:
```typescript
import { StreamWrapper } from '@push.rocks/smartstream';
import fs from 'fs';
const pipeline = new StreamWrapper([
fs.createReadStream('./input.txt'),
new SmartDuplex({
writeFunction: async (chunk) => Buffer.from(chunk.toString().toUpperCase()),
}),
fs.createWriteStream('./output.txt'),
]);
await pipeline.run();
console.log('Pipeline complete!');
```
Error handling is automatic — if any stream in the array errors, the returned promise rejects:
```typescript
pipeline.run()
.then(() => console.log('Done'))
.catch((err) => console.error('Pipeline failed:', err));
```
You can listen for custom events across all streams in the pipeline:
```typescript
pipeline.onCustomEvent('progress', () => {
console.log('Progress event fired');
});
```
You can also await `streamStarted()` to know when the pipeline has been wired up:
```typescript
const runPromise = pipeline.run();
await pipeline.streamStarted(); // Resolves once pipes are connected
await runPromise;
```
---
### 📥 StreamIntake — Dynamic Data Injection
`StreamIntake` is a `Readable` stream that lets you programmatically push data into a pipeline. It operates in object mode by default and provides a reactive observable (`pushNextObservable`) for demand-driven data production.
```typescript
import { StreamIntake, SmartDuplex } from '@push.rocks/smartstream';
const intake = new StreamIntake<string>();
// Pipe through a transform
intake
.pipe(new SmartDuplex({
objectMode: true,
writeFunction: async (chunk) => {
console.log('Processing:', chunk);
return chunk;
},
}))
.on('data', (data) => console.log('Output:', data));
// Push data whenever it's ready
intake.pushData('Hello');
intake.pushData('World');
intake.signalEnd(); // Signal end-of-stream
```
#### Demand-Driven Production with Observable
`pushNextObservable` emits whenever the stream is ready for more data — perfect for throttled or event-driven producers:
```typescript
const intake = new StreamIntake<number>();
let counter = 0;
intake.pushNextObservable.subscribe(() => {
if (counter < 100) {
intake.pushData(counter++);
} else {
intake.signalEnd();
}
});
intake.pipe(consumer);
```
#### Creating from Existing Streams
Wrap a Node.js `Readable` or a Web `ReadableStream`:
```typescript
// From Node.js Readable
const intake = await StreamIntake.fromStream<Buffer>(fs.createReadStream('./data.bin'));
// From Web ReadableStream
const response = await fetch('https://example.com/stream');
const intake = await StreamIntake.fromStream<Uint8Array>(response.body);
```
---
### ⚡ Utility Functions
#### `createTransformFunction`
Quickly create a `SmartDuplex` from a simple async mapping function:
```typescript
import { createTransformFunction } from '@push.rocks/smartstream';
const doubler = createTransformFunction<number, number>(
async (n) => n * 2,
{ objectMode: true }
);
intakeStream.pipe(doubler).pipe(outputStream);
```
#### `createPassThrough`
Create an object-mode passthrough stream (useful as an intermediary or tee point):
```typescript
import { createPassThrough } from '@push.rocks/smartstream';
const passThrough = createPassThrough();
source.pipe(passThrough).pipe(destination);
```
---
### 🌐 WebDuplexStream — Pure Web Streams API
`WebDuplexStream` extends `TransformStream` and works in both browsers and Node.js. Import it from the `/web` subpath for zero Node.js dependencies.
```typescript
import { WebDuplexStream } from '@push.rocks/smartstream/web';
const stream = new WebDuplexStream<number, number>({
writeFunction: async (chunk, { push }) => {
push(chunk * 2); // Push transformed data
},
});
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();
// Always read concurrently with writes — TransformStream applies backpressure
const readPromise = (async () => {
const results = [];
while (true) {
const { value, done } = await reader.read();
if (done) break;
results.push(value);
}
return results;
})();
await writer.write(5);
await writer.write(10);
await writer.close();
const results = await readPromise; // [10, 20]
```
> 💡 **Tip:** Because `TransformStream` enforces backpressure between its writable and readable sides, you must start reading *before* or *concurrently with* writes. If you await all writes first and then read, the writes will block waiting for the readable side to drain.
#### From a Uint8Array
```typescript
const stream = WebDuplexStream.fromUInt8Array(new Uint8Array([1, 2, 3]));
const reader = stream.readable.getReader();
const { value } = await reader.read(); // Uint8Array [1, 2, 3]
```
#### Data Production with `readFunction`
Supply data into the stream from any async source:
```typescript
const stream = new WebDuplexStream<string, string>({
readFunction: async (tools) => {
await tools.write('chunk 1');
await tools.write('chunk 2');
tools.done(); // Signal end
},
writeFunction: async (chunk, { push }) => {
push(chunk.toUpperCase());
},
});
const reader = stream.readable.getReader();
// reads "CHUNK 1", "CHUNK 2"
```
#### Final Function
Emit trailing data when the writable side is closed:
```typescript
const stream = new WebDuplexStream<string, string>({
writeFunction: async (chunk) => chunk,
finalFunction: async (tools) => {
tools.push('footer');
},
});
```
---
### 🔀 Node ↔ Web Stream Converters
The `nodewebhelpers` namespace provides bidirectional converters between Node.js and Web Streams with proper backpressure handling:
```typescript
import { nodewebhelpers } from '@push.rocks/smartstream';
```
| Function | From | To |
|---|---|---|
| `createWebReadableStreamFromFile(path)` | File path | Web `ReadableStream<Uint8Array>` |
| `convertWebReadableToNodeReadable(webStream)` | Web `ReadableStream` | Node.js `Readable` |
| `convertNodeReadableToWebReadable(nodeStream)` | Node.js `Readable` | Web `ReadableStream` |
| `convertWebWritableToNodeWritable(webWritable)` | Web `WritableStream` | Node.js `Writable` |
| `convertNodeWritableToWebWritable(nodeWritable)` | Node.js `Writable` | Web `WritableStream` |
#### Example: Serve a File as a Web ReadableStream
```typescript
const webStream = nodewebhelpers.createWebReadableStreamFromFile('./video.mp4');
// Use with fetch Response, service workers, etc.
return new Response(webStream, {
headers: { 'Content-Type': 'video/mp4' },
});
```
#### Example: Convert Between Stream Types
```typescript
import fs from 'fs';
import { nodewebhelpers } from '@push.rocks/smartstream';
// Node → Web
const nodeReadable = fs.createReadStream('./data.bin');
const webReadable = nodewebhelpers.convertNodeReadableToWebReadable(nodeReadable);
// Web → Node
const nodeReadable2 = nodewebhelpers.convertWebReadableToNodeReadable(webReadable);
nodeReadable2.pipe(fs.createWriteStream('./copy.bin'));
```
#### Example: Round-Trip Conversion
```typescript
// Node → Web → Node (lossless round-trip)
const original = fs.createReadStream('./photo.jpg');
const webStream = nodewebhelpers.convertNodeReadableToWebReadable(original);
const backToNode = nodewebhelpers.convertWebReadableToNodeReadable(webStream);
backToNode.pipe(fs.createWriteStream('./photo-copy.jpg'));
```
---
### 🏗️ Backpressure Handling
`SmartDuplex` uses a `BackpressuredArray` internally, bounded by `highWaterMark` (default: 1). When the downstream consumer is slow, the stream automatically pauses the upstream producer until space is available — no manual bookkeeping required.
```typescript
const slow = new SmartDuplex({
name: 'SlowConsumer',
objectMode: true,
highWaterMark: 1,
writeFunction: async (chunk, tools) => {
await new Promise((resolve) => setTimeout(resolve, 200));
return chunk;
},
});
const fast = new SmartDuplex({
name: 'FastProducer',
objectMode: true,
writeFunction: async (chunk, tools) => {
return chunk; // Instant processing
},
});
// Backpressure is handled automatically between fast → slow
fast.pipe(slow).on('data', (d) => console.log(d));
for (let i = 0; i < 100; i++) {
fast.write(`chunk-${i}`);
}
fast.end();
```
---
### 🎯 Real-World Example: Log Processing Pipeline
```typescript
import fs from 'fs';
import { SmartDuplex, StreamWrapper } from '@push.rocks/smartstream';
// Read → Parse → Filter → Write
const pipeline = new StreamWrapper([
fs.createReadStream('./access.log'),
new SmartDuplex({
writeFunction: async (chunk) => {
return chunk.toString().split('\n');
},
}),
new SmartDuplex({
objectMode: true,
writeFunction: async (lines: string[], tools) => {
for (const line of lines) {
if (line.includes('ERROR')) {
await tools.push(line + '\n');
}
}
},
}),
fs.createWriteStream('./errors.log'),
]);
await pipeline.run();
console.log('Error extraction complete');
```
---
### 📋 API Reference
#### SmartDuplex
| Member | Type | Description |
|---|---|---|
| `new SmartDuplex(options?)` | Constructor | Create a new duplex stream |
| `options.writeFunction` | `(chunk, tools) => Promise<T>` | Transform each chunk; return to push, or use `tools.push()` |
| `options.finalFunction` | `(tools) => Promise<T>` | Emit final data when writable ends |
| `options.readFunction` | `() => Promise<void>` | Supply data to the readable side |
| `options.debug` | `boolean` | Enable internal logging |
| `options.name` | `string` | Stream name for debug logs |
| `SmartDuplex.fromBuffer(buf)` | Static | Create a readable stream from a Buffer |
| `SmartDuplex.fromWebReadableStream(rs)` | Static | Bridge a Web ReadableStream to Node.js Duplex |
| `duplex.getWebStreams()` | Method | Get `{ readable, writable }` Web Streams pair |
#### WebDuplexStream
| Member | Type | Description |
|---|---|---|
| `new WebDuplexStream(options)` | Constructor | Create a new web transform stream |
| `options.writeFunction` | `(chunk, tools) => Promise<T>` | Transform each chunk; use `tools.push()` or return |
| `options.finalFunction` | `(tools) => Promise<T>` | Emit data on flush |
| `options.readFunction` | `(tools) => Promise<void>` | Supply data via `tools.write()`, signal `tools.done()` |
| `WebDuplexStream.fromUInt8Array(arr)` | Static | Create a stream from a Uint8Array |
#### StreamWrapper
| Member | Type | Description |
|---|---|---|
| `new StreamWrapper(streams[])` | Constructor | Create a pipeline from an array of streams |
| `wrapper.run()` | Method | Pipe all streams and return a Promise |
| `wrapper.streamStarted()` | Method | Promise that resolves when pipes are connected |
| `wrapper.onCustomEvent(name, fn)` | Method | Listen for custom events across all streams |
#### StreamIntake
| Member | Type | Description |
|---|---|---|
| `new StreamIntake<T>()` | Constructor | Create a new intake stream (object mode) |
| `intake.pushData(data)` | Method | Push data into the stream |
| `intake.signalEnd()` | Method | Signal end of stream |
| `intake.pushNextObservable` | Property | Observable that emits when the stream wants more data |
| `StreamIntake.fromStream(stream)` | Static | Wrap a Node.js Readable or Web ReadableStream |
#### nodewebhelpers
| Function | Description |
|---|---|
| `createWebReadableStreamFromFile(path)` | File → Web ReadableStream (pull-based backpressure) |
| `convertWebReadableToNodeReadable(rs)` | Web ReadableStream → Node.js Readable |
| `convertNodeReadableToWebReadable(ns)` | Node.js Readable → Web ReadableStream (pull-based backpressure) |
| `convertWebWritableToNodeWritable(ws)` | Web WritableStream → Node.js Writable |
| `convertNodeWritableToWebWritable(nw)` | Node.js Writable → Web WritableStream |
#### Utility Functions
| Function | Description |
|---|---|
| `createTransformFunction(fn, opts?)` | Create a SmartDuplex from an async mapping function |
| `createPassThrough()` | Create an object-mode passthrough stream |
## License and Legal Information
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
### Trademarks
This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH or third parties, and are not included within the scope of the MIT license granted herein.
Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines or the guidelines of the respective third-party owners, and any usage must be approved in writing. Third-party trademarks used herein are the property of their respective owners and used only in a descriptive manner, e.g. for an implementation of an API or similar.
### Company Information
Task Venture Capital GmbH
Registered at District Court Bremen HRB 35230 HB, Germany
For any legal inquiries or further information, please contact us via email at hello@task.vc.
By using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.
+10 -50
View File
@@ -1,50 +1,10 @@
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
data
data
data
data
data
data
data
data
data
data
+56
View File
@@ -0,0 +1,56 @@
import { tap, expect } from '@git.zone/tstest/tapbundle';
import { SmartDuplex } from '../ts/index.js';
tap.test('Backpressure: should apply backpressure across piped streams', async (toolsArg) => {
const done = toolsArg.defer();
const stream1 = new SmartDuplex({
name: 'stream1',
objectMode: true,
writeFunction: async (chunk, tools) => {
await new Promise((resolve) => setTimeout(resolve, 10));
return chunk;
},
});
const stream2 = new SmartDuplex({
name: 'stream2',
objectMode: true,
writeFunction: async (chunk, tools) => {
await new Promise((resolve) => setTimeout(resolve, 20));
await tools.push(chunk);
},
});
const stream3 = new SmartDuplex({
objectMode: true,
name: 'stream3',
writeFunction: async (chunk, tools) => {
await new Promise((resolve) => setTimeout(resolve, 200));
},
});
stream1.pipe(stream2).pipe(stream3);
let backpressured = false;
for (let i = 0; i < 200; i++) {
const canContinue = stream1.write(`Chunk ${i}`, 'utf8');
if (!canContinue) {
backpressured = true;
}
}
stream1.end();
stream3.on('finish', () => {
if (!backpressured) {
throw new Error('No backpressure was observed.');
} else {
done.resolve();
}
});
await done.promise;
});
export default tap.start();
+152
View File
@@ -0,0 +1,152 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as fs from 'fs';
import * as stream from 'stream';
import { nodewebhelpers } from '../ts/index.js';
// =============================================
// createWebReadableStreamFromFile
// =============================================
tap.test('nodewebhelpers: createWebReadableStreamFromFile should read a file', async () => {
const webStream = nodewebhelpers.createWebReadableStreamFromFile('./test/assets/readabletext.txt');
const reader = webStream.getReader();
const chunks: Uint8Array[] = [];
while (true) {
const { value, done } = await reader.read();
if (done) break;
chunks.push(value);
}
expect(chunks.length).toBeGreaterThan(0);
const content = Buffer.concat(chunks).toString();
expect(content.length).toBeGreaterThan(0);
});
// =============================================
// convertNodeReadableToWebReadable
// =============================================
tap.test('nodewebhelpers: convertNodeReadableToWebReadable should convert', async () => {
const nodeReadable = fs.createReadStream('./test/assets/readabletext.txt');
const webReadable = nodewebhelpers.convertNodeReadableToWebReadable(nodeReadable);
const reader = webReadable.getReader();
const chunks: Uint8Array[] = [];
while (true) {
const { value, done } = await reader.read();
if (done) break;
chunks.push(value);
}
expect(chunks.length).toBeGreaterThan(0);
const content = Buffer.concat(chunks).toString();
expect(content.length).toBeGreaterThan(0);
});
// =============================================
// convertWebReadableToNodeReadable
// =============================================
tap.test('nodewebhelpers: convertWebReadableToNodeReadable should convert', async (tools) => {
const data = new Uint8Array([72, 101, 108, 108, 111]); // "Hello"
const webReadable = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(data);
controller.close();
},
});
const nodeReadable = nodewebhelpers.convertWebReadableToNodeReadable(webReadable);
const chunks: Buffer[] = [];
const done = tools.defer();
nodeReadable.on('data', (chunk: Buffer) => {
chunks.push(chunk);
});
nodeReadable.on('end', () => {
const result = Buffer.concat(chunks).toString();
expect(result).toEqual('Hello');
done.resolve();
});
await done.promise;
});
// =============================================
// convertNodeWritableToWebWritable
// =============================================
tap.test('nodewebhelpers: convertNodeWritableToWebWritable should convert', async () => {
const chunks: Buffer[] = [];
const nodeWritable = new stream.Writable({
write(chunk, encoding, callback) {
chunks.push(chunk);
callback();
},
});
const webWritable = nodewebhelpers.convertNodeWritableToWebWritable(nodeWritable);
const writer = webWritable.getWriter();
await writer.write(new Uint8Array([65, 66, 67])); // "ABC"
await writer.close();
const result = Buffer.concat(chunks).toString();
expect(result).toEqual('ABC');
});
// =============================================
// convertWebWritableToNodeWritable
// =============================================
tap.test('nodewebhelpers: convertWebWritableToNodeWritable should convert', async (tools) => {
const chunks: Uint8Array[] = [];
const webWritable = new WritableStream<Uint8Array>({
write(chunk) {
chunks.push(chunk);
},
});
const nodeWritable = nodewebhelpers.convertWebWritableToNodeWritable(webWritable);
const done = tools.defer();
nodeWritable.write(Buffer.from('Hello'), (err) => {
expect(err).toBeFalsy();
nodeWritable.end(() => {
expect(chunks.length).toBeGreaterThan(0);
done.resolve();
});
});
await done.promise;
});
// =============================================
// Round-trip: Node → Web → Node
// =============================================
tap.test('nodewebhelpers: round-trip Node → Web → Node readable', async (tools) => {
const nodeReadable = fs.createReadStream('./test/assets/readabletext.txt');
const webReadable = nodewebhelpers.convertNodeReadableToWebReadable(nodeReadable);
const nodeReadable2 = nodewebhelpers.convertWebReadableToNodeReadable(webReadable);
const chunks: Buffer[] = [];
const done = tools.defer();
nodeReadable2.on('data', (chunk: Buffer) => {
chunks.push(chunk);
});
nodeReadable2.on('end', () => {
expect(chunks.length).toBeGreaterThan(0);
done.resolve();
});
await done.promise;
});
export default tap.start();
+379
View File
@@ -0,0 +1,379 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as fs from 'fs';
import * as smartstream from '../ts/index.js';
import { SmartDuplex } from '../ts/smartstream.classes.smartduplex.js';
// =============================================
// Constructor
// =============================================
tap.test('SmartDuplex: should construct with no options', async () => {
const duplex = new SmartDuplex();
expect(duplex).toBeInstanceOf(SmartDuplex);
});
tap.test('SmartDuplex: should construct with options', async () => {
const duplex = new SmartDuplex({
objectMode: true,
writeFunction: async (chunk) => chunk,
});
expect(duplex).toBeInstanceOf(SmartDuplex);
});
// =============================================
// fromBuffer
// =============================================
tap.test('SmartDuplex: should create from a Buffer', async () => {
const bufferData = Buffer.from('This is a test buffer');
const stream = SmartDuplex.fromBuffer(bufferData, {});
let receivedData = Buffer.alloc(0);
return new Promise<void>((resolve) => {
stream.on('data', (chunk: Buffer) => {
receivedData = Buffer.concat([receivedData, chunk]);
});
stream.on('end', () => {
expect(receivedData.toString()).toEqual(bufferData.toString());
resolve();
});
});
});
// =============================================
// writeFunction
// =============================================
tap.test('SmartDuplex: should transform chunks via writeFunction', async (tools) => {
const results: string[] = [];
const transform = new SmartDuplex<string, string>({
objectMode: true,
writeFunction: async (chunk) => {
return chunk.toUpperCase();
},
});
const done = tools.defer();
transform.on('data', (chunk: string) => {
results.push(chunk);
});
transform.on('end', () => {
expect(results).toContain('HELLO');
expect(results).toContain('WORLD');
done.resolve();
});
transform.write('hello');
transform.write('world');
transform.end();
await done.promise;
});
tap.test('SmartDuplex: writeFunction returning undefined should not push', async (tools) => {
const results: any[] = [];
const transform = new SmartDuplex<string, string>({
objectMode: true,
writeFunction: async () => {
return undefined;
},
});
const done = tools.defer();
transform.on('data', (chunk: any) => {
results.push(chunk);
});
transform.on('end', () => {
expect(results.length).toEqual(0);
done.resolve();
});
transform.write('hello');
transform.end();
await done.promise;
});
// =============================================
// tools.push — multiple outputs
// =============================================
tap.test('SmartDuplex: should emit multiple chunks via tools.push', async (tools) => {
const results: string[] = [];
const splitter = new SmartDuplex<string, string>({
objectMode: true,
writeFunction: async (chunk, streamTools) => {
const words = chunk.split(' ');
for (const word of words) {
await streamTools.push(word);
}
},
});
const done = tools.defer();
splitter.on('data', (chunk: string) => results.push(chunk));
splitter.on('end', () => {
expect(results).toContain('hello');
expect(results).toContain('beautiful');
expect(results).toContain('world');
done.resolve();
});
splitter.write('hello beautiful world');
splitter.end();
await done.promise;
});
// =============================================
// finalFunction
// =============================================
tap.test('SmartDuplex: should emit final chunk via finalFunction', async (tools) => {
const results: string[] = [];
let count = 0;
const aggregator = new SmartDuplex<string, string>({
objectMode: true,
writeFunction: async () => {
count++;
return undefined;
},
finalFunction: async () => {
return `total: ${count}`;
},
});
const done = tools.defer();
aggregator.on('data', (chunk: string) => results.push(chunk));
aggregator.on('end', () => {
expect(results.length).toEqual(1);
expect(results[0]).toEqual('total: 2');
done.resolve();
});
aggregator.write('a');
aggregator.write('b');
aggregator.end();
await done.promise;
});
tap.test('SmartDuplex: finalFunction can push multiple chunks via tools.push', async (tools) => {
const results: string[] = [];
const stream = new SmartDuplex<string, string>({
objectMode: true,
writeFunction: async (chunk) => chunk,
finalFunction: async (streamTools) => {
await streamTools.push('final1');
await streamTools.push('final2');
},
});
const done = tools.defer();
stream.on('data', (chunk: string) => results.push(chunk));
stream.on('end', () => {
expect(results).toContain('hello');
expect(results).toContain('final1');
expect(results).toContain('final2');
done.resolve();
});
stream.write('hello');
stream.end();
await done.promise;
});
// =============================================
// truncate
// =============================================
tap.test('SmartDuplex: should truncate stream early', async (tools) => {
const results: string[] = [];
const limiter = new SmartDuplex<string, string>({
objectMode: true,
writeFunction: async (chunk, streamTools) => {
if (chunk === 'STOP') {
streamTools.truncate();
return undefined;
}
return chunk;
},
});
const done = tools.defer();
limiter.on('data', (chunk: string) => results.push(chunk));
limiter.on('end', () => {
expect(results).toContain('a');
expect(results).toContain('b');
expect(results).not.toContain('STOP');
done.resolve();
});
limiter.write('a');
limiter.write('b');
// Write STOP on next tick to allow previous writes to flush
process.nextTick(() => {
limiter.write('STOP');
});
await done.promise;
});
// =============================================
// Error handling
// =============================================
tap.test('SmartDuplex: should emit error when writeFunction throws', async (tools) => {
const stream = new SmartDuplex<string, string>({
objectMode: true,
writeFunction: async () => {
throw new Error('write error');
},
});
const done = tools.defer();
stream.on('error', (err) => {
expect(err.message).toEqual('write error');
done.resolve();
});
stream.write('test');
await done.promise;
});
tap.test('SmartDuplex: should error when no writeFunction and data is written', async (tools) => {
const stream = new SmartDuplex<string, string>({
objectMode: true,
});
const done = tools.defer();
stream.on('error', (err) => {
expect(err.message).toEqual('No stream function provided');
done.resolve();
});
stream.write('test');
await done.promise;
});
// =============================================
// fromWebReadableStream
// =============================================
tap.test('SmartDuplex: should create from a Web ReadableStream', async (tools) => {
const chunks = ['hello', 'world', 'foo'];
const webReadable = new ReadableStream<string>({
start(controller) {
for (const chunk of chunks) {
controller.enqueue(chunk);
}
controller.close();
}
});
const duplex = SmartDuplex.fromWebReadableStream(webReadable);
const results: string[] = [];
const done = tools.defer();
duplex.on('data', (chunk: string) => results.push(chunk));
duplex.on('end', () => {
expect(results).toEqual(chunks);
done.resolve();
});
await done.promise;
});
// =============================================
// getWebStreams
// =============================================
tap.test('SmartDuplex: should provide web streams via getWebStreams()', async () => {
const duplex = new SmartDuplex<string, string>({
objectMode: true,
writeFunction: async (chunk) => {
return chunk.toUpperCase();
},
});
const { readable, writable } = await duplex.getWebStreams();
const writer = writable.getWriter();
const reader = readable.getReader();
await writer.write('hello');
await writer.write('world');
await writer.close();
const results: string[] = [];
while (true) {
const { value, done } = await reader.read();
if (done) break;
results.push(value);
}
expect(results).toContain('HELLO');
expect(results).toContain('WORLD');
});
// =============================================
// Debug mode
// =============================================
tap.test('SmartDuplex: debug mode should not crash', async (tools) => {
const stream = new SmartDuplex<string, string>({
name: 'DebugStream',
debug: true,
objectMode: true,
writeFunction: async (chunk) => chunk,
});
const done = tools.defer();
stream.on('data', () => {});
stream.on('end', () => done.resolve());
stream.write('test');
stream.end();
await done.promise;
});
// =============================================
// Pipe with file read
// =============================================
tap.test('SmartDuplex: should handle a read stream pipeline', async () => {
const streamWrapper = new smartstream.StreamWrapper([
fs.createReadStream('./test/assets/readabletext.txt'),
new smartstream.SmartDuplex({
writeFunction: async (chunkStringArg: Buffer, streamTools) => {
const result = chunkStringArg.toString().substr(0, 100);
streamTools.push('wow =========== \n');
return Buffer.from(result);
},
finalFunction: async () => {
return Buffer.from('this is the end');
},
}),
new smartstream.SmartDuplex({
writeFunction: async (chunkStringArg) => {
// consume data
},
finalFunction: async (streamTools) => {
streamTools.push(null);
},
})
]);
await streamWrapper.run();
});
export default tap.start();
-44
View File
@@ -1,44 +0,0 @@
import { expect, tap } from '@push.rocks/tapbundle';
import { SmartDuplex } from '../ts/smartstream.classes.smartduplex.js'; // Adjust the import to your file structure
import * as smartrx from '@push.rocks/smartrx';
import * as fs from 'fs';
tap.test('should create a SmartStream from a Buffer', async () => {
const bufferData = Buffer.from('This is a test buffer');
const smartStream = SmartDuplex.fromBuffer(bufferData);
let receivedData = Buffer.alloc(0);
return new Promise<void>((resolve) => {
smartStream.on('data', (chunk: Buffer) => {
receivedData = Buffer.concat([receivedData, chunk]);
});
smartStream.on('end', () => {
expect(receivedData.toString()).toEqual(bufferData.toString());
resolve();
});
});
});
tap.test('should create a SmartStream from an Observable', async () => {
const observableData = 'Observable test data';
const testObservable = smartrx.rxjs.of(Buffer.from(observableData));
const smartStream = SmartDuplex.fromObservable(testObservable);
let receivedData = Buffer.alloc(0);
return new Promise<void>((resolve) => {
smartStream.on('data', (chunk: Buffer) => {
receivedData = Buffer.concat([receivedData, chunk]);
});
smartStream.on('end', () => {
expect(receivedData.toString()).toEqual(observableData);
resolve();
});
});
});
tap.start();
-65
View File
@@ -1,65 +0,0 @@
import { expect, tap } from '@push.rocks/tapbundle';
import * as smartfile from '@push.rocks/smartfile';
import * as smartstream from '../ts/index.js';
let testIntake: smartstream.StreamIntake<string>;
tap.test('should handle a read stream', async (tools) => {
const counter = 0;
const streamWrapper = new smartstream.StreamWrapper([
smartfile.fsStream.createReadStream('./test/assets/readabletext.txt'),
new smartstream.SmartDuplex({
writeFunction: async (chunkStringArg: Buffer, streamTools) => {
// do something with the stream here
const result = chunkStringArg.toString().substr(0, 100);
streamTools.push('wow =========== \n');
return Buffer.from(result);
},
finalFunction: async (tools) => {
return Buffer.from('this is the end');
},
}),
new smartstream.SmartDuplex({
writeFunction: async (chunkStringArg) => {
console.log(chunkStringArg.toString());
},
finalFunction: async (tools) => {
tools.push(null);
},
})
]);
// await streamWrapper.run();
});
tap.test('should create a valid Intake', async (tools) => {
testIntake = new smartstream.StreamIntake<string>();
testIntake.pipe(
new smartstream.SmartDuplex({
objectMode: true,
writeFunction: async (chunkStringArg: string, streamTools) => {
await tools.delayFor(100);
console.log(chunkStringArg);
return chunkStringArg;
}
})
)
.pipe(smartfile.fsStream.createWriteStream('./test/assets/writabletext.txt'));
const testFinished = tools.defer();
let counter = 0;
testIntake.pushNextObservable.subscribe(() => {
if (counter < 50) {
counter++;
testIntake.pushData('hi');
testIntake.pushData('+wow');
testIntake.pushData('\n');
} else {
testIntake.signalEnd();
testFinished.resolve();
}
});
await testFinished.promise;
testIntake.signalEnd();
});
tap.start();
+128
View File
@@ -0,0 +1,128 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as fs from 'fs';
import { StreamIntake, SmartDuplex } from '../ts/index.js';
import * as stream from 'stream';
// =============================================
// Basic StreamIntake
// =============================================
tap.test('StreamIntake: should push data and signal end', async (tools) => {
const intake = new StreamIntake<string>();
const results: string[] = [];
intake.pipe(
new SmartDuplex<string, string>({
objectMode: true,
writeFunction: async (chunk) => {
results.push(chunk);
return chunk;
},
})
);
const done = tools.defer();
let counter = 0;
intake.pushNextObservable.subscribe(() => {
if (counter < 5) {
counter++;
intake.pushData(`item-${counter}`);
} else {
intake.signalEnd();
done.resolve();
}
});
await done.promise;
// Give streams time to flush
await new Promise((resolve) => setTimeout(resolve, 100));
expect(results.length).toBeGreaterThan(0);
});
tap.test('StreamIntake: should pipe to a writable', async (tools) => {
const intake = new StreamIntake<string>();
intake
.pipe(
new SmartDuplex({
objectMode: true,
writeFunction: async (chunk: string) => {
return chunk;
},
})
)
.pipe(fs.createWriteStream('./test/assets/writabletext.txt'));
const done = tools.defer();
let counter = 0;
intake.pushNextObservable.subscribe(() => {
if (counter < 10) {
counter++;
intake.pushData('data\n');
} else {
intake.signalEnd();
done.resolve();
}
});
await done.promise;
});
// =============================================
// StreamIntake.fromStream (Node Readable)
// =============================================
tap.test('StreamIntake: fromStream should wrap a Node readable', async (tools) => {
const nodeReadable = fs.createReadStream('./test/assets/readabletext.txt');
const intake = await StreamIntake.fromStream<Buffer>(nodeReadable);
const chunks: Buffer[] = [];
const done = tools.defer();
intake.on('data', (chunk: Buffer) => {
chunks.push(chunk);
});
intake.on('end', () => {
expect(chunks.length).toBeGreaterThan(0);
const content = Buffer.concat(chunks).toString();
expect(content.length).toBeGreaterThan(0);
done.resolve();
});
await done.promise;
});
// =============================================
// StreamIntake.fromStream (Web ReadableStream)
// =============================================
tap.test('StreamIntake: fromStream should wrap a Web ReadableStream', async (tools) => {
const data = ['chunk1', 'chunk2', 'chunk3'];
const webReadable = new ReadableStream<string>({
start(controller) {
for (const item of data) {
controller.enqueue(item);
}
controller.close();
},
});
const intake = await StreamIntake.fromStream<string>(webReadable);
const results: string[] = [];
const done = tools.defer();
intake.on('data', (chunk: string) => {
results.push(chunk);
});
intake.on('end', () => {
expect(results).toEqual(data);
done.resolve();
});
await done.promise;
});
export default tap.start();
+71
View File
@@ -0,0 +1,71 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as fs from 'fs';
import { StreamWrapper, SmartDuplex } from '../ts/index.js';
tap.test('StreamWrapper: should pipe read to write', async () => {
const wrapper = new StreamWrapper([
fs.createReadStream('./test/assets/test.md'),
fs.createWriteStream('./test/assets/testCopy.md'),
]);
await wrapper.run();
});
tap.test('StreamWrapper: should propagate errors', async (tools) => {
const failingStream = new SmartDuplex<Buffer, Buffer>({
writeFunction: async () => {
throw new Error('intentional error');
},
});
const wrapper = new StreamWrapper([
fs.createReadStream('./test/assets/test.md'),
failingStream,
]);
let errorCaught = false;
try {
await wrapper.run();
} catch (err) {
errorCaught = true;
const message = err instanceof Error ? err.message : String(err);
expect(message).toEqual('intentional error');
}
expect(errorCaught).toBeTrue();
});
tap.test('StreamWrapper: streamStarted should resolve', async () => {
const wrapper = new StreamWrapper([
fs.createReadStream('./test/assets/test.md'),
fs.createWriteStream('./test/assets/testCopy.md'),
]);
const runPromise = wrapper.run();
await wrapper.streamStarted();
await runPromise;
});
tap.test('StreamWrapper: onCustomEvent should fire', async (tools) => {
const results: string[] = [];
const emitter = new SmartDuplex<Buffer, Buffer>({
writeFunction: async (chunk, streamTools) => {
(emitter as any).emit('custom-progress', 'progress');
return chunk;
},
});
const wrapper = new StreamWrapper([
fs.createReadStream('./test/assets/test.md'),
emitter,
fs.createWriteStream('./test/assets/testCopy.md'),
]);
wrapper.onCustomEvent('custom-progress', () => {
results.push('fired');
});
await wrapper.run();
expect(results.length).toBeGreaterThan(0);
});
export default tap.start();
-15
View File
@@ -1,15 +0,0 @@
import * as smartfile from '@push.rocks/smartfile';
import { expect, tap } from '@push.rocks/tapbundle';
import * as smartstream from '../ts/smartstream.classes.streamwrapper.js';
let testSmartstream: smartstream.StreamWrapper;
tap.test('should combine a stream', async () => {
testSmartstream = new smartstream.StreamWrapper([
smartfile.fsStream.createReadStream('./test/assets/test.md'),
smartfile.fsStream.createWriteStream('./test/assets/testCopy.md'),
]);
await testSmartstream.run();
});
tap.start();
+51
View File
@@ -0,0 +1,51 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import { createTransformFunction, createPassThrough, SmartDuplex, StreamWrapper } from '../ts/index.js';
// =============================================
// createTransformFunction
// =============================================
tap.test('createTransformFunction: should create a transform stream', async (tools) => {
const doubler = createTransformFunction<number, number>(async (n) => n * 2, { objectMode: true });
const results: number[] = [];
doubler.on('data', (chunk: number) => results.push(chunk));
const done = tools.defer();
doubler.on('end', () => {
expect(results).toContain(10);
expect(results).toContain(20);
expect(results).toContain(30);
done.resolve();
});
doubler.write(5);
doubler.write(10);
doubler.write(15);
doubler.end();
await done.promise;
});
// =============================================
// createPassThrough
// =============================================
tap.test('createPassThrough: should pass data through unchanged', async (tools) => {
const passThrough = createPassThrough();
const results: string[] = [];
passThrough.on('data', (chunk: string) => results.push(chunk));
const done = tools.defer();
passThrough.on('end', () => {
expect(results).toEqual(['hello', 'world']);
done.resolve();
});
passThrough.write('hello');
passThrough.write('world');
passThrough.end();
await done.promise;
});
export default tap.start();
+147
View File
@@ -0,0 +1,147 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import { WebDuplexStream } from '../ts_web/index.js';
// Helper: collect all chunks from a readable
async function collectAll<T>(reader: ReadableStreamDefaultReader<T>): Promise<T[]> {
const results: T[] = [];
while (true) {
const result = await reader.read();
if (result.done) break;
results.push(result.value);
}
return results;
}
// =============================================
// Basic transform
// =============================================
tap.test('WebDuplexStream: should transform chunks via writeFunction', async () => {
const stream = new WebDuplexStream<number, number>({
writeFunction: async (chunk, { push }) => {
push(chunk * 2);
},
});
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();
// Read and write concurrently to avoid backpressure deadlock
const readPromise = collectAll(reader);
await writer.write(5);
await writer.write(10);
await writer.close();
const results = await readPromise;
expect(results).toContain(10);
expect(results).toContain(20);
});
// =============================================
// writeFunction return value
// =============================================
tap.test('WebDuplexStream: should enqueue returned non-null values', async () => {
const stream = new WebDuplexStream<string, string>({
writeFunction: async (chunk) => {
return chunk.toUpperCase();
},
});
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();
const readPromise = collectAll(reader);
await writer.write('hello');
await writer.close();
const results = await readPromise;
expect(results[0]).toEqual('HELLO');
});
// =============================================
// fromUInt8Array
// =============================================
tap.test('WebDuplexStream: fromUInt8Array should produce data', async () => {
const data = new Uint8Array([1, 2, 3, 4, 5]);
const stream = WebDuplexStream.fromUInt8Array(data);
const reader = stream.readable.getReader();
const { value } = await reader.read();
expect(value).toBeTruthy();
if (!value) {
throw new Error('Expected fromUInt8Array to produce data');
}
expect(value.length).toEqual(5);
});
// =============================================
// readFunction
// =============================================
tap.test('WebDuplexStream: readFunction should supply data to the stream', async () => {
const stream = new WebDuplexStream<string, string>({
readFunction: async (tools) => {
await tools.write('chunk1');
await tools.write('chunk2');
tools.done();
},
writeFunction: async (chunk, { push }) => {
push(chunk.toUpperCase());
},
});
const reader = stream.readable.getReader();
const results = await collectAll(reader);
expect(results).toContain('CHUNK1');
expect(results).toContain('CHUNK2');
});
// =============================================
// finalFunction
// =============================================
tap.test('WebDuplexStream: finalFunction should emit final data', async () => {
const stream = new WebDuplexStream<string, string>({
writeFunction: async (chunk) => {
return chunk;
},
finalFunction: async (tools) => {
tools.push('final');
return undefined;
},
});
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();
const readPromise = collectAll(reader);
await writer.write('hello');
await writer.close();
const results = await readPromise;
expect(results).toContain('hello');
expect(results).toContain('final');
});
// =============================================
// No writeFunction = passthrough
// =============================================
tap.test('WebDuplexStream: no writeFunction should passthrough', async () => {
const stream = new WebDuplexStream<string, string>({});
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();
const readPromise = collectAll(reader);
await writer.write('pass');
await writer.close();
const results = await readPromise;
expect(results[0]).toEqual('pass');
});
export default tap.start();
+3 -3
View File
@@ -1,8 +1,8 @@
/**
* autocreated commitinfo by @pushrocks/commitinfo
* autocreated commitinfo by @push.rocks/commitinfo
*/
export const commitinfo = {
name: '@push.rocks/smartstream',
version: '3.0.13',
description: 'simplifies access to node streams'
version: '3.4.2',
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
}
+13 -2
View File
@@ -1,6 +1,17 @@
export * from './smartstream.classes.passthrough.js';
import { stream } from './smartstream.plugins.js';
export {
stream,
}
export * from './smartstream.classes.smartduplex.js';
export * from './smartstream.classes.streamwrapper.js';
export * from './smartstream.classes.streamintake.js';
export * from './smartstream.functions.js'
export * from './smartstream.functions.js';
import * as plugins from './smartstream.plugins.js';
export const webstream = plugins.webstream;
import * as nodewebhelpers from './smartstream.nodewebhelpers.js';
export {
nodewebhelpers,
}
-21
View File
@@ -1,21 +0,0 @@
import * as plugins from './smartstream.plugins.js';
export class PassThrough extends plugins.stream.Duplex {
constructor(options?: plugins.stream.DuplexOptions) {
super(options);
}
_read(size: number): void {
// No-op: Data written will be automatically available for reading.
}
_write(chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void): void {
if (this.push(chunk, encoding)) {
callback();
} else {
this.once('drain', () => {
callback();
});
}
}
}
+332 -136
View File
@@ -3,175 +3,371 @@ import { Duplex, type DuplexOptions } from 'stream';
export interface IStreamTools {
truncate: () => void;
push: (pipeObject: any) => void;
push: (pipeObject: any) => Promise<boolean>;
}
export interface IStreamWriteFunction<T, rT> {
(chunkArg: T, toolsArg: IStreamTools): Promise<rT>;
(chunkArg: T, toolsArg: IStreamTools): Promise<rT | void | null | undefined>;
}
export interface IStreamFinalFunction<rT> {
(toolsArg: IStreamTools): Promise<rT>;
(toolsArg: IStreamTools): Promise<rT | void | null | undefined>;
}
export interface SmartStreamOptions<TInput, TOutput> extends DuplexOptions {
export interface ISmartDuplexOptions<TInput, TOutput> extends DuplexOptions {
/**
* wether to print debug logs
*/
debug?: boolean;
/**
* the name of the stream
*/
name?: string;
/**
* a function that is being called to read more stuff from whereever to be processed by the stream
* @returns
*/
readFunction?: () => Promise<void>;
/**
* the write function is called for every chunk that is being written to the stream
* it can push or return chunks (but does not have to) to be written to the readable side of the stream
*/
writeFunction?: IStreamWriteFunction<TInput, TOutput>;
/**
* a final function that is run at the end of the stream
*/
finalFunction?: IStreamFinalFunction<TOutput>;
// Add other custom options if necessary
}
export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
// STATIC
static fromBuffer(buffer: Buffer, options?: DuplexOptions): SmartDuplex {
const smartStream = new SmartDuplex(options);
static fromBuffer(buffer: Buffer, options?: ISmartDuplexOptions<any, any>): SmartDuplex {
const smartDuplex = new SmartDuplex(options);
process.nextTick(() => {
smartStream.push(buffer);
smartStream.push(null); // Signal the end of the data
smartDuplex.push(buffer);
smartDuplex.push(null); // Signal the end of the data
});
return smartStream;
return smartDuplex;
}
static fromObservable(
observable: plugins.smartrx.rxjs.Observable<any>,
options?: DuplexOptions
): SmartDuplex {
const smartStream = new SmartDuplex(options);
smartStream.observableSubscription = observable.subscribe({
next: (data) => {
if (!smartStream.push(data)) {
// Pause the observable if the stream buffer is full
smartStream.observableSubscription?.unsubscribe();
smartStream.once('drain', () => {
// Resume the observable when the stream buffer is drained
smartStream.observableSubscription?.unsubscribe();
smartStream.observableSubscription = observable.subscribe((data) => {
smartStream.push(data);
});
});
public static fromWebReadableStream<T = any>(
readableStream: ReadableStream<T>
): SmartDuplex<T, T> {
const smartDuplex = new SmartDuplex<T, T>({
objectMode: true,
});
// Acquire reader ONCE
const reader = readableStream.getReader();
let reading = false;
// Override _read to pull from the web reader
smartDuplex._read = function (_size: number) {
if (reading) return;
reading = true;
reader.read().then(
({ value, done }) => {
reading = false;
if (done) {
smartDuplex.push(null);
} else {
smartDuplex.push(value);
}
},
(err) => {
reading = false;
smartDuplex.destroy(err);
}
},
error: (err) => {
smartStream.emit('error', err);
},
complete: () => {
smartStream.push(null); // Signal the end of the data
},
);
};
// Cancel reader on destroy
smartDuplex.on('close', () => {
reader.cancel().catch(() => {});
});
return smartStream;
}
static fromReplaySubject(
replaySubject: plugins.smartrx.rxjs.ReplaySubject<any>,
options?: DuplexOptions
): SmartDuplex {
const smartStream = new SmartDuplex(options);
let isBackpressured = false;
// Subscribe to the ReplaySubject
const subscription = replaySubject.subscribe({
next: (data) => {
const canPush = smartStream.push(data);
if (!canPush) {
// If push returns false, pause the subscription because of backpressure
isBackpressured = true;
subscription.unsubscribe();
}
},
error: (err) => {
smartStream.emit('error', err);
},
complete: () => {
smartStream.push(null); // End the stream when the ReplaySubject completes
},
});
// Listen for 'drain' event to resume the subscription if it was paused
smartStream.on('drain', () => {
if (isBackpressured) {
isBackpressured = false;
// Resubscribe to the ReplaySubject since we previously paused
smartStream.observableSubscription = replaySubject.subscribe({
next: (data) => {
if (!smartStream.push(data)) {
smartStream.observableSubscription?.unsubscribe();
isBackpressured = true;
}
},
// No need to repeat error and complete handling here because it's already set up above
});
}
});
return smartStream;
return smartDuplex;
}
// INSTANCE
private readFunction?: () => Promise<void>;
private writeFunction?: IStreamWriteFunction<TInput, TOutput>;
private finalFunction?: IStreamFinalFunction<TOutput>;
private observableSubscription?: plugins.smartrx.rxjs.Subscription;
private backpressuredArray: plugins.lik.BackpressuredArray<TOutput | null>;
public options: ISmartDuplexOptions<TInput, TOutput>;
private _consumerWantsData = false;
private _readFunctionRunning = false;
constructor(optionsArg?: SmartStreamOptions<TInput, TOutput>) {
super(optionsArg);
this.readFunction = optionsArg?.readFunction;
this.writeFunction = optionsArg?.writeFunction;
this.finalFunction = optionsArg?.finalFunction;
}
public async _read(size: number): Promise<void> {
if (this.readFunction) {
await this.readFunction();
private debugLog(messageArg: string) {
if (this.options.debug) {
console.log(messageArg);
}
}
// Ensure the _write method types the chunk as TInput and encodes TOutput
public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) {
if (!this.writeFunction) {
constructor(optionsArg?: ISmartDuplexOptions<TInput, TOutput>) {
const safeOptions = optionsArg || {} as ISmartDuplexOptions<TInput, TOutput>;
super(
Object.assign(
{
highWaterMark: 1,
},
safeOptions
)
);
this.options = safeOptions;
this.backpressuredArray = new plugins.lik.BackpressuredArray<TOutput | null>(
this.options.highWaterMark || 1
);
}
/**
* Synchronously drains items from the backpressuredArray into the readable side.
* Stops when push() returns false (consumer is full) or array is empty.
*/
private _drainBackpressuredArray(): void {
while (this.backpressuredArray.data.length > 0) {
const nextChunk = this.backpressuredArray.shift();
if (nextChunk === null) {
// EOF signal — push null to end readable side
this.push(null);
this._consumerWantsData = false;
return;
}
const canPushMore = this.push(nextChunk);
if (!canPushMore) {
this._consumerWantsData = false;
return;
}
}
}
// _read must NOT be async — Node.js ignores the return value
public _read(size: number): void {
this.debugLog(`${this.options.name}: read was called`);
this._consumerWantsData = true;
// Drain any buffered items first
if (this.backpressuredArray.data.length > 0) {
this._drainBackpressuredArray();
}
// If readFunction exists and is not already running, start it
if (this.options.readFunction && !this._readFunctionRunning) {
this._readFunctionRunning = true;
this.options.readFunction().then(
() => { this._readFunctionRunning = false; },
(err) => { this._readFunctionRunning = false; this.destroy(err); }
);
}
}
public async backpressuredPush(pushArg: TOutput) {
const canPushMore = this.backpressuredArray.push(pushArg);
// Try to drain if the consumer wants data
if (this._consumerWantsData) {
this._drainBackpressuredArray();
}
if (!canPushMore) {
this.debugLog(`${this.options.name}: cannot push more`);
await this.backpressuredArray.waitForSpace();
this.debugLog(`${this.options.name}: can push more again`);
}
return canPushMore;
}
private asyncWritePromiseObjectmap = new plugins.lik.ObjectMap<Promise<any>>();
// _write must NOT be async — Node.js ignores the return value
public _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) {
if (!this.options.writeFunction) {
return callback(new Error('No stream function provided'));
}
const tools: IStreamTools = {
truncate: () => {
this.push(null);
callback();
},
push: (pushArg: TOutput) => this.push(pushArg),
};
try {
const modifiedChunk = await this.writeFunction(chunk, tools);
if (modifiedChunk) {
if (!this.push(modifiedChunk)) {
// Handle backpressure if necessary
}
}
callback();
} catch (err) {
callback(err);
}
}
public async _final(callback: (error?: Error | null) => void) {
if (this.finalFunction) {
const tools: IStreamTools = {
truncate: () => callback(),
push: (pipeObject) => this.push(pipeObject),
};
try {
const finalChunk = await this.finalFunction(tools);
if (finalChunk) {
this.push(finalChunk);
}
} catch (err) {
let callbackCalled = false;
const safeCallback = (err?: Error | null) => {
if (!callbackCalled) {
callbackCalled = true;
callback(err);
}
} else {
// nothing here
}
this.push(null);
callback();
};
let isTruncated = false;
const tools: IStreamTools = {
truncate: () => {
isTruncated = true;
safeCallback();
this.push(null);
},
push: async (pushArg: TOutput) => {
return await this.backpressuredPush(pushArg);
},
};
const writeDeferred = plugins.smartpromise.defer();
this.asyncWritePromiseObjectmap.add(writeDeferred.promise);
this.options.writeFunction(chunk, tools).then(
(modifiedChunk) => {
if (isTruncated) {
writeDeferred.resolve();
this.asyncWritePromiseObjectmap.remove(writeDeferred.promise);
return;
}
const finish = () => {
safeCallback();
writeDeferred.resolve();
this.asyncWritePromiseObjectmap.remove(writeDeferred.promise);
};
if (modifiedChunk !== undefined && modifiedChunk !== null) {
this.backpressuredPush(modifiedChunk).then(finish, (err) => {
safeCallback(err);
writeDeferred.resolve();
this.asyncWritePromiseObjectmap.remove(writeDeferred.promise);
});
} else {
finish();
}
},
(err) => {
safeCallback(err);
writeDeferred.resolve();
this.asyncWritePromiseObjectmap.remove(writeDeferred.promise);
}
);
}
// _final must NOT be async — Node.js ignores the return value
public _final(callback: (error?: Error | null) => void) {
let callbackCalled = false;
const safeCallback = (err?: Error | null) => {
if (!callbackCalled) {
callbackCalled = true;
callback(err);
}
};
Promise.all(this.asyncWritePromiseObjectmap.getArray()).then(() => {
if (this.options.finalFunction) {
const tools: IStreamTools = {
truncate: () => safeCallback(),
push: async (pipeObject) => {
return await this.backpressuredPush(pipeObject);
},
};
this.options.finalFunction(tools).then(
(finalChunk) => {
const pushNull = () => {
this.backpressuredArray.push(null);
if (this._consumerWantsData) {
this._drainBackpressuredArray();
}
safeCallback();
};
if (finalChunk !== undefined && finalChunk !== null) {
this.backpressuredPush(finalChunk).then(pushNull, (err) => {
safeCallback(err);
});
} else {
pushNull();
}
},
(err) => {
this.backpressuredArray.push(null);
if (this._consumerWantsData) {
this._drainBackpressuredArray();
}
safeCallback(err);
}
);
} else {
this.backpressuredArray.push(null);
if (this._consumerWantsData) {
this._drainBackpressuredArray();
}
safeCallback();
}
}, (err) => {
safeCallback(err);
});
}
public async getWebStreams(): Promise<{ readable: ReadableStream; writable: WritableStream }> {
const duplex = this;
let readableClosed = false;
const readable = new ReadableStream({
start(controller) {
const onReadable = () => {
let chunk;
while (null !== (chunk = duplex.read())) {
controller.enqueue(chunk);
}
};
const onEnd = () => {
if (!readableClosed) {
readableClosed = true;
controller.close();
}
cleanup();
};
const cleanup = () => {
duplex.removeListener('readable', onReadable);
duplex.removeListener('end', onEnd);
};
duplex.on('readable', onReadable);
duplex.on('end', onEnd);
},
cancel(reason) {
duplex.destroy(new Error(reason));
},
});
const writable = new WritableStream({
write(chunk) {
return new Promise<void>((resolve, reject) => {
let resolved = false;
const onDrain = () => {
if (!resolved) {
resolved = true;
resolve();
}
};
const isBackpressured = !duplex.write(chunk, (error) => {
if (error) {
if (!resolved) {
resolved = true;
duplex.removeListener('drain', onDrain);
reject(error);
}
} else if (!isBackpressured && !resolved) {
resolved = true;
resolve();
}
});
if (isBackpressured) {
duplex.once('drain', onDrain);
}
});
},
close() {
return new Promise<void>((resolve, reject) => {
duplex.end((err: Error | null) => {
if (err) reject(err);
else resolve();
});
});
},
abort(reason) {
duplex.destroy(new Error(reason));
},
});
return { readable, writable };
}
}
+43 -1
View File
@@ -1,6 +1,48 @@
import * as plugins from './smartstream.plugins.js';
export class StreamIntake<T> extends plugins.stream.Readable {
// STATIC
public static async fromStream<U>(inputStream: plugins.stream.Readable | ReadableStream, options?: plugins.stream.ReadableOptions): Promise<StreamIntake<U>> {
const intakeStream = new StreamIntake<U>(options);
if (inputStream instanceof plugins.stream.Readable) {
inputStream.on('readable', () => {
let chunk: U;
while (null !== (chunk = inputStream.read() as U)) {
intakeStream.pushData(chunk);
}
});
inputStream.on('end', () => {
intakeStream.signalEnd();
});
inputStream.on('error', (err: Error) => {
intakeStream.destroy(err);
});
} else {
const reader = (inputStream as ReadableStream).getReader();
const readChunk = () => {
reader.read().then(({ done, value }) => {
if (done) {
intakeStream.signalEnd();
} else {
intakeStream.pushData(value);
readChunk();
}
}).catch((err) => {
intakeStream.destroy(err);
});
};
readChunk();
}
return intakeStream;
}
// INSTANCE
private signalEndBoolean = false;
private chunkStore: T[] = [];
public pushNextObservable = new plugins.smartrx.ObservableIntake<any>();
@@ -14,7 +56,7 @@ export class StreamIntake<T> extends plugins.stream.Readable {
_read(size: number): void {
// console.log('get next');
const pushChunk = (): void => {
if (this.chunkStore.length > 0) {
while (this.chunkStore.length > 0) {
// If push returns false, then we should stop reading
if (!this.push(this.chunkStore.shift())) {
return;
+18 -14
View File
@@ -1,8 +1,5 @@
import * as plugins from './smartstream.plugins.js';
// interfaces
import { Transform } from 'stream';
export interface IErrorFunction {
(err: Error): any;
}
@@ -62,7 +59,7 @@ export class StreamWrapper {
}
// combine the stream
let finalStream = null;
let finalStream: plugins.stream.Duplex | null = null;
let firstIteration: boolean = true;
for (const stream of streamExecutionArray) {
if (firstIteration === true) {
@@ -74,23 +71,30 @@ export class StreamWrapper {
for (const customEventObject of this.customEventObjectArray) {
stream.on(customEventObject.eventName, customEventObject.eventFunction);
}
if (!firstIteration) {
if (!firstIteration && finalStream) {
finalStream = finalStream.pipe(stream);
}
firstIteration = false;
}
if (!finalStream) {
done.resolve();
return done.promise;
}
this.streamStartedDeferred.resolve();
finalStream.on('end', () => {
done.resolve();
});
finalStream.on('close', () => {
done.resolve();
});
finalStream.on('finish', () => {
done.resolve();
});
let resolved = false;
const safeResolve = () => {
if (!resolved) {
resolved = true;
done.resolve();
}
};
finalStream.on('end', safeResolve);
finalStream.on('close', safeResolve);
finalStream.on('finish', safeResolve);
return done.promise;
}
}
+19 -15
View File
@@ -1,4 +1,5 @@
import { Transform, type TransformCallback, type TransformOptions } from 'stream';
import { type DuplexOptions } from 'stream';
import { SmartDuplex } from './smartstream.classes.smartduplex.js';
export interface AsyncTransformFunction<TInput, TOutput> {
(chunkArg: TInput): Promise<TOutput>;
@@ -6,21 +7,24 @@ export interface AsyncTransformFunction<TInput, TOutput> {
export function createTransformFunction<TInput, TOutput>(
asyncFunction: AsyncTransformFunction<TInput, TOutput>,
options?: TransformOptions
): Transform {
const transformStream = new Transform({
options?: DuplexOptions
): SmartDuplex {
const smartDuplexStream = new SmartDuplex({
...options,
objectMode: true, // Ensure we operate in object mode
async transform(chunk: TInput, encoding: string, callback: TransformCallback) {
try {
const transformed = await asyncFunction(chunk);
this.push(transformed);
callback();
} catch (error) {
callback(error instanceof Error ? error : new Error(String(error)));
}
writeFunction: async (chunkArg, toolsArg) => {
const result = await asyncFunction(chunkArg);
return result;
}
});
return transformStream;
}
return smartDuplexStream;
}
export const createPassThrough = () => {
return new SmartDuplex({
objectMode: true,
writeFunction: async (chunkArg, toolsArg) => {
return chunkArg;
}
})
}
+219
View File
@@ -0,0 +1,219 @@
import * as plugins from './smartstream.plugins.js';
/**
* Creates a Web ReadableStream from a file using pull-based backpressure.
*
* @param filePath - The path to the file to be read
* @returns A Web ReadableStream that reads the file in chunks
*/
export function createWebReadableStreamFromFile(filePath: string): ReadableStream<Uint8Array> {
const fileStream = plugins.fs.createReadStream(filePath);
return new ReadableStream({
start(controller) {
fileStream.on('error', (err) => {
controller.error(err);
});
fileStream.on('end', () => {
controller.close();
});
// Pause immediately — pull() will drive reads
fileStream.pause();
},
pull(controller) {
return new Promise<void>((resolve, reject) => {
const chunk = fileStream.read();
if (chunk !== null) {
controller.enqueue(chunk as Uint8Array);
resolve();
return;
}
// No data available yet — wait for 'readable' or 'end'
const onReadable = () => {
cleanup();
const data = fileStream.read();
if (data !== null) {
controller.enqueue(data as Uint8Array);
}
resolve();
};
const onEnd = () => {
cleanup();
resolve();
};
const onError = (err: Error) => {
cleanup();
reject(err);
};
const cleanup = () => {
fileStream.removeListener('readable', onReadable);
fileStream.removeListener('end', onEnd);
fileStream.removeListener('error', onError);
};
fileStream.once('readable', onReadable);
fileStream.once('end', onEnd);
fileStream.once('error', onError);
});
},
cancel() {
fileStream.destroy();
}
});
}
/**
* Converts a Web ReadableStream to a Node.js Readable stream.
*
* @param webStream - The Web ReadableStream to convert
* @returns A Node.js Readable stream that reads data from the Web ReadableStream
*/
export function convertWebReadableToNodeReadable(webStream: ReadableStream<Uint8Array>): plugins.stream.Readable {
const reader = webStream.getReader();
return new plugins.stream.Readable({
read() {
reader.read().then(
({ value, done }) => {
if (done) {
this.push(null);
} else {
this.push(Buffer.from(value));
}
},
(err) => {
this.destroy(err);
}
);
}
});
}
/**
* Converts a Node.js Readable stream to a Web ReadableStream using pull-based backpressure.
*
* @param nodeStream - The Node.js Readable stream to convert
* @returns A Web ReadableStream that reads data from the Node.js Readable stream
*/
export function convertNodeReadableToWebReadable(nodeStream: plugins.stream.Readable): ReadableStream<Uint8Array> {
return new ReadableStream({
start(controller) {
nodeStream.on('error', (err) => {
controller.error(err);
});
nodeStream.on('end', () => {
controller.close();
});
// Pause immediately — pull() will drive reads
nodeStream.pause();
},
pull(controller) {
return new Promise<void>((resolve, reject) => {
const chunk = nodeStream.read();
if (chunk !== null) {
controller.enqueue(new Uint8Array(chunk));
resolve();
return;
}
// No data available yet — wait for 'readable' or 'end'
const onReadable = () => {
cleanup();
const data = nodeStream.read();
if (data !== null) {
controller.enqueue(new Uint8Array(data));
}
resolve();
};
const onEnd = () => {
cleanup();
resolve();
};
const onError = (err: Error) => {
cleanup();
reject(err);
};
const cleanup = () => {
nodeStream.removeListener('readable', onReadable);
nodeStream.removeListener('end', onEnd);
nodeStream.removeListener('error', onError);
};
nodeStream.once('readable', onReadable);
nodeStream.once('end', onEnd);
nodeStream.once('error', onError);
});
},
cancel() {
nodeStream.destroy();
}
});
}
/**
* Converts a Web WritableStream to a Node.js Writable stream.
*
* @param webWritable - The Web WritableStream to convert
* @returns A Node.js Writable stream that writes data to the Web WritableStream
*/
export function convertWebWritableToNodeWritable(webWritable: WritableStream<Uint8Array>): plugins.stream.Writable {
const writer = webWritable.getWriter();
return new plugins.stream.Writable({
write(chunk, encoding, callback) {
writer.write(new Uint8Array(chunk)).then(
() => callback(),
(err) => callback(err)
);
},
final(callback) {
writer.close().then(() => callback()).catch(callback);
},
destroy(err, callback) {
if (err) {
writer.abort(err).then(() => callback(err)).catch(() => callback(err));
} else {
// Clean destroy — just release the lock
writer.releaseLock();
callback(null);
}
}
});
}
/**
* Converts a Node.js Writable stream to a Web WritableStream.
*
* @param nodeWritable - The Node.js Writable stream to convert
* @returns A Web WritableStream that writes data to the Node.js Writable stream
*/
export function convertNodeWritableToWebWritable(nodeWritable: plugins.stream.Writable): WritableStream<Uint8Array> {
return new WritableStream({
write(chunk) {
return new Promise((resolve, reject) => {
nodeWritable.write(Buffer.from(chunk), (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
},
close() {
return new Promise((resolve, reject) => {
nodeWritable.end((err: Error | null) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
},
abort(reason) {
nodeWritable.destroy(reason instanceof Error ? reason : new Error(String(reason)));
}
});
}
+5 -2
View File
@@ -1,11 +1,14 @@
// node native
import * as fs from 'fs';
import * as stream from 'stream';
export { stream };
export { fs, stream };
// pushrocks scope
import * as lik from '@push.rocks/lik';
import * as smartpromise from '@push.rocks/smartpromise';
import * as smartrx from '@push.rocks/smartrx';
import * as webstream from '../dist_ts_web/index.js';
export { smartpromise, smartrx };
export { lik, smartpromise, smartrx, webstream };
+3
View File
@@ -0,0 +1,3 @@
{
"order": 1
}
+8
View File
@@ -0,0 +1,8 @@
/**
* autocreated commitinfo by @push.rocks/commitinfo
*/
export const commitinfo = {
name: '@push.rocks/smartstream',
version: '3.4.2',
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
}
+145
View File
@@ -0,0 +1,145 @@
import * as plugins from './plugins.js';
// ========================================
// Interfaces for Read functionality
// ========================================
export interface IStreamToolsRead<TInput, TOutput> {
done: () => void;
write: (writeArg: TInput) => Promise<void>;
}
/**
* The read function is called when data needs to be read into the stream.
*/
export interface IStreamReadFunction<TInput, TOutput> {
(toolsArg: IStreamToolsRead<TInput, TOutput>): Promise<void>;
}
// ========================================
// Interfaces for Write functionality
// ========================================
export interface IStreamToolsWrite<TInput, TOutput> {
truncate: () => void;
push: (pushArg: TOutput) => void;
}
/**
* The write function is called whenever a chunk is written to the stream.
*/
export interface IStreamWriteFunction<TInput, TOutput> {
(chunkArg: TInput, toolsArg: IStreamToolsWrite<TInput, TOutput>): Promise<any>;
}
export interface IStreamFinalFunction<TInput, TOutput> {
(toolsArg: IStreamToolsWrite<TInput, TOutput>): Promise<TOutput | void>;
}
export interface WebDuplexStreamOptions<TInput, TOutput> {
readFunction?: IStreamReadFunction<TInput, TOutput>;
writeFunction?: IStreamWriteFunction<TInput, TOutput>;
finalFunction?: IStreamFinalFunction<TInput, TOutput>;
}
export class WebDuplexStream<TInput = any, TOutput = any> extends TransformStream<TInput, TOutput> {
// INSTANCE
options: WebDuplexStreamOptions<TInput, TOutput>;
constructor(optionsArg: WebDuplexStreamOptions<TInput, TOutput>) {
super({
async start(controller) {
// Optionally initialize any state here
},
async transform(chunk, controller) {
if (optionsArg?.writeFunction) {
const tools: IStreamToolsWrite<TInput, TOutput> = {
truncate: () => controller.terminate(),
push: (pushArg: TOutput) => controller.enqueue(pushArg),
};
try {
const writeReturnChunk = await optionsArg.writeFunction(chunk, tools);
if (writeReturnChunk !== undefined && writeReturnChunk !== null) {
controller.enqueue(writeReturnChunk);
}
} catch (err) {
controller.error(err);
}
} else {
// If no writeFunction is provided, pass the chunk through
controller.enqueue(chunk as unknown as TOutput);
}
},
async flush(controller) {
if (optionsArg?.finalFunction) {
const tools: IStreamToolsWrite<TInput, TOutput> = {
truncate: () => controller.terminate(),
push: (pushArg) => controller.enqueue(pushArg),
};
try {
const finalChunk = await optionsArg.finalFunction(tools);
if (finalChunk !== undefined && finalChunk !== null) {
controller.enqueue(finalChunk as TOutput);
}
} catch (err) {
controller.error(err);
}
}
// TransformStream auto-closes readable after flush resolves — no terminate() needed
},
});
this.options = optionsArg;
// Start producing data if readFunction is provided
if (this.options.readFunction) {
this._startReading().catch((err) => {
// Prevent unhandled rejection — the error is propagated through the writable side
});
}
}
private async _startReading() {
const writable = this.writable;
const writer = writable.getWriter();
let doneSignaled = false;
const tools: IStreamToolsRead<TInput, TOutput> = {
done: () => {
doneSignaled = true;
},
write: async (writeArg) => await writer.write(writeArg),
};
try {
const readFunction = this.options.readFunction;
if (readFunction) {
await readFunction(tools);
}
if (doneSignaled) {
await writer.close();
}
} catch (err) {
try {
await writer.abort(err);
} catch (_) {
// Writer may already be in error state
}
}
}
// Static method example (adjust as needed)
static fromUInt8Array(uint8Array: Uint8Array): WebDuplexStream<Uint8Array, Uint8Array> {
const stream = new WebDuplexStream<Uint8Array, Uint8Array>({
writeFunction: async (chunk, { push }) => {
push(chunk); // Directly push the chunk as is
return null;
},
});
const writer = stream.writable.getWriter();
writer.write(uint8Array).then(() => writer.close()).catch(() => {});
return stream;
}
}
+2
View File
@@ -0,0 +1,2 @@
import './plugins.js';
export * from './classes.webduplexstream.js';
+15
View File
@@ -0,0 +1,15 @@
// @push.rocks scope
import * as smartenv from '@push.rocks/smartenv';
export {
smartenv,
}
// lets setup dependencies
const smartenvInstance = new smartenv.Smartenv();
await smartenvInstance.getSafeNodeModule<typeof import('stream/web')>('stream/web', async (moduleArg) => {
globalThis.ReadableStream = moduleArg.ReadableStream as any;
globalThis.WritableStream = moduleArg.WritableStream as any;
globalThis.TransformStream = moduleArg.TransformStream as any;
})
+3
View File
@@ -0,0 +1,3 @@
{
"order": 0
}
+3 -1
View File
@@ -5,8 +5,10 @@
"target": "ES2022",
"module": "NodeNext",
"moduleResolution": "NodeNext",
"noImplicitAny": true,
"esModuleInterop": true,
"verbatimModuleSyntax": true
"verbatimModuleSyntax": true,
"types": ["node"]
},
"exclude": [
"dist_*/**/*.d.ts"