Compare commits
161 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1262c48fe9 | |||
| 9b9b1be62b | |||
| 3d13cb76f6 | |||
| 9e3fd28c4a | |||
| 673f5c86fb | |||
| a225188e24 | |||
| 4fc82d0dc6 | |||
| 3d58a01b29 | |||
| f7e9636bf6 | |||
| f211cc8ddd | |||
| 60c8824f33 | |||
| 40e8e06ff1 | |||
| 30f2facd59 | |||
| ddb7d4af03 | |||
| 22d93b4c07 | |||
| e138bca39d | |||
| 6a2ef1b152 | |||
| 7b1d2199e9 | |||
| 04c22f73df | |||
| c8dc791c83 | |||
| 9c30e5bab1 | |||
| 5f2c5f9380 | |||
| f9b8bf33b0 | |||
| a55b2548d7 | |||
| c8465b82be | |||
| b593e3a32c | |||
| a490f521ab | |||
| 59027782dc | |||
| 8c7dd7970c | |||
| 22d18dc21f | |||
| 1cb6f727af | |||
| 824c44d165 | |||
| 3e062103f8 | |||
| 6451e93c12 | |||
| 70cf93595c | |||
| 17e03e9790 | |||
| e52ce7af61 | |||
| f548f4b6cb | |||
| 23a7a77a73 | |||
| 13d2fc78b8 | |||
| 898cc0407d | |||
| 8a3f43a11a | |||
| da2191bb96 | |||
| f13db1e422 | |||
| 42a90e804a | |||
| 413e2af717 | |||
| 267a76af13 | |||
| 7834b7e6d2 | |||
| ae643708e7 | |||
| d9d96b8bb7 | |||
| a961eea431 | |||
| edb58ade28 | |||
| 753a481765 | |||
| bbbd1b73b9 | |||
| 271d0be106 | |||
| 0ceeacd5a0 | |||
| 287695e445 | |||
| 60f9e541a5 | |||
| 96ea67e135 | |||
| ba0a2023ad | |||
| a09c359847 | |||
| e2b4d772b3 | |||
| 0f46b62b2d | |||
| 9bf37469c6 | |||
| 12bb125bdc | |||
| 703dc11c6c | |||
| 28725d1723 | |||
| c77e0f2ba6 | |||
| 196fb6d396 | |||
| df0ddf04b3 | |||
| 2e1aa4a8ff | |||
| bc09033af0 | |||
| 22df9dfd94 | |||
| d48ef6eb43 | |||
| 9421c652a2 | |||
| a6ab15bf1d | |||
| 00d1455367 | |||
| 116a281c6c | |||
| 9bf6f251c4 | |||
| e3427c2498 | |||
| a400a0a04c | |||
| 91392e8bd5 | |||
| d161d6613a | |||
| 7a14e67f4f | |||
| 465ccfec40 | |||
| 3adb16d1f8 | |||
| a9230ca790 | |||
| 788f2665c2 | |||
| 7b678cc856 | |||
| 12c9d8cc9d | |||
| 3a2dc1c37e | |||
| 1f67bc0e1e | |||
| b15ddd987c | |||
| cc43080513 | |||
| 49d235411f | |||
| d238662bea | |||
| 8efb2b1093 | |||
| 4926f57d83 | |||
| 86552f2b1b | |||
| 353a8ecde6 | |||
| 3e03b81a43 | |||
| 5e4ec5b837 | |||
| 62796f7151 | |||
| 2c1d9f05ce | |||
| 34cbf28972 | |||
| 1b6e38c040 | |||
| b135e6023a | |||
| 91d01f3689 | |||
| e8e067ea77 | |||
| 2cb490cd2a | |||
| 98397bb85e | |||
| f52b0de21f | |||
| 1c0e5f264d | |||
| 8a3c653213 | |||
| 456ce78917 | |||
| 5277083097 | |||
| 8618ac55ef | |||
| ea66d1b2fb | |||
| c37f62abec | |||
| 2c904cc1ec | |||
| d1561ad1b7 | |||
| 0ae3fee987 | |||
| 047c2bd402 | |||
| 9ed3de718f | |||
| 14530f393c | |||
| 15a226d30d | |||
| 16c5c89662 | |||
| 851a96c014 | |||
| 4ea42cb9fb | |||
| 41eed6423d | |||
| 0e067004a4 | |||
| 9fe222b500 | |||
| 05e9067a34 | |||
| 2aff46eb0e | |||
| 6aa4b86598 | |||
| af30268551 | |||
| e562e8f099 | |||
| 01f4a53b5b | |||
| f42b77986f | |||
| b7ef295757 | |||
| 2818420ee9 | |||
| d759e2a562 | |||
| 65a97c9ee0 | |||
| b036bfcb92 | |||
| c0304d1d10 | |||
| 26880f9b71 | |||
| aa7c30a096 | |||
| 083673f02c | |||
| 93bcc03e72 | |||
| f8b79710be | |||
| addf20995a | |||
| 5401e5e008 | |||
| 144295f7c0 | |||
| 1e0557b722 | |||
| 9b67507acd | |||
| c7b3f2a228 | |||
| 084a47096d | |||
| 224b39f0a6 | |||
| b24b564495 | |||
| aabfb2721d | |||
| c28e2cb8e0 |
66
.gitea/workflows/default_nottags.yaml
Normal file
66
.gitea/workflows/default_nottags.yaml
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
name: Default (not tags)
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
tags-ignore:
|
||||||
|
- '**'
|
||||||
|
|
||||||
|
env:
|
||||||
|
IMAGE: registry.gitlab.com/hosttoday/ht-docker-node:npmci
|
||||||
|
NPMCI_COMPUTED_REPOURL: https://${{gitea.repository_owner}}:${{secrets.GITEA_TOKEN}}@gitea.lossless.digital/${{gitea.repository}}.git
|
||||||
|
NPMCI_TOKEN_NPM: ${{secrets.NPMCI_TOKEN_NPM}}
|
||||||
|
NPMCI_TOKEN_NPM2: ${{secrets.NPMCI_TOKEN_NPM2}}
|
||||||
|
NPMCI_GIT_GITHUBTOKEN: ${{secrets.NPMCI_GIT_GITHUBTOKEN}}
|
||||||
|
NPMCI_URL_CLOUDLY: ${{secrets.NPMCI_URL_CLOUDLY}}
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
security:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
continue-on-error: true
|
||||||
|
container:
|
||||||
|
image: ${{ env.IMAGE }}
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v3
|
||||||
|
|
||||||
|
- name: Install pnpm and npmci
|
||||||
|
run: |
|
||||||
|
pnpm install -g pnpm
|
||||||
|
pnpm install -g @shipzone/npmci
|
||||||
|
|
||||||
|
- name: Run npm prepare
|
||||||
|
run: npmci npm prepare
|
||||||
|
|
||||||
|
- name: Audit production dependencies
|
||||||
|
run: |
|
||||||
|
npmci command npm config set registry https://registry.npmjs.org
|
||||||
|
npmci command pnpm audit --audit-level=high --prod
|
||||||
|
continue-on-error: true
|
||||||
|
|
||||||
|
- name: Audit development dependencies
|
||||||
|
run: |
|
||||||
|
npmci command npm config set registry https://registry.npmjs.org
|
||||||
|
npmci command pnpm audit --audit-level=high --dev
|
||||||
|
continue-on-error: true
|
||||||
|
|
||||||
|
test:
|
||||||
|
if: ${{ always() }}
|
||||||
|
needs: security
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
container:
|
||||||
|
image: ${{ env.IMAGE }}
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v3
|
||||||
|
|
||||||
|
- name: Test stable
|
||||||
|
run: |
|
||||||
|
npmci node install stable
|
||||||
|
npmci npm install
|
||||||
|
npmci npm test
|
||||||
|
|
||||||
|
- name: Test build
|
||||||
|
run: |
|
||||||
|
npmci node install stable
|
||||||
|
npmci npm install
|
||||||
|
npmci npm build
|
||||||
124
.gitea/workflows/default_tags.yaml
Normal file
124
.gitea/workflows/default_tags.yaml
Normal file
@@ -0,0 +1,124 @@
|
|||||||
|
name: Default (tags)
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
tags:
|
||||||
|
- '*'
|
||||||
|
|
||||||
|
env:
|
||||||
|
IMAGE: registry.gitlab.com/hosttoday/ht-docker-node:npmci
|
||||||
|
NPMCI_COMPUTED_REPOURL: https://${{gitea.repository_owner}}:${{secrets.GITEA_TOKEN}}@gitea.lossless.digital/${{gitea.repository}}.git
|
||||||
|
NPMCI_TOKEN_NPM: ${{secrets.NPMCI_TOKEN_NPM}}
|
||||||
|
NPMCI_TOKEN_NPM2: ${{secrets.NPMCI_TOKEN_NPM2}}
|
||||||
|
NPMCI_GIT_GITHUBTOKEN: ${{secrets.NPMCI_GIT_GITHUBTOKEN}}
|
||||||
|
NPMCI_URL_CLOUDLY: ${{secrets.NPMCI_URL_CLOUDLY}}
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
security:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
continue-on-error: true
|
||||||
|
container:
|
||||||
|
image: ${{ env.IMAGE }}
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v3
|
||||||
|
|
||||||
|
- name: Prepare
|
||||||
|
run: |
|
||||||
|
pnpm install -g pnpm
|
||||||
|
pnpm install -g @shipzone/npmci
|
||||||
|
npmci npm prepare
|
||||||
|
|
||||||
|
- name: Audit production dependencies
|
||||||
|
run: |
|
||||||
|
npmci command npm config set registry https://registry.npmjs.org
|
||||||
|
npmci command pnpm audit --audit-level=high --prod
|
||||||
|
continue-on-error: true
|
||||||
|
|
||||||
|
- name: Audit development dependencies
|
||||||
|
run: |
|
||||||
|
npmci command npm config set registry https://registry.npmjs.org
|
||||||
|
npmci command pnpm audit --audit-level=high --dev
|
||||||
|
continue-on-error: true
|
||||||
|
|
||||||
|
test:
|
||||||
|
if: ${{ always() }}
|
||||||
|
needs: security
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
container:
|
||||||
|
image: ${{ env.IMAGE }}
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v3
|
||||||
|
|
||||||
|
- name: Prepare
|
||||||
|
run: |
|
||||||
|
pnpm install -g pnpm
|
||||||
|
pnpm install -g @shipzone/npmci
|
||||||
|
npmci npm prepare
|
||||||
|
|
||||||
|
- name: Test stable
|
||||||
|
run: |
|
||||||
|
npmci node install stable
|
||||||
|
npmci npm install
|
||||||
|
npmci npm test
|
||||||
|
|
||||||
|
- name: Test build
|
||||||
|
run: |
|
||||||
|
npmci node install stable
|
||||||
|
npmci npm install
|
||||||
|
npmci npm build
|
||||||
|
|
||||||
|
release:
|
||||||
|
needs: test
|
||||||
|
if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/')
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
container:
|
||||||
|
image: ${{ env.IMAGE }}
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v3
|
||||||
|
|
||||||
|
- name: Prepare
|
||||||
|
run: |
|
||||||
|
pnpm install -g pnpm
|
||||||
|
pnpm install -g @shipzone/npmci
|
||||||
|
npmci npm prepare
|
||||||
|
|
||||||
|
- name: Release
|
||||||
|
run: |
|
||||||
|
npmci node install stable
|
||||||
|
npmci npm publish
|
||||||
|
|
||||||
|
metadata:
|
||||||
|
needs: test
|
||||||
|
if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/')
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
container:
|
||||||
|
image: ${{ env.IMAGE }}
|
||||||
|
continue-on-error: true
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v3
|
||||||
|
|
||||||
|
- name: Prepare
|
||||||
|
run: |
|
||||||
|
pnpm install -g pnpm
|
||||||
|
pnpm install -g @shipzone/npmci
|
||||||
|
npmci npm prepare
|
||||||
|
|
||||||
|
- name: Code quality
|
||||||
|
run: |
|
||||||
|
npmci command npm install -g typescript
|
||||||
|
npmci npm install
|
||||||
|
|
||||||
|
- name: Trigger
|
||||||
|
run: npmci trigger
|
||||||
|
|
||||||
|
- name: Build docs and upload artifacts
|
||||||
|
run: |
|
||||||
|
npmci node install stable
|
||||||
|
npmci npm install
|
||||||
|
pnpm install -g @git.zone/tsdoc
|
||||||
|
npmci command tsdoc
|
||||||
|
continue-on-error: true
|
||||||
20
.gitignore
vendored
20
.gitignore
vendored
@@ -1,4 +1,20 @@
|
|||||||
node_modules/
|
.nogit/
|
||||||
|
|
||||||
|
# artifacts
|
||||||
|
coverage/
|
||||||
public/
|
public/
|
||||||
pages/
|
pages/
|
||||||
coverage/
|
|
||||||
|
# installs
|
||||||
|
node_modules/
|
||||||
|
|
||||||
|
# caches
|
||||||
|
.yarn/
|
||||||
|
.cache/
|
||||||
|
.rpt2_cache
|
||||||
|
|
||||||
|
# builds
|
||||||
|
dist/
|
||||||
|
dist_*/
|
||||||
|
|
||||||
|
# custom
|
||||||
@@ -1,42 +0,0 @@
|
|||||||
image: hosttoday/ht-docker-node:npmts
|
|
||||||
stages:
|
|
||||||
- test
|
|
||||||
- release
|
|
||||||
- page
|
|
||||||
testLEGACY:
|
|
||||||
stage: test
|
|
||||||
script:
|
|
||||||
- npmci test legacy
|
|
||||||
tags:
|
|
||||||
- docker
|
|
||||||
testLTS:
|
|
||||||
stage: test
|
|
||||||
script:
|
|
||||||
- npmci test lts
|
|
||||||
tags:
|
|
||||||
- docker
|
|
||||||
testSTABLE:
|
|
||||||
stage: test
|
|
||||||
script:
|
|
||||||
- npmci test stable
|
|
||||||
tags:
|
|
||||||
- docker
|
|
||||||
release:
|
|
||||||
stage: release
|
|
||||||
script:
|
|
||||||
- npmci publish
|
|
||||||
only:
|
|
||||||
- tags
|
|
||||||
tags:
|
|
||||||
- docker
|
|
||||||
pages:
|
|
||||||
image: hosttoday/ht-docker-node:npmpage
|
|
||||||
stage: page
|
|
||||||
script:
|
|
||||||
- npmci command npmpage --host gitlab
|
|
||||||
only:
|
|
||||||
- tags
|
|
||||||
artifacts:
|
|
||||||
expire_in: 1 week
|
|
||||||
paths:
|
|
||||||
- public
|
|
||||||
11
.vscode/launch.json
vendored
Normal file
11
.vscode/launch.json
vendored
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
{
|
||||||
|
"version": "0.2.0",
|
||||||
|
"configurations": [
|
||||||
|
{
|
||||||
|
"command": "npm test",
|
||||||
|
"name": "Run npm test",
|
||||||
|
"request": "launch",
|
||||||
|
"type": "node-terminal"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
26
.vscode/settings.json
vendored
Normal file
26
.vscode/settings.json
vendored
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
{
|
||||||
|
"json.schemas": [
|
||||||
|
{
|
||||||
|
"fileMatch": ["/npmextra.json"],
|
||||||
|
"schema": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"npmci": {
|
||||||
|
"type": "object",
|
||||||
|
"description": "settings for npmci"
|
||||||
|
},
|
||||||
|
"gitzone": {
|
||||||
|
"type": "object",
|
||||||
|
"description": "settings for gitzone",
|
||||||
|
"properties": {
|
||||||
|
"projectType": {
|
||||||
|
"type": "string",
|
||||||
|
"enum": ["website", "element", "service", "npm", "wcc"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
@@ -1,9 +0,0 @@
|
|||||||
# smartstream
|
|
||||||
simplifies access to node streams, TypeScript ready!
|
|
||||||
|
|
||||||
## Status
|
|
||||||
[](https://gitlab.com/pushrocks/smartstream/commits/master)
|
|
||||||
|
|
||||||
## Usage
|
|
||||||
We recommend the use of TypeScript for best in class intellisense support.
|
|
||||||
|
|
||||||
104
changelog.md
Normal file
104
changelog.md
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
# Changelog
|
||||||
|
|
||||||
|
## 2026-02-28 - 3.3.0 - feat(smartstream)
|
||||||
|
bump dependencies, update build/publish config, refactor tests, and overhaul documentation
|
||||||
|
|
||||||
|
- Upgrade devDependencies (e.g. @git.zone/tsbuild -> ^4.1.2, @git.zone/tsrun -> ^2.0.1, @git.zone/tstest -> ^3.1.8, @types/node -> ^25.3.2) and runtime deps (e.g. @push.rocks/lik -> ^6.2.2, @push.rocks/smartenv -> ^6.0.0, @push.rocks/smartpromise -> ^4.2.3, @push.rocks/smartrx -> ^3.0.10).
|
||||||
|
- Refactor tests to use Node's native fs streams instead of @push.rocks/smartfile.fsStream and export default tap.start() to support ESM test runner patterns.
|
||||||
|
- Adjust build/publish: remove --web flag from build script, add pnpm override for agentkeepalive, add tspublish.json files for publish order, and add release registries/access in npmextra.json (verdaccio + npm).
|
||||||
|
- Rework project metadata in npmextra.json (namespaced @git.zone keys, tsdoc entry changes) and minor TypeScript/web fix: cast stream/web constructors to any in ts_web/plugins.ts.
|
||||||
|
- Large README rewrite: improved installation (pnpm), clearer Node vs Web entrypoints, expanded examples, and updated legal/license wording.
|
||||||
|
|
||||||
|
## 2024-11-19 - 3.2.5 - fix(nodewebhelpers)
|
||||||
|
Fix import and use correct module structure for Node.js streams in smartstream.nodewebhelpers.ts
|
||||||
|
|
||||||
|
- Corrected the import statement for the fs module.
|
||||||
|
- Updated the use of the fs.createReadStream method.
|
||||||
|
|
||||||
|
## 2024-10-16 - 3.2.4 - fix(SmartDuplex)
|
||||||
|
Fix stream termination when reading from a web readable stream
|
||||||
|
|
||||||
|
- Resolved an issue in SmartDuplex where the stream did not properly terminate after reaching the end of a web readable stream.
|
||||||
|
|
||||||
|
## 2024-10-16 - 3.2.3 - fix(smartduplex)
|
||||||
|
Enhance documentation for read function in SmartDuplex
|
||||||
|
|
||||||
|
- Added inline comments to clarify the behavior and importance of unlocking the reader in the readFunction of SmartDuplex.fromWebReadableStream.
|
||||||
|
|
||||||
|
## 2024-10-16 - 3.2.2 - fix(SmartDuplex)
|
||||||
|
Fix issue with SmartDuplex fromWebReadableStream method
|
||||||
|
|
||||||
|
- Resolved a potential unhandled promise rejection in fromWebReadableStream method
|
||||||
|
- Ensured proper release of stream reader lock in case of read completion
|
||||||
|
|
||||||
|
## 2024-10-16 - 3.2.1 - fix(core)
|
||||||
|
Fix the order of operations in SmartDuplex _read method to ensure proper waiting for items.
|
||||||
|
|
||||||
|
- Adjusted the order of reading function execution and waiting for items in the SmartDuplex _read method.
|
||||||
|
- Fixed potential issues with stream data processing timing.
|
||||||
|
|
||||||
|
## 2024-10-16 - 3.2.0 - feat(SmartDuplex)
|
||||||
|
Added method to create SmartDuplex from a WebReadableStream.
|
||||||
|
|
||||||
|
- Implemented a static method in SmartDuplex to allow creating an instance from a WebReadableStream.
|
||||||
|
- This addition enhances the capability of SmartDuplex to integrate with web streams, facilitating seamless stream manipulation across environments.
|
||||||
|
|
||||||
|
## 2024-10-14 - 3.1.2 - fix(WebDuplexStream)
|
||||||
|
Fix variable naming inconsistency in WebDuplexStream test
|
||||||
|
|
||||||
|
- Changed variable names from 'transformStream' to 'webDuplexStream' for consistency.
|
||||||
|
- Renamed 'writableStream' and 'readableStream' to 'writer' and 'reader' respectively.
|
||||||
|
|
||||||
|
## 2024-10-13 - 3.1.1 - fix(WebDuplexStream)
|
||||||
|
Improved read/write interface and error handling in WebDuplexStream
|
||||||
|
|
||||||
|
- Enhanced the IStreamToolsRead and IStreamToolsWrite interfaces for better Promise handling
|
||||||
|
- Refined readFunction and writeFunction handling to accommodate asynchronous data processing and error propagation
|
||||||
|
- Added internal _startReading method to facilitate initial data handling if readFunction is present
|
||||||
|
- Maintained backward compatibility while ensuring data continuity when no writeFunction is specified
|
||||||
|
|
||||||
|
## 2024-10-13 - 3.1.0 - feat(core)
|
||||||
|
Add support for creating Web ReadableStream from a file
|
||||||
|
|
||||||
|
- Introduced a new helper function `createWebReadableStreamFromFile` that allows for creating a Web ReadableStream from a file path.
|
||||||
|
- Updated exports in `ts/index.ts` to include `nodewebhelpers` which provides the new web stream feature.
|
||||||
|
|
||||||
|
## 2024-10-13 - 3.0.46 - fix(WebDuplexStream)
|
||||||
|
Fix errors in WebDuplexStream transformation and test logic
|
||||||
|
|
||||||
|
- Corrected async handling in WebDuplexStream write function
|
||||||
|
- Fixed `WebDuplexStream` tests to properly handle asynchronous reading and writing
|
||||||
|
|
||||||
|
## 2024-10-13 - 3.0.45 - fix(ts)
|
||||||
|
Fixed formatting issues in SmartDuplex class
|
||||||
|
|
||||||
|
- Resolved inconsistent spacing in SmartDuplex class methods and constructor.
|
||||||
|
- Ensured consistent formatting in the getWebStreams method.
|
||||||
|
|
||||||
|
## 2024-06-02 - 3.0.39 - smartduplex
|
||||||
|
Add .getWebStreams method
|
||||||
|
|
||||||
|
- Introduced a new `.getWebStreams` method in the smartduplex module, providing compatibility with the web streams API.
|
||||||
|
|
||||||
|
## 2024-03-16 - 3.0.34 - configuration
|
||||||
|
Update project configuration files
|
||||||
|
|
||||||
|
- Updated `tsconfig` for optimization.
|
||||||
|
- Modified `npmextra.json` to set the `githost` attribute.
|
||||||
|
|
||||||
|
## 2023-11-03 - 3.0.0 to 3.0.8 - core
|
||||||
|
Transition to major version 3.x
|
||||||
|
|
||||||
|
- Implemented breaking changes in the core system for better performance and feature set.
|
||||||
|
- Continuous core updates to improve stability and performance across minor version increments.
|
||||||
|
|
||||||
|
## 2023-11-02 - 2.0.4 to 2.0.8 - core
|
||||||
|
Core updates and a major fix
|
||||||
|
|
||||||
|
- Implemented core updates addressing minor bugs and enhancements.
|
||||||
|
- A significant breaking change update transitioning from 2.0.x to 3.0.0.
|
||||||
|
|
||||||
|
## 2022-03-31 - 2.0.0 - core
|
||||||
|
Major esm transition
|
||||||
|
|
||||||
|
- Implemented a breaking change by switching the core to ESM (ECMAScript Module) format for modernized module handling.
|
||||||
10
dist/index.d.ts
vendored
10
dist/index.d.ts
vendored
@@ -1,10 +0,0 @@
|
|||||||
export interface IErrorFunction {
|
|
||||||
(err: any): number;
|
|
||||||
}
|
|
||||||
export declare class Smartstream {
|
|
||||||
streamArray: any[];
|
|
||||||
errorFunction: IErrorFunction;
|
|
||||||
constructor(streamArrayArg: any[]);
|
|
||||||
onError(errorFunctionArg: IErrorFunction): void;
|
|
||||||
run(): any;
|
|
||||||
}
|
|
||||||
21
dist/index.js
vendored
21
dist/index.js
vendored
@@ -1,21 +0,0 @@
|
|||||||
"use strict";
|
|
||||||
const plugins = require("./smartstream.plugins");
|
|
||||||
class Smartstream {
|
|
||||||
constructor(streamArrayArg) {
|
|
||||||
this.streamArray = [];
|
|
||||||
this.errorFunction = null;
|
|
||||||
this.streamArray = streamArrayArg;
|
|
||||||
}
|
|
||||||
onError(errorFunctionArg) {
|
|
||||||
this.errorFunction = errorFunctionArg;
|
|
||||||
}
|
|
||||||
run() {
|
|
||||||
let combinedStream = plugins.streamCombiner2.obj(this.streamArray);
|
|
||||||
if (this.errorFunction !== null) {
|
|
||||||
combinedStream.on('error', this.errorFunction);
|
|
||||||
}
|
|
||||||
return combinedStream;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
exports.Smartstream = Smartstream;
|
|
||||||
//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoiaW5kZXguanMiLCJzb3VyY2VSb290IjoiIiwic291cmNlcyI6WyIuLi90cy9pbmRleC50cyJdLCJuYW1lcyI6W10sIm1hcHBpbmdzIjoiO0FBQUEsaURBQWdEO0FBTWhEO0lBR0ksWUFBWSxjQUFxQjtRQUZqQyxnQkFBVyxHQUFHLEVBQUUsQ0FBQTtRQUNoQixrQkFBYSxHQUFtQixJQUFJLENBQUE7UUFFaEMsSUFBSSxDQUFDLFdBQVcsR0FBRyxjQUFjLENBQUE7SUFDckMsQ0FBQztJQUNELE9BQU8sQ0FBQyxnQkFBZ0M7UUFDcEMsSUFBSSxDQUFDLGFBQWEsR0FBRyxnQkFBZ0IsQ0FBQTtJQUN6QyxDQUFDO0lBQ0QsR0FBRztRQUNDLElBQUksY0FBYyxHQUFHLE9BQU8sQ0FBQyxlQUFlLENBQUMsR0FBRyxDQUFDLElBQUksQ0FBQyxXQUFXLENBQUMsQ0FBQTtRQUNsRSxFQUFFLENBQUMsQ0FBQyxJQUFJLENBQUMsYUFBYSxLQUFLLElBQUksQ0FBQyxDQUFDLENBQUM7WUFDOUIsY0FBYyxDQUFDLEVBQUUsQ0FBQyxPQUFPLEVBQUUsSUFBSSxDQUFDLGFBQWEsQ0FBQyxDQUFBO1FBQ2xELENBQUM7UUFDRCxNQUFNLENBQUMsY0FBYyxDQUFBO0lBQ3pCLENBQUM7Q0FDSjtBQWhCRCxrQ0FnQkMifQ==
|
|
||||||
3
dist/smartstream.plugins.d.ts
vendored
3
dist/smartstream.plugins.d.ts
vendored
@@ -1,3 +0,0 @@
|
|||||||
import 'typings-global';
|
|
||||||
export import q = require('q');
|
|
||||||
export declare let streamCombiner2: any;
|
|
||||||
5
dist/smartstream.plugins.js
vendored
5
dist/smartstream.plugins.js
vendored
@@ -1,5 +0,0 @@
|
|||||||
"use strict";
|
|
||||||
require("typings-global");
|
|
||||||
exports.q = require("q");
|
|
||||||
exports.streamCombiner2 = require('stream-combiner2');
|
|
||||||
//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoic21hcnRzdHJlYW0ucGx1Z2lucy5qcyIsInNvdXJjZVJvb3QiOiIiLCJzb3VyY2VzIjpbIi4uL3RzL3NtYXJ0c3RyZWFtLnBsdWdpbnMudHMiXSwibmFtZXMiOltdLCJtYXBwaW5ncyI6IjtBQUFBLDBCQUF1QjtBQUN2Qix5QkFBOEI7QUFDbkIsUUFBQSxlQUFlLEdBQUcsT0FBTyxDQUFDLGtCQUFrQixDQUFDLENBQUEifQ==
|
|
||||||
@@ -1 +1,45 @@
|
|||||||
{}
|
{
|
||||||
|
"@git.zone/cli": {
|
||||||
|
"projectType": "npm",
|
||||||
|
"module": {
|
||||||
|
"githost": "code.foss.global",
|
||||||
|
"gitscope": "push.rocks",
|
||||||
|
"gitrepo": "smartstream",
|
||||||
|
"description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
|
||||||
|
"npmPackagename": "@push.rocks/smartstream",
|
||||||
|
"license": "MIT",
|
||||||
|
"keywords": [
|
||||||
|
"stream",
|
||||||
|
"node.js",
|
||||||
|
"typescript",
|
||||||
|
"stream manipulation",
|
||||||
|
"data processing",
|
||||||
|
"pipeline",
|
||||||
|
"async transformation",
|
||||||
|
"event handling",
|
||||||
|
"backpressure",
|
||||||
|
"readable stream",
|
||||||
|
"writable stream",
|
||||||
|
"duplex stream",
|
||||||
|
"transform stream",
|
||||||
|
"file streaming",
|
||||||
|
"buffer",
|
||||||
|
"stream utilities",
|
||||||
|
"esm"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"release": {
|
||||||
|
"registries": [
|
||||||
|
"https://verdaccio.lossless.digital",
|
||||||
|
"https://registry.npmjs.org"
|
||||||
|
],
|
||||||
|
"accessLevel": "public"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"@git.zone/tsdoc": {
|
||||||
|
"legal": "\n## License and Legal Information\n\nThis repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the [license](license) file within this repository. \n\n**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.\n\n### Trademarks\n\nThis project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.\n\n### Company Information\n\nTask Venture Capital GmbH \nRegistered at District court Bremen HRB 35230 HB, Germany\n\nFor any legal inquiries or if you require further information, please contact us via email at hello@task.vc.\n\nBy using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.\n"
|
||||||
|
},
|
||||||
|
"@ship.zone/szci": {
|
||||||
|
"npmGlobalTools": []
|
||||||
|
}
|
||||||
|
}
|
||||||
81
package.json
81
package.json
@@ -1,32 +1,77 @@
|
|||||||
{
|
{
|
||||||
"name": "smartstream",
|
"name": "@push.rocks/smartstream",
|
||||||
"version": "1.0.2",
|
"version": "3.3.0",
|
||||||
"description": "simplifies access to node streams, TypeScript ready!",
|
"private": false,
|
||||||
"main": "dist/index.js",
|
"description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
|
||||||
"typings": "dist/index.d.ts",
|
"type": "module",
|
||||||
|
"exports": {
|
||||||
|
".": "./dist_ts/index.js",
|
||||||
|
"./web": "./dist_ts_web/index.js"
|
||||||
|
},
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"test": "(npmts)"
|
"test": "(tstest test/)",
|
||||||
|
"build": "(tsbuild tsfolders --allowimplicitany)"
|
||||||
},
|
},
|
||||||
"repository": {
|
"repository": {
|
||||||
"type": "git",
|
"type": "git",
|
||||||
"url": "git+ssh://git@gitlab.com/pushrocks/smartstream.git"
|
"url": "https://code.foss.global/push.rocks/smartstream.git"
|
||||||
},
|
},
|
||||||
"author": "Lossless GmbH",
|
"author": "Lossless GmbH",
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"bugs": {
|
"bugs": {
|
||||||
"url": "https://gitlab.com/pushrocks/smartstream/issues"
|
"url": "https://gitlab.com/push.rocks/smartstream/issues"
|
||||||
},
|
},
|
||||||
"homepage": "https://gitlab.com/pushrocks/smartstream#README",
|
"homepage": "https://code.foss.global/push.rocks/smartstream",
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@types/should": "^8.1.30",
|
"@git.zone/tsbuild": "^4.1.2",
|
||||||
"npmts-g": "^5.2.8",
|
"@git.zone/tsrun": "^2.0.1",
|
||||||
"should": "^11.1.0",
|
"@git.zone/tstest": "^3.1.8",
|
||||||
"typings-test": "^1.0.3"
|
"@push.rocks/tapbundle": "^6.0.3",
|
||||||
|
"@types/node": "^25.3.2"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@types/q": "0.x.x",
|
"@push.rocks/lik": "^6.2.2",
|
||||||
"q": "^1.4.1",
|
"@push.rocks/smartenv": "^6.0.0",
|
||||||
"stream-combiner2": "^1.1.1",
|
"@push.rocks/smartpromise": "^4.2.3",
|
||||||
"typings-global": "^1.0.14"
|
"@push.rocks/smartrx": "^3.0.10"
|
||||||
}
|
},
|
||||||
|
"browserslist": [
|
||||||
|
"last 1 chrome versions"
|
||||||
|
],
|
||||||
|
"files": [
|
||||||
|
"ts/**/*",
|
||||||
|
"ts_web/**/*",
|
||||||
|
"dist/**/*",
|
||||||
|
"dist_*/**/*",
|
||||||
|
"dist_ts/**/*",
|
||||||
|
"dist_ts_web/**/*",
|
||||||
|
"assets/**/*",
|
||||||
|
"cli.js",
|
||||||
|
"npmextra.json",
|
||||||
|
"readme.md"
|
||||||
|
],
|
||||||
|
"pnpm": {
|
||||||
|
"overrides": {
|
||||||
|
"agentkeepalive": "^4.6.0"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"keywords": [
|
||||||
|
"stream",
|
||||||
|
"node.js",
|
||||||
|
"typescript",
|
||||||
|
"stream manipulation",
|
||||||
|
"data processing",
|
||||||
|
"pipeline",
|
||||||
|
"async transformation",
|
||||||
|
"event handling",
|
||||||
|
"backpressure",
|
||||||
|
"readable stream",
|
||||||
|
"writable stream",
|
||||||
|
"duplex stream",
|
||||||
|
"transform stream",
|
||||||
|
"file streaming",
|
||||||
|
"buffer",
|
||||||
|
"stream utilities",
|
||||||
|
"esm"
|
||||||
|
]
|
||||||
}
|
}
|
||||||
|
|||||||
10736
pnpm-lock.yaml
generated
Normal file
10736
pnpm-lock.yaml
generated
Normal file
File diff suppressed because it is too large
Load Diff
1
readme.hints.md
Normal file
1
readme.hints.md
Normal file
@@ -0,0 +1 @@
|
|||||||
|
- make sure to respect backpressure handling.
|
||||||
472
readme.md
Normal file
472
readme.md
Normal file
@@ -0,0 +1,472 @@
|
|||||||
|
# @push.rocks/smartstream
|
||||||
|
|
||||||
|
A TypeScript-first library for creating and manipulating Node.js and Web streams with built-in backpressure handling, async transformations, and seamless Node.js ↔ Web stream interoperability.
|
||||||
|
|
||||||
|
## Issue Reporting and Security
|
||||||
|
|
||||||
|
For reporting bugs, issues, or security vulnerabilities, please visit [community.foss.global/](https://community.foss.global/). This is the central community hub for all issue reporting. Developers who sign and comply with our contribution agreement and go through identification can also get a [code.foss.global/](https://code.foss.global/) account to submit Pull Requests directly.
|
||||||
|
|
||||||
|
## Install
|
||||||
|
|
||||||
|
```bash
|
||||||
|
pnpm install @push.rocks/smartstream
|
||||||
|
```
|
||||||
|
|
||||||
|
The package ships with two entry points:
|
||||||
|
|
||||||
|
| Entry Point | Import Path | Environment |
|
||||||
|
|---|---|---|
|
||||||
|
| **Node.js** (default) | `@push.rocks/smartstream` | Node.js — full stream utilities, duplex, intake, wrappers, and Node↔Web helpers |
|
||||||
|
| **Web** | `@push.rocks/smartstream/web` | Browser & Node.js — pure Web Streams API (`WebDuplexStream`) |
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
All examples use ESM / TypeScript syntax.
|
||||||
|
|
||||||
|
### 📦 Importing
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
// Node.js — full API
|
||||||
|
import {
|
||||||
|
SmartDuplex,
|
||||||
|
StreamWrapper,
|
||||||
|
StreamIntake,
|
||||||
|
createTransformFunction,
|
||||||
|
createPassThrough,
|
||||||
|
nodewebhelpers,
|
||||||
|
} from '@push.rocks/smartstream';
|
||||||
|
|
||||||
|
// Web — browser-safe, zero Node.js dependencies
|
||||||
|
import { WebDuplexStream } from '@push.rocks/smartstream/web';
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 🔄 SmartDuplex — The Core Stream Primitive
|
||||||
|
|
||||||
|
`SmartDuplex` extends Node.js `Duplex` with first-class async support, built-in backpressure management, and a clean functional API. Instead of overriding `_transform` or `_write` manually, you pass a `writeFunction` that receives each chunk along with a `tools` object.
|
||||||
|
|
||||||
|
#### Basic Transform
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import { SmartDuplex } from '@push.rocks/smartstream';
|
||||||
|
|
||||||
|
const upperCaser = new SmartDuplex<Buffer, Buffer>({
|
||||||
|
writeFunction: async (chunk, tools) => {
|
||||||
|
// Return a value to push it downstream
|
||||||
|
return Buffer.from(chunk.toString().toUpperCase());
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
readableStream.pipe(upperCaser).pipe(writableStream);
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Using `tools.push()` for Multiple Outputs
|
||||||
|
|
||||||
|
The `writeFunction` can emit multiple chunks per input via `tools.push()`:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
const splitter = new SmartDuplex<string, string>({
|
||||||
|
objectMode: true,
|
||||||
|
writeFunction: async (chunk, tools) => {
|
||||||
|
const words = chunk.split(' ');
|
||||||
|
for (const word of words) {
|
||||||
|
await tools.push(word);
|
||||||
|
}
|
||||||
|
// Returning nothing — output was already pushed
|
||||||
|
},
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Final Function
|
||||||
|
|
||||||
|
Run cleanup or emit final data when the writable side ends:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
const aggregator = new SmartDuplex<number, number>({
|
||||||
|
objectMode: true,
|
||||||
|
writeFunction: async (chunk, tools) => {
|
||||||
|
runningTotal += chunk;
|
||||||
|
// Don't emit anything per-chunk
|
||||||
|
},
|
||||||
|
finalFunction: async (tools) => {
|
||||||
|
return runningTotal; // Emitted as the last chunk
|
||||||
|
},
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Truncating a Stream Early
|
||||||
|
|
||||||
|
Call `tools.truncate()` inside `writeFunction` to signal that no more data should be read:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
const limiter = new SmartDuplex<string, string>({
|
||||||
|
objectMode: true,
|
||||||
|
writeFunction: async (chunk, tools) => {
|
||||||
|
if (chunk === 'STOP') {
|
||||||
|
tools.truncate();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
return chunk;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Creating from a Buffer
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
const stream = SmartDuplex.fromBuffer(Buffer.from('hello world'));
|
||||||
|
stream.on('data', (chunk) => console.log(chunk.toString())); // "hello world"
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Creating from a Web ReadableStream
|
||||||
|
|
||||||
|
Bridge the Web Streams API into a Node.js Duplex:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
const response = await fetch('https://example.com/data');
|
||||||
|
const nodeDuplex = SmartDuplex.fromWebReadableStream(response.body);
|
||||||
|
|
||||||
|
nodeDuplex.pipe(processTransform).pipe(outputStream);
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Getting Web Streams from SmartDuplex
|
||||||
|
|
||||||
|
Convert a `SmartDuplex` into Web `ReadableStream` + `WritableStream` pair:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
const duplex = new SmartDuplex({
|
||||||
|
writeFunction: async (chunk, tools) => {
|
||||||
|
return transform(chunk);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const { readable, writable } = await duplex.getWebStreams();
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Debug Mode
|
||||||
|
|
||||||
|
Pass `debug: true` and `name` to get detailed internal logs:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
const stream = new SmartDuplex({
|
||||||
|
name: 'MyStream',
|
||||||
|
debug: true,
|
||||||
|
writeFunction: async (chunk, tools) => chunk,
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 🧩 StreamWrapper — Pipeline Composition
|
||||||
|
|
||||||
|
`StreamWrapper` takes an array of streams, pipes them together, attaches error listeners on all of them, and returns a `Promise` that resolves when the pipeline finishes:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import { StreamWrapper } from '@push.rocks/smartstream';
|
||||||
|
import fs from 'fs';
|
||||||
|
|
||||||
|
const pipeline = new StreamWrapper([
|
||||||
|
fs.createReadStream('./input.txt'),
|
||||||
|
new SmartDuplex({
|
||||||
|
writeFunction: async (chunk) => Buffer.from(chunk.toString().toUpperCase()),
|
||||||
|
}),
|
||||||
|
fs.createWriteStream('./output.txt'),
|
||||||
|
]);
|
||||||
|
|
||||||
|
await pipeline.run();
|
||||||
|
console.log('Pipeline complete!');
|
||||||
|
```
|
||||||
|
|
||||||
|
Error handling is automatic — if any stream in the array errors, the returned promise rejects:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
pipeline.run()
|
||||||
|
.then(() => console.log('Done'))
|
||||||
|
.catch((err) => console.error('Pipeline failed:', err));
|
||||||
|
```
|
||||||
|
|
||||||
|
You can also listen for custom events across all streams:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
pipeline.onCustomEvent('progress', () => {
|
||||||
|
console.log('Progress event fired');
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 📥 StreamIntake — Dynamic Data Injection
|
||||||
|
|
||||||
|
`StreamIntake` is a `Readable` stream that lets you programmatically push data into a pipeline. It operates in object mode by default and provides a reactive observable (`pushNextObservable`) for demand-driven data production.
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import { StreamIntake, SmartDuplex } from '@push.rocks/smartstream';
|
||||||
|
|
||||||
|
const intake = new StreamIntake<string>();
|
||||||
|
|
||||||
|
// Pipe through a transform
|
||||||
|
intake
|
||||||
|
.pipe(new SmartDuplex({
|
||||||
|
objectMode: true,
|
||||||
|
writeFunction: async (chunk) => {
|
||||||
|
console.log('Processing:', chunk);
|
||||||
|
return chunk;
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
.on('data', (data) => console.log('Output:', data));
|
||||||
|
|
||||||
|
// Push data whenever it's ready
|
||||||
|
intake.pushData('Hello');
|
||||||
|
intake.pushData('World');
|
||||||
|
intake.signalEnd(); // Signal end-of-stream
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Demand-driven Production with Observable
|
||||||
|
|
||||||
|
`pushNextObservable` emits whenever the stream is ready for more data — perfect for throttled or event-driven producers:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
const intake = new StreamIntake<number>();
|
||||||
|
|
||||||
|
let counter = 0;
|
||||||
|
intake.pushNextObservable.subscribe(() => {
|
||||||
|
if (counter < 100) {
|
||||||
|
intake.pushData(counter++);
|
||||||
|
} else {
|
||||||
|
intake.signalEnd();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
intake.pipe(consumer);
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Creating from Existing Streams
|
||||||
|
|
||||||
|
Wrap a Node.js `Readable` or a Web `ReadableStream`:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
// From Node.js Readable
|
||||||
|
const intake = await StreamIntake.fromStream<Buffer>(fs.createReadStream('./data.bin'));
|
||||||
|
|
||||||
|
// From Web ReadableStream
|
||||||
|
const response = await fetch('https://example.com/stream');
|
||||||
|
const intake = await StreamIntake.fromStream<Uint8Array>(response.body);
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### ⚡ Utility Functions
|
||||||
|
|
||||||
|
#### `createTransformFunction`
|
||||||
|
|
||||||
|
Quickly create a `SmartDuplex` from a simple async mapping function:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import { createTransformFunction } from '@push.rocks/smartstream';
|
||||||
|
|
||||||
|
const doubler = createTransformFunction<number, number>(async (n) => n * 2);
|
||||||
|
|
||||||
|
intakeStream.pipe(doubler).pipe(outputStream);
|
||||||
|
```
|
||||||
|
|
||||||
|
#### `createPassThrough`
|
||||||
|
|
||||||
|
Create an object-mode passthrough stream (useful as an intermediary or tee point):
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import { createPassThrough } from '@push.rocks/smartstream';
|
||||||
|
|
||||||
|
const passThrough = createPassThrough();
|
||||||
|
source.pipe(passThrough).pipe(destination);
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 🌐 WebDuplexStream — Pure Web Streams API
|
||||||
|
|
||||||
|
`WebDuplexStream` extends `TransformStream` and works in both browsers and Node.js. Import it from the `/web` subpath for zero Node.js dependencies.
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import { WebDuplexStream } from '@push.rocks/smartstream/web';
|
||||||
|
|
||||||
|
const stream = new WebDuplexStream<number, number>({
|
||||||
|
writeFunction: async (chunk, { push }) => {
|
||||||
|
push(chunk * 2); // Push transformed data
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const writer = stream.writable.getWriter();
|
||||||
|
const reader = stream.readable.getReader();
|
||||||
|
|
||||||
|
// Write
|
||||||
|
await writer.write(5);
|
||||||
|
await writer.write(10);
|
||||||
|
await writer.close();
|
||||||
|
|
||||||
|
// Read
|
||||||
|
const { value } = await reader.read(); // 10
|
||||||
|
const { value: v2 } = await reader.read(); // 20
|
||||||
|
```
|
||||||
|
|
||||||
|
#### From a Uint8Array
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
const stream = WebDuplexStream.fromUInt8Array(new Uint8Array([1, 2, 3]));
|
||||||
|
const reader = stream.readable.getReader();
|
||||||
|
const { value } = await reader.read(); // Uint8Array [1, 2, 3]
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Data Production with `readFunction`
|
||||||
|
|
||||||
|
Supply data into the stream from any async source:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
const stream = new WebDuplexStream<string, string>({
|
||||||
|
readFunction: async (tools) => {
|
||||||
|
await tools.write('chunk 1');
|
||||||
|
await tools.write('chunk 2');
|
||||||
|
tools.done(); // Signal end
|
||||||
|
},
|
||||||
|
writeFunction: async (chunk, { push }) => {
|
||||||
|
push(chunk.toUpperCase());
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const reader = stream.readable.getReader();
|
||||||
|
// reads "CHUNK 1", "CHUNK 2"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 🔀 Node ↔ Web Stream Converters
|
||||||
|
|
||||||
|
The `nodewebhelpers` namespace provides bidirectional converters between Node.js and Web Streams:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import { nodewebhelpers } from '@push.rocks/smartstream';
|
||||||
|
```
|
||||||
|
|
||||||
|
| Function | From | To |
|
||||||
|
|---|---|---|
|
||||||
|
| `createWebReadableStreamFromFile(path)` | File path | Web `ReadableStream<Uint8Array>` |
|
||||||
|
| `convertWebReadableToNodeReadable(webStream)` | Web `ReadableStream` | Node.js `Readable` |
|
||||||
|
| `convertNodeReadableToWebReadable(nodeStream)` | Node.js `Readable` | Web `ReadableStream` |
|
||||||
|
| `convertWebWritableToNodeWritable(webWritable)` | Web `WritableStream` | Node.js `Writable` |
|
||||||
|
| `convertNodeWritableToWebWritable(nodeWritable)` | Node.js `Writable` | Web `WritableStream` |
|
||||||
|
|
||||||
|
#### Example: Serve a File as a Web ReadableStream
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
const webStream = nodewebhelpers.createWebReadableStreamFromFile('./video.mp4');
|
||||||
|
|
||||||
|
// Use with fetch Response, service workers, etc.
|
||||||
|
return new Response(webStream, {
|
||||||
|
headers: { 'Content-Type': 'video/mp4' },
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Example: Convert Between Stream Types
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import fs from 'fs';
|
||||||
|
import { nodewebhelpers } from '@push.rocks/smartstream';
|
||||||
|
|
||||||
|
// Node → Web
|
||||||
|
const nodeReadable = fs.createReadStream('./data.bin');
|
||||||
|
const webReadable = nodewebhelpers.convertNodeReadableToWebReadable(nodeReadable);
|
||||||
|
|
||||||
|
// Web → Node
|
||||||
|
const nodeReadable2 = nodewebhelpers.convertWebReadableToNodeReadable(webReadable);
|
||||||
|
nodeReadable2.pipe(fs.createWriteStream('./copy.bin'));
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 🏗️ Backpressure Handling
|
||||||
|
|
||||||
|
`SmartDuplex` uses a `BackpressuredArray` internally, bounded by `highWaterMark` (default: 1). When the downstream consumer is slow, the stream automatically pauses the upstream producer until space is available — no manual bookkeeping required.
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
const slow = new SmartDuplex({
|
||||||
|
name: 'SlowConsumer',
|
||||||
|
objectMode: true,
|
||||||
|
highWaterMark: 1,
|
||||||
|
writeFunction: async (chunk, tools) => {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||||
|
return chunk;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const fast = new SmartDuplex({
|
||||||
|
name: 'FastProducer',
|
||||||
|
objectMode: true,
|
||||||
|
writeFunction: async (chunk, tools) => {
|
||||||
|
return chunk; // Instant processing
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Backpressure is handled automatically between fast → slow
|
||||||
|
fast.pipe(slow).on('data', (d) => console.log(d));
|
||||||
|
|
||||||
|
for (let i = 0; i < 100; i++) {
|
||||||
|
fast.write(`chunk-${i}`);
|
||||||
|
}
|
||||||
|
fast.end();
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 🎯 Real-World Example: Processing Pipeline
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import fs from 'fs';
|
||||||
|
import { SmartDuplex, StreamWrapper } from '@push.rocks/smartstream';
|
||||||
|
|
||||||
|
// Read → Transform → Filter → Write
|
||||||
|
const pipeline = new StreamWrapper([
|
||||||
|
fs.createReadStream('./access.log'),
|
||||||
|
new SmartDuplex({
|
||||||
|
writeFunction: async (chunk) => {
|
||||||
|
// Parse each line
|
||||||
|
return chunk.toString().split('\n');
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
new SmartDuplex({
|
||||||
|
objectMode: true,
|
||||||
|
writeFunction: async (lines: string[], tools) => {
|
||||||
|
// Filter and push matching lines
|
||||||
|
for (const line of lines) {
|
||||||
|
if (line.includes('ERROR')) {
|
||||||
|
await tools.push(line + '\n');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
fs.createWriteStream('./errors.log'),
|
||||||
|
]);
|
||||||
|
|
||||||
|
await pipeline.run();
|
||||||
|
console.log('Error extraction complete');
|
||||||
|
```
|
||||||
|
|
||||||
|
## License and Legal Information
|
||||||
|
|
||||||
|
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
|
||||||
|
|
||||||
|
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
|
||||||
|
|
||||||
|
### Trademarks
|
||||||
|
|
||||||
|
This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH or third parties, and are not included within the scope of the MIT license granted herein.
|
||||||
|
|
||||||
|
Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines or the guidelines of the respective third-party owners, and any usage must be approved in writing. Third-party trademarks used herein are the property of their respective owners and used only in a descriptive manner, e.g. for an implementation of an API or similar.
|
||||||
|
|
||||||
|
### Company Information
|
||||||
|
|
||||||
|
Task Venture Capital GmbH
|
||||||
|
Registered at District Court Bremen HRB 35230 HB, Germany
|
||||||
|
|
||||||
|
For any legal inquiries or further information, please contact us via email at hello@task.vc.
|
||||||
|
|
||||||
|
By using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.
|
||||||
0
readme_instructions.md
Normal file
0
readme_instructions.md
Normal file
6210
test/assets/readabletext.txt
Normal file
6210
test/assets/readabletext.txt
Normal file
File diff suppressed because it is too large
Load Diff
50
test/assets/writabletext.txt
Normal file
50
test/assets/writabletext.txt
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
|
hi+wow
|
||||||
1
test/test.d.ts
vendored
1
test/test.d.ts
vendored
@@ -1 +0,0 @@
|
|||||||
import 'typings-test';
|
|
||||||
15
test/test.js
15
test/test.js
@@ -1,15 +0,0 @@
|
|||||||
"use strict";
|
|
||||||
require("typings-test");
|
|
||||||
const fs = require("fs");
|
|
||||||
const smartstream = require("../dist/index");
|
|
||||||
let testSmartstream;
|
|
||||||
describe('smartstream', function () {
|
|
||||||
it('should combine a stream', function () {
|
|
||||||
testSmartstream = new smartstream.Smartstream([
|
|
||||||
fs.createReadStream('./test/assets/test.md'),
|
|
||||||
fs.createWriteStream('./test/assets/testCopy.md')
|
|
||||||
]);
|
|
||||||
testSmartstream.run();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoidGVzdC5qcyIsInNvdXJjZVJvb3QiOiIiLCJzb3VyY2VzIjpbInRlc3QudHMiXSwibmFtZXMiOltdLCJtYXBwaW5ncyI6IjtBQUFBLHdCQUFxQjtBQUNyQix5QkFBeUI7QUFHekIsNkNBQTRDO0FBRTVDLElBQUksZUFBd0MsQ0FBQTtBQUU1QyxRQUFRLENBQUMsYUFBYSxFQUFFO0lBQ3BCLEVBQUUsQ0FBQyx5QkFBeUIsRUFBRTtRQUMxQixlQUFlLEdBQUcsSUFBSSxXQUFXLENBQUMsV0FBVyxDQUFDO1lBQzFDLEVBQUUsQ0FBQyxnQkFBZ0IsQ0FBQyx1QkFBdUIsQ0FBQztZQUM1QyxFQUFFLENBQUMsaUJBQWlCLENBQUMsMkJBQTJCLENBQUM7U0FDcEQsQ0FBQyxDQUFBO1FBQ0YsZUFBZSxDQUFDLEdBQUcsRUFBRSxDQUFBO0lBQ3pCLENBQUMsQ0FBQyxDQUFBO0FBQ04sQ0FBQyxDQUFDLENBQUEifQ==
|
|
||||||
17
test/test.ts
17
test/test.ts
@@ -1,17 +0,0 @@
|
|||||||
import 'typings-test'
|
|
||||||
import fs = require('fs')
|
|
||||||
import * as should from 'should'
|
|
||||||
|
|
||||||
import * as smartstream from '../dist/index'
|
|
||||||
|
|
||||||
let testSmartstream: smartstream.Smartstream
|
|
||||||
|
|
||||||
describe('smartstream', function() {
|
|
||||||
it('should combine a stream', function(){
|
|
||||||
testSmartstream = new smartstream.Smartstream([
|
|
||||||
fs.createReadStream('./test/assets/test.md'),
|
|
||||||
fs.createWriteStream('./test/assets/testCopy.md')
|
|
||||||
])
|
|
||||||
testSmartstream.run()
|
|
||||||
})
|
|
||||||
})
|
|
||||||
68
test/test.ts.backpressure.ts
Normal file
68
test/test.ts.backpressure.ts
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
import { tap, expect } from '@push.rocks/tapbundle';
|
||||||
|
import { SmartDuplex, type ISmartDuplexOptions, StreamWrapper } from '../ts/index.js';
|
||||||
|
|
||||||
|
tap.test('should run backpressure test', async (toolsArg) => {
|
||||||
|
const done = toolsArg.defer();
|
||||||
|
async function testBackpressure() {
|
||||||
|
const stream1 = new SmartDuplex({
|
||||||
|
name: 'stream1',
|
||||||
|
objectMode: true,
|
||||||
|
writeFunction: async (chunk, tools) => {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 10)); // Slow processing
|
||||||
|
console.log(`processed chunk ${chunk} in stream 1`);
|
||||||
|
return chunk; // Fast processing
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const stream2 = new SmartDuplex({
|
||||||
|
name: 'stream2',
|
||||||
|
objectMode: true,
|
||||||
|
writeFunction: async (chunk, tools) => {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 20)); // Slow processing
|
||||||
|
console.log(`processed chunk ${chunk} in stream 2`);
|
||||||
|
await tools.push(chunk);
|
||||||
|
// return chunk, optionally return ;
|
||||||
|
},
|
||||||
|
}); // This stream processes data more slowly
|
||||||
|
const stream3 = new SmartDuplex({
|
||||||
|
objectMode: true,
|
||||||
|
name: 'stream3',
|
||||||
|
writeFunction: async (chunk, tools) => {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 200)); // Slow processing
|
||||||
|
console.log(`processed chunk ${chunk} in stream 3`);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
stream1.pipe(stream2).pipe(stream3);
|
||||||
|
|
||||||
|
let backpressured = false;
|
||||||
|
for (let i = 0; i < 200; i++) {
|
||||||
|
const canContinue = stream1.write(`Chunk ${i}`, 'utf8');
|
||||||
|
if (!canContinue) {
|
||||||
|
backpressured = true;
|
||||||
|
console.log(`Backpressure at chunk ${i}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stream1.end();
|
||||||
|
|
||||||
|
stream1.on('finish', () => {
|
||||||
|
console.log('Stream 1 finished processing.');
|
||||||
|
});
|
||||||
|
stream2.on('finish', () => {
|
||||||
|
console.log('Stream 2 finished processing.');
|
||||||
|
});
|
||||||
|
stream3.on('finish', () => {
|
||||||
|
console.log('Stream 3 finished processing.');
|
||||||
|
if (!backpressured) {
|
||||||
|
throw new Error('No backpressure was observed.');
|
||||||
|
} else {
|
||||||
|
done.resolve();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
testBackpressure();
|
||||||
|
await done.promise;
|
||||||
|
});
|
||||||
|
|
||||||
|
export default tap.start();
|
||||||
24
test/test.ts.smartstream.ts
Normal file
24
test/test.ts.smartstream.ts
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
import { expect, tap } from '@push.rocks/tapbundle';
|
||||||
|
import { SmartDuplex } from '../ts/smartstream.classes.smartduplex.js'; // Adjust the import to your file structure
|
||||||
|
import * as smartrx from '@push.rocks/smartrx';
|
||||||
|
import * as fs from 'fs';
|
||||||
|
|
||||||
|
tap.test('should create a SmartStream from a Buffer', async () => {
|
||||||
|
const bufferData = Buffer.from('This is a test buffer');
|
||||||
|
const smartStream = SmartDuplex.fromBuffer(bufferData, {});
|
||||||
|
|
||||||
|
let receivedData = Buffer.alloc(0);
|
||||||
|
|
||||||
|
return new Promise<void>((resolve) => {
|
||||||
|
smartStream.on('data', (chunk: Buffer) => {
|
||||||
|
receivedData = Buffer.concat([receivedData, chunk]);
|
||||||
|
});
|
||||||
|
|
||||||
|
smartStream.on('end', () => {
|
||||||
|
expect(receivedData.toString()).toEqual(bufferData.toString());
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
export default tap.start();
|
||||||
65
test/test.ts.streamfunction.ts
Normal file
65
test/test.ts.streamfunction.ts
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
import { expect, tap } from '@push.rocks/tapbundle';
|
||||||
|
import * as fs from 'fs';
|
||||||
|
|
||||||
|
import * as smartstream from '../ts/index.js';
|
||||||
|
|
||||||
|
let testIntake: smartstream.StreamIntake<string>;
|
||||||
|
|
||||||
|
tap.test('should handle a read stream', async (tools) => {
|
||||||
|
const counter = 0;
|
||||||
|
const streamWrapper = new smartstream.StreamWrapper([
|
||||||
|
fs.createReadStream('./test/assets/readabletext.txt'),
|
||||||
|
new smartstream.SmartDuplex({
|
||||||
|
writeFunction: async (chunkStringArg: Buffer, streamTools) => {
|
||||||
|
// do something with the stream here
|
||||||
|
const result = chunkStringArg.toString().substr(0, 100);
|
||||||
|
streamTools.push('wow =========== \n');
|
||||||
|
return Buffer.from(result);
|
||||||
|
},
|
||||||
|
finalFunction: async (tools) => {
|
||||||
|
return Buffer.from('this is the end');
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
new smartstream.SmartDuplex({
|
||||||
|
writeFunction: async (chunkStringArg) => {
|
||||||
|
console.log(chunkStringArg.toString());
|
||||||
|
},
|
||||||
|
finalFunction: async (tools) => {
|
||||||
|
tools.push(null);
|
||||||
|
},
|
||||||
|
})
|
||||||
|
]);
|
||||||
|
await streamWrapper.run();
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('should create a valid Intake', async (tools) => {
|
||||||
|
testIntake = new smartstream.StreamIntake<string>();
|
||||||
|
testIntake.pipe(
|
||||||
|
new smartstream.SmartDuplex({
|
||||||
|
objectMode: true,
|
||||||
|
writeFunction: async (chunkStringArg: string, streamTools) => {
|
||||||
|
await tools.delayFor(100);
|
||||||
|
console.log(chunkStringArg);
|
||||||
|
return chunkStringArg;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
)
|
||||||
|
.pipe(fs.createWriteStream('./test/assets/writabletext.txt'));
|
||||||
|
const testFinished = tools.defer();
|
||||||
|
let counter = 0;
|
||||||
|
testIntake.pushNextObservable.subscribe(() => {
|
||||||
|
if (counter < 50) {
|
||||||
|
counter++;
|
||||||
|
testIntake.pushData('hi');
|
||||||
|
testIntake.pushData('+wow');
|
||||||
|
testIntake.pushData('\n');
|
||||||
|
} else {
|
||||||
|
testIntake.signalEnd();
|
||||||
|
testFinished.resolve();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
await testFinished.promise;
|
||||||
|
testIntake.signalEnd();
|
||||||
|
});
|
||||||
|
|
||||||
|
export default tap.start();
|
||||||
15
test/test.ts.ts
Normal file
15
test/test.ts.ts
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
import * as fs from 'fs';
|
||||||
|
import { expect, tap } from '@push.rocks/tapbundle';
|
||||||
|
|
||||||
|
import * as smartstream from '../ts/smartstream.classes.streamwrapper.js';
|
||||||
|
|
||||||
|
let testSmartstream: smartstream.StreamWrapper;
|
||||||
|
tap.test('should combine a stream', async () => {
|
||||||
|
testSmartstream = new smartstream.StreamWrapper([
|
||||||
|
fs.createReadStream('./test/assets/test.md'),
|
||||||
|
fs.createWriteStream('./test/assets/testCopy.md'),
|
||||||
|
]);
|
||||||
|
await testSmartstream.run();
|
||||||
|
});
|
||||||
|
|
||||||
|
export default tap.start();
|
||||||
67
test/test.ts_web.both.ts
Normal file
67
test/test.ts_web.both.ts
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
import { expect, tap } from '@push.rocks/tapbundle';
|
||||||
|
import * as webstream from '../ts_web/index.js';
|
||||||
|
|
||||||
|
tap.test('WebDuplexStream fromUInt8Array should read back the same Uint8Array', async () => {
|
||||||
|
const inputUint8Array = new Uint8Array([1, 2, 3, 4, 5]);
|
||||||
|
const stream = webstream.WebDuplexStream.fromUInt8Array(inputUint8Array);
|
||||||
|
|
||||||
|
const reader = stream.readable.getReader();
|
||||||
|
let readUint8Array = new Uint8Array();
|
||||||
|
|
||||||
|
// Read from the stream
|
||||||
|
while (true) {
|
||||||
|
const { value, done } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
if (value) {
|
||||||
|
// Concatenate value to readUint8Array
|
||||||
|
const tempArray = new Uint8Array(readUint8Array.length + value.length);
|
||||||
|
tempArray.set(readUint8Array, 0);
|
||||||
|
tempArray.set(value, readUint8Array.length);
|
||||||
|
readUint8Array = tempArray;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(readUint8Array).toEqual(inputUint8Array);
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('WebDuplexStream should handle transform with a write function', async () => {
|
||||||
|
const input = [1, 2, 3, 4, 5];
|
||||||
|
const expectedOutput = [2, 4, 6, 8, 10];
|
||||||
|
|
||||||
|
const webDuplexStream = new webstream.WebDuplexStream<number, number>({
|
||||||
|
writeFunction: async (chunk, { push }) => {
|
||||||
|
// Push the doubled number into the stream
|
||||||
|
push(chunk * 2);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const writer = webDuplexStream.writable.getWriter();
|
||||||
|
const reader = webDuplexStream.readable.getReader();
|
||||||
|
|
||||||
|
const output: number[] = [];
|
||||||
|
|
||||||
|
// Read from the stream asynchronously
|
||||||
|
const readPromise = (async () => {
|
||||||
|
while (true) {
|
||||||
|
const { value, done } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
if (value !== undefined) {
|
||||||
|
output.push(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
// Write to the stream
|
||||||
|
for (const num of input) {
|
||||||
|
await writer.write(num);
|
||||||
|
}
|
||||||
|
await writer.close();
|
||||||
|
|
||||||
|
// Wait for the reading to complete
|
||||||
|
await readPromise;
|
||||||
|
|
||||||
|
// Assert that the output matches the expected transformed data
|
||||||
|
expect(output).toEqual(expectedOutput);
|
||||||
|
});
|
||||||
|
|
||||||
|
export default tap.start();
|
||||||
8
ts/00_commitinfo_data.ts
Normal file
8
ts/00_commitinfo_data.ts
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
/**
|
||||||
|
* autocreated commitinfo by @push.rocks/commitinfo
|
||||||
|
*/
|
||||||
|
export const commitinfo = {
|
||||||
|
name: '@push.rocks/smartstream',
|
||||||
|
version: '3.3.0',
|
||||||
|
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
|
||||||
|
}
|
||||||
36
ts/index.ts
36
ts/index.ts
@@ -1,23 +1,17 @@
|
|||||||
import * as plugins from './smartstream.plugins'
|
import { stream } from './smartstream.plugins.js';
|
||||||
|
export {
|
||||||
export interface IErrorFunction {
|
stream,
|
||||||
(err): number
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export class Smartstream {
|
export * from './smartstream.classes.smartduplex.js';
|
||||||
streamArray = []
|
export * from './smartstream.classes.streamwrapper.js';
|
||||||
errorFunction: IErrorFunction = null
|
export * from './smartstream.classes.streamintake.js';
|
||||||
constructor(streamArrayArg: any[]){
|
|
||||||
this.streamArray = streamArrayArg
|
export * from './smartstream.functions.js';
|
||||||
}
|
|
||||||
onError(errorFunctionArg: IErrorFunction) {
|
import * as plugins from './smartstream.plugins.js';
|
||||||
this.errorFunction = errorFunctionArg
|
export const webstream = plugins.webstream;
|
||||||
}
|
import * as nodewebhelpers from './smartstream.nodewebhelpers.js';
|
||||||
run() {
|
export {
|
||||||
let combinedStream = plugins.streamCombiner2.obj(this.streamArray)
|
nodewebhelpers,
|
||||||
if (this.errorFunction !== null) {
|
}
|
||||||
combinedStream.on('error', this.errorFunction)
|
|
||||||
}
|
|
||||||
return combinedStream
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
240
ts/smartstream.classes.smartduplex.ts
Normal file
240
ts/smartstream.classes.smartduplex.ts
Normal file
@@ -0,0 +1,240 @@
|
|||||||
|
import * as plugins from './smartstream.plugins.js';
|
||||||
|
import { Duplex, type DuplexOptions } from 'stream';
|
||||||
|
|
||||||
|
export interface IStreamTools {
|
||||||
|
truncate: () => void;
|
||||||
|
push: (pipeObject: any) => Promise<boolean>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface IStreamWriteFunction<T, rT> {
|
||||||
|
(chunkArg: T, toolsArg: IStreamTools): Promise<rT>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface IStreamFinalFunction<rT> {
|
||||||
|
(toolsArg: IStreamTools): Promise<rT>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ISmartDuplexOptions<TInput, TOutput> extends DuplexOptions {
|
||||||
|
/**
|
||||||
|
* wether to print debug logs
|
||||||
|
*/
|
||||||
|
debug?: boolean;
|
||||||
|
/**
|
||||||
|
* the name of the stream
|
||||||
|
*/
|
||||||
|
name?: string;
|
||||||
|
/**
|
||||||
|
* a function that is being called to read more stuff from whereever to be processed by the stream
|
||||||
|
* @returns
|
||||||
|
*/
|
||||||
|
readFunction?: () => Promise<void>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* the write function is called for every chunk that is being written to the stream
|
||||||
|
* it can push or return chunks (but does not have to) to be written to the readable side of the stream
|
||||||
|
*/
|
||||||
|
writeFunction?: IStreamWriteFunction<TInput, TOutput>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* a final function that is run at the end of the stream
|
||||||
|
*/
|
||||||
|
finalFunction?: IStreamFinalFunction<TOutput>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
||||||
|
// STATIC
|
||||||
|
static fromBuffer(buffer: Buffer, options?: ISmartDuplexOptions<any, any>): SmartDuplex {
|
||||||
|
const smartDuplex = new SmartDuplex(options);
|
||||||
|
process.nextTick(() => {
|
||||||
|
smartDuplex.push(buffer);
|
||||||
|
smartDuplex.push(null); // Signal the end of the data
|
||||||
|
});
|
||||||
|
return smartDuplex;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static fromWebReadableStream<T = any>(
|
||||||
|
readableStream: ReadableStream<T>
|
||||||
|
): SmartDuplex<T, T> {
|
||||||
|
const smartDuplex = new SmartDuplex<T, T>({
|
||||||
|
/**
|
||||||
|
* this function is called whenever the stream is being read from and at the same time if nothing is enqueued
|
||||||
|
* therefor it is important to always unlock the reader after reading
|
||||||
|
*/
|
||||||
|
readFunction: async () => {
|
||||||
|
const reader = readableStream.getReader();
|
||||||
|
const { value, done } = await reader.read();
|
||||||
|
if (value !== undefined) {
|
||||||
|
smartDuplex.push(value);
|
||||||
|
}
|
||||||
|
reader.releaseLock();
|
||||||
|
if (done) {
|
||||||
|
smartDuplex.push(null);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
});
|
||||||
|
return smartDuplex;
|
||||||
|
}
|
||||||
|
|
||||||
|
// INSTANCE
|
||||||
|
private backpressuredArray: plugins.lik.BackpressuredArray<TOutput>; // an array that only takes a defined amount of items
|
||||||
|
public options: ISmartDuplexOptions<TInput, TOutput>;
|
||||||
|
private observableSubscription?: plugins.smartrx.rxjs.Subscription;
|
||||||
|
private debugLog(messageArg: string) {
|
||||||
|
// optional debug log
|
||||||
|
if (this.options.debug) {
|
||||||
|
console.log(messageArg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
constructor(optionsArg?: ISmartDuplexOptions<TInput, TOutput>) {
|
||||||
|
super(
|
||||||
|
Object.assign(
|
||||||
|
{
|
||||||
|
highWaterMark: 1,
|
||||||
|
},
|
||||||
|
optionsArg
|
||||||
|
)
|
||||||
|
);
|
||||||
|
this.options = optionsArg;
|
||||||
|
this.backpressuredArray = new plugins.lik.BackpressuredArray<TOutput>(
|
||||||
|
this.options.highWaterMark || 1
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async _read(size: number): Promise<void> {
|
||||||
|
this.debugLog(`${this.options.name}: read was called`);
|
||||||
|
if (this.options.readFunction) {
|
||||||
|
await this.options.readFunction();
|
||||||
|
}
|
||||||
|
await this.backpressuredArray.waitForItems();
|
||||||
|
this.debugLog(`${this.options.name}: successfully waited for items.`);
|
||||||
|
let canPushMore = true;
|
||||||
|
while (this.backpressuredArray.data.length > 0 && canPushMore) {
|
||||||
|
const nextChunk = this.backpressuredArray.shift();
|
||||||
|
canPushMore = this.push(nextChunk);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async backpressuredPush(pushArg: TOutput) {
|
||||||
|
const canPushMore = this.backpressuredArray.push(pushArg);
|
||||||
|
if (!canPushMore) {
|
||||||
|
this.debugLog(`${this.options.name}: cannot push more`);
|
||||||
|
await this.backpressuredArray.waitForSpace();
|
||||||
|
this.debugLog(`${this.options.name}: can push more again`);
|
||||||
|
}
|
||||||
|
return canPushMore;
|
||||||
|
}
|
||||||
|
|
||||||
|
private asyncWritePromiseObjectmap = new plugins.lik.ObjectMap<Promise<any>>();
|
||||||
|
// Ensure the _write method types the chunk as TInput and encodes TOutput
|
||||||
|
public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) {
|
||||||
|
if (!this.options.writeFunction) {
|
||||||
|
return callback(new Error('No stream function provided'));
|
||||||
|
}
|
||||||
|
|
||||||
|
let isTruncated = false;
|
||||||
|
const tools: IStreamTools = {
|
||||||
|
truncate: () => {
|
||||||
|
this.push(null);
|
||||||
|
isTruncated = true;
|
||||||
|
callback();
|
||||||
|
},
|
||||||
|
push: async (pushArg: TOutput) => {
|
||||||
|
return await this.backpressuredPush(pushArg);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
const writeDeferred = plugins.smartpromise.defer();
|
||||||
|
this.asyncWritePromiseObjectmap.add(writeDeferred.promise);
|
||||||
|
const modifiedChunk = await this.options.writeFunction(chunk, tools);
|
||||||
|
if (isTruncated) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (modifiedChunk) {
|
||||||
|
await tools.push(modifiedChunk);
|
||||||
|
}
|
||||||
|
callback();
|
||||||
|
writeDeferred.resolve();
|
||||||
|
writeDeferred.promise.then(() => {
|
||||||
|
this.asyncWritePromiseObjectmap.remove(writeDeferred.promise);
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
callback(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async _final(callback: (error?: Error | null) => void) {
|
||||||
|
await Promise.all(this.asyncWritePromiseObjectmap.getArray());
|
||||||
|
if (this.options.finalFunction) {
|
||||||
|
const tools: IStreamTools = {
|
||||||
|
truncate: () => callback(),
|
||||||
|
push: async (pipeObject) => {
|
||||||
|
return this.backpressuredArray.push(pipeObject);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
const finalChunk = await this.options.finalFunction(tools);
|
||||||
|
if (finalChunk) {
|
||||||
|
this.backpressuredArray.push(finalChunk);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
this.backpressuredArray.push(null);
|
||||||
|
callback(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.backpressuredArray.push(null);
|
||||||
|
callback();
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getWebStreams(): Promise<{ readable: ReadableStream; writable: WritableStream }> {
|
||||||
|
const duplex = this;
|
||||||
|
const readable = new ReadableStream({
|
||||||
|
start(controller) {
|
||||||
|
duplex.on('readable', () => {
|
||||||
|
let chunk;
|
||||||
|
while (null !== (chunk = duplex.read())) {
|
||||||
|
controller.enqueue(chunk);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
duplex.on('end', () => {
|
||||||
|
controller.close();
|
||||||
|
});
|
||||||
|
},
|
||||||
|
cancel(reason) {
|
||||||
|
duplex.destroy(new Error(reason));
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const writable = new WritableStream({
|
||||||
|
write(chunk) {
|
||||||
|
return new Promise<void>((resolve, reject) => {
|
||||||
|
const isBackpressured = !duplex.write(chunk, (error) => {
|
||||||
|
if (error) {
|
||||||
|
reject(error);
|
||||||
|
} else {
|
||||||
|
resolve();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (isBackpressured) {
|
||||||
|
duplex.once('drain', resolve);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
close() {
|
||||||
|
return new Promise<void>((resolve, reject) => {
|
||||||
|
duplex.end(resolve);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
abort(reason) {
|
||||||
|
duplex.destroy(new Error(reason));
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
return { readable, writable };
|
||||||
|
}
|
||||||
|
}
|
||||||
91
ts/smartstream.classes.streamintake.ts
Normal file
91
ts/smartstream.classes.streamintake.ts
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
import * as plugins from './smartstream.plugins.js';
|
||||||
|
|
||||||
|
export class StreamIntake<T> extends plugins.stream.Readable {
|
||||||
|
// STATIC
|
||||||
|
public static async fromStream<U>(inputStream: plugins.stream.Readable | ReadableStream, options?: plugins.stream.ReadableOptions): Promise<StreamIntake<U>> {
|
||||||
|
const intakeStream = new StreamIntake<U>(options);
|
||||||
|
|
||||||
|
if (inputStream instanceof plugins.stream.Readable) {
|
||||||
|
inputStream.on('data', (chunk: U) => {
|
||||||
|
intakeStream.pushData(chunk);
|
||||||
|
});
|
||||||
|
|
||||||
|
inputStream.on('end', () => {
|
||||||
|
intakeStream.signalEnd();
|
||||||
|
});
|
||||||
|
|
||||||
|
inputStream.on('error', (err: Error) => {
|
||||||
|
intakeStream.destroy(err);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
const reader = (inputStream as ReadableStream).getReader();
|
||||||
|
|
||||||
|
const readChunk = () => {
|
||||||
|
reader.read().then(({ done, value }) => {
|
||||||
|
if (done) {
|
||||||
|
intakeStream.signalEnd();
|
||||||
|
} else {
|
||||||
|
intakeStream.pushData(value);
|
||||||
|
readChunk();
|
||||||
|
}
|
||||||
|
}).catch((err) => {
|
||||||
|
intakeStream.destroy(err);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
readChunk();
|
||||||
|
}
|
||||||
|
|
||||||
|
return intakeStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
// INSTANCE
|
||||||
|
private signalEndBoolean = false;
|
||||||
|
private chunkStore: T[] = [];
|
||||||
|
public pushNextObservable = new plugins.smartrx.ObservableIntake<any>();
|
||||||
|
private pushedNextDeferred = plugins.smartpromise.defer();
|
||||||
|
|
||||||
|
constructor(options?: plugins.stream.ReadableOptions) {
|
||||||
|
super({ ...options, objectMode: true }); // Ensure that we are in object mode.
|
||||||
|
this.pushNextObservable.push('please push next');
|
||||||
|
}
|
||||||
|
|
||||||
|
_read(size: number): void {
|
||||||
|
// console.log('get next');
|
||||||
|
const pushChunk = (): void => {
|
||||||
|
while (this.chunkStore.length > 0) {
|
||||||
|
// If push returns false, then we should stop reading
|
||||||
|
if (!this.push(this.chunkStore.shift())) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.chunkStore.length === 0) {
|
||||||
|
if (this.signalEndBoolean) {
|
||||||
|
// If we're done, push null to signal the end of the stream
|
||||||
|
this.push(null);
|
||||||
|
} else {
|
||||||
|
// Ask for more data and wait
|
||||||
|
this.pushNextObservable.push('please push next');
|
||||||
|
this.pushedNextDeferred.promise.then(() => {
|
||||||
|
this.pushedNextDeferred = plugins.smartpromise.defer(); // Reset the deferred
|
||||||
|
pushChunk(); // Try pushing the next chunk
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
pushChunk();
|
||||||
|
}
|
||||||
|
|
||||||
|
public pushData(chunkData: T) {
|
||||||
|
this.chunkStore.push(chunkData);
|
||||||
|
this.pushedNextDeferred.resolve();
|
||||||
|
}
|
||||||
|
|
||||||
|
public signalEnd() {
|
||||||
|
this.signalEndBoolean = true;
|
||||||
|
this.pushedNextDeferred.resolve();
|
||||||
|
this.pushNextObservable.signalComplete();
|
||||||
|
}
|
||||||
|
}
|
||||||
96
ts/smartstream.classes.streamwrapper.ts
Normal file
96
ts/smartstream.classes.streamwrapper.ts
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
import * as plugins from './smartstream.plugins.js';
|
||||||
|
|
||||||
|
// interfaces
|
||||||
|
import { Transform } from 'stream';
|
||||||
|
|
||||||
|
export interface IErrorFunction {
|
||||||
|
(err: Error): any;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ICustomEventFunction {
|
||||||
|
(): any;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ICustomEventObject {
|
||||||
|
eventName: string;
|
||||||
|
eventFunction: ICustomEventFunction;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* class Smartstream handles
|
||||||
|
*/
|
||||||
|
export class StreamWrapper {
|
||||||
|
private streamArray: Array<plugins.stream.Duplex> = [];
|
||||||
|
private customEventObjectArray: ICustomEventObject[] = [];
|
||||||
|
private streamStartedDeferred = plugins.smartpromise.defer();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* constructor
|
||||||
|
*/
|
||||||
|
constructor(streamArrayArg: any[]) {
|
||||||
|
this.streamArray = streamArrayArg;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* make something with the stream itself
|
||||||
|
*/
|
||||||
|
streamStarted(): Promise<any> {
|
||||||
|
return this.streamStartedDeferred.promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* attach listener to custom event
|
||||||
|
*/
|
||||||
|
onCustomEvent(eventNameArg: string, eventFunctionArg: ICustomEventFunction) {
|
||||||
|
this.customEventObjectArray.push({
|
||||||
|
eventName: eventNameArg,
|
||||||
|
eventFunction: eventFunctionArg,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* run the stream
|
||||||
|
* @returns Promise
|
||||||
|
*/
|
||||||
|
run(): Promise<void> {
|
||||||
|
const done = plugins.smartpromise.defer<void>();
|
||||||
|
|
||||||
|
// clone Array
|
||||||
|
const streamExecutionArray: Array<plugins.stream.Duplex> = [];
|
||||||
|
for (const streamItem of this.streamArray) {
|
||||||
|
streamExecutionArray.push(streamItem);
|
||||||
|
}
|
||||||
|
|
||||||
|
// combine the stream
|
||||||
|
let finalStream = null;
|
||||||
|
let firstIteration: boolean = true;
|
||||||
|
for (const stream of streamExecutionArray) {
|
||||||
|
if (firstIteration === true) {
|
||||||
|
finalStream = stream;
|
||||||
|
}
|
||||||
|
stream.on('error', (err) => {
|
||||||
|
done.reject(err);
|
||||||
|
});
|
||||||
|
for (const customEventObject of this.customEventObjectArray) {
|
||||||
|
stream.on(customEventObject.eventName, customEventObject.eventFunction);
|
||||||
|
}
|
||||||
|
if (!firstIteration) {
|
||||||
|
finalStream = finalStream.pipe(stream);
|
||||||
|
}
|
||||||
|
firstIteration = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.streamStartedDeferred.resolve();
|
||||||
|
|
||||||
|
finalStream.on('end', () => {
|
||||||
|
done.resolve();
|
||||||
|
});
|
||||||
|
finalStream.on('close', () => {
|
||||||
|
done.resolve();
|
||||||
|
});
|
||||||
|
finalStream.on('finish', () => {
|
||||||
|
done.resolve();
|
||||||
|
});
|
||||||
|
return done.promise;
|
||||||
|
}
|
||||||
|
}
|
||||||
30
ts/smartstream.functions.ts
Normal file
30
ts/smartstream.functions.ts
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
import { type TransformOptions } from 'stream';
|
||||||
|
import { SmartDuplex } from './smartstream.classes.smartduplex.js';
|
||||||
|
|
||||||
|
export interface AsyncTransformFunction<TInput, TOutput> {
|
||||||
|
(chunkArg: TInput): Promise<TOutput>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function createTransformFunction<TInput, TOutput>(
|
||||||
|
asyncFunction: AsyncTransformFunction<TInput, TOutput>,
|
||||||
|
options?: TransformOptions
|
||||||
|
): SmartDuplex {
|
||||||
|
const smartDuplexStream = new SmartDuplex({
|
||||||
|
...options,
|
||||||
|
writeFunction: async (chunkArg, toolsArg) => {
|
||||||
|
const result = await asyncFunction(chunkArg);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return smartDuplexStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
export const createPassThrough = () => {
|
||||||
|
return new SmartDuplex({
|
||||||
|
objectMode: true,
|
||||||
|
writeFunction: async (chunkArg, toolsArg) => {
|
||||||
|
return chunkArg;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
151
ts/smartstream.nodewebhelpers.ts
Normal file
151
ts/smartstream.nodewebhelpers.ts
Normal file
@@ -0,0 +1,151 @@
|
|||||||
|
import * as plugins from './smartstream.plugins.js';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a Web ReadableStream from a file.
|
||||||
|
*
|
||||||
|
* @param filePath - The path to the file to be read
|
||||||
|
* @returns A Web ReadableStream that reads the file in chunks
|
||||||
|
*/
|
||||||
|
export function createWebReadableStreamFromFile(filePath: string): ReadableStream<Uint8Array> {
|
||||||
|
const fileStream = plugins.fs.createReadStream(filePath);
|
||||||
|
|
||||||
|
return new ReadableStream({
|
||||||
|
start(controller) {
|
||||||
|
// When data is available, enqueue it into the Web ReadableStream
|
||||||
|
fileStream.on('data', (chunk) => {
|
||||||
|
controller.enqueue(chunk as Uint8Array);
|
||||||
|
});
|
||||||
|
|
||||||
|
// When the file stream ends, close the Web ReadableStream
|
||||||
|
fileStream.on('end', () => {
|
||||||
|
controller.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
// If there's an error, error the Web ReadableStream
|
||||||
|
fileStream.on('error', (err) => {
|
||||||
|
controller.error(err);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
cancel() {
|
||||||
|
// If the Web ReadableStream is canceled, destroy the file stream
|
||||||
|
fileStream.destroy();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts a Web ReadableStream to a Node.js Readable stream.
|
||||||
|
*
|
||||||
|
* @param webStream - The Web ReadableStream to convert
|
||||||
|
* @returns A Node.js Readable stream that reads data from the Web ReadableStream
|
||||||
|
*/
|
||||||
|
export function convertWebReadableToNodeReadable(webStream: ReadableStream<Uint8Array>): plugins.stream.Readable {
|
||||||
|
const reader = webStream.getReader();
|
||||||
|
|
||||||
|
return new plugins.stream.Readable({
|
||||||
|
async read() {
|
||||||
|
try {
|
||||||
|
const { value, done } = await reader.read();
|
||||||
|
if (done) {
|
||||||
|
this.push(null); // Signal end of stream
|
||||||
|
} else {
|
||||||
|
this.push(Buffer.from(value)); // Convert Uint8Array to Buffer for Node.js Readable
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
this.destroy(err); // Handle errors by destroying the stream
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts a Node.js Readable stream to a Web ReadableStream.
|
||||||
|
*
|
||||||
|
* @param nodeStream - The Node.js Readable stream to convert
|
||||||
|
* @returns A Web ReadableStream that reads data from the Node.js Readable stream
|
||||||
|
*/
|
||||||
|
export function convertNodeReadableToWebReadable(nodeStream: plugins.stream.Readable): ReadableStream<Uint8Array> {
|
||||||
|
return new ReadableStream({
|
||||||
|
start(controller) {
|
||||||
|
nodeStream.on('data', (chunk) => {
|
||||||
|
controller.enqueue(new Uint8Array(chunk));
|
||||||
|
});
|
||||||
|
|
||||||
|
nodeStream.on('end', () => {
|
||||||
|
controller.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
nodeStream.on('error', (err) => {
|
||||||
|
controller.error(err);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
cancel() {
|
||||||
|
nodeStream.destroy();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts a Web WritableStream to a Node.js Writable stream.
|
||||||
|
*
|
||||||
|
* @param webWritable - The Web WritableStream to convert
|
||||||
|
* @returns A Node.js Writable stream that writes data to the Web WritableStream
|
||||||
|
*/
|
||||||
|
export function convertWebWritableToNodeWritable(webWritable: WritableStream<Uint8Array>): plugins.stream.Writable {
|
||||||
|
const writer = webWritable.getWriter();
|
||||||
|
|
||||||
|
return new plugins.stream.Writable({
|
||||||
|
async write(chunk, encoding, callback) {
|
||||||
|
try {
|
||||||
|
await writer.write(new Uint8Array(chunk));
|
||||||
|
callback();
|
||||||
|
} catch (err) {
|
||||||
|
callback(err);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
final(callback) {
|
||||||
|
writer.close().then(() => callback()).catch(callback);
|
||||||
|
},
|
||||||
|
destroy(err, callback) {
|
||||||
|
writer.abort(err).then(() => callback(err)).catch(callback);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts a Node.js Writable stream to a Web WritableStream.
|
||||||
|
*
|
||||||
|
* @param nodeWritable - The Node.js Writable stream to convert
|
||||||
|
* @returns A Web WritableStream that writes data to the Node.js Writable stream
|
||||||
|
*/
|
||||||
|
export function convertNodeWritableToWebWritable(nodeWritable: plugins.stream.Writable): WritableStream<Uint8Array> {
|
||||||
|
return new WritableStream({
|
||||||
|
write(chunk) {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
nodeWritable.write(Buffer.from(chunk), (err) => {
|
||||||
|
if (err) {
|
||||||
|
reject(err);
|
||||||
|
} else {
|
||||||
|
resolve();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
},
|
||||||
|
close() {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
nodeWritable.end((err) => {
|
||||||
|
if (err) {
|
||||||
|
reject(err);
|
||||||
|
} else {
|
||||||
|
resolve();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
},
|
||||||
|
abort(reason) {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
nodeWritable.destroy(reason);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
@@ -1,3 +1,14 @@
|
|||||||
import 'typings-global'
|
// node native
|
||||||
export import q = require('q')
|
import * as fs from 'fs';
|
||||||
export let streamCombiner2 = require('stream-combiner2')
|
import * as stream from 'stream';
|
||||||
|
|
||||||
|
export { fs, stream };
|
||||||
|
|
||||||
|
// pushrocks scope
|
||||||
|
import * as lik from '@push.rocks/lik';
|
||||||
|
import * as smartpromise from '@push.rocks/smartpromise';
|
||||||
|
import * as smartrx from '@push.rocks/smartrx';
|
||||||
|
import * as webstream from '../dist_ts_web/index.js';
|
||||||
|
|
||||||
|
export { lik, smartpromise, smartrx, webstream };
|
||||||
|
|
||||||
|
|||||||
3
ts/tspublish.json
Normal file
3
ts/tspublish.json
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
{
|
||||||
|
"order": 1
|
||||||
|
}
|
||||||
8
ts_web/00_commitinfo_data.ts
Normal file
8
ts_web/00_commitinfo_data.ts
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
/**
|
||||||
|
* autocreated commitinfo by @push.rocks/commitinfo
|
||||||
|
*/
|
||||||
|
export const commitinfo = {
|
||||||
|
name: '@push.rocks/smartstream',
|
||||||
|
version: '3.3.0',
|
||||||
|
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
|
||||||
|
}
|
||||||
135
ts_web/classes.webduplexstream.ts
Normal file
135
ts_web/classes.webduplexstream.ts
Normal file
@@ -0,0 +1,135 @@
|
|||||||
|
import * as plugins from './plugins.js';
|
||||||
|
|
||||||
|
// ========================================
|
||||||
|
// Interfaces for Read functionality
|
||||||
|
// ========================================
|
||||||
|
export interface IStreamToolsRead<TInput, TOutput> {
|
||||||
|
done: () => void;
|
||||||
|
write: (writeArg: TInput) => Promise<void>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The read function is called when data needs to be read into the stream.
|
||||||
|
*/
|
||||||
|
export interface IStreamReadFunction<TInput, TOutput> {
|
||||||
|
(toolsArg: IStreamToolsRead<TInput, TOutput>): Promise<void>;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ========================================
|
||||||
|
// Interfaces for Write functionality
|
||||||
|
// ========================================
|
||||||
|
export interface IStreamToolsWrite<TInput, TOutput> {
|
||||||
|
truncate: () => void;
|
||||||
|
push: (pushArg: TOutput) => void;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The write function is called whenever a chunk is written to the stream.
|
||||||
|
*/
|
||||||
|
export interface IStreamWriteFunction<TInput, TOutput> {
|
||||||
|
(chunkArg: TInput, toolsArg: IStreamToolsWrite<TInput, TOutput>): Promise<any>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface IStreamFinalFunction<TInput, TOutput> {
|
||||||
|
(toolsArg: IStreamToolsWrite<TInput, TOutput>): Promise<TOutput | void>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface WebDuplexStreamOptions<TInput, TOutput> {
|
||||||
|
readFunction?: IStreamReadFunction<TInput, TOutput>;
|
||||||
|
writeFunction?: IStreamWriteFunction<TInput, TOutput>;
|
||||||
|
finalFunction?: IStreamFinalFunction<TInput, TOutput>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class WebDuplexStream<TInput = any, TOutput = any> extends TransformStream<TInput, TOutput> {
|
||||||
|
// INSTANCE
|
||||||
|
options: WebDuplexStreamOptions<TInput, TOutput>;
|
||||||
|
|
||||||
|
constructor(optionsArg: WebDuplexStreamOptions<TInput, TOutput>) {
|
||||||
|
super({
|
||||||
|
async start(controller) {
|
||||||
|
// Optionally initialize any state here
|
||||||
|
},
|
||||||
|
async transform(chunk, controller) {
|
||||||
|
if (optionsArg?.writeFunction) {
|
||||||
|
const tools: IStreamToolsWrite<TInput, TOutput> = {
|
||||||
|
truncate: () => controller.terminate(),
|
||||||
|
push: (pushArg: TOutput) => controller.enqueue(pushArg),
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
const writeReturnChunk = await optionsArg.writeFunction(chunk, tools);
|
||||||
|
if (writeReturnChunk !== undefined && writeReturnChunk !== null) {
|
||||||
|
controller.enqueue(writeReturnChunk);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
controller.error(err);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// If no writeFunction is provided, pass the chunk through
|
||||||
|
controller.enqueue(chunk as unknown as TOutput);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
async flush(controller) {
|
||||||
|
if (optionsArg?.finalFunction) {
|
||||||
|
const tools: IStreamToolsWrite<TInput, TOutput> = {
|
||||||
|
truncate: () => controller.terminate(),
|
||||||
|
push: (pushArg) => controller.enqueue(pushArg),
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
const finalChunk = await optionsArg.finalFunction(tools);
|
||||||
|
if (finalChunk) {
|
||||||
|
controller.enqueue(finalChunk);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
controller.error(err);
|
||||||
|
} finally {
|
||||||
|
controller.terminate();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
controller.terminate();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
this.options = optionsArg;
|
||||||
|
|
||||||
|
// Start producing data if readFunction is provided
|
||||||
|
if (this.options.readFunction) {
|
||||||
|
this._startReading();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async _startReading() {
|
||||||
|
const writable = this.writable;
|
||||||
|
const writer = writable.getWriter();
|
||||||
|
|
||||||
|
const tools: IStreamToolsRead<TInput, TOutput> = {
|
||||||
|
done: () => writer.close(),
|
||||||
|
write: async (writeArg) => await writer.write(writeArg),
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.options.readFunction(tools);
|
||||||
|
} catch (err) {
|
||||||
|
writer.abort(err);
|
||||||
|
} finally {
|
||||||
|
writer.releaseLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Static method example (adjust as needed)
|
||||||
|
static fromUInt8Array(uint8Array: Uint8Array): WebDuplexStream<Uint8Array, Uint8Array> {
|
||||||
|
const stream = new WebDuplexStream<Uint8Array, Uint8Array>({
|
||||||
|
writeFunction: async (chunk, { push }) => {
|
||||||
|
push(chunk); // Directly push the chunk as is
|
||||||
|
return null;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const writer = stream.writable.getWriter();
|
||||||
|
writer.write(uint8Array).then(() => writer.close());
|
||||||
|
|
||||||
|
return stream;
|
||||||
|
}
|
||||||
|
}
|
||||||
2
ts_web/index.ts
Normal file
2
ts_web/index.ts
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
import './plugins.js';
|
||||||
|
export * from './classes.webduplexstream.js';
|
||||||
15
ts_web/plugins.ts
Normal file
15
ts_web/plugins.ts
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
// @push.rocks scope
|
||||||
|
import * as smartenv from '@push.rocks/smartenv';
|
||||||
|
|
||||||
|
export {
|
||||||
|
smartenv,
|
||||||
|
}
|
||||||
|
|
||||||
|
// lets setup dependencies
|
||||||
|
const smartenvInstance = new smartenv.Smartenv();
|
||||||
|
|
||||||
|
await smartenvInstance.getSafeNodeModule<typeof import('stream/web')>('stream/web', async (moduleArg) => {
|
||||||
|
globalThis.ReadableStream = moduleArg.ReadableStream as any;
|
||||||
|
globalThis.WritableStream = moduleArg.WritableStream as any;
|
||||||
|
globalThis.TransformStream = moduleArg.TransformStream as any;
|
||||||
|
})
|
||||||
3
ts_web/tspublish.json
Normal file
3
ts_web/tspublish.json
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
{
|
||||||
|
"order": 0
|
||||||
|
}
|
||||||
14
tsconfig.json
Normal file
14
tsconfig.json
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
{
|
||||||
|
"compilerOptions": {
|
||||||
|
"experimentalDecorators": true,
|
||||||
|
"useDefineForClassFields": false,
|
||||||
|
"target": "ES2022",
|
||||||
|
"module": "NodeNext",
|
||||||
|
"moduleResolution": "NodeNext",
|
||||||
|
"esModuleInterop": true,
|
||||||
|
"verbatimModuleSyntax": true
|
||||||
|
},
|
||||||
|
"exclude": [
|
||||||
|
"dist_*/**/*.d.ts"
|
||||||
|
]
|
||||||
|
}
|
||||||
@@ -1,3 +0,0 @@
|
|||||||
{
|
|
||||||
"extends": "tslint-config-standard"
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user