Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 77046acac7 | |||
| 2acf1972a2 | |||
| 1262c48fe9 | |||
| 9b9b1be62b | |||
| 3d13cb76f6 | |||
| 9e3fd28c4a | |||
| 673f5c86fb | |||
| a225188e24 | |||
| 4fc82d0dc6 | |||
| 3d58a01b29 | |||
| f7e9636bf6 | |||
| f211cc8ddd | |||
| 60c8824f33 | |||
| 40e8e06ff1 |
46
changelog.md
46
changelog.md
@@ -1,5 +1,51 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-03-02 - 3.4.0 - feat(smartduplex)
|
||||
improve backpressure handling and web/node stream interoperability
|
||||
|
||||
- Refactored SmartDuplex to use synchronous _read/_write/_final (avoids async pitfalls), added internal backpressured buffer draining and consumer signaling
|
||||
- Implemented pull-based backpressure for Node <-> Web stream conversions (nodewebhelpers and convertNodeReadableToWebReadable/convertWebReadableToNodeReadable)
|
||||
- StreamIntake.fromStream now reads from 'readable' and drains properly; StreamWrapper resolves safely on end/close/finish
|
||||
- Improved getWebStreams / WebDuplexStream behavior (safer enqueue/close/abort handling, final/readFunction improvements)
|
||||
- Added many new unit tests covering backpressure, web/node helpers, StreamIntake, StreamWrapper, WebDuplexStream; bumped @push.rocks/lik and @types/node versions
|
||||
|
||||
## 2026-02-28 - 3.3.0 - feat(smartstream)
|
||||
bump dependencies, update build/publish config, refactor tests, and overhaul documentation
|
||||
|
||||
- Upgrade devDependencies (e.g. @git.zone/tsbuild -> ^4.1.2, @git.zone/tsrun -> ^2.0.1, @git.zone/tstest -> ^3.1.8, @types/node -> ^25.3.2) and runtime deps (e.g. @push.rocks/lik -> ^6.2.2, @push.rocks/smartenv -> ^6.0.0, @push.rocks/smartpromise -> ^4.2.3, @push.rocks/smartrx -> ^3.0.10).
|
||||
- Refactor tests to use Node's native fs streams instead of @push.rocks/smartfile.fsStream and export default tap.start() to support ESM test runner patterns.
|
||||
- Adjust build/publish: remove --web flag from build script, add pnpm override for agentkeepalive, add tspublish.json files for publish order, and add release registries/access in npmextra.json (verdaccio + npm).
|
||||
- Rework project metadata in npmextra.json (namespaced @git.zone keys, tsdoc entry changes) and minor TypeScript/web fix: cast stream/web constructors to any in ts_web/plugins.ts.
|
||||
- Large README rewrite: improved installation (pnpm), clearer Node vs Web entrypoints, expanded examples, and updated legal/license wording.
|
||||
|
||||
## 2024-11-19 - 3.2.5 - fix(nodewebhelpers)
|
||||
Fix import and use correct module structure for Node.js streams in smartstream.nodewebhelpers.ts
|
||||
|
||||
- Corrected the import statement for the fs module.
|
||||
- Updated the use of the fs.createReadStream method.
|
||||
|
||||
## 2024-10-16 - 3.2.4 - fix(SmartDuplex)
|
||||
Fix stream termination when reading from a web readable stream
|
||||
|
||||
- Resolved an issue in SmartDuplex where the stream did not properly terminate after reaching the end of a web readable stream.
|
||||
|
||||
## 2024-10-16 - 3.2.3 - fix(smartduplex)
|
||||
Enhance documentation for read function in SmartDuplex
|
||||
|
||||
- Added inline comments to clarify the behavior and importance of unlocking the reader in the readFunction of SmartDuplex.fromWebReadableStream.
|
||||
|
||||
## 2024-10-16 - 3.2.2 - fix(SmartDuplex)
|
||||
Fix issue with SmartDuplex fromWebReadableStream method
|
||||
|
||||
- Resolved a potential unhandled promise rejection in fromWebReadableStream method
|
||||
- Ensured proper release of stream reader lock in case of read completion
|
||||
|
||||
## 2024-10-16 - 3.2.1 - fix(core)
|
||||
Fix the order of operations in SmartDuplex _read method to ensure proper waiting for items.
|
||||
|
||||
- Adjusted the order of reading function execution and waiting for items in the SmartDuplex _read method.
|
||||
- Fixed potential issues with stream data processing timing.
|
||||
|
||||
## 2024-10-16 - 3.2.0 - feat(SmartDuplex)
|
||||
Added method to create SmartDuplex from a WebReadableStream.
|
||||
|
||||
|
||||
@@ -1,9 +1,5 @@
|
||||
{
|
||||
"npmci": {
|
||||
"npmGlobalTools": [],
|
||||
"npmAccessLevel": "public"
|
||||
},
|
||||
"gitzone": {
|
||||
"@git.zone/cli": {
|
||||
"projectType": "npm",
|
||||
"module": {
|
||||
"githost": "code.foss.global",
|
||||
@@ -31,9 +27,19 @@
|
||||
"stream utilities",
|
||||
"esm"
|
||||
]
|
||||
},
|
||||
"release": {
|
||||
"registries": [
|
||||
"https://verdaccio.lossless.digital",
|
||||
"https://registry.npmjs.org"
|
||||
],
|
||||
"accessLevel": "public"
|
||||
}
|
||||
},
|
||||
"tsdoc": {
|
||||
"@git.zone/tsdoc": {
|
||||
"legal": "\n## License and Legal Information\n\nThis repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the [license](license) file within this repository. \n\n**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.\n\n### Trademarks\n\nThis project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.\n\n### Company Information\n\nTask Venture Capital GmbH \nRegistered at District court Bremen HRB 35230 HB, Germany\n\nFor any legal inquiries or if you require further information, please contact us via email at hello@task.vc.\n\nBy using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.\n"
|
||||
},
|
||||
"@ship.zone/szci": {
|
||||
"npmGlobalTools": []
|
||||
}
|
||||
}
|
||||
30
package.json
30
package.json
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartstream",
|
||||
"version": "3.2.0",
|
||||
"version": "3.4.0",
|
||||
"private": false,
|
||||
"description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
|
||||
"type": "module",
|
||||
@@ -9,8 +9,8 @@
|
||||
"./web": "./dist_ts_web/index.js"
|
||||
},
|
||||
"scripts": {
|
||||
"test": "(tstest test/)",
|
||||
"build": "(tsbuild tsfolders --web --allowimplicitany)"
|
||||
"test": "(tstest test/ --verbose --logfile --timeout 60)",
|
||||
"build": "(tsbuild tsfolders --allowimplicitany)"
|
||||
},
|
||||
"repository": {
|
||||
"type": "git",
|
||||
@@ -23,18 +23,17 @@
|
||||
},
|
||||
"homepage": "https://code.foss.global/push.rocks/smartstream",
|
||||
"devDependencies": {
|
||||
"@git.zone/tsbuild": "^2.1.80",
|
||||
"@git.zone/tsrun": "^1.2.44",
|
||||
"@git.zone/tstest": "^1.0.90",
|
||||
"@push.rocks/smartfile": "^11.0.15",
|
||||
"@push.rocks/tapbundle": "^5.0.23",
|
||||
"@types/node": "^20.12.12"
|
||||
"@git.zone/tsbuild": "^4.1.2",
|
||||
"@git.zone/tsrun": "^2.0.1",
|
||||
"@git.zone/tstest": "^3.1.8",
|
||||
"@push.rocks/tapbundle": "^6.0.3",
|
||||
"@types/node": "^25.3.3"
|
||||
},
|
||||
"dependencies": {
|
||||
"@push.rocks/lik": "^6.0.15",
|
||||
"@push.rocks/smartenv": "^5.0.12",
|
||||
"@push.rocks/smartpromise": "^4.0.3",
|
||||
"@push.rocks/smartrx": "^3.0.7"
|
||||
"@push.rocks/lik": "^6.3.1",
|
||||
"@push.rocks/smartenv": "^6.0.0",
|
||||
"@push.rocks/smartpromise": "^4.2.3",
|
||||
"@push.rocks/smartrx": "^3.0.10"
|
||||
},
|
||||
"browserslist": [
|
||||
"last 1 chrome versions"
|
||||
@@ -51,6 +50,11 @@
|
||||
"npmextra.json",
|
||||
"readme.md"
|
||||
],
|
||||
"pnpm": {
|
||||
"overrides": {
|
||||
"agentkeepalive": "^4.6.0"
|
||||
}
|
||||
},
|
||||
"keywords": [
|
||||
"stream",
|
||||
"node.js",
|
||||
|
||||
6749
pnpm-lock.yaml
generated
6749
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
665
readme.md
665
readme.md
@@ -1,375 +1,472 @@
|
||||
```markdown
|
||||
# @push.rocks/smartstream
|
||||
A TypeScript library to simplify the creation and manipulation of Node.js streams, providing utilities for transform, duplex, and readable/writable stream handling while managing backpressure effectively.
|
||||
|
||||
A TypeScript-first library for creating and manipulating Node.js and Web streams with built-in backpressure handling, async transformations, and seamless Node.js ↔ Web stream interoperability.
|
||||
|
||||
## Issue Reporting and Security
|
||||
|
||||
For reporting bugs, issues, or security vulnerabilities, please visit [community.foss.global/](https://community.foss.global/). This is the central community hub for all issue reporting. Developers who sign and comply with our contribution agreement and go through identification can also get a [code.foss.global/](https://code.foss.global/) account to submit Pull Requests directly.
|
||||
|
||||
## Install
|
||||
To install `@push.rocks/smartstream`, you can use npm or yarn as follows:
|
||||
|
||||
```bash
|
||||
npm install @push.rocks/smartstream --save
|
||||
# OR
|
||||
yarn add @push.rocks/smartstream
|
||||
pnpm install @push.rocks/smartstream
|
||||
```
|
||||
|
||||
This will add `@push.rocks/smartstream` to your project's dependencies.
|
||||
The package ships with two entry points:
|
||||
|
||||
| Entry Point | Import Path | Environment |
|
||||
|---|---|---|
|
||||
| **Node.js** (default) | `@push.rocks/smartstream` | Node.js — full stream utilities, duplex, intake, wrappers, and Node↔Web helpers |
|
||||
| **Web** | `@push.rocks/smartstream/web` | Browser & Node.js — pure Web Streams API (`WebDuplexStream`) |
|
||||
|
||||
## Usage
|
||||
|
||||
The `@push.rocks/smartstream` module is designed to simplify working with Node.js streams by providing a set of utilities for creating and manipulating streams. This module makes extensive use of TypeScript for improved code quality, readability, and maintenance. ESM syntax is utilized throughout the examples.
|
||||
All examples use ESM / TypeScript syntax.
|
||||
|
||||
### Importing the Module
|
||||
|
||||
Start by importing the module into your TypeScript file:
|
||||
### 📦 Importing
|
||||
|
||||
```typescript
|
||||
import * as smartstream from '@push.rocks/smartstream';
|
||||
// Node.js — full API
|
||||
import {
|
||||
SmartDuplex,
|
||||
StreamWrapper,
|
||||
StreamIntake,
|
||||
createTransformFunction,
|
||||
createPassThrough,
|
||||
nodewebhelpers,
|
||||
} from '@push.rocks/smartstream';
|
||||
|
||||
// Web — browser-safe, zero Node.js dependencies
|
||||
import { WebDuplexStream } from '@push.rocks/smartstream/web';
|
||||
```
|
||||
|
||||
For a more specific import, you may do the following:
|
||||
---
|
||||
|
||||
### 🔄 SmartDuplex — The Core Stream Primitive
|
||||
|
||||
`SmartDuplex` extends Node.js `Duplex` with first-class async support, built-in backpressure management, and a clean functional API. Instead of overriding `_transform` or `_write` manually, you pass a `writeFunction` that receives each chunk along with a `tools` object.
|
||||
|
||||
#### Basic Transform
|
||||
|
||||
```typescript
|
||||
import { SmartDuplex, StreamWrapper, StreamIntake, createTransformFunction, createPassThrough } from '@push.rocks/smartstream';
|
||||
import { SmartDuplex } from '@push.rocks/smartstream';
|
||||
|
||||
const upperCaser = new SmartDuplex<Buffer, Buffer>({
|
||||
writeFunction: async (chunk, tools) => {
|
||||
// Return a value to push it downstream
|
||||
return Buffer.from(chunk.toString().toUpperCase());
|
||||
},
|
||||
});
|
||||
|
||||
readableStream.pipe(upperCaser).pipe(writableStream);
|
||||
```
|
||||
|
||||
### Creating Basic Transform Streams
|
||||
#### Using `tools.push()` for Multiple Outputs
|
||||
|
||||
The module provides utilities for creating transform streams. For example, to create a transform stream that modifies chunks of data, you can use the `createTransformFunction` utility:
|
||||
The `writeFunction` can emit multiple chunks per input via `tools.push()`:
|
||||
|
||||
```typescript
|
||||
const splitter = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
const words = chunk.split(' ');
|
||||
for (const word of words) {
|
||||
await tools.push(word);
|
||||
}
|
||||
// Returning nothing — output was already pushed
|
||||
},
|
||||
});
|
||||
```
|
||||
|
||||
#### Final Function
|
||||
|
||||
Run cleanup or emit final data when the writable side ends:
|
||||
|
||||
```typescript
|
||||
const aggregator = new SmartDuplex<number, number>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
runningTotal += chunk;
|
||||
// Don't emit anything per-chunk
|
||||
},
|
||||
finalFunction: async (tools) => {
|
||||
return runningTotal; // Emitted as the last chunk
|
||||
},
|
||||
});
|
||||
```
|
||||
|
||||
#### Truncating a Stream Early
|
||||
|
||||
Call `tools.truncate()` inside `writeFunction` to signal that no more data should be read:
|
||||
|
||||
```typescript
|
||||
const limiter = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
if (chunk === 'STOP') {
|
||||
tools.truncate();
|
||||
return;
|
||||
}
|
||||
return chunk;
|
||||
},
|
||||
});
|
||||
```
|
||||
|
||||
#### Creating from a Buffer
|
||||
|
||||
```typescript
|
||||
const stream = SmartDuplex.fromBuffer(Buffer.from('hello world'));
|
||||
stream.on('data', (chunk) => console.log(chunk.toString())); // "hello world"
|
||||
```
|
||||
|
||||
#### Creating from a Web ReadableStream
|
||||
|
||||
Bridge the Web Streams API into a Node.js Duplex:
|
||||
|
||||
```typescript
|
||||
const response = await fetch('https://example.com/data');
|
||||
const nodeDuplex = SmartDuplex.fromWebReadableStream(response.body);
|
||||
|
||||
nodeDuplex.pipe(processTransform).pipe(outputStream);
|
||||
```
|
||||
|
||||
#### Getting Web Streams from SmartDuplex
|
||||
|
||||
Convert a `SmartDuplex` into Web `ReadableStream` + `WritableStream` pair:
|
||||
|
||||
```typescript
|
||||
const duplex = new SmartDuplex({
|
||||
writeFunction: async (chunk, tools) => {
|
||||
return transform(chunk);
|
||||
},
|
||||
});
|
||||
|
||||
const { readable, writable } = await duplex.getWebStreams();
|
||||
```
|
||||
|
||||
#### Debug Mode
|
||||
|
||||
Pass `debug: true` and `name` to get detailed internal logs:
|
||||
|
||||
```typescript
|
||||
const stream = new SmartDuplex({
|
||||
name: 'MyStream',
|
||||
debug: true,
|
||||
writeFunction: async (chunk, tools) => chunk,
|
||||
});
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 🧩 StreamWrapper — Pipeline Composition
|
||||
|
||||
`StreamWrapper` takes an array of streams, pipes them together, attaches error listeners on all of them, and returns a `Promise` that resolves when the pipeline finishes:
|
||||
|
||||
```typescript
|
||||
import { StreamWrapper } from '@push.rocks/smartstream';
|
||||
import fs from 'fs';
|
||||
|
||||
const pipeline = new StreamWrapper([
|
||||
fs.createReadStream('./input.txt'),
|
||||
new SmartDuplex({
|
||||
writeFunction: async (chunk) => Buffer.from(chunk.toString().toUpperCase()),
|
||||
}),
|
||||
fs.createWriteStream('./output.txt'),
|
||||
]);
|
||||
|
||||
await pipeline.run();
|
||||
console.log('Pipeline complete!');
|
||||
```
|
||||
|
||||
Error handling is automatic — if any stream in the array errors, the returned promise rejects:
|
||||
|
||||
```typescript
|
||||
pipeline.run()
|
||||
.then(() => console.log('Done'))
|
||||
.catch((err) => console.error('Pipeline failed:', err));
|
||||
```
|
||||
|
||||
You can also listen for custom events across all streams:
|
||||
|
||||
```typescript
|
||||
pipeline.onCustomEvent('progress', () => {
|
||||
console.log('Progress event fired');
|
||||
});
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 📥 StreamIntake — Dynamic Data Injection
|
||||
|
||||
`StreamIntake` is a `Readable` stream that lets you programmatically push data into a pipeline. It operates in object mode by default and provides a reactive observable (`pushNextObservable`) for demand-driven data production.
|
||||
|
||||
```typescript
|
||||
import { StreamIntake, SmartDuplex } from '@push.rocks/smartstream';
|
||||
|
||||
const intake = new StreamIntake<string>();
|
||||
|
||||
// Pipe through a transform
|
||||
intake
|
||||
.pipe(new SmartDuplex({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => {
|
||||
console.log('Processing:', chunk);
|
||||
return chunk;
|
||||
},
|
||||
}))
|
||||
.on('data', (data) => console.log('Output:', data));
|
||||
|
||||
// Push data whenever it's ready
|
||||
intake.pushData('Hello');
|
||||
intake.pushData('World');
|
||||
intake.signalEnd(); // Signal end-of-stream
|
||||
```
|
||||
|
||||
#### Demand-driven Production with Observable
|
||||
|
||||
`pushNextObservable` emits whenever the stream is ready for more data — perfect for throttled or event-driven producers:
|
||||
|
||||
```typescript
|
||||
const intake = new StreamIntake<number>();
|
||||
|
||||
let counter = 0;
|
||||
intake.pushNextObservable.subscribe(() => {
|
||||
if (counter < 100) {
|
||||
intake.pushData(counter++);
|
||||
} else {
|
||||
intake.signalEnd();
|
||||
}
|
||||
});
|
||||
|
||||
intake.pipe(consumer);
|
||||
```
|
||||
|
||||
#### Creating from Existing Streams
|
||||
|
||||
Wrap a Node.js `Readable` or a Web `ReadableStream`:
|
||||
|
||||
```typescript
|
||||
// From Node.js Readable
|
||||
const intake = await StreamIntake.fromStream<Buffer>(fs.createReadStream('./data.bin'));
|
||||
|
||||
// From Web ReadableStream
|
||||
const response = await fetch('https://example.com/stream');
|
||||
const intake = await StreamIntake.fromStream<Uint8Array>(response.body);
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### ⚡ Utility Functions
|
||||
|
||||
#### `createTransformFunction`
|
||||
|
||||
Quickly create a `SmartDuplex` from a simple async mapping function:
|
||||
|
||||
```typescript
|
||||
import { createTransformFunction } from '@push.rocks/smartstream';
|
||||
|
||||
const upperCaseTransform = createTransformFunction<string, string>(async (chunk) => {
|
||||
return chunk.toUpperCase();
|
||||
const doubler = createTransformFunction<number, number>(async (n) => n * 2);
|
||||
|
||||
intakeStream.pipe(doubler).pipe(outputStream);
|
||||
```
|
||||
|
||||
#### `createPassThrough`
|
||||
|
||||
Create an object-mode passthrough stream (useful as an intermediary or tee point):
|
||||
|
||||
```typescript
|
||||
import { createPassThrough } from '@push.rocks/smartstream';
|
||||
|
||||
const passThrough = createPassThrough();
|
||||
source.pipe(passThrough).pipe(destination);
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 🌐 WebDuplexStream — Pure Web Streams API
|
||||
|
||||
`WebDuplexStream` extends `TransformStream` and works in both browsers and Node.js. Import it from the `/web` subpath for zero Node.js dependencies.
|
||||
|
||||
```typescript
|
||||
import { WebDuplexStream } from '@push.rocks/smartstream/web';
|
||||
|
||||
const stream = new WebDuplexStream<number, number>({
|
||||
writeFunction: async (chunk, { push }) => {
|
||||
push(chunk * 2); // Push transformed data
|
||||
},
|
||||
});
|
||||
|
||||
// Usage with pipe
|
||||
readableStream
|
||||
.pipe(upperCaseTransform)
|
||||
.pipe(writableStream);
|
||||
const writer = stream.writable.getWriter();
|
||||
const reader = stream.readable.getReader();
|
||||
|
||||
// Write
|
||||
await writer.write(5);
|
||||
await writer.write(10);
|
||||
await writer.close();
|
||||
|
||||
// Read
|
||||
const { value } = await reader.read(); // 10
|
||||
const { value: v2 } = await reader.read(); // 20
|
||||
```
|
||||
|
||||
### Handling Backpressure with SmartDuplex
|
||||
|
||||
`SmartDuplex` is a powerful part of the `smartstream` module designed to handle backpressure effectively. Here's an example of how to create a `SmartDuplex` stream that processes data and respects the consumer's pace:
|
||||
#### From a Uint8Array
|
||||
|
||||
```typescript
|
||||
import { SmartDuplex } from '@push.rocks/smartstream';
|
||||
const stream = WebDuplexStream.fromUInt8Array(new Uint8Array([1, 2, 3]));
|
||||
const reader = stream.readable.getReader();
|
||||
const { value } = await reader.read(); // Uint8Array [1, 2, 3]
|
||||
```
|
||||
|
||||
const processDataDuplex = new SmartDuplex({
|
||||
async writeFunction(chunk, { push }) {
|
||||
const processedChunk = await processChunk(chunk); // Assume this is a defined asynchronous function
|
||||
push(processedChunk);
|
||||
}
|
||||
#### Data Production with `readFunction`
|
||||
|
||||
Supply data into the stream from any async source:
|
||||
|
||||
```typescript
|
||||
const stream = new WebDuplexStream<string, string>({
|
||||
readFunction: async (tools) => {
|
||||
await tools.write('chunk 1');
|
||||
await tools.write('chunk 2');
|
||||
tools.done(); // Signal end
|
||||
},
|
||||
writeFunction: async (chunk, { push }) => {
|
||||
push(chunk.toUpperCase());
|
||||
},
|
||||
});
|
||||
|
||||
sourceStream.pipe(processDataDuplex).pipe(destinationStream);
|
||||
const reader = stream.readable.getReader();
|
||||
// reads "CHUNK 1", "CHUNK 2"
|
||||
```
|
||||
|
||||
### Combining Multiple Streams
|
||||
---
|
||||
|
||||
`Smartstream` facilitates easy combining of multiple streams into a single pipeline, handling errors and cleanup automatically. Here's how you can combine multiple streams:
|
||||
### 🔀 Node ↔ Web Stream Converters
|
||||
|
||||
The `nodewebhelpers` namespace provides bidirectional converters between Node.js and Web Streams:
|
||||
|
||||
```typescript
|
||||
import { StreamWrapper } from '@push.rocks/smartstream';
|
||||
|
||||
const combinedStream = new StreamWrapper([
|
||||
readStream, // Source stream
|
||||
transformStream1, // Transformation
|
||||
transformStream2, // Another transformation
|
||||
writeStream // Destination stream
|
||||
]);
|
||||
|
||||
combinedStream.run()
|
||||
.then(() => console.log('Processing completed.'))
|
||||
.catch(err => console.error('An error occurred:', err));
|
||||
import { nodewebhelpers } from '@push.rocks/smartstream';
|
||||
```
|
||||
|
||||
### Working with StreamIntake
|
||||
| Function | From | To |
|
||||
|---|---|---|
|
||||
| `createWebReadableStreamFromFile(path)` | File path | Web `ReadableStream<Uint8Array>` |
|
||||
| `convertWebReadableToNodeReadable(webStream)` | Web `ReadableStream` | Node.js `Readable` |
|
||||
| `convertNodeReadableToWebReadable(nodeStream)` | Node.js `Readable` | Web `ReadableStream` |
|
||||
| `convertWebWritableToNodeWritable(webWritable)` | Web `WritableStream` | Node.js `Writable` |
|
||||
| `convertNodeWritableToWebWritable(nodeWritable)` | Node.js `Writable` | Web `WritableStream` |
|
||||
|
||||
`StreamIntake` allows for more dynamic control of the reading process, facilitating scenarios where data is not continuously available:
|
||||
#### Example: Serve a File as a Web ReadableStream
|
||||
|
||||
```typescript
|
||||
import { StreamIntake } from '@push.rocks/smartstream';
|
||||
const webStream = nodewebhelpers.createWebReadableStreamFromFile('./video.mp4');
|
||||
|
||||
const streamIntake = new StreamIntake<string>();
|
||||
|
||||
// Dynamically push data into the intake
|
||||
streamIntake.pushData('Hello, World!');
|
||||
streamIntake.pushData('Another message');
|
||||
|
||||
// Signal end when no more data is to be pushed
|
||||
streamIntake.signalEnd();
|
||||
// Use with fetch Response, service workers, etc.
|
||||
return new Response(webStream, {
|
||||
headers: { 'Content-Type': 'video/mp4' },
|
||||
});
|
||||
```
|
||||
|
||||
### Real-world Scenario: Processing Large Files
|
||||
|
||||
Consider a scenario where you need to process a large CSV file, transform the data row-by-row, and then write the results to a database or another file. With `smartstream`, you could create a pipe that reads the CSV, processes each row, and handles backpressure, ensuring efficient use of resources.
|
||||
#### Example: Convert Between Stream Types
|
||||
|
||||
```typescript
|
||||
import { SmartDuplex, createTransformFunction } from '@push.rocks/smartstream';
|
||||
import fs from 'fs';
|
||||
import csvParser from 'csv-parser';
|
||||
import { nodewebhelpers } from '@push.rocks/smartstream';
|
||||
|
||||
const csvReadTransform = createTransformFunction<any, any>(async (row) => {
|
||||
// Process row
|
||||
return processedRow;
|
||||
});
|
||||
// Node → Web
|
||||
const nodeReadable = fs.createReadStream('./data.bin');
|
||||
const webReadable = nodewebhelpers.convertNodeReadableToWebReadable(nodeReadable);
|
||||
|
||||
fs.createReadStream('path/to/largeFile.csv')
|
||||
.pipe(csvParser())
|
||||
.pipe(csvReadTransform)
|
||||
.pipe(new SmartDuplex({
|
||||
async writeFunction(chunk, { push }) {
|
||||
await writeToDatabase(chunk); // Assume this writes to a database
|
||||
}
|
||||
}))
|
||||
.on('finish', () => console.log('File processed successfully.'));
|
||||
// Web → Node
|
||||
const nodeReadable2 = nodewebhelpers.convertWebReadableToNodeReadable(webReadable);
|
||||
nodeReadable2.pipe(fs.createWriteStream('./copy.bin'));
|
||||
```
|
||||
|
||||
This example demonstrates reading a large CSV file, transforming each row with `createTransformFunction`, and using a `SmartDuplex` to manage the processed data flow efficiently, ensuring no data is lost due to backpressure issues.
|
||||
---
|
||||
|
||||
### Advanced Use Case: Backpressure Handling
|
||||
### 🏗️ Backpressure Handling
|
||||
|
||||
Effective backpressure handling is crucial when working with streams to avoid overwhelming the downstream consumers. Here’s a comprehensive example that demonstrates handling backpressure in a pipeline with multiple `SmartDuplex` instances:
|
||||
`SmartDuplex` uses a `BackpressuredArray` internally, bounded by `highWaterMark` (default: 1). When the downstream consumer is slow, the stream automatically pauses the upstream producer until space is available — no manual bookkeeping required.
|
||||
|
||||
```typescript
|
||||
import { SmartDuplex } from '@push.rocks/smartstream';
|
||||
|
||||
// Define the first SmartDuplex, which writes data slowly to simulate backpressure
|
||||
const slowProcessingStream = new SmartDuplex({
|
||||
name: 'SlowProcessor',
|
||||
const slow = new SmartDuplex({
|
||||
name: 'SlowConsumer',
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, { push }) => {
|
||||
await new Promise(resolve => setTimeout(resolve, 100)); // Simulated delay
|
||||
console.log('Processed chunk:', chunk);
|
||||
push(chunk);
|
||||
}
|
||||
highWaterMark: 1,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
return chunk;
|
||||
},
|
||||
});
|
||||
|
||||
// Define the second SmartDuplex as a fast processor
|
||||
const fastProcessingStream = new SmartDuplex({
|
||||
name: 'FastProcessor',
|
||||
const fast = new SmartDuplex({
|
||||
name: 'FastProducer',
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, { push }) => {
|
||||
console.log('Fast processing chunk:', chunk);
|
||||
push(chunk);
|
||||
}
|
||||
writeFunction: async (chunk, tools) => {
|
||||
return chunk; // Instant processing
|
||||
},
|
||||
});
|
||||
|
||||
// Create a StreamIntake to dynamically handle incoming data
|
||||
const streamIntake = new StreamIntake<string>();
|
||||
// Backpressure is handled automatically between fast → slow
|
||||
fast.pipe(slow).on('data', (d) => console.log(d));
|
||||
|
||||
// Chain the streams together and handle the backpressure scenario
|
||||
streamIntake
|
||||
.pipe(fastProcessingStream)
|
||||
.pipe(slowProcessingStream)
|
||||
.pipe(createPassThrough()) // Use Pass-Through to provide intermediary handling
|
||||
.on('data', data => console.log('Final output:', data))
|
||||
.on('error', error => console.error('Stream encountered an error:', error));
|
||||
|
||||
// Simulate data pushing with intervals to observe backpressure handling
|
||||
let counter = 0;
|
||||
const interval = setInterval(() => {
|
||||
if (counter >= 10) {
|
||||
streamIntake.signalEnd();
|
||||
clearInterval(interval);
|
||||
} else {
|
||||
streamIntake.pushData(`Chunk ${counter}`);
|
||||
counter++;
|
||||
for (let i = 0; i < 100; i++) {
|
||||
fast.write(`chunk-${i}`);
|
||||
}
|
||||
}, 50);
|
||||
fast.end();
|
||||
```
|
||||
|
||||
In this advanced use case, a `SlowProcessor` and `FastProcessor` are created using `SmartDuplex`, simulating a situation where one stream is slower than another. The `StreamIntake` dynamically handles incoming chunks of data and the intermediary Pass-Through handles any potential interruptions.
|
||||
---
|
||||
|
||||
### Transform Streams in Parallel
|
||||
|
||||
For scenarios where you need to process data in parallel:
|
||||
### 🎯 Real-World Example: Processing Pipeline
|
||||
|
||||
```typescript
|
||||
import { SmartDuplex, createTransformFunction } from '@push.rocks/smartstream';
|
||||
|
||||
const parallelTransform = createTransformFunction<any, any>(async (chunk) => {
|
||||
// Parallel Processing
|
||||
const results = await Promise.all(chunk.map(async item => await processItem(item)));
|
||||
return results;
|
||||
});
|
||||
|
||||
const streamIntake = new StreamIntake<any[]>();
|
||||
|
||||
streamIntake
|
||||
.pipe(parallelTransform)
|
||||
.pipe(new SmartDuplex({
|
||||
async writeFunction(chunk, { push }) {
|
||||
console.log('Processed parallel chunk:', chunk);
|
||||
push(chunk);
|
||||
}
|
||||
}))
|
||||
.on('finish', () => console.log('Parallel processing completed.'));
|
||||
|
||||
// Simulate data pushing
|
||||
streamIntake.pushData([1, 2, 3, 4]);
|
||||
streamIntake.pushData([5, 6, 7, 8]);
|
||||
streamIntake.signalEnd();
|
||||
```
|
||||
|
||||
### Error Handling in Stream Pipelines
|
||||
|
||||
Error handling is an essential part of working with streams. The `StreamWrapper` assists in combining multiple streams while managing errors seamlessly:
|
||||
|
||||
```typescript
|
||||
import { StreamWrapper } from '@push.rocks/smartstream';
|
||||
|
||||
const faultyStream = new SmartDuplex({
|
||||
async writeFunction(chunk, { push }) {
|
||||
if (chunk === 'bad data') {
|
||||
throw new Error('Faulty data encountered');
|
||||
}
|
||||
push(chunk);
|
||||
}
|
||||
});
|
||||
|
||||
const readStream = new StreamIntake<string>();
|
||||
const writeStream = new SmartDuplex({
|
||||
async writeFunction(chunk) {
|
||||
console.log('Written chunk:', chunk);
|
||||
}
|
||||
});
|
||||
|
||||
const combinedStream = new StreamWrapper([readStream, faultyStream, writeStream]);
|
||||
|
||||
combinedStream.run()
|
||||
.then(() => console.log('Stream processing completed.'))
|
||||
.catch(err => console.error('Stream error:', err.message));
|
||||
|
||||
// Push Data
|
||||
readStream.pushData('good data');
|
||||
readStream.pushData('bad data'); // This will throw an error
|
||||
readStream.pushData('more good data');
|
||||
readStream.signalEnd();
|
||||
```
|
||||
|
||||
### Testing Streams
|
||||
|
||||
Here's an example test case using the `tap` testing framework to verify the integrity of the `SmartDuplex` from a buffer:
|
||||
|
||||
```typescript
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import { SmartDuplex } from '@push.rocks/smartstream';
|
||||
|
||||
tap.test('should create a SmartStream from a Buffer', async () => {
|
||||
const bufferData = Buffer.from('This is a test buffer');
|
||||
const smartStream = SmartDuplex.fromBuffer(bufferData, {});
|
||||
|
||||
let receivedData = Buffer.alloc(0);
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
smartStream.on('data', (chunk: Buffer) => {
|
||||
receivedData = Buffer.concat([receivedData, chunk]);
|
||||
});
|
||||
|
||||
smartStream.on('end', () => {
|
||||
expect(receivedData.toString()).toEqual(bufferData.toString());
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
tap.start();
|
||||
```
|
||||
|
||||
### Working with Files and Buffers
|
||||
|
||||
You can easily stream files and buffers with `smartstream`. Here’s a test illustrating reading and writing with file streams using `smartfile` combined with `smartstream` utilities:
|
||||
|
||||
```typescript
|
||||
import { tap } from '@push.rocks/tapbundle';
|
||||
import * as smartfile from '@push.rocks/smartfile';
|
||||
import fs from 'fs';
|
||||
import { SmartDuplex, StreamWrapper } from '@push.rocks/smartstream';
|
||||
|
||||
tap.test('should handle file read and write streams', async () => {
|
||||
const readStream = smartfile.fsStream.createReadStream('./test/assets/readabletext.txt');
|
||||
const writeStream = smartfile.fsStream.createWriteStream('./test/assets/writabletext.txt');
|
||||
|
||||
const transformStream = new SmartDuplex({
|
||||
async writeFunction(chunk, { push }) {
|
||||
const transformedChunk = chunk.toString().toUpperCase();
|
||||
push(transformedChunk);
|
||||
// Read → Transform → Filter → Write
|
||||
const pipeline = new StreamWrapper([
|
||||
fs.createReadStream('./access.log'),
|
||||
new SmartDuplex({
|
||||
writeFunction: async (chunk) => {
|
||||
// Parse each line
|
||||
return chunk.toString().split('\n');
|
||||
},
|
||||
}),
|
||||
new SmartDuplex({
|
||||
objectMode: true,
|
||||
writeFunction: async (lines: string[], tools) => {
|
||||
// Filter and push matching lines
|
||||
for (const line of lines) {
|
||||
if (line.includes('ERROR')) {
|
||||
await tools.push(line + '\n');
|
||||
}
|
||||
});
|
||||
|
||||
const streamWrapper = new StreamWrapper([readStream, transformStream, writeStream]);
|
||||
|
||||
await streamWrapper.run();
|
||||
|
||||
const outputContent = await smartfile.fs.promises.readFile('./test/assets/writabletext.txt', 'utf-8');
|
||||
console.log('Output Content:', outputContent);
|
||||
});
|
||||
|
||||
tap.start();
|
||||
```
|
||||
|
||||
### Modular and Scoped Transformations
|
||||
|
||||
Creating modular and scoped transformations is straightforward with `SmartDuplex`:
|
||||
|
||||
```typescript
|
||||
import { SmartDuplex } from '@push.rocks/smartstream';
|
||||
|
||||
type DataChunk = {
|
||||
id: number;
|
||||
data: string;
|
||||
};
|
||||
|
||||
const transformationStream1 = new SmartDuplex<DataChunk, DataChunk>({
|
||||
async writeFunction(chunk, { push }) {
|
||||
chunk.data = chunk.data.toUpperCase();
|
||||
push(chunk);
|
||||
}
|
||||
})
|
||||
},
|
||||
}),
|
||||
fs.createWriteStream('./errors.log'),
|
||||
]);
|
||||
|
||||
const transformationStream2 = new SmartDuplex<DataChunk, DataChunk>({
|
||||
async writeFunction(chunk, { push }) {
|
||||
chunk.data = `${chunk.data} processed with transformation 2`;
|
||||
push(chunk);
|
||||
}
|
||||
});
|
||||
|
||||
const initialData: DataChunk[] = [
|
||||
{ id: 1, data: 'first' },
|
||||
{ id: 2, data: 'second' }
|
||||
];
|
||||
|
||||
const intakeStream = new StreamIntake<DataChunk>();
|
||||
|
||||
intakeStream
|
||||
.pipe(transformationStream1)
|
||||
.pipe(transformationStream2)
|
||||
.on('data', data => console.log('Transformed Data:', data));
|
||||
|
||||
initialData.forEach(item => intakeStream.pushData(item));
|
||||
intakeStream.signalEnd();
|
||||
await pipeline.run();
|
||||
console.log('Error extraction complete');
|
||||
```
|
||||
|
||||
By leveraging `SmartDuplex`, `StreamWrapper`, and `StreamIntake`, you can streamline and enhance your data transformation pipelines in Node.js with a clear, efficient, and backpressure-friendly approach.
|
||||
```
|
||||
|
||||
|
||||
## License and Legal Information
|
||||
|
||||
This repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the [license](license) file within this repository.
|
||||
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
|
||||
|
||||
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
|
||||
|
||||
### Trademarks
|
||||
|
||||
This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.
|
||||
This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH or third parties, and are not included within the scope of the MIT license granted herein.
|
||||
|
||||
Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines or the guidelines of the respective third-party owners, and any usage must be approved in writing. Third-party trademarks used herein are the property of their respective owners and used only in a descriptive manner, e.g. for an implementation of an API or similar.
|
||||
|
||||
### Company Information
|
||||
|
||||
Task Venture Capital GmbH
|
||||
Registered at District court Bremen HRB 35230 HB, Germany
|
||||
Registered at District Court Bremen HRB 35230 HB, Germany
|
||||
|
||||
For any legal inquiries or if you require further information, please contact us via email at hello@task.vc.
|
||||
For any legal inquiries or further information, please contact us via email at hello@task.vc.
|
||||
|
||||
By using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.
|
||||
|
||||
@@ -1,50 +1,10 @@
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
|
||||
56
test/test.backpressure.ts
Normal file
56
test/test.backpressure.ts
Normal file
@@ -0,0 +1,56 @@
|
||||
import { tap, expect } from '@push.rocks/tapbundle';
|
||||
import { SmartDuplex } from '../ts/index.js';
|
||||
|
||||
tap.test('Backpressure: should apply backpressure across piped streams', async (toolsArg) => {
|
||||
const done = toolsArg.defer();
|
||||
|
||||
const stream1 = new SmartDuplex({
|
||||
name: 'stream1',
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
return chunk;
|
||||
},
|
||||
});
|
||||
|
||||
const stream2 = new SmartDuplex({
|
||||
name: 'stream2',
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
await tools.push(chunk);
|
||||
},
|
||||
});
|
||||
|
||||
const stream3 = new SmartDuplex({
|
||||
objectMode: true,
|
||||
name: 'stream3',
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
},
|
||||
});
|
||||
|
||||
stream1.pipe(stream2).pipe(stream3);
|
||||
|
||||
let backpressured = false;
|
||||
for (let i = 0; i < 200; i++) {
|
||||
const canContinue = stream1.write(`Chunk ${i}`, 'utf8');
|
||||
if (!canContinue) {
|
||||
backpressured = true;
|
||||
}
|
||||
}
|
||||
|
||||
stream1.end();
|
||||
|
||||
stream3.on('finish', () => {
|
||||
if (!backpressured) {
|
||||
throw new Error('No backpressure was observed.');
|
||||
} else {
|
||||
done.resolve();
|
||||
}
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
152
test/test.nodewebhelpers.ts
Normal file
152
test/test.nodewebhelpers.ts
Normal file
@@ -0,0 +1,152 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import * as fs from 'fs';
|
||||
import * as stream from 'stream';
|
||||
import { nodewebhelpers } from '../ts/index.js';
|
||||
|
||||
// =============================================
|
||||
// createWebReadableStreamFromFile
|
||||
// =============================================
|
||||
|
||||
tap.test('nodewebhelpers: createWebReadableStreamFromFile should read a file', async () => {
|
||||
const webStream = nodewebhelpers.createWebReadableStreamFromFile('./test/assets/readabletext.txt');
|
||||
const reader = webStream.getReader();
|
||||
|
||||
const chunks: Uint8Array[] = [];
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
chunks.push(value);
|
||||
}
|
||||
|
||||
expect(chunks.length).toBeGreaterThan(0);
|
||||
const content = Buffer.concat(chunks).toString();
|
||||
expect(content.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// convertNodeReadableToWebReadable
|
||||
// =============================================
|
||||
|
||||
tap.test('nodewebhelpers: convertNodeReadableToWebReadable should convert', async () => {
|
||||
const nodeReadable = fs.createReadStream('./test/assets/readabletext.txt');
|
||||
const webReadable = nodewebhelpers.convertNodeReadableToWebReadable(nodeReadable);
|
||||
|
||||
const reader = webReadable.getReader();
|
||||
const chunks: Uint8Array[] = [];
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
chunks.push(value);
|
||||
}
|
||||
|
||||
expect(chunks.length).toBeGreaterThan(0);
|
||||
const content = Buffer.concat(chunks).toString();
|
||||
expect(content.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// convertWebReadableToNodeReadable
|
||||
// =============================================
|
||||
|
||||
tap.test('nodewebhelpers: convertWebReadableToNodeReadable should convert', async (tools) => {
|
||||
const data = new Uint8Array([72, 101, 108, 108, 111]); // "Hello"
|
||||
const webReadable = new ReadableStream<Uint8Array>({
|
||||
start(controller) {
|
||||
controller.enqueue(data);
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const nodeReadable = nodewebhelpers.convertWebReadableToNodeReadable(webReadable);
|
||||
|
||||
const chunks: Buffer[] = [];
|
||||
const done = tools.defer();
|
||||
|
||||
nodeReadable.on('data', (chunk: Buffer) => {
|
||||
chunks.push(chunk);
|
||||
});
|
||||
|
||||
nodeReadable.on('end', () => {
|
||||
const result = Buffer.concat(chunks).toString();
|
||||
expect(result).toEqual('Hello');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// convertNodeWritableToWebWritable
|
||||
// =============================================
|
||||
|
||||
tap.test('nodewebhelpers: convertNodeWritableToWebWritable should convert', async () => {
|
||||
const chunks: Buffer[] = [];
|
||||
const nodeWritable = new stream.Writable({
|
||||
write(chunk, encoding, callback) {
|
||||
chunks.push(chunk);
|
||||
callback();
|
||||
},
|
||||
});
|
||||
|
||||
const webWritable = nodewebhelpers.convertNodeWritableToWebWritable(nodeWritable);
|
||||
const writer = webWritable.getWriter();
|
||||
|
||||
await writer.write(new Uint8Array([65, 66, 67])); // "ABC"
|
||||
await writer.close();
|
||||
|
||||
const result = Buffer.concat(chunks).toString();
|
||||
expect(result).toEqual('ABC');
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// convertWebWritableToNodeWritable
|
||||
// =============================================
|
||||
|
||||
tap.test('nodewebhelpers: convertWebWritableToNodeWritable should convert', async (tools) => {
|
||||
const chunks: Uint8Array[] = [];
|
||||
|
||||
const webWritable = new WritableStream<Uint8Array>({
|
||||
write(chunk) {
|
||||
chunks.push(chunk);
|
||||
},
|
||||
});
|
||||
|
||||
const nodeWritable = nodewebhelpers.convertWebWritableToNodeWritable(webWritable);
|
||||
|
||||
const done = tools.defer();
|
||||
nodeWritable.write(Buffer.from('Hello'), (err) => {
|
||||
expect(err).toBeFalsy();
|
||||
nodeWritable.end(() => {
|
||||
expect(chunks.length).toBeGreaterThan(0);
|
||||
done.resolve();
|
||||
});
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// Round-trip: Node → Web → Node
|
||||
// =============================================
|
||||
|
||||
tap.test('nodewebhelpers: round-trip Node → Web → Node readable', async (tools) => {
|
||||
const nodeReadable = fs.createReadStream('./test/assets/readabletext.txt');
|
||||
const webReadable = nodewebhelpers.convertNodeReadableToWebReadable(nodeReadable);
|
||||
const nodeReadable2 = nodewebhelpers.convertWebReadableToNodeReadable(webReadable);
|
||||
|
||||
const chunks: Buffer[] = [];
|
||||
const done = tools.defer();
|
||||
|
||||
nodeReadable2.on('data', (chunk: Buffer) => {
|
||||
chunks.push(chunk);
|
||||
});
|
||||
|
||||
nodeReadable2.on('end', () => {
|
||||
expect(chunks.length).toBeGreaterThan(0);
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
379
test/test.smartduplex.ts
Normal file
379
test/test.smartduplex.ts
Normal file
@@ -0,0 +1,379 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import * as fs from 'fs';
|
||||
import * as smartstream from '../ts/index.js';
|
||||
import { SmartDuplex } from '../ts/smartstream.classes.smartduplex.js';
|
||||
|
||||
// =============================================
|
||||
// Constructor
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should construct with no options', async () => {
|
||||
const duplex = new SmartDuplex();
|
||||
expect(duplex).toBeInstanceOf(SmartDuplex);
|
||||
});
|
||||
|
||||
tap.test('SmartDuplex: should construct with options', async () => {
|
||||
const duplex = new SmartDuplex({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => chunk,
|
||||
});
|
||||
expect(duplex).toBeInstanceOf(SmartDuplex);
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// fromBuffer
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should create from a Buffer', async () => {
|
||||
const bufferData = Buffer.from('This is a test buffer');
|
||||
const stream = SmartDuplex.fromBuffer(bufferData, {});
|
||||
|
||||
let receivedData = Buffer.alloc(0);
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
stream.on('data', (chunk: Buffer) => {
|
||||
receivedData = Buffer.concat([receivedData, chunk]);
|
||||
});
|
||||
stream.on('end', () => {
|
||||
expect(receivedData.toString()).toEqual(bufferData.toString());
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// writeFunction
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should transform chunks via writeFunction', async (tools) => {
|
||||
const results: string[] = [];
|
||||
const transform = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => {
|
||||
return chunk.toUpperCase();
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
|
||||
transform.on('data', (chunk: string) => {
|
||||
results.push(chunk);
|
||||
});
|
||||
|
||||
transform.on('end', () => {
|
||||
expect(results).toContain('HELLO');
|
||||
expect(results).toContain('WORLD');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
transform.write('hello');
|
||||
transform.write('world');
|
||||
transform.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
tap.test('SmartDuplex: writeFunction returning undefined should not push', async (tools) => {
|
||||
const results: any[] = [];
|
||||
const transform = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async () => {
|
||||
return undefined;
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
|
||||
transform.on('data', (chunk: any) => {
|
||||
results.push(chunk);
|
||||
});
|
||||
|
||||
transform.on('end', () => {
|
||||
expect(results.length).toEqual(0);
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
transform.write('hello');
|
||||
transform.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// tools.push — multiple outputs
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should emit multiple chunks via tools.push', async (tools) => {
|
||||
const results: string[] = [];
|
||||
const splitter = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, streamTools) => {
|
||||
const words = chunk.split(' ');
|
||||
for (const word of words) {
|
||||
await streamTools.push(word);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
|
||||
splitter.on('data', (chunk: string) => results.push(chunk));
|
||||
|
||||
splitter.on('end', () => {
|
||||
expect(results).toContain('hello');
|
||||
expect(results).toContain('beautiful');
|
||||
expect(results).toContain('world');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
splitter.write('hello beautiful world');
|
||||
splitter.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// finalFunction
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should emit final chunk via finalFunction', async (tools) => {
|
||||
const results: string[] = [];
|
||||
let count = 0;
|
||||
|
||||
const aggregator = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async () => {
|
||||
count++;
|
||||
return undefined;
|
||||
},
|
||||
finalFunction: async () => {
|
||||
return `total: ${count}`;
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
|
||||
aggregator.on('data', (chunk: string) => results.push(chunk));
|
||||
|
||||
aggregator.on('end', () => {
|
||||
expect(results.length).toEqual(1);
|
||||
expect(results[0]).toEqual('total: 2');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
aggregator.write('a');
|
||||
aggregator.write('b');
|
||||
aggregator.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
tap.test('SmartDuplex: finalFunction can push multiple chunks via tools.push', async (tools) => {
|
||||
const results: string[] = [];
|
||||
|
||||
const stream = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => chunk,
|
||||
finalFunction: async (streamTools) => {
|
||||
await streamTools.push('final1');
|
||||
await streamTools.push('final2');
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
|
||||
stream.on('data', (chunk: string) => results.push(chunk));
|
||||
|
||||
stream.on('end', () => {
|
||||
expect(results).toContain('hello');
|
||||
expect(results).toContain('final1');
|
||||
expect(results).toContain('final2');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
stream.write('hello');
|
||||
stream.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// truncate
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should truncate stream early', async (tools) => {
|
||||
const results: string[] = [];
|
||||
|
||||
const limiter = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, streamTools) => {
|
||||
if (chunk === 'STOP') {
|
||||
streamTools.truncate();
|
||||
return undefined;
|
||||
}
|
||||
return chunk;
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
|
||||
limiter.on('data', (chunk: string) => results.push(chunk));
|
||||
|
||||
limiter.on('end', () => {
|
||||
expect(results).toContain('a');
|
||||
expect(results).toContain('b');
|
||||
expect(results).not.toContain('STOP');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
limiter.write('a');
|
||||
limiter.write('b');
|
||||
// Write STOP on next tick to allow previous writes to flush
|
||||
process.nextTick(() => {
|
||||
limiter.write('STOP');
|
||||
});
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// Error handling
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should emit error when writeFunction throws', async (tools) => {
|
||||
const stream = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async () => {
|
||||
throw new Error('write error');
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
stream.on('error', (err) => {
|
||||
expect(err.message).toEqual('write error');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
stream.write('test');
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
tap.test('SmartDuplex: should error when no writeFunction and data is written', async (tools) => {
|
||||
const stream = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
stream.on('error', (err) => {
|
||||
expect(err.message).toEqual('No stream function provided');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
stream.write('test');
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// fromWebReadableStream
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should create from a Web ReadableStream', async (tools) => {
|
||||
const chunks = ['hello', 'world', 'foo'];
|
||||
const webReadable = new ReadableStream<string>({
|
||||
start(controller) {
|
||||
for (const chunk of chunks) {
|
||||
controller.enqueue(chunk);
|
||||
}
|
||||
controller.close();
|
||||
}
|
||||
});
|
||||
|
||||
const duplex = SmartDuplex.fromWebReadableStream(webReadable);
|
||||
const results: string[] = [];
|
||||
|
||||
const done = tools.defer();
|
||||
duplex.on('data', (chunk: string) => results.push(chunk));
|
||||
duplex.on('end', () => {
|
||||
expect(results).toEqual(chunks);
|
||||
done.resolve();
|
||||
});
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// getWebStreams
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should provide web streams via getWebStreams()', async () => {
|
||||
const duplex = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => {
|
||||
return chunk.toUpperCase();
|
||||
},
|
||||
});
|
||||
|
||||
const { readable, writable } = await duplex.getWebStreams();
|
||||
|
||||
const writer = writable.getWriter();
|
||||
const reader = readable.getReader();
|
||||
|
||||
await writer.write('hello');
|
||||
await writer.write('world');
|
||||
await writer.close();
|
||||
|
||||
const results: string[] = [];
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
results.push(value);
|
||||
}
|
||||
|
||||
expect(results).toContain('HELLO');
|
||||
expect(results).toContain('WORLD');
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// Debug mode
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: debug mode should not crash', async (tools) => {
|
||||
const stream = new SmartDuplex<string, string>({
|
||||
name: 'DebugStream',
|
||||
debug: true,
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => chunk,
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
stream.on('data', () => {});
|
||||
stream.on('end', () => done.resolve());
|
||||
|
||||
stream.write('test');
|
||||
stream.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// Pipe with file read
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should handle a read stream pipeline', async () => {
|
||||
const streamWrapper = new smartstream.StreamWrapper([
|
||||
fs.createReadStream('./test/assets/readabletext.txt'),
|
||||
new smartstream.SmartDuplex({
|
||||
writeFunction: async (chunkStringArg: Buffer, streamTools) => {
|
||||
const result = chunkStringArg.toString().substr(0, 100);
|
||||
streamTools.push('wow =========== \n');
|
||||
return Buffer.from(result);
|
||||
},
|
||||
finalFunction: async () => {
|
||||
return Buffer.from('this is the end');
|
||||
},
|
||||
}),
|
||||
new smartstream.SmartDuplex({
|
||||
writeFunction: async (chunkStringArg) => {
|
||||
// consume data
|
||||
},
|
||||
finalFunction: async (streamTools) => {
|
||||
streamTools.push(null);
|
||||
},
|
||||
})
|
||||
]);
|
||||
await streamWrapper.run();
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
128
test/test.streamintake.ts
Normal file
128
test/test.streamintake.ts
Normal file
@@ -0,0 +1,128 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import * as fs from 'fs';
|
||||
import { StreamIntake, SmartDuplex } from '../ts/index.js';
|
||||
import * as stream from 'stream';
|
||||
|
||||
// =============================================
|
||||
// Basic StreamIntake
|
||||
// =============================================
|
||||
|
||||
tap.test('StreamIntake: should push data and signal end', async (tools) => {
|
||||
const intake = new StreamIntake<string>();
|
||||
const results: string[] = [];
|
||||
|
||||
intake.pipe(
|
||||
new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => {
|
||||
results.push(chunk);
|
||||
return chunk;
|
||||
},
|
||||
})
|
||||
);
|
||||
|
||||
const done = tools.defer();
|
||||
let counter = 0;
|
||||
intake.pushNextObservable.subscribe(() => {
|
||||
if (counter < 5) {
|
||||
counter++;
|
||||
intake.pushData(`item-${counter}`);
|
||||
} else {
|
||||
intake.signalEnd();
|
||||
done.resolve();
|
||||
}
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
// Give streams time to flush
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
expect(results.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
tap.test('StreamIntake: should pipe to a writable', async (tools) => {
|
||||
const intake = new StreamIntake<string>();
|
||||
|
||||
intake
|
||||
.pipe(
|
||||
new SmartDuplex({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk: string) => {
|
||||
return chunk;
|
||||
},
|
||||
})
|
||||
)
|
||||
.pipe(fs.createWriteStream('./test/assets/writabletext.txt'));
|
||||
|
||||
const done = tools.defer();
|
||||
let counter = 0;
|
||||
intake.pushNextObservable.subscribe(() => {
|
||||
if (counter < 10) {
|
||||
counter++;
|
||||
intake.pushData('data\n');
|
||||
} else {
|
||||
intake.signalEnd();
|
||||
done.resolve();
|
||||
}
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// StreamIntake.fromStream (Node Readable)
|
||||
// =============================================
|
||||
|
||||
tap.test('StreamIntake: fromStream should wrap a Node readable', async (tools) => {
|
||||
const nodeReadable = fs.createReadStream('./test/assets/readabletext.txt');
|
||||
const intake = await StreamIntake.fromStream<Buffer>(nodeReadable);
|
||||
|
||||
const chunks: Buffer[] = [];
|
||||
const done = tools.defer();
|
||||
|
||||
intake.on('data', (chunk: Buffer) => {
|
||||
chunks.push(chunk);
|
||||
});
|
||||
|
||||
intake.on('end', () => {
|
||||
expect(chunks.length).toBeGreaterThan(0);
|
||||
const content = Buffer.concat(chunks).toString();
|
||||
expect(content.length).toBeGreaterThan(0);
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// StreamIntake.fromStream (Web ReadableStream)
|
||||
// =============================================
|
||||
|
||||
tap.test('StreamIntake: fromStream should wrap a Web ReadableStream', async (tools) => {
|
||||
const data = ['chunk1', 'chunk2', 'chunk3'];
|
||||
const webReadable = new ReadableStream<string>({
|
||||
start(controller) {
|
||||
for (const item of data) {
|
||||
controller.enqueue(item);
|
||||
}
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const intake = await StreamIntake.fromStream<string>(webReadable);
|
||||
|
||||
const results: string[] = [];
|
||||
const done = tools.defer();
|
||||
|
||||
intake.on('data', (chunk: string) => {
|
||||
results.push(chunk);
|
||||
});
|
||||
|
||||
intake.on('end', () => {
|
||||
expect(results).toEqual(data);
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
70
test/test.streamwrapper.ts
Normal file
70
test/test.streamwrapper.ts
Normal file
@@ -0,0 +1,70 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import * as fs from 'fs';
|
||||
import { StreamWrapper, SmartDuplex } from '../ts/index.js';
|
||||
|
||||
tap.test('StreamWrapper: should pipe read to write', async () => {
|
||||
const wrapper = new StreamWrapper([
|
||||
fs.createReadStream('./test/assets/test.md'),
|
||||
fs.createWriteStream('./test/assets/testCopy.md'),
|
||||
]);
|
||||
await wrapper.run();
|
||||
});
|
||||
|
||||
tap.test('StreamWrapper: should propagate errors', async (tools) => {
|
||||
const failingStream = new SmartDuplex<Buffer, Buffer>({
|
||||
writeFunction: async () => {
|
||||
throw new Error('intentional error');
|
||||
},
|
||||
});
|
||||
|
||||
const wrapper = new StreamWrapper([
|
||||
fs.createReadStream('./test/assets/test.md'),
|
||||
failingStream,
|
||||
]);
|
||||
|
||||
let errorCaught = false;
|
||||
try {
|
||||
await wrapper.run();
|
||||
} catch (err) {
|
||||
errorCaught = true;
|
||||
expect(err.message).toEqual('intentional error');
|
||||
}
|
||||
expect(errorCaught).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('StreamWrapper: streamStarted should resolve', async () => {
|
||||
const wrapper = new StreamWrapper([
|
||||
fs.createReadStream('./test/assets/test.md'),
|
||||
fs.createWriteStream('./test/assets/testCopy.md'),
|
||||
]);
|
||||
|
||||
const runPromise = wrapper.run();
|
||||
await wrapper.streamStarted();
|
||||
await runPromise;
|
||||
});
|
||||
|
||||
tap.test('StreamWrapper: onCustomEvent should fire', async (tools) => {
|
||||
const results: string[] = [];
|
||||
|
||||
const emitter = new SmartDuplex<Buffer, Buffer>({
|
||||
writeFunction: async (chunk, streamTools) => {
|
||||
(emitter as any).emit('custom-progress', 'progress');
|
||||
return chunk;
|
||||
},
|
||||
});
|
||||
|
||||
const wrapper = new StreamWrapper([
|
||||
fs.createReadStream('./test/assets/test.md'),
|
||||
emitter,
|
||||
fs.createWriteStream('./test/assets/testCopy.md'),
|
||||
]);
|
||||
|
||||
wrapper.onCustomEvent('custom-progress', () => {
|
||||
results.push('fired');
|
||||
});
|
||||
|
||||
await wrapper.run();
|
||||
expect(results.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -1,68 +0,0 @@
|
||||
import { tap, expect } from '@push.rocks/tapbundle';
|
||||
import { SmartDuplex, type ISmartDuplexOptions, StreamWrapper } from '../ts/index.js';
|
||||
|
||||
tap.test('should run backpressure test', async (toolsArg) => {
|
||||
const done = toolsArg.defer();
|
||||
async function testBackpressure() {
|
||||
const stream1 = new SmartDuplex({
|
||||
name: 'stream1',
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 10)); // Slow processing
|
||||
console.log(`processed chunk ${chunk} in stream 1`);
|
||||
return chunk; // Fast processing
|
||||
},
|
||||
});
|
||||
const stream2 = new SmartDuplex({
|
||||
name: 'stream2',
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 20)); // Slow processing
|
||||
console.log(`processed chunk ${chunk} in stream 2`);
|
||||
await tools.push(chunk);
|
||||
// return chunk, optionally return ;
|
||||
},
|
||||
}); // This stream processes data more slowly
|
||||
const stream3 = new SmartDuplex({
|
||||
objectMode: true,
|
||||
name: 'stream3',
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 200)); // Slow processing
|
||||
console.log(`processed chunk ${chunk} in stream 3`);
|
||||
},
|
||||
});
|
||||
|
||||
stream1.pipe(stream2).pipe(stream3);
|
||||
|
||||
let backpressured = false;
|
||||
for (let i = 0; i < 200; i++) {
|
||||
const canContinue = stream1.write(`Chunk ${i}`, 'utf8');
|
||||
if (!canContinue) {
|
||||
backpressured = true;
|
||||
console.log(`Backpressure at chunk ${i}`);
|
||||
}
|
||||
}
|
||||
|
||||
stream1.end();
|
||||
|
||||
stream1.on('finish', () => {
|
||||
console.log('Stream 1 finished processing.');
|
||||
});
|
||||
stream2.on('finish', () => {
|
||||
console.log('Stream 2 finished processing.');
|
||||
});
|
||||
stream3.on('finish', () => {
|
||||
console.log('Stream 3 finished processing.');
|
||||
if (!backpressured) {
|
||||
throw new Error('No backpressure was observed.');
|
||||
} else {
|
||||
done.resolve();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
testBackpressure();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
await tap.start();
|
||||
@@ -1,24 +0,0 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import { SmartDuplex } from '../ts/smartstream.classes.smartduplex.js'; // Adjust the import to your file structure
|
||||
import * as smartrx from '@push.rocks/smartrx';
|
||||
import * as fs from 'fs';
|
||||
|
||||
tap.test('should create a SmartStream from a Buffer', async () => {
|
||||
const bufferData = Buffer.from('This is a test buffer');
|
||||
const smartStream = SmartDuplex.fromBuffer(bufferData, {});
|
||||
|
||||
let receivedData = Buffer.alloc(0);
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
smartStream.on('data', (chunk: Buffer) => {
|
||||
receivedData = Buffer.concat([receivedData, chunk]);
|
||||
});
|
||||
|
||||
smartStream.on('end', () => {
|
||||
expect(receivedData.toString()).toEqual(bufferData.toString());
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
tap.start();
|
||||
@@ -1,65 +0,0 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import * as smartfile from '@push.rocks/smartfile';
|
||||
|
||||
import * as smartstream from '../ts/index.js';
|
||||
|
||||
let testIntake: smartstream.StreamIntake<string>;
|
||||
|
||||
tap.test('should handle a read stream', async (tools) => {
|
||||
const counter = 0;
|
||||
const streamWrapper = new smartstream.StreamWrapper([
|
||||
smartfile.fsStream.createReadStream('./test/assets/readabletext.txt'),
|
||||
new smartstream.SmartDuplex({
|
||||
writeFunction: async (chunkStringArg: Buffer, streamTools) => {
|
||||
// do something with the stream here
|
||||
const result = chunkStringArg.toString().substr(0, 100);
|
||||
streamTools.push('wow =========== \n');
|
||||
return Buffer.from(result);
|
||||
},
|
||||
finalFunction: async (tools) => {
|
||||
return Buffer.from('this is the end');
|
||||
},
|
||||
}),
|
||||
new smartstream.SmartDuplex({
|
||||
writeFunction: async (chunkStringArg) => {
|
||||
console.log(chunkStringArg.toString());
|
||||
},
|
||||
finalFunction: async (tools) => {
|
||||
tools.push(null);
|
||||
},
|
||||
})
|
||||
]);
|
||||
await streamWrapper.run();
|
||||
});
|
||||
|
||||
tap.test('should create a valid Intake', async (tools) => {
|
||||
testIntake = new smartstream.StreamIntake<string>();
|
||||
testIntake.pipe(
|
||||
new smartstream.SmartDuplex({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunkStringArg: string, streamTools) => {
|
||||
await tools.delayFor(100);
|
||||
console.log(chunkStringArg);
|
||||
return chunkStringArg;
|
||||
}
|
||||
})
|
||||
)
|
||||
.pipe(smartfile.fsStream.createWriteStream('./test/assets/writabletext.txt'));
|
||||
const testFinished = tools.defer();
|
||||
let counter = 0;
|
||||
testIntake.pushNextObservable.subscribe(() => {
|
||||
if (counter < 50) {
|
||||
counter++;
|
||||
testIntake.pushData('hi');
|
||||
testIntake.pushData('+wow');
|
||||
testIntake.pushData('\n');
|
||||
} else {
|
||||
testIntake.signalEnd();
|
||||
testFinished.resolve();
|
||||
}
|
||||
});
|
||||
await testFinished.promise;
|
||||
testIntake.signalEnd();
|
||||
});
|
||||
|
||||
tap.start();
|
||||
@@ -1,15 +0,0 @@
|
||||
import * as smartfile from '@push.rocks/smartfile';
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
|
||||
import * as smartstream from '../ts/smartstream.classes.streamwrapper.js';
|
||||
|
||||
let testSmartstream: smartstream.StreamWrapper;
|
||||
tap.test('should combine a stream', async () => {
|
||||
testSmartstream = new smartstream.StreamWrapper([
|
||||
smartfile.fsStream.createReadStream('./test/assets/test.md'),
|
||||
smartfile.fsStream.createWriteStream('./test/assets/testCopy.md'),
|
||||
]);
|
||||
await testSmartstream.run();
|
||||
});
|
||||
|
||||
tap.start();
|
||||
@@ -1,67 +0,0 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import * as webstream from '../ts_web/index.js';
|
||||
|
||||
tap.test('WebDuplexStream fromUInt8Array should read back the same Uint8Array', async () => {
|
||||
const inputUint8Array = new Uint8Array([1, 2, 3, 4, 5]);
|
||||
const stream = webstream.WebDuplexStream.fromUInt8Array(inputUint8Array);
|
||||
|
||||
const reader = stream.readable.getReader();
|
||||
let readUint8Array = new Uint8Array();
|
||||
|
||||
// Read from the stream
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
if (value) {
|
||||
// Concatenate value to readUint8Array
|
||||
const tempArray = new Uint8Array(readUint8Array.length + value.length);
|
||||
tempArray.set(readUint8Array, 0);
|
||||
tempArray.set(value, readUint8Array.length);
|
||||
readUint8Array = tempArray;
|
||||
}
|
||||
}
|
||||
|
||||
expect(readUint8Array).toEqual(inputUint8Array);
|
||||
});
|
||||
|
||||
tap.test('WebDuplexStream should handle transform with a write function', async () => {
|
||||
const input = [1, 2, 3, 4, 5];
|
||||
const expectedOutput = [2, 4, 6, 8, 10];
|
||||
|
||||
const webDuplexStream = new webstream.WebDuplexStream<number, number>({
|
||||
writeFunction: async (chunk, { push }) => {
|
||||
// Push the doubled number into the stream
|
||||
push(chunk * 2);
|
||||
},
|
||||
});
|
||||
|
||||
const writer = webDuplexStream.writable.getWriter();
|
||||
const reader = webDuplexStream.readable.getReader();
|
||||
|
||||
const output: number[] = [];
|
||||
|
||||
// Read from the stream asynchronously
|
||||
const readPromise = (async () => {
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
if (value !== undefined) {
|
||||
output.push(value);
|
||||
}
|
||||
}
|
||||
})();
|
||||
|
||||
// Write to the stream
|
||||
for (const num of input) {
|
||||
await writer.write(num);
|
||||
}
|
||||
await writer.close();
|
||||
|
||||
// Wait for the reading to complete
|
||||
await readPromise;
|
||||
|
||||
// Assert that the output matches the expected transformed data
|
||||
expect(output).toEqual(expectedOutput);
|
||||
});
|
||||
|
||||
tap.start();
|
||||
51
test/test.utilities.ts
Normal file
51
test/test.utilities.ts
Normal file
@@ -0,0 +1,51 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import { createTransformFunction, createPassThrough, SmartDuplex, StreamWrapper } from '../ts/index.js';
|
||||
|
||||
// =============================================
|
||||
// createTransformFunction
|
||||
// =============================================
|
||||
|
||||
tap.test('createTransformFunction: should create a transform stream', async (tools) => {
|
||||
const doubler = createTransformFunction<number, number>(async (n) => n * 2, { objectMode: true });
|
||||
const results: number[] = [];
|
||||
|
||||
doubler.on('data', (chunk: number) => results.push(chunk));
|
||||
|
||||
const done = tools.defer();
|
||||
doubler.on('end', () => {
|
||||
expect(results).toContain(10);
|
||||
expect(results).toContain(20);
|
||||
expect(results).toContain(30);
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
doubler.write(5);
|
||||
doubler.write(10);
|
||||
doubler.write(15);
|
||||
doubler.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// createPassThrough
|
||||
// =============================================
|
||||
|
||||
tap.test('createPassThrough: should pass data through unchanged', async (tools) => {
|
||||
const passThrough = createPassThrough();
|
||||
const results: string[] = [];
|
||||
|
||||
passThrough.on('data', (chunk: string) => results.push(chunk));
|
||||
|
||||
const done = tools.defer();
|
||||
passThrough.on('end', () => {
|
||||
expect(results).toEqual(['hello', 'world']);
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
passThrough.write('hello');
|
||||
passThrough.write('world');
|
||||
passThrough.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
144
test/test.webduplexstream.both.ts
Normal file
144
test/test.webduplexstream.both.ts
Normal file
@@ -0,0 +1,144 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import { WebDuplexStream } from '../ts_web/index.js';
|
||||
|
||||
// Helper: collect all chunks from a readable
|
||||
async function collectAll<T>(reader: ReadableStreamDefaultReader<T>): Promise<T[]> {
|
||||
const results: T[] = [];
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
results.push(value);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
// =============================================
|
||||
// Basic transform
|
||||
// =============================================
|
||||
|
||||
tap.test('WebDuplexStream: should transform chunks via writeFunction', async () => {
|
||||
const stream = new WebDuplexStream<number, number>({
|
||||
writeFunction: async (chunk, { push }) => {
|
||||
push(chunk * 2);
|
||||
},
|
||||
});
|
||||
|
||||
const writer = stream.writable.getWriter();
|
||||
const reader = stream.readable.getReader();
|
||||
|
||||
// Read and write concurrently to avoid backpressure deadlock
|
||||
const readPromise = collectAll(reader);
|
||||
await writer.write(5);
|
||||
await writer.write(10);
|
||||
await writer.close();
|
||||
const results = await readPromise;
|
||||
|
||||
expect(results).toContain(10);
|
||||
expect(results).toContain(20);
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// writeFunction return value
|
||||
// =============================================
|
||||
|
||||
tap.test('WebDuplexStream: should enqueue returned non-null values', async () => {
|
||||
const stream = new WebDuplexStream<string, string>({
|
||||
writeFunction: async (chunk) => {
|
||||
return chunk.toUpperCase();
|
||||
},
|
||||
});
|
||||
|
||||
const writer = stream.writable.getWriter();
|
||||
const reader = stream.readable.getReader();
|
||||
|
||||
const readPromise = collectAll(reader);
|
||||
await writer.write('hello');
|
||||
await writer.close();
|
||||
const results = await readPromise;
|
||||
|
||||
expect(results[0]).toEqual('HELLO');
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// fromUInt8Array
|
||||
// =============================================
|
||||
|
||||
tap.test('WebDuplexStream: fromUInt8Array should produce data', async () => {
|
||||
const data = new Uint8Array([1, 2, 3, 4, 5]);
|
||||
const stream = WebDuplexStream.fromUInt8Array(data);
|
||||
const reader = stream.readable.getReader();
|
||||
|
||||
const { value } = await reader.read();
|
||||
expect(value).toBeTruthy();
|
||||
expect(value.length).toEqual(5);
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// readFunction
|
||||
// =============================================
|
||||
|
||||
tap.test('WebDuplexStream: readFunction should supply data to the stream', async () => {
|
||||
const stream = new WebDuplexStream<string, string>({
|
||||
readFunction: async (tools) => {
|
||||
await tools.write('chunk1');
|
||||
await tools.write('chunk2');
|
||||
tools.done();
|
||||
},
|
||||
writeFunction: async (chunk, { push }) => {
|
||||
push(chunk.toUpperCase());
|
||||
},
|
||||
});
|
||||
|
||||
const reader = stream.readable.getReader();
|
||||
const results = await collectAll(reader);
|
||||
|
||||
expect(results).toContain('CHUNK1');
|
||||
expect(results).toContain('CHUNK2');
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// finalFunction
|
||||
// =============================================
|
||||
|
||||
tap.test('WebDuplexStream: finalFunction should emit final data', async () => {
|
||||
const stream = new WebDuplexStream<string, string>({
|
||||
writeFunction: async (chunk) => {
|
||||
return chunk;
|
||||
},
|
||||
finalFunction: async (tools) => {
|
||||
tools.push('final');
|
||||
return undefined;
|
||||
},
|
||||
});
|
||||
|
||||
const writer = stream.writable.getWriter();
|
||||
const reader = stream.readable.getReader();
|
||||
|
||||
const readPromise = collectAll(reader);
|
||||
await writer.write('hello');
|
||||
await writer.close();
|
||||
const results = await readPromise;
|
||||
|
||||
expect(results).toContain('hello');
|
||||
expect(results).toContain('final');
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// No writeFunction = passthrough
|
||||
// =============================================
|
||||
|
||||
tap.test('WebDuplexStream: no writeFunction should passthrough', async () => {
|
||||
const stream = new WebDuplexStream<string, string>({});
|
||||
|
||||
const writer = stream.writable.getWriter();
|
||||
const reader = stream.readable.getReader();
|
||||
|
||||
const readPromise = collectAll(reader);
|
||||
await writer.write('pass');
|
||||
await writer.close();
|
||||
const results = await readPromise;
|
||||
|
||||
expect(results[0]).toEqual('pass');
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartstream',
|
||||
version: '3.2.0',
|
||||
version: '3.4.0',
|
||||
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
|
||||
}
|
||||
|
||||
@@ -56,69 +56,116 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
||||
readableStream: ReadableStream<T>
|
||||
): SmartDuplex<T, T> {
|
||||
const smartDuplex = new SmartDuplex<T, T>({
|
||||
readFunction: async () => {
|
||||
objectMode: true,
|
||||
});
|
||||
|
||||
// Acquire reader ONCE
|
||||
const reader = readableStream.getReader();
|
||||
try {
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (value !== undefined) {
|
||||
let reading = false;
|
||||
|
||||
// Override _read to pull from the web reader
|
||||
smartDuplex._read = function (_size: number) {
|
||||
if (reading) return;
|
||||
reading = true;
|
||||
reader.read().then(
|
||||
({ value, done }) => {
|
||||
reading = false;
|
||||
if (done) {
|
||||
smartDuplex.push(null);
|
||||
} else {
|
||||
smartDuplex.push(value);
|
||||
}
|
||||
if (done) {
|
||||
smartDuplex.end();
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
},
|
||||
(err) => {
|
||||
reading = false;
|
||||
smartDuplex.destroy(err);
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
// Cancel reader on destroy
|
||||
smartDuplex.on('close', () => {
|
||||
reader.cancel().catch(() => {});
|
||||
});
|
||||
|
||||
return smartDuplex;
|
||||
}
|
||||
|
||||
// INSTANCE
|
||||
private backpressuredArray: plugins.lik.BackpressuredArray<TOutput>; // an array that only takes a defined amount of items
|
||||
private backpressuredArray: plugins.lik.BackpressuredArray<TOutput>;
|
||||
public options: ISmartDuplexOptions<TInput, TOutput>;
|
||||
private observableSubscription?: plugins.smartrx.rxjs.Subscription;
|
||||
private _consumerWantsData = false;
|
||||
private _readFunctionRunning = false;
|
||||
|
||||
private debugLog(messageArg: string) {
|
||||
// optional debug log
|
||||
if (this.options.debug) {
|
||||
console.log(messageArg);
|
||||
}
|
||||
}
|
||||
|
||||
constructor(optionsArg?: ISmartDuplexOptions<TInput, TOutput>) {
|
||||
const safeOptions = optionsArg || {} as ISmartDuplexOptions<TInput, TOutput>;
|
||||
super(
|
||||
Object.assign(
|
||||
{
|
||||
highWaterMark: 1,
|
||||
},
|
||||
optionsArg
|
||||
safeOptions
|
||||
)
|
||||
);
|
||||
this.options = optionsArg;
|
||||
this.options = safeOptions;
|
||||
this.backpressuredArray = new plugins.lik.BackpressuredArray<TOutput>(
|
||||
this.options.highWaterMark || 1
|
||||
);
|
||||
}
|
||||
|
||||
public async _read(size: number): Promise<void> {
|
||||
this.debugLog(`${this.options.name}: read was called`);
|
||||
await this.backpressuredArray.waitForItems();
|
||||
this.debugLog(`${this.options.name}: successfully waited for items.`);
|
||||
if (this.options.readFunction) {
|
||||
await this.options.readFunction();
|
||||
}
|
||||
let canPushMore = true;
|
||||
while (this.backpressuredArray.data.length > 0 && canPushMore) {
|
||||
/**
|
||||
* Synchronously drains items from the backpressuredArray into the readable side.
|
||||
* Stops when push() returns false (consumer is full) or array is empty.
|
||||
*/
|
||||
private _drainBackpressuredArray(): void {
|
||||
while (this.backpressuredArray.data.length > 0) {
|
||||
const nextChunk = this.backpressuredArray.shift();
|
||||
canPushMore = this.push(nextChunk);
|
||||
if (nextChunk === null) {
|
||||
// EOF signal — push null to end readable side
|
||||
this.push(null);
|
||||
this._consumerWantsData = false;
|
||||
return;
|
||||
}
|
||||
const canPushMore = this.push(nextChunk);
|
||||
if (!canPushMore) {
|
||||
this._consumerWantsData = false;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// _read must NOT be async — Node.js ignores the return value
|
||||
public _read(size: number): void {
|
||||
this.debugLog(`${this.options.name}: read was called`);
|
||||
this._consumerWantsData = true;
|
||||
|
||||
// Drain any buffered items first
|
||||
if (this.backpressuredArray.data.length > 0) {
|
||||
this._drainBackpressuredArray();
|
||||
}
|
||||
|
||||
// If readFunction exists and is not already running, start it
|
||||
if (this.options.readFunction && !this._readFunctionRunning) {
|
||||
this._readFunctionRunning = true;
|
||||
this.options.readFunction().then(
|
||||
() => { this._readFunctionRunning = false; },
|
||||
(err) => { this._readFunctionRunning = false; this.destroy(err); }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public async backpressuredPush(pushArg: TOutput) {
|
||||
const canPushMore = this.backpressuredArray.push(pushArg);
|
||||
// Try to drain if the consumer wants data
|
||||
if (this._consumerWantsData) {
|
||||
this._drainBackpressuredArray();
|
||||
}
|
||||
if (!canPushMore) {
|
||||
this.debugLog(`${this.options.name}: cannot push more`);
|
||||
await this.backpressuredArray.waitForSpace();
|
||||
@@ -128,83 +175,151 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
||||
}
|
||||
|
||||
private asyncWritePromiseObjectmap = new plugins.lik.ObjectMap<Promise<any>>();
|
||||
// Ensure the _write method types the chunk as TInput and encodes TOutput
|
||||
public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) {
|
||||
|
||||
// _write must NOT be async — Node.js ignores the return value
|
||||
public _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) {
|
||||
if (!this.options.writeFunction) {
|
||||
return callback(new Error('No stream function provided'));
|
||||
}
|
||||
|
||||
let callbackCalled = false;
|
||||
const safeCallback = (err?: Error | null) => {
|
||||
if (!callbackCalled) {
|
||||
callbackCalled = true;
|
||||
callback(err);
|
||||
}
|
||||
};
|
||||
|
||||
let isTruncated = false;
|
||||
const tools: IStreamTools = {
|
||||
truncate: () => {
|
||||
this.push(null);
|
||||
isTruncated = true;
|
||||
callback();
|
||||
safeCallback();
|
||||
this.push(null);
|
||||
},
|
||||
push: async (pushArg: TOutput) => {
|
||||
return await this.backpressuredPush(pushArg);
|
||||
},
|
||||
};
|
||||
|
||||
try {
|
||||
const writeDeferred = plugins.smartpromise.defer();
|
||||
this.asyncWritePromiseObjectmap.add(writeDeferred.promise);
|
||||
const modifiedChunk = await this.options.writeFunction(chunk, tools);
|
||||
|
||||
this.options.writeFunction(chunk, tools).then(
|
||||
(modifiedChunk) => {
|
||||
if (isTruncated) {
|
||||
writeDeferred.resolve();
|
||||
this.asyncWritePromiseObjectmap.remove(writeDeferred.promise);
|
||||
return;
|
||||
}
|
||||
if (modifiedChunk) {
|
||||
await tools.push(modifiedChunk);
|
||||
}
|
||||
callback();
|
||||
const finish = () => {
|
||||
safeCallback();
|
||||
writeDeferred.resolve();
|
||||
this.asyncWritePromiseObjectmap.remove(writeDeferred.promise);
|
||||
};
|
||||
if (modifiedChunk !== undefined && modifiedChunk !== null) {
|
||||
this.backpressuredPush(modifiedChunk).then(finish, (err) => {
|
||||
safeCallback(err);
|
||||
writeDeferred.resolve();
|
||||
writeDeferred.promise.then(() => {
|
||||
this.asyncWritePromiseObjectmap.remove(writeDeferred.promise);
|
||||
});
|
||||
} catch (err) {
|
||||
callback(err);
|
||||
} else {
|
||||
finish();
|
||||
}
|
||||
},
|
||||
(err) => {
|
||||
safeCallback(err);
|
||||
writeDeferred.resolve();
|
||||
this.asyncWritePromiseObjectmap.remove(writeDeferred.promise);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public async _final(callback: (error?: Error | null) => void) {
|
||||
await Promise.all(this.asyncWritePromiseObjectmap.getArray());
|
||||
// _final must NOT be async — Node.js ignores the return value
|
||||
public _final(callback: (error?: Error | null) => void) {
|
||||
let callbackCalled = false;
|
||||
const safeCallback = (err?: Error | null) => {
|
||||
if (!callbackCalled) {
|
||||
callbackCalled = true;
|
||||
callback(err);
|
||||
}
|
||||
};
|
||||
|
||||
Promise.all(this.asyncWritePromiseObjectmap.getArray()).then(() => {
|
||||
if (this.options.finalFunction) {
|
||||
const tools: IStreamTools = {
|
||||
truncate: () => callback(),
|
||||
truncate: () => safeCallback(),
|
||||
push: async (pipeObject) => {
|
||||
return this.backpressuredArray.push(pipeObject);
|
||||
return await this.backpressuredPush(pipeObject);
|
||||
},
|
||||
};
|
||||
|
||||
try {
|
||||
const finalChunk = await this.options.finalFunction(tools);
|
||||
if (finalChunk) {
|
||||
this.backpressuredArray.push(finalChunk);
|
||||
}
|
||||
} catch (err) {
|
||||
this.options.finalFunction(tools).then(
|
||||
(finalChunk) => {
|
||||
const pushNull = () => {
|
||||
this.backpressuredArray.push(null);
|
||||
callback(err);
|
||||
return;
|
||||
if (this._consumerWantsData) {
|
||||
this._drainBackpressuredArray();
|
||||
}
|
||||
safeCallback();
|
||||
};
|
||||
|
||||
if (finalChunk !== undefined && finalChunk !== null) {
|
||||
this.backpressuredPush(finalChunk).then(pushNull, (err) => {
|
||||
safeCallback(err);
|
||||
});
|
||||
} else {
|
||||
pushNull();
|
||||
}
|
||||
},
|
||||
(err) => {
|
||||
this.backpressuredArray.push(null);
|
||||
callback();
|
||||
if (this._consumerWantsData) {
|
||||
this._drainBackpressuredArray();
|
||||
}
|
||||
safeCallback(err);
|
||||
}
|
||||
);
|
||||
} else {
|
||||
this.backpressuredArray.push(null);
|
||||
if (this._consumerWantsData) {
|
||||
this._drainBackpressuredArray();
|
||||
}
|
||||
safeCallback();
|
||||
}
|
||||
}, (err) => {
|
||||
safeCallback(err);
|
||||
});
|
||||
}
|
||||
|
||||
public async getWebStreams(): Promise<{ readable: ReadableStream; writable: WritableStream }> {
|
||||
const duplex = this;
|
||||
let readableClosed = false;
|
||||
|
||||
const readable = new ReadableStream({
|
||||
start(controller) {
|
||||
duplex.on('readable', () => {
|
||||
const onReadable = () => {
|
||||
let chunk;
|
||||
while (null !== (chunk = duplex.read())) {
|
||||
controller.enqueue(chunk);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
duplex.on('end', () => {
|
||||
const onEnd = () => {
|
||||
if (!readableClosed) {
|
||||
readableClosed = true;
|
||||
controller.close();
|
||||
});
|
||||
}
|
||||
cleanup();
|
||||
};
|
||||
|
||||
const cleanup = () => {
|
||||
duplex.removeListener('readable', onReadable);
|
||||
duplex.removeListener('end', onEnd);
|
||||
};
|
||||
|
||||
duplex.on('readable', onReadable);
|
||||
duplex.on('end', onEnd);
|
||||
},
|
||||
cancel(reason) {
|
||||
duplex.destroy(new Error(reason));
|
||||
@@ -214,22 +329,38 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
||||
const writable = new WritableStream({
|
||||
write(chunk) {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
let resolved = false;
|
||||
const onDrain = () => {
|
||||
if (!resolved) {
|
||||
resolved = true;
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
|
||||
const isBackpressured = !duplex.write(chunk, (error) => {
|
||||
if (error) {
|
||||
if (!resolved) {
|
||||
resolved = true;
|
||||
duplex.removeListener('drain', onDrain);
|
||||
reject(error);
|
||||
} else {
|
||||
}
|
||||
} else if (!isBackpressured && !resolved) {
|
||||
resolved = true;
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
|
||||
if (isBackpressured) {
|
||||
duplex.once('drain', resolve);
|
||||
duplex.once('drain', onDrain);
|
||||
}
|
||||
});
|
||||
},
|
||||
close() {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
duplex.end(resolve);
|
||||
duplex.end((err: Error | null) => {
|
||||
if (err) reject(err);
|
||||
else resolve();
|
||||
});
|
||||
});
|
||||
},
|
||||
abort(reason) {
|
||||
|
||||
@@ -6,8 +6,11 @@ export class StreamIntake<T> extends plugins.stream.Readable {
|
||||
const intakeStream = new StreamIntake<U>(options);
|
||||
|
||||
if (inputStream instanceof plugins.stream.Readable) {
|
||||
inputStream.on('data', (chunk: U) => {
|
||||
inputStream.on('readable', () => {
|
||||
let chunk: U;
|
||||
while (null !== (chunk = inputStream.read() as U)) {
|
||||
intakeStream.pushData(chunk);
|
||||
}
|
||||
});
|
||||
|
||||
inputStream.on('end', () => {
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
import * as plugins from './smartstream.plugins.js';
|
||||
|
||||
// interfaces
|
||||
import { Transform } from 'stream';
|
||||
|
||||
export interface IErrorFunction {
|
||||
(err: Error): any;
|
||||
}
|
||||
@@ -82,15 +79,17 @@ export class StreamWrapper {
|
||||
|
||||
this.streamStartedDeferred.resolve();
|
||||
|
||||
finalStream.on('end', () => {
|
||||
let resolved = false;
|
||||
const safeResolve = () => {
|
||||
if (!resolved) {
|
||||
resolved = true;
|
||||
done.resolve();
|
||||
});
|
||||
finalStream.on('close', () => {
|
||||
done.resolve();
|
||||
});
|
||||
finalStream.on('finish', () => {
|
||||
done.resolve();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
finalStream.on('end', safeResolve);
|
||||
finalStream.on('close', safeResolve);
|
||||
finalStream.on('finish', safeResolve);
|
||||
return done.promise;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,34 +1,219 @@
|
||||
import { createReadStream } from 'fs';
|
||||
import * as plugins from './smartstream.plugins.js';
|
||||
|
||||
/**
|
||||
* Creates a Web ReadableStream from a file.
|
||||
* Creates a Web ReadableStream from a file using pull-based backpressure.
|
||||
*
|
||||
* @param filePath - The path to the file to be read
|
||||
* @returns A Web ReadableStream that reads the file in chunks
|
||||
*/
|
||||
export function createWebReadableStreamFromFile(filePath: string): ReadableStream<Uint8Array> {
|
||||
const fileStream = createReadStream(filePath);
|
||||
const fileStream = plugins.fs.createReadStream(filePath);
|
||||
|
||||
return new ReadableStream({
|
||||
start(controller) {
|
||||
// When data is available, enqueue it into the Web ReadableStream
|
||||
fileStream.on('data', (chunk) => {
|
||||
controller.enqueue(chunk as Uint8Array);
|
||||
fileStream.on('error', (err) => {
|
||||
controller.error(err);
|
||||
});
|
||||
|
||||
// When the file stream ends, close the Web ReadableStream
|
||||
fileStream.on('end', () => {
|
||||
controller.close();
|
||||
});
|
||||
|
||||
// If there's an error, error the Web ReadableStream
|
||||
fileStream.on('error', (err) => {
|
||||
controller.error(err);
|
||||
// Pause immediately — pull() will drive reads
|
||||
fileStream.pause();
|
||||
},
|
||||
pull(controller) {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const chunk = fileStream.read();
|
||||
if (chunk !== null) {
|
||||
controller.enqueue(chunk as Uint8Array);
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
// No data available yet — wait for 'readable' or 'end'
|
||||
const onReadable = () => {
|
||||
cleanup();
|
||||
const data = fileStream.read();
|
||||
if (data !== null) {
|
||||
controller.enqueue(data as Uint8Array);
|
||||
}
|
||||
resolve();
|
||||
};
|
||||
const onEnd = () => {
|
||||
cleanup();
|
||||
resolve();
|
||||
};
|
||||
const onError = (err: Error) => {
|
||||
cleanup();
|
||||
reject(err);
|
||||
};
|
||||
const cleanup = () => {
|
||||
fileStream.removeListener('readable', onReadable);
|
||||
fileStream.removeListener('end', onEnd);
|
||||
fileStream.removeListener('error', onError);
|
||||
};
|
||||
fileStream.once('readable', onReadable);
|
||||
fileStream.once('end', onEnd);
|
||||
fileStream.once('error', onError);
|
||||
});
|
||||
},
|
||||
cancel() {
|
||||
// If the Web ReadableStream is canceled, destroy the file stream
|
||||
fileStream.destroy();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a Web ReadableStream to a Node.js Readable stream.
|
||||
*
|
||||
* @param webStream - The Web ReadableStream to convert
|
||||
* @returns A Node.js Readable stream that reads data from the Web ReadableStream
|
||||
*/
|
||||
export function convertWebReadableToNodeReadable(webStream: ReadableStream<Uint8Array>): plugins.stream.Readable {
|
||||
const reader = webStream.getReader();
|
||||
|
||||
return new plugins.stream.Readable({
|
||||
read() {
|
||||
reader.read().then(
|
||||
({ value, done }) => {
|
||||
if (done) {
|
||||
this.push(null);
|
||||
} else {
|
||||
this.push(Buffer.from(value));
|
||||
}
|
||||
},
|
||||
(err) => {
|
||||
this.destroy(err);
|
||||
}
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a Node.js Readable stream to a Web ReadableStream using pull-based backpressure.
|
||||
*
|
||||
* @param nodeStream - The Node.js Readable stream to convert
|
||||
* @returns A Web ReadableStream that reads data from the Node.js Readable stream
|
||||
*/
|
||||
export function convertNodeReadableToWebReadable(nodeStream: plugins.stream.Readable): ReadableStream<Uint8Array> {
|
||||
return new ReadableStream({
|
||||
start(controller) {
|
||||
nodeStream.on('error', (err) => {
|
||||
controller.error(err);
|
||||
});
|
||||
|
||||
nodeStream.on('end', () => {
|
||||
controller.close();
|
||||
});
|
||||
|
||||
// Pause immediately — pull() will drive reads
|
||||
nodeStream.pause();
|
||||
},
|
||||
pull(controller) {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const chunk = nodeStream.read();
|
||||
if (chunk !== null) {
|
||||
controller.enqueue(new Uint8Array(chunk));
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
// No data available yet — wait for 'readable' or 'end'
|
||||
const onReadable = () => {
|
||||
cleanup();
|
||||
const data = nodeStream.read();
|
||||
if (data !== null) {
|
||||
controller.enqueue(new Uint8Array(data));
|
||||
}
|
||||
resolve();
|
||||
};
|
||||
const onEnd = () => {
|
||||
cleanup();
|
||||
resolve();
|
||||
};
|
||||
const onError = (err: Error) => {
|
||||
cleanup();
|
||||
reject(err);
|
||||
};
|
||||
const cleanup = () => {
|
||||
nodeStream.removeListener('readable', onReadable);
|
||||
nodeStream.removeListener('end', onEnd);
|
||||
nodeStream.removeListener('error', onError);
|
||||
};
|
||||
nodeStream.once('readable', onReadable);
|
||||
nodeStream.once('end', onEnd);
|
||||
nodeStream.once('error', onError);
|
||||
});
|
||||
},
|
||||
cancel() {
|
||||
nodeStream.destroy();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a Web WritableStream to a Node.js Writable stream.
|
||||
*
|
||||
* @param webWritable - The Web WritableStream to convert
|
||||
* @returns A Node.js Writable stream that writes data to the Web WritableStream
|
||||
*/
|
||||
export function convertWebWritableToNodeWritable(webWritable: WritableStream<Uint8Array>): plugins.stream.Writable {
|
||||
const writer = webWritable.getWriter();
|
||||
|
||||
return new plugins.stream.Writable({
|
||||
write(chunk, encoding, callback) {
|
||||
writer.write(new Uint8Array(chunk)).then(
|
||||
() => callback(),
|
||||
(err) => callback(err)
|
||||
);
|
||||
},
|
||||
final(callback) {
|
||||
writer.close().then(() => callback()).catch(callback);
|
||||
},
|
||||
destroy(err, callback) {
|
||||
if (err) {
|
||||
writer.abort(err).then(() => callback(err)).catch(() => callback(err));
|
||||
} else {
|
||||
// Clean destroy — just release the lock
|
||||
writer.releaseLock();
|
||||
callback(null);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a Node.js Writable stream to a Web WritableStream.
|
||||
*
|
||||
* @param nodeWritable - The Node.js Writable stream to convert
|
||||
* @returns A Web WritableStream that writes data to the Node.js Writable stream
|
||||
*/
|
||||
export function convertNodeWritableToWebWritable(nodeWritable: plugins.stream.Writable): WritableStream<Uint8Array> {
|
||||
return new WritableStream({
|
||||
write(chunk) {
|
||||
return new Promise((resolve, reject) => {
|
||||
nodeWritable.write(Buffer.from(chunk), (err) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
},
|
||||
close() {
|
||||
return new Promise((resolve, reject) => {
|
||||
nodeWritable.end((err: Error | null) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
},
|
||||
abort(reason) {
|
||||
nodeWritable.destroy(reason instanceof Error ? reason : new Error(String(reason)));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
// node native
|
||||
import * as fs from 'fs';
|
||||
import * as stream from 'stream';
|
||||
|
||||
export { stream };
|
||||
export { fs, stream };
|
||||
|
||||
// pushrocks scope
|
||||
import * as lik from '@push.rocks/lik';
|
||||
|
||||
3
ts/tspublish.json
Normal file
3
ts/tspublish.json
Normal file
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"order": 1
|
||||
}
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartstream',
|
||||
version: '3.2.0',
|
||||
version: '3.4.0',
|
||||
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
|
||||
}
|
||||
|
||||
@@ -78,17 +78,14 @@ export class WebDuplexStream<TInput = any, TOutput = any> extends TransformStrea
|
||||
|
||||
try {
|
||||
const finalChunk = await optionsArg.finalFunction(tools);
|
||||
if (finalChunk) {
|
||||
controller.enqueue(finalChunk);
|
||||
if (finalChunk !== undefined && finalChunk !== null) {
|
||||
controller.enqueue(finalChunk as TOutput);
|
||||
}
|
||||
} catch (err) {
|
||||
controller.error(err);
|
||||
} finally {
|
||||
controller.terminate();
|
||||
}
|
||||
} else {
|
||||
controller.terminate();
|
||||
}
|
||||
// TransformStream auto-closes readable after flush resolves — no terminate() needed
|
||||
},
|
||||
});
|
||||
|
||||
@@ -96,7 +93,9 @@ export class WebDuplexStream<TInput = any, TOutput = any> extends TransformStrea
|
||||
|
||||
// Start producing data if readFunction is provided
|
||||
if (this.options.readFunction) {
|
||||
this._startReading();
|
||||
this._startReading().catch((err) => {
|
||||
// Prevent unhandled rejection — the error is propagated through the writable side
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,17 +103,25 @@ export class WebDuplexStream<TInput = any, TOutput = any> extends TransformStrea
|
||||
const writable = this.writable;
|
||||
const writer = writable.getWriter();
|
||||
|
||||
let doneSignaled = false;
|
||||
const tools: IStreamToolsRead<TInput, TOutput> = {
|
||||
done: () => writer.close(),
|
||||
done: () => {
|
||||
doneSignaled = true;
|
||||
},
|
||||
write: async (writeArg) => await writer.write(writeArg),
|
||||
};
|
||||
|
||||
try {
|
||||
await this.options.readFunction(tools);
|
||||
if (doneSignaled) {
|
||||
await writer.close();
|
||||
}
|
||||
} catch (err) {
|
||||
writer.abort(err);
|
||||
} finally {
|
||||
writer.releaseLock();
|
||||
try {
|
||||
await writer.abort(err);
|
||||
} catch (_) {
|
||||
// Writer may already be in error state
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -128,7 +135,7 @@ export class WebDuplexStream<TInput = any, TOutput = any> extends TransformStrea
|
||||
});
|
||||
|
||||
const writer = stream.writable.getWriter();
|
||||
writer.write(uint8Array).then(() => writer.close());
|
||||
writer.write(uint8Array).then(() => writer.close()).catch(() => {});
|
||||
|
||||
return stream;
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ export {
|
||||
const smartenvInstance = new smartenv.Smartenv();
|
||||
|
||||
await smartenvInstance.getSafeNodeModule<typeof import('stream/web')>('stream/web', async (moduleArg) => {
|
||||
globalThis.ReadableStream = moduleArg.ReadableStream;
|
||||
globalThis.WritableStream = moduleArg.WritableStream;
|
||||
globalThis.TransformStream = moduleArg.TransformStream;
|
||||
globalThis.ReadableStream = moduleArg.ReadableStream as any;
|
||||
globalThis.WritableStream = moduleArg.WritableStream as any;
|
||||
globalThis.TransformStream = moduleArg.TransformStream as any;
|
||||
})
|
||||
|
||||
3
ts_web/tspublish.json
Normal file
3
ts_web/tspublish.json
Normal file
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"order": 0
|
||||
}
|
||||
Reference in New Issue
Block a user