Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a765363371 | |||
| fccd0f86ad | |||
| f78469a299 | |||
| 04d2f3223a | |||
| 77046acac7 | |||
| 2acf1972a2 |
@@ -0,0 +1,27 @@
|
||||
{
|
||||
"@git.zone/cli": {
|
||||
"projectType": "npm",
|
||||
"module": {
|
||||
"githost": "code.foss.global",
|
||||
"gitscope": "push.rocks",
|
||||
"gitrepo": "smartstream",
|
||||
"description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
|
||||
"npmPackagename": "@push.rocks/smartstream",
|
||||
"license": "MIT",
|
||||
"projectDomain": "push.rocks"
|
||||
},
|
||||
"release": {
|
||||
"registries": [
|
||||
"https://verdaccio.lossless.digital",
|
||||
"https://registry.npmjs.org"
|
||||
],
|
||||
"accessLevel": "public"
|
||||
}
|
||||
},
|
||||
"@git.zone/tsdoc": {
|
||||
"legal": "\n## License and Legal Information\n\nThis repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the [license](license) file within this repository. \n\n**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.\n\n### Trademarks\n\nThis project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.\n\n### Company Information\n\nTask Venture Capital GmbH \nRegistered at District court Bremen HRB 35230 HB, Germany\n\nFor any legal inquiries or if you require further information, please contact us via email at hello@task.vc.\n\nBy using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.\n"
|
||||
},
|
||||
"@ship.zone/szci": {
|
||||
"npmGlobalTools": []
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,32 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-04-30 - 3.4.2 - fix(streams)
|
||||
tighten stream typings and guard optional runtime paths for duplex and wrapper utilities
|
||||
|
||||
- allow SmartDuplex write and final handlers to return empty values in addition to transformed output
|
||||
- prevent StreamWrapper from piping or starting when no executable stream chain is produced
|
||||
- guard optional WebDuplexStream readFunction execution before invoking it
|
||||
- update tests and TypeScript configuration to satisfy stricter noImplicitAny checks and newer tstest tooling
|
||||
|
||||
## 2026-03-02 - 3.4.1 - fix(readme)
|
||||
improve README: clarify entry points, add Web & Node stream examples, finalization and backpressure tips, and comprehensive API reference
|
||||
|
||||
- Clarified package entry points and import paths so consumers can choose Node or Web builds.
|
||||
- Expanded examples: SmartDuplex finalFunction pushing multiple chunks, converting SmartDuplex to Web streams (reader/writer example), WebDuplexStream finalFunction, and a Node↔Web round-trip conversion example.
|
||||
- Added guidance and examples for concurrency and backpressure (TransformStream read/write concurrency tip, reading concurrently with writes, and backpressure notes for Node↔Web converters).
|
||||
- Documented StreamWrapper.streamStarted() and onCustomEvent() usage with examples showing awaiting stream startup.
|
||||
- Added a new API reference section documenting SmartDuplex, WebDuplexStream, StreamWrapper, StreamIntake, nodewebhelpers, and utility functions.
|
||||
- Various README wording, formatting, and example clarifications (tips, headings, and minor cosmetic fixes).
|
||||
|
||||
## 2026-03-02 - 3.4.0 - feat(smartduplex)
|
||||
improve backpressure handling and web/node stream interoperability
|
||||
|
||||
- Refactored SmartDuplex to use synchronous _read/_write/_final (avoids async pitfalls), added internal backpressured buffer draining and consumer signaling
|
||||
- Implemented pull-based backpressure for Node <-> Web stream conversions (nodewebhelpers and convertNodeReadableToWebReadable/convertWebReadableToNodeReadable)
|
||||
- StreamIntake.fromStream now reads from 'readable' and drains properly; StreamWrapper resolves safely on end/close/finish
|
||||
- Improved getWebStreams / WebDuplexStream behavior (safer enqueue/close/abort handling, final/readFunction improvements)
|
||||
- Added many new unit tests covering backpressure, web/node helpers, StreamIntake, StreamWrapper, WebDuplexStream; bumped @push.rocks/lik and @types/node versions
|
||||
|
||||
## 2026-02-28 - 3.3.0 - feat(smartstream)
|
||||
bump dependencies, update build/publish config, refactor tests, and overhaul documentation
|
||||
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2016 Push.Rocks
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
+12
-10
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartstream",
|
||||
"version": "3.3.0",
|
||||
"version": "3.4.2",
|
||||
"private": false,
|
||||
"description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
|
||||
"type": "module",
|
||||
@@ -9,8 +9,8 @@
|
||||
"./web": "./dist_ts_web/index.js"
|
||||
},
|
||||
"scripts": {
|
||||
"test": "(tstest test/)",
|
||||
"build": "(tsbuild tsfolders --allowimplicitany)"
|
||||
"test": "(tstest test/ --verbose --logfile --timeout 60)",
|
||||
"build": "(tsbuild tsfolders)"
|
||||
},
|
||||
"repository": {
|
||||
"type": "git",
|
||||
@@ -23,14 +23,13 @@
|
||||
},
|
||||
"homepage": "https://code.foss.global/push.rocks/smartstream",
|
||||
"devDependencies": {
|
||||
"@git.zone/tsbuild": "^4.1.2",
|
||||
"@git.zone/tsrun": "^2.0.1",
|
||||
"@git.zone/tstest": "^3.1.8",
|
||||
"@push.rocks/tapbundle": "^6.0.3",
|
||||
"@types/node": "^25.3.2"
|
||||
"@git.zone/tsbuild": "^4.4.0",
|
||||
"@git.zone/tsrun": "^2.0.2",
|
||||
"@git.zone/tstest": "^3.6.3",
|
||||
"@types/node": "^25.6.0"
|
||||
},
|
||||
"dependencies": {
|
||||
"@push.rocks/lik": "^6.2.2",
|
||||
"@push.rocks/lik": "^6.4.1",
|
||||
"@push.rocks/smartenv": "^6.0.0",
|
||||
"@push.rocks/smartpromise": "^4.2.3",
|
||||
"@push.rocks/smartrx": "^3.0.10"
|
||||
@@ -47,6 +46,8 @@
|
||||
"dist_ts_web/**/*",
|
||||
"assets/**/*",
|
||||
"cli.js",
|
||||
".smartconfig.json",
|
||||
"license",
|
||||
"npmextra.json",
|
||||
"readme.md"
|
||||
],
|
||||
@@ -73,5 +74,6 @@
|
||||
"buffer",
|
||||
"stream utilities",
|
||||
"esm"
|
||||
]
|
||||
],
|
||||
"packageManager": "pnpm@10.28.2"
|
||||
}
|
||||
|
||||
Generated
+1582
-4229
File diff suppressed because it is too large
Load Diff
@@ -12,7 +12,7 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
|
||||
pnpm install @push.rocks/smartstream
|
||||
```
|
||||
|
||||
The package ships with two entry points:
|
||||
The package ships with **two entry points** so you can pick exactly what you need:
|
||||
|
||||
| Entry Point | Import Path | Environment |
|
||||
|---|---|---|
|
||||
@@ -83,6 +83,8 @@ const splitter = new SmartDuplex<string, string>({
|
||||
Run cleanup or emit final data when the writable side ends:
|
||||
|
||||
```typescript
|
||||
let runningTotal = 0;
|
||||
|
||||
const aggregator = new SmartDuplex<number, number>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
@@ -95,6 +97,8 @@ const aggregator = new SmartDuplex<number, number>({
|
||||
});
|
||||
```
|
||||
|
||||
The `finalFunction` can also push multiple chunks via `tools.push()`, just like `writeFunction`.
|
||||
|
||||
#### Truncating a Stream Early
|
||||
|
||||
Call `tools.truncate()` inside `writeFunction` to signal that no more data should be read:
|
||||
@@ -132,16 +136,36 @@ nodeDuplex.pipe(processTransform).pipe(outputStream);
|
||||
|
||||
#### Getting Web Streams from SmartDuplex
|
||||
|
||||
Convert a `SmartDuplex` into Web `ReadableStream` + `WritableStream` pair:
|
||||
Convert a `SmartDuplex` into a Web `ReadableStream` + `WritableStream` pair:
|
||||
|
||||
```typescript
|
||||
const duplex = new SmartDuplex({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
return transform(chunk);
|
||||
},
|
||||
});
|
||||
|
||||
const { readable, writable } = await duplex.getWebStreams();
|
||||
|
||||
const writer = writable.getWriter();
|
||||
const reader = readable.getReader();
|
||||
|
||||
// Read and write concurrently to avoid TransformStream backpressure
|
||||
const readAll = async () => {
|
||||
const results = [];
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
results.push(value);
|
||||
}
|
||||
return results;
|
||||
};
|
||||
|
||||
const readPromise = readAll();
|
||||
await writer.write('hello');
|
||||
await writer.close();
|
||||
const results = await readPromise;
|
||||
```
|
||||
|
||||
#### Debug Mode
|
||||
@@ -186,7 +210,7 @@ pipeline.run()
|
||||
.catch((err) => console.error('Pipeline failed:', err));
|
||||
```
|
||||
|
||||
You can also listen for custom events across all streams:
|
||||
You can listen for custom events across all streams in the pipeline:
|
||||
|
||||
```typescript
|
||||
pipeline.onCustomEvent('progress', () => {
|
||||
@@ -194,6 +218,14 @@ pipeline.onCustomEvent('progress', () => {
|
||||
});
|
||||
```
|
||||
|
||||
You can also await `streamStarted()` to know when the pipeline has been wired up:
|
||||
|
||||
```typescript
|
||||
const runPromise = pipeline.run();
|
||||
await pipeline.streamStarted(); // Resolves once pipes are connected
|
||||
await runPromise;
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 📥 StreamIntake — Dynamic Data Injection
|
||||
@@ -222,7 +254,7 @@ intake.pushData('World');
|
||||
intake.signalEnd(); // Signal end-of-stream
|
||||
```
|
||||
|
||||
#### Demand-driven Production with Observable
|
||||
#### Demand-Driven Production with Observable
|
||||
|
||||
`pushNextObservable` emits whenever the stream is ready for more data — perfect for throttled or event-driven producers:
|
||||
|
||||
@@ -265,7 +297,10 @@ Quickly create a `SmartDuplex` from a simple async mapping function:
|
||||
```typescript
|
||||
import { createTransformFunction } from '@push.rocks/smartstream';
|
||||
|
||||
const doubler = createTransformFunction<number, number>(async (n) => n * 2);
|
||||
const doubler = createTransformFunction<number, number>(
|
||||
async (n) => n * 2,
|
||||
{ objectMode: true }
|
||||
);
|
||||
|
||||
intakeStream.pipe(doubler).pipe(outputStream);
|
||||
```
|
||||
@@ -299,16 +334,26 @@ const stream = new WebDuplexStream<number, number>({
|
||||
const writer = stream.writable.getWriter();
|
||||
const reader = stream.readable.getReader();
|
||||
|
||||
// Write
|
||||
// Always read concurrently with writes — TransformStream applies backpressure
|
||||
const readPromise = (async () => {
|
||||
const results = [];
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
results.push(value);
|
||||
}
|
||||
return results;
|
||||
})();
|
||||
|
||||
await writer.write(5);
|
||||
await writer.write(10);
|
||||
await writer.close();
|
||||
|
||||
// Read
|
||||
const { value } = await reader.read(); // 10
|
||||
const { value: v2 } = await reader.read(); // 20
|
||||
const results = await readPromise; // [10, 20]
|
||||
```
|
||||
|
||||
> 💡 **Tip:** Because `TransformStream` enforces backpressure between its writable and readable sides, you must start reading *before* or *concurrently with* writes. If you await all writes first and then read, the writes will block waiting for the readable side to drain.
|
||||
|
||||
#### From a Uint8Array
|
||||
|
||||
```typescript
|
||||
@@ -337,11 +382,24 @@ const reader = stream.readable.getReader();
|
||||
// reads "CHUNK 1", "CHUNK 2"
|
||||
```
|
||||
|
||||
#### Final Function
|
||||
|
||||
Emit trailing data when the writable side is closed:
|
||||
|
||||
```typescript
|
||||
const stream = new WebDuplexStream<string, string>({
|
||||
writeFunction: async (chunk) => chunk,
|
||||
finalFunction: async (tools) => {
|
||||
tools.push('footer');
|
||||
},
|
||||
});
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 🔀 Node ↔ Web Stream Converters
|
||||
|
||||
The `nodewebhelpers` namespace provides bidirectional converters between Node.js and Web Streams:
|
||||
The `nodewebhelpers` namespace provides bidirectional converters between Node.js and Web Streams with proper backpressure handling:
|
||||
|
||||
```typescript
|
||||
import { nodewebhelpers } from '@push.rocks/smartstream';
|
||||
@@ -381,6 +439,16 @@ const nodeReadable2 = nodewebhelpers.convertWebReadableToNodeReadable(webReadabl
|
||||
nodeReadable2.pipe(fs.createWriteStream('./copy.bin'));
|
||||
```
|
||||
|
||||
#### Example: Round-Trip Conversion
|
||||
|
||||
```typescript
|
||||
// Node → Web → Node (lossless round-trip)
|
||||
const original = fs.createReadStream('./photo.jpg');
|
||||
const webStream = nodewebhelpers.convertNodeReadableToWebReadable(original);
|
||||
const backToNode = nodewebhelpers.convertWebReadableToNodeReadable(webStream);
|
||||
backToNode.pipe(fs.createWriteStream('./photo-copy.jpg'));
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 🏗️ Backpressure Handling
|
||||
@@ -417,25 +485,23 @@ fast.end();
|
||||
|
||||
---
|
||||
|
||||
### 🎯 Real-World Example: Processing Pipeline
|
||||
### 🎯 Real-World Example: Log Processing Pipeline
|
||||
|
||||
```typescript
|
||||
import fs from 'fs';
|
||||
import { SmartDuplex, StreamWrapper } from '@push.rocks/smartstream';
|
||||
|
||||
// Read → Transform → Filter → Write
|
||||
// Read → Parse → Filter → Write
|
||||
const pipeline = new StreamWrapper([
|
||||
fs.createReadStream('./access.log'),
|
||||
new SmartDuplex({
|
||||
writeFunction: async (chunk) => {
|
||||
// Parse each line
|
||||
return chunk.toString().split('\n');
|
||||
},
|
||||
}),
|
||||
new SmartDuplex({
|
||||
objectMode: true,
|
||||
writeFunction: async (lines: string[], tools) => {
|
||||
// Filter and push matching lines
|
||||
for (const line of lines) {
|
||||
if (line.includes('ERROR')) {
|
||||
await tools.push(line + '\n');
|
||||
@@ -450,6 +516,70 @@ await pipeline.run();
|
||||
console.log('Error extraction complete');
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 📋 API Reference
|
||||
|
||||
#### SmartDuplex
|
||||
|
||||
| Member | Type | Description |
|
||||
|---|---|---|
|
||||
| `new SmartDuplex(options?)` | Constructor | Create a new duplex stream |
|
||||
| `options.writeFunction` | `(chunk, tools) => Promise<T>` | Transform each chunk; return to push, or use `tools.push()` |
|
||||
| `options.finalFunction` | `(tools) => Promise<T>` | Emit final data when writable ends |
|
||||
| `options.readFunction` | `() => Promise<void>` | Supply data to the readable side |
|
||||
| `options.debug` | `boolean` | Enable internal logging |
|
||||
| `options.name` | `string` | Stream name for debug logs |
|
||||
| `SmartDuplex.fromBuffer(buf)` | Static | Create a readable stream from a Buffer |
|
||||
| `SmartDuplex.fromWebReadableStream(rs)` | Static | Bridge a Web ReadableStream to Node.js Duplex |
|
||||
| `duplex.getWebStreams()` | Method | Get `{ readable, writable }` Web Streams pair |
|
||||
|
||||
#### WebDuplexStream
|
||||
|
||||
| Member | Type | Description |
|
||||
|---|---|---|
|
||||
| `new WebDuplexStream(options)` | Constructor | Create a new web transform stream |
|
||||
| `options.writeFunction` | `(chunk, tools) => Promise<T>` | Transform each chunk; use `tools.push()` or return |
|
||||
| `options.finalFunction` | `(tools) => Promise<T>` | Emit data on flush |
|
||||
| `options.readFunction` | `(tools) => Promise<void>` | Supply data via `tools.write()`, signal `tools.done()` |
|
||||
| `WebDuplexStream.fromUInt8Array(arr)` | Static | Create a stream from a Uint8Array |
|
||||
|
||||
#### StreamWrapper
|
||||
|
||||
| Member | Type | Description |
|
||||
|---|---|---|
|
||||
| `new StreamWrapper(streams[])` | Constructor | Create a pipeline from an array of streams |
|
||||
| `wrapper.run()` | Method | Pipe all streams and return a Promise |
|
||||
| `wrapper.streamStarted()` | Method | Promise that resolves when pipes are connected |
|
||||
| `wrapper.onCustomEvent(name, fn)` | Method | Listen for custom events across all streams |
|
||||
|
||||
#### StreamIntake
|
||||
|
||||
| Member | Type | Description |
|
||||
|---|---|---|
|
||||
| `new StreamIntake<T>()` | Constructor | Create a new intake stream (object mode) |
|
||||
| `intake.pushData(data)` | Method | Push data into the stream |
|
||||
| `intake.signalEnd()` | Method | Signal end of stream |
|
||||
| `intake.pushNextObservable` | Property | Observable that emits when the stream wants more data |
|
||||
| `StreamIntake.fromStream(stream)` | Static | Wrap a Node.js Readable or Web ReadableStream |
|
||||
|
||||
#### nodewebhelpers
|
||||
|
||||
| Function | Description |
|
||||
|---|---|
|
||||
| `createWebReadableStreamFromFile(path)` | File → Web ReadableStream (pull-based backpressure) |
|
||||
| `convertWebReadableToNodeReadable(rs)` | Web ReadableStream → Node.js Readable |
|
||||
| `convertNodeReadableToWebReadable(ns)` | Node.js Readable → Web ReadableStream (pull-based backpressure) |
|
||||
| `convertWebWritableToNodeWritable(ws)` | Web WritableStream → Node.js Writable |
|
||||
| `convertNodeWritableToWebWritable(nw)` | Node.js Writable → Web WritableStream |
|
||||
|
||||
#### Utility Functions
|
||||
|
||||
| Function | Description |
|
||||
|---|---|
|
||||
| `createTransformFunction(fn, opts?)` | Create a SmartDuplex from an async mapping function |
|
||||
| `createPassThrough()` | Create an object-mode passthrough stream |
|
||||
|
||||
## License and Legal Information
|
||||
|
||||
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
|
||||
@@ -464,7 +594,7 @@ Use of these trademarks must comply with Task Venture Capital GmbH's Trademark G
|
||||
|
||||
### Company Information
|
||||
|
||||
Task Venture Capital GmbH
|
||||
Task Venture Capital GmbH
|
||||
Registered at District Court Bremen HRB 35230 HB, Germany
|
||||
|
||||
For any legal inquiries or further information, please contact us via email at hello@task.vc.
|
||||
|
||||
@@ -1,50 +1,10 @@
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
|
||||
@@ -0,0 +1,56 @@
|
||||
import { tap, expect } from '@git.zone/tstest/tapbundle';
|
||||
import { SmartDuplex } from '../ts/index.js';
|
||||
|
||||
tap.test('Backpressure: should apply backpressure across piped streams', async (toolsArg) => {
|
||||
const done = toolsArg.defer();
|
||||
|
||||
const stream1 = new SmartDuplex({
|
||||
name: 'stream1',
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
return chunk;
|
||||
},
|
||||
});
|
||||
|
||||
const stream2 = new SmartDuplex({
|
||||
name: 'stream2',
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
await tools.push(chunk);
|
||||
},
|
||||
});
|
||||
|
||||
const stream3 = new SmartDuplex({
|
||||
objectMode: true,
|
||||
name: 'stream3',
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
},
|
||||
});
|
||||
|
||||
stream1.pipe(stream2).pipe(stream3);
|
||||
|
||||
let backpressured = false;
|
||||
for (let i = 0; i < 200; i++) {
|
||||
const canContinue = stream1.write(`Chunk ${i}`, 'utf8');
|
||||
if (!canContinue) {
|
||||
backpressured = true;
|
||||
}
|
||||
}
|
||||
|
||||
stream1.end();
|
||||
|
||||
stream3.on('finish', () => {
|
||||
if (!backpressured) {
|
||||
throw new Error('No backpressure was observed.');
|
||||
} else {
|
||||
done.resolve();
|
||||
}
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -0,0 +1,152 @@
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import * as fs from 'fs';
|
||||
import * as stream from 'stream';
|
||||
import { nodewebhelpers } from '../ts/index.js';
|
||||
|
||||
// =============================================
|
||||
// createWebReadableStreamFromFile
|
||||
// =============================================
|
||||
|
||||
tap.test('nodewebhelpers: createWebReadableStreamFromFile should read a file', async () => {
|
||||
const webStream = nodewebhelpers.createWebReadableStreamFromFile('./test/assets/readabletext.txt');
|
||||
const reader = webStream.getReader();
|
||||
|
||||
const chunks: Uint8Array[] = [];
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
chunks.push(value);
|
||||
}
|
||||
|
||||
expect(chunks.length).toBeGreaterThan(0);
|
||||
const content = Buffer.concat(chunks).toString();
|
||||
expect(content.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// convertNodeReadableToWebReadable
|
||||
// =============================================
|
||||
|
||||
tap.test('nodewebhelpers: convertNodeReadableToWebReadable should convert', async () => {
|
||||
const nodeReadable = fs.createReadStream('./test/assets/readabletext.txt');
|
||||
const webReadable = nodewebhelpers.convertNodeReadableToWebReadable(nodeReadable);
|
||||
|
||||
const reader = webReadable.getReader();
|
||||
const chunks: Uint8Array[] = [];
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
chunks.push(value);
|
||||
}
|
||||
|
||||
expect(chunks.length).toBeGreaterThan(0);
|
||||
const content = Buffer.concat(chunks).toString();
|
||||
expect(content.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// convertWebReadableToNodeReadable
|
||||
// =============================================
|
||||
|
||||
tap.test('nodewebhelpers: convertWebReadableToNodeReadable should convert', async (tools) => {
|
||||
const data = new Uint8Array([72, 101, 108, 108, 111]); // "Hello"
|
||||
const webReadable = new ReadableStream<Uint8Array>({
|
||||
start(controller) {
|
||||
controller.enqueue(data);
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const nodeReadable = nodewebhelpers.convertWebReadableToNodeReadable(webReadable);
|
||||
|
||||
const chunks: Buffer[] = [];
|
||||
const done = tools.defer();
|
||||
|
||||
nodeReadable.on('data', (chunk: Buffer) => {
|
||||
chunks.push(chunk);
|
||||
});
|
||||
|
||||
nodeReadable.on('end', () => {
|
||||
const result = Buffer.concat(chunks).toString();
|
||||
expect(result).toEqual('Hello');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// convertNodeWritableToWebWritable
|
||||
// =============================================
|
||||
|
||||
tap.test('nodewebhelpers: convertNodeWritableToWebWritable should convert', async () => {
|
||||
const chunks: Buffer[] = [];
|
||||
const nodeWritable = new stream.Writable({
|
||||
write(chunk, encoding, callback) {
|
||||
chunks.push(chunk);
|
||||
callback();
|
||||
},
|
||||
});
|
||||
|
||||
const webWritable = nodewebhelpers.convertNodeWritableToWebWritable(nodeWritable);
|
||||
const writer = webWritable.getWriter();
|
||||
|
||||
await writer.write(new Uint8Array([65, 66, 67])); // "ABC"
|
||||
await writer.close();
|
||||
|
||||
const result = Buffer.concat(chunks).toString();
|
||||
expect(result).toEqual('ABC');
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// convertWebWritableToNodeWritable
|
||||
// =============================================
|
||||
|
||||
tap.test('nodewebhelpers: convertWebWritableToNodeWritable should convert', async (tools) => {
|
||||
const chunks: Uint8Array[] = [];
|
||||
|
||||
const webWritable = new WritableStream<Uint8Array>({
|
||||
write(chunk) {
|
||||
chunks.push(chunk);
|
||||
},
|
||||
});
|
||||
|
||||
const nodeWritable = nodewebhelpers.convertWebWritableToNodeWritable(webWritable);
|
||||
|
||||
const done = tools.defer();
|
||||
nodeWritable.write(Buffer.from('Hello'), (err) => {
|
||||
expect(err).toBeFalsy();
|
||||
nodeWritable.end(() => {
|
||||
expect(chunks.length).toBeGreaterThan(0);
|
||||
done.resolve();
|
||||
});
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// Round-trip: Node → Web → Node
|
||||
// =============================================
|
||||
|
||||
tap.test('nodewebhelpers: round-trip Node → Web → Node readable', async (tools) => {
|
||||
const nodeReadable = fs.createReadStream('./test/assets/readabletext.txt');
|
||||
const webReadable = nodewebhelpers.convertNodeReadableToWebReadable(nodeReadable);
|
||||
const nodeReadable2 = nodewebhelpers.convertWebReadableToNodeReadable(webReadable);
|
||||
|
||||
const chunks: Buffer[] = [];
|
||||
const done = tools.defer();
|
||||
|
||||
nodeReadable2.on('data', (chunk: Buffer) => {
|
||||
chunks.push(chunk);
|
||||
});
|
||||
|
||||
nodeReadable2.on('end', () => {
|
||||
expect(chunks.length).toBeGreaterThan(0);
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -0,0 +1,379 @@
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import * as fs from 'fs';
|
||||
import * as smartstream from '../ts/index.js';
|
||||
import { SmartDuplex } from '../ts/smartstream.classes.smartduplex.js';
|
||||
|
||||
// =============================================
|
||||
// Constructor
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should construct with no options', async () => {
|
||||
const duplex = new SmartDuplex();
|
||||
expect(duplex).toBeInstanceOf(SmartDuplex);
|
||||
});
|
||||
|
||||
tap.test('SmartDuplex: should construct with options', async () => {
|
||||
const duplex = new SmartDuplex({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => chunk,
|
||||
});
|
||||
expect(duplex).toBeInstanceOf(SmartDuplex);
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// fromBuffer
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should create from a Buffer', async () => {
|
||||
const bufferData = Buffer.from('This is a test buffer');
|
||||
const stream = SmartDuplex.fromBuffer(bufferData, {});
|
||||
|
||||
let receivedData = Buffer.alloc(0);
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
stream.on('data', (chunk: Buffer) => {
|
||||
receivedData = Buffer.concat([receivedData, chunk]);
|
||||
});
|
||||
stream.on('end', () => {
|
||||
expect(receivedData.toString()).toEqual(bufferData.toString());
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// writeFunction
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should transform chunks via writeFunction', async (tools) => {
|
||||
const results: string[] = [];
|
||||
const transform = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => {
|
||||
return chunk.toUpperCase();
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
|
||||
transform.on('data', (chunk: string) => {
|
||||
results.push(chunk);
|
||||
});
|
||||
|
||||
transform.on('end', () => {
|
||||
expect(results).toContain('HELLO');
|
||||
expect(results).toContain('WORLD');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
transform.write('hello');
|
||||
transform.write('world');
|
||||
transform.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
tap.test('SmartDuplex: writeFunction returning undefined should not push', async (tools) => {
|
||||
const results: any[] = [];
|
||||
const transform = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async () => {
|
||||
return undefined;
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
|
||||
transform.on('data', (chunk: any) => {
|
||||
results.push(chunk);
|
||||
});
|
||||
|
||||
transform.on('end', () => {
|
||||
expect(results.length).toEqual(0);
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
transform.write('hello');
|
||||
transform.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// tools.push — multiple outputs
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should emit multiple chunks via tools.push', async (tools) => {
|
||||
const results: string[] = [];
|
||||
const splitter = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, streamTools) => {
|
||||
const words = chunk.split(' ');
|
||||
for (const word of words) {
|
||||
await streamTools.push(word);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
|
||||
splitter.on('data', (chunk: string) => results.push(chunk));
|
||||
|
||||
splitter.on('end', () => {
|
||||
expect(results).toContain('hello');
|
||||
expect(results).toContain('beautiful');
|
||||
expect(results).toContain('world');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
splitter.write('hello beautiful world');
|
||||
splitter.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// finalFunction
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should emit final chunk via finalFunction', async (tools) => {
|
||||
const results: string[] = [];
|
||||
let count = 0;
|
||||
|
||||
const aggregator = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async () => {
|
||||
count++;
|
||||
return undefined;
|
||||
},
|
||||
finalFunction: async () => {
|
||||
return `total: ${count}`;
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
|
||||
aggregator.on('data', (chunk: string) => results.push(chunk));
|
||||
|
||||
aggregator.on('end', () => {
|
||||
expect(results.length).toEqual(1);
|
||||
expect(results[0]).toEqual('total: 2');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
aggregator.write('a');
|
||||
aggregator.write('b');
|
||||
aggregator.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
tap.test('SmartDuplex: finalFunction can push multiple chunks via tools.push', async (tools) => {
|
||||
const results: string[] = [];
|
||||
|
||||
const stream = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => chunk,
|
||||
finalFunction: async (streamTools) => {
|
||||
await streamTools.push('final1');
|
||||
await streamTools.push('final2');
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
|
||||
stream.on('data', (chunk: string) => results.push(chunk));
|
||||
|
||||
stream.on('end', () => {
|
||||
expect(results).toContain('hello');
|
||||
expect(results).toContain('final1');
|
||||
expect(results).toContain('final2');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
stream.write('hello');
|
||||
stream.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// truncate
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should truncate stream early', async (tools) => {
|
||||
const results: string[] = [];
|
||||
|
||||
const limiter = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, streamTools) => {
|
||||
if (chunk === 'STOP') {
|
||||
streamTools.truncate();
|
||||
return undefined;
|
||||
}
|
||||
return chunk;
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
|
||||
limiter.on('data', (chunk: string) => results.push(chunk));
|
||||
|
||||
limiter.on('end', () => {
|
||||
expect(results).toContain('a');
|
||||
expect(results).toContain('b');
|
||||
expect(results).not.toContain('STOP');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
limiter.write('a');
|
||||
limiter.write('b');
|
||||
// Write STOP on next tick to allow previous writes to flush
|
||||
process.nextTick(() => {
|
||||
limiter.write('STOP');
|
||||
});
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// Error handling
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should emit error when writeFunction throws', async (tools) => {
|
||||
const stream = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async () => {
|
||||
throw new Error('write error');
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
stream.on('error', (err) => {
|
||||
expect(err.message).toEqual('write error');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
stream.write('test');
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
tap.test('SmartDuplex: should error when no writeFunction and data is written', async (tools) => {
|
||||
const stream = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
stream.on('error', (err) => {
|
||||
expect(err.message).toEqual('No stream function provided');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
stream.write('test');
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// fromWebReadableStream
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should create from a Web ReadableStream', async (tools) => {
|
||||
const chunks = ['hello', 'world', 'foo'];
|
||||
const webReadable = new ReadableStream<string>({
|
||||
start(controller) {
|
||||
for (const chunk of chunks) {
|
||||
controller.enqueue(chunk);
|
||||
}
|
||||
controller.close();
|
||||
}
|
||||
});
|
||||
|
||||
const duplex = SmartDuplex.fromWebReadableStream(webReadable);
|
||||
const results: string[] = [];
|
||||
|
||||
const done = tools.defer();
|
||||
duplex.on('data', (chunk: string) => results.push(chunk));
|
||||
duplex.on('end', () => {
|
||||
expect(results).toEqual(chunks);
|
||||
done.resolve();
|
||||
});
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// getWebStreams
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should provide web streams via getWebStreams()', async () => {
|
||||
const duplex = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => {
|
||||
return chunk.toUpperCase();
|
||||
},
|
||||
});
|
||||
|
||||
const { readable, writable } = await duplex.getWebStreams();
|
||||
|
||||
const writer = writable.getWriter();
|
||||
const reader = readable.getReader();
|
||||
|
||||
await writer.write('hello');
|
||||
await writer.write('world');
|
||||
await writer.close();
|
||||
|
||||
const results: string[] = [];
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
results.push(value);
|
||||
}
|
||||
|
||||
expect(results).toContain('HELLO');
|
||||
expect(results).toContain('WORLD');
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// Debug mode
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: debug mode should not crash', async (tools) => {
|
||||
const stream = new SmartDuplex<string, string>({
|
||||
name: 'DebugStream',
|
||||
debug: true,
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => chunk,
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
stream.on('data', () => {});
|
||||
stream.on('end', () => done.resolve());
|
||||
|
||||
stream.write('test');
|
||||
stream.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// Pipe with file read
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should handle a read stream pipeline', async () => {
|
||||
const streamWrapper = new smartstream.StreamWrapper([
|
||||
fs.createReadStream('./test/assets/readabletext.txt'),
|
||||
new smartstream.SmartDuplex({
|
||||
writeFunction: async (chunkStringArg: Buffer, streamTools) => {
|
||||
const result = chunkStringArg.toString().substr(0, 100);
|
||||
streamTools.push('wow =========== \n');
|
||||
return Buffer.from(result);
|
||||
},
|
||||
finalFunction: async () => {
|
||||
return Buffer.from('this is the end');
|
||||
},
|
||||
}),
|
||||
new smartstream.SmartDuplex({
|
||||
writeFunction: async (chunkStringArg) => {
|
||||
// consume data
|
||||
},
|
||||
finalFunction: async (streamTools) => {
|
||||
streamTools.push(null);
|
||||
},
|
||||
})
|
||||
]);
|
||||
await streamWrapper.run();
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -0,0 +1,128 @@
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import * as fs from 'fs';
|
||||
import { StreamIntake, SmartDuplex } from '../ts/index.js';
|
||||
import * as stream from 'stream';
|
||||
|
||||
// =============================================
|
||||
// Basic StreamIntake
|
||||
// =============================================
|
||||
|
||||
tap.test('StreamIntake: should push data and signal end', async (tools) => {
|
||||
const intake = new StreamIntake<string>();
|
||||
const results: string[] = [];
|
||||
|
||||
intake.pipe(
|
||||
new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => {
|
||||
results.push(chunk);
|
||||
return chunk;
|
||||
},
|
||||
})
|
||||
);
|
||||
|
||||
const done = tools.defer();
|
||||
let counter = 0;
|
||||
intake.pushNextObservable.subscribe(() => {
|
||||
if (counter < 5) {
|
||||
counter++;
|
||||
intake.pushData(`item-${counter}`);
|
||||
} else {
|
||||
intake.signalEnd();
|
||||
done.resolve();
|
||||
}
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
// Give streams time to flush
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
expect(results.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
tap.test('StreamIntake: should pipe to a writable', async (tools) => {
|
||||
const intake = new StreamIntake<string>();
|
||||
|
||||
intake
|
||||
.pipe(
|
||||
new SmartDuplex({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk: string) => {
|
||||
return chunk;
|
||||
},
|
||||
})
|
||||
)
|
||||
.pipe(fs.createWriteStream('./test/assets/writabletext.txt'));
|
||||
|
||||
const done = tools.defer();
|
||||
let counter = 0;
|
||||
intake.pushNextObservable.subscribe(() => {
|
||||
if (counter < 10) {
|
||||
counter++;
|
||||
intake.pushData('data\n');
|
||||
} else {
|
||||
intake.signalEnd();
|
||||
done.resolve();
|
||||
}
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// StreamIntake.fromStream (Node Readable)
|
||||
// =============================================
|
||||
|
||||
tap.test('StreamIntake: fromStream should wrap a Node readable', async (tools) => {
|
||||
const nodeReadable = fs.createReadStream('./test/assets/readabletext.txt');
|
||||
const intake = await StreamIntake.fromStream<Buffer>(nodeReadable);
|
||||
|
||||
const chunks: Buffer[] = [];
|
||||
const done = tools.defer();
|
||||
|
||||
intake.on('data', (chunk: Buffer) => {
|
||||
chunks.push(chunk);
|
||||
});
|
||||
|
||||
intake.on('end', () => {
|
||||
expect(chunks.length).toBeGreaterThan(0);
|
||||
const content = Buffer.concat(chunks).toString();
|
||||
expect(content.length).toBeGreaterThan(0);
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// StreamIntake.fromStream (Web ReadableStream)
|
||||
// =============================================
|
||||
|
||||
tap.test('StreamIntake: fromStream should wrap a Web ReadableStream', async (tools) => {
|
||||
const data = ['chunk1', 'chunk2', 'chunk3'];
|
||||
const webReadable = new ReadableStream<string>({
|
||||
start(controller) {
|
||||
for (const item of data) {
|
||||
controller.enqueue(item);
|
||||
}
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const intake = await StreamIntake.fromStream<string>(webReadable);
|
||||
|
||||
const results: string[] = [];
|
||||
const done = tools.defer();
|
||||
|
||||
intake.on('data', (chunk: string) => {
|
||||
results.push(chunk);
|
||||
});
|
||||
|
||||
intake.on('end', () => {
|
||||
expect(results).toEqual(data);
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -0,0 +1,71 @@
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import * as fs from 'fs';
|
||||
import { StreamWrapper, SmartDuplex } from '../ts/index.js';
|
||||
|
||||
tap.test('StreamWrapper: should pipe read to write', async () => {
|
||||
const wrapper = new StreamWrapper([
|
||||
fs.createReadStream('./test/assets/test.md'),
|
||||
fs.createWriteStream('./test/assets/testCopy.md'),
|
||||
]);
|
||||
await wrapper.run();
|
||||
});
|
||||
|
||||
tap.test('StreamWrapper: should propagate errors', async (tools) => {
|
||||
const failingStream = new SmartDuplex<Buffer, Buffer>({
|
||||
writeFunction: async () => {
|
||||
throw new Error('intentional error');
|
||||
},
|
||||
});
|
||||
|
||||
const wrapper = new StreamWrapper([
|
||||
fs.createReadStream('./test/assets/test.md'),
|
||||
failingStream,
|
||||
]);
|
||||
|
||||
let errorCaught = false;
|
||||
try {
|
||||
await wrapper.run();
|
||||
} catch (err) {
|
||||
errorCaught = true;
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
expect(message).toEqual('intentional error');
|
||||
}
|
||||
expect(errorCaught).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('StreamWrapper: streamStarted should resolve', async () => {
|
||||
const wrapper = new StreamWrapper([
|
||||
fs.createReadStream('./test/assets/test.md'),
|
||||
fs.createWriteStream('./test/assets/testCopy.md'),
|
||||
]);
|
||||
|
||||
const runPromise = wrapper.run();
|
||||
await wrapper.streamStarted();
|
||||
await runPromise;
|
||||
});
|
||||
|
||||
tap.test('StreamWrapper: onCustomEvent should fire', async (tools) => {
|
||||
const results: string[] = [];
|
||||
|
||||
const emitter = new SmartDuplex<Buffer, Buffer>({
|
||||
writeFunction: async (chunk, streamTools) => {
|
||||
(emitter as any).emit('custom-progress', 'progress');
|
||||
return chunk;
|
||||
},
|
||||
});
|
||||
|
||||
const wrapper = new StreamWrapper([
|
||||
fs.createReadStream('./test/assets/test.md'),
|
||||
emitter,
|
||||
fs.createWriteStream('./test/assets/testCopy.md'),
|
||||
]);
|
||||
|
||||
wrapper.onCustomEvent('custom-progress', () => {
|
||||
results.push('fired');
|
||||
});
|
||||
|
||||
await wrapper.run();
|
||||
expect(results.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -1,68 +0,0 @@
|
||||
import { tap, expect } from '@push.rocks/tapbundle';
|
||||
import { SmartDuplex, type ISmartDuplexOptions, StreamWrapper } from '../ts/index.js';
|
||||
|
||||
tap.test('should run backpressure test', async (toolsArg) => {
|
||||
const done = toolsArg.defer();
|
||||
async function testBackpressure() {
|
||||
const stream1 = new SmartDuplex({
|
||||
name: 'stream1',
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 10)); // Slow processing
|
||||
console.log(`processed chunk ${chunk} in stream 1`);
|
||||
return chunk; // Fast processing
|
||||
},
|
||||
});
|
||||
const stream2 = new SmartDuplex({
|
||||
name: 'stream2',
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 20)); // Slow processing
|
||||
console.log(`processed chunk ${chunk} in stream 2`);
|
||||
await tools.push(chunk);
|
||||
// return chunk, optionally return ;
|
||||
},
|
||||
}); // This stream processes data more slowly
|
||||
const stream3 = new SmartDuplex({
|
||||
objectMode: true,
|
||||
name: 'stream3',
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 200)); // Slow processing
|
||||
console.log(`processed chunk ${chunk} in stream 3`);
|
||||
},
|
||||
});
|
||||
|
||||
stream1.pipe(stream2).pipe(stream3);
|
||||
|
||||
let backpressured = false;
|
||||
for (let i = 0; i < 200; i++) {
|
||||
const canContinue = stream1.write(`Chunk ${i}`, 'utf8');
|
||||
if (!canContinue) {
|
||||
backpressured = true;
|
||||
console.log(`Backpressure at chunk ${i}`);
|
||||
}
|
||||
}
|
||||
|
||||
stream1.end();
|
||||
|
||||
stream1.on('finish', () => {
|
||||
console.log('Stream 1 finished processing.');
|
||||
});
|
||||
stream2.on('finish', () => {
|
||||
console.log('Stream 2 finished processing.');
|
||||
});
|
||||
stream3.on('finish', () => {
|
||||
console.log('Stream 3 finished processing.');
|
||||
if (!backpressured) {
|
||||
throw new Error('No backpressure was observed.');
|
||||
} else {
|
||||
done.resolve();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
testBackpressure();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -1,24 +0,0 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import { SmartDuplex } from '../ts/smartstream.classes.smartduplex.js'; // Adjust the import to your file structure
|
||||
import * as smartrx from '@push.rocks/smartrx';
|
||||
import * as fs from 'fs';
|
||||
|
||||
tap.test('should create a SmartStream from a Buffer', async () => {
|
||||
const bufferData = Buffer.from('This is a test buffer');
|
||||
const smartStream = SmartDuplex.fromBuffer(bufferData, {});
|
||||
|
||||
let receivedData = Buffer.alloc(0);
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
smartStream.on('data', (chunk: Buffer) => {
|
||||
receivedData = Buffer.concat([receivedData, chunk]);
|
||||
});
|
||||
|
||||
smartStream.on('end', () => {
|
||||
expect(receivedData.toString()).toEqual(bufferData.toString());
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -1,65 +0,0 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import * as fs from 'fs';
|
||||
|
||||
import * as smartstream from '../ts/index.js';
|
||||
|
||||
let testIntake: smartstream.StreamIntake<string>;
|
||||
|
||||
tap.test('should handle a read stream', async (tools) => {
|
||||
const counter = 0;
|
||||
const streamWrapper = new smartstream.StreamWrapper([
|
||||
fs.createReadStream('./test/assets/readabletext.txt'),
|
||||
new smartstream.SmartDuplex({
|
||||
writeFunction: async (chunkStringArg: Buffer, streamTools) => {
|
||||
// do something with the stream here
|
||||
const result = chunkStringArg.toString().substr(0, 100);
|
||||
streamTools.push('wow =========== \n');
|
||||
return Buffer.from(result);
|
||||
},
|
||||
finalFunction: async (tools) => {
|
||||
return Buffer.from('this is the end');
|
||||
},
|
||||
}),
|
||||
new smartstream.SmartDuplex({
|
||||
writeFunction: async (chunkStringArg) => {
|
||||
console.log(chunkStringArg.toString());
|
||||
},
|
||||
finalFunction: async (tools) => {
|
||||
tools.push(null);
|
||||
},
|
||||
})
|
||||
]);
|
||||
await streamWrapper.run();
|
||||
});
|
||||
|
||||
tap.test('should create a valid Intake', async (tools) => {
|
||||
testIntake = new smartstream.StreamIntake<string>();
|
||||
testIntake.pipe(
|
||||
new smartstream.SmartDuplex({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunkStringArg: string, streamTools) => {
|
||||
await tools.delayFor(100);
|
||||
console.log(chunkStringArg);
|
||||
return chunkStringArg;
|
||||
}
|
||||
})
|
||||
)
|
||||
.pipe(fs.createWriteStream('./test/assets/writabletext.txt'));
|
||||
const testFinished = tools.defer();
|
||||
let counter = 0;
|
||||
testIntake.pushNextObservable.subscribe(() => {
|
||||
if (counter < 50) {
|
||||
counter++;
|
||||
testIntake.pushData('hi');
|
||||
testIntake.pushData('+wow');
|
||||
testIntake.pushData('\n');
|
||||
} else {
|
||||
testIntake.signalEnd();
|
||||
testFinished.resolve();
|
||||
}
|
||||
});
|
||||
await testFinished.promise;
|
||||
testIntake.signalEnd();
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -1,15 +0,0 @@
|
||||
import * as fs from 'fs';
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
|
||||
import * as smartstream from '../ts/smartstream.classes.streamwrapper.js';
|
||||
|
||||
let testSmartstream: smartstream.StreamWrapper;
|
||||
tap.test('should combine a stream', async () => {
|
||||
testSmartstream = new smartstream.StreamWrapper([
|
||||
fs.createReadStream('./test/assets/test.md'),
|
||||
fs.createWriteStream('./test/assets/testCopy.md'),
|
||||
]);
|
||||
await testSmartstream.run();
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -1,67 +0,0 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import * as webstream from '../ts_web/index.js';
|
||||
|
||||
tap.test('WebDuplexStream fromUInt8Array should read back the same Uint8Array', async () => {
|
||||
const inputUint8Array = new Uint8Array([1, 2, 3, 4, 5]);
|
||||
const stream = webstream.WebDuplexStream.fromUInt8Array(inputUint8Array);
|
||||
|
||||
const reader = stream.readable.getReader();
|
||||
let readUint8Array = new Uint8Array();
|
||||
|
||||
// Read from the stream
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
if (value) {
|
||||
// Concatenate value to readUint8Array
|
||||
const tempArray = new Uint8Array(readUint8Array.length + value.length);
|
||||
tempArray.set(readUint8Array, 0);
|
||||
tempArray.set(value, readUint8Array.length);
|
||||
readUint8Array = tempArray;
|
||||
}
|
||||
}
|
||||
|
||||
expect(readUint8Array).toEqual(inputUint8Array);
|
||||
});
|
||||
|
||||
tap.test('WebDuplexStream should handle transform with a write function', async () => {
|
||||
const input = [1, 2, 3, 4, 5];
|
||||
const expectedOutput = [2, 4, 6, 8, 10];
|
||||
|
||||
const webDuplexStream = new webstream.WebDuplexStream<number, number>({
|
||||
writeFunction: async (chunk, { push }) => {
|
||||
// Push the doubled number into the stream
|
||||
push(chunk * 2);
|
||||
},
|
||||
});
|
||||
|
||||
const writer = webDuplexStream.writable.getWriter();
|
||||
const reader = webDuplexStream.readable.getReader();
|
||||
|
||||
const output: number[] = [];
|
||||
|
||||
// Read from the stream asynchronously
|
||||
const readPromise = (async () => {
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
if (value !== undefined) {
|
||||
output.push(value);
|
||||
}
|
||||
}
|
||||
})();
|
||||
|
||||
// Write to the stream
|
||||
for (const num of input) {
|
||||
await writer.write(num);
|
||||
}
|
||||
await writer.close();
|
||||
|
||||
// Wait for the reading to complete
|
||||
await readPromise;
|
||||
|
||||
// Assert that the output matches the expected transformed data
|
||||
expect(output).toEqual(expectedOutput);
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -0,0 +1,51 @@
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import { createTransformFunction, createPassThrough, SmartDuplex, StreamWrapper } from '../ts/index.js';
|
||||
|
||||
// =============================================
|
||||
// createTransformFunction
|
||||
// =============================================
|
||||
|
||||
tap.test('createTransformFunction: should create a transform stream', async (tools) => {
|
||||
const doubler = createTransformFunction<number, number>(async (n) => n * 2, { objectMode: true });
|
||||
const results: number[] = [];
|
||||
|
||||
doubler.on('data', (chunk: number) => results.push(chunk));
|
||||
|
||||
const done = tools.defer();
|
||||
doubler.on('end', () => {
|
||||
expect(results).toContain(10);
|
||||
expect(results).toContain(20);
|
||||
expect(results).toContain(30);
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
doubler.write(5);
|
||||
doubler.write(10);
|
||||
doubler.write(15);
|
||||
doubler.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// createPassThrough
|
||||
// =============================================
|
||||
|
||||
tap.test('createPassThrough: should pass data through unchanged', async (tools) => {
|
||||
const passThrough = createPassThrough();
|
||||
const results: string[] = [];
|
||||
|
||||
passThrough.on('data', (chunk: string) => results.push(chunk));
|
||||
|
||||
const done = tools.defer();
|
||||
passThrough.on('end', () => {
|
||||
expect(results).toEqual(['hello', 'world']);
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
passThrough.write('hello');
|
||||
passThrough.write('world');
|
||||
passThrough.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -0,0 +1,147 @@
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import { WebDuplexStream } from '../ts_web/index.js';
|
||||
|
||||
// Helper: collect all chunks from a readable
|
||||
async function collectAll<T>(reader: ReadableStreamDefaultReader<T>): Promise<T[]> {
|
||||
const results: T[] = [];
|
||||
while (true) {
|
||||
const result = await reader.read();
|
||||
if (result.done) break;
|
||||
results.push(result.value);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
// =============================================
|
||||
// Basic transform
|
||||
// =============================================
|
||||
|
||||
tap.test('WebDuplexStream: should transform chunks via writeFunction', async () => {
|
||||
const stream = new WebDuplexStream<number, number>({
|
||||
writeFunction: async (chunk, { push }) => {
|
||||
push(chunk * 2);
|
||||
},
|
||||
});
|
||||
|
||||
const writer = stream.writable.getWriter();
|
||||
const reader = stream.readable.getReader();
|
||||
|
||||
// Read and write concurrently to avoid backpressure deadlock
|
||||
const readPromise = collectAll(reader);
|
||||
await writer.write(5);
|
||||
await writer.write(10);
|
||||
await writer.close();
|
||||
const results = await readPromise;
|
||||
|
||||
expect(results).toContain(10);
|
||||
expect(results).toContain(20);
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// writeFunction return value
|
||||
// =============================================
|
||||
|
||||
tap.test('WebDuplexStream: should enqueue returned non-null values', async () => {
|
||||
const stream = new WebDuplexStream<string, string>({
|
||||
writeFunction: async (chunk) => {
|
||||
return chunk.toUpperCase();
|
||||
},
|
||||
});
|
||||
|
||||
const writer = stream.writable.getWriter();
|
||||
const reader = stream.readable.getReader();
|
||||
|
||||
const readPromise = collectAll(reader);
|
||||
await writer.write('hello');
|
||||
await writer.close();
|
||||
const results = await readPromise;
|
||||
|
||||
expect(results[0]).toEqual('HELLO');
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// fromUInt8Array
|
||||
// =============================================
|
||||
|
||||
tap.test('WebDuplexStream: fromUInt8Array should produce data', async () => {
|
||||
const data = new Uint8Array([1, 2, 3, 4, 5]);
|
||||
const stream = WebDuplexStream.fromUInt8Array(data);
|
||||
const reader = stream.readable.getReader();
|
||||
|
||||
const { value } = await reader.read();
|
||||
expect(value).toBeTruthy();
|
||||
if (!value) {
|
||||
throw new Error('Expected fromUInt8Array to produce data');
|
||||
}
|
||||
expect(value.length).toEqual(5);
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// readFunction
|
||||
// =============================================
|
||||
|
||||
tap.test('WebDuplexStream: readFunction should supply data to the stream', async () => {
|
||||
const stream = new WebDuplexStream<string, string>({
|
||||
readFunction: async (tools) => {
|
||||
await tools.write('chunk1');
|
||||
await tools.write('chunk2');
|
||||
tools.done();
|
||||
},
|
||||
writeFunction: async (chunk, { push }) => {
|
||||
push(chunk.toUpperCase());
|
||||
},
|
||||
});
|
||||
|
||||
const reader = stream.readable.getReader();
|
||||
const results = await collectAll(reader);
|
||||
|
||||
expect(results).toContain('CHUNK1');
|
||||
expect(results).toContain('CHUNK2');
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// finalFunction
|
||||
// =============================================
|
||||
|
||||
tap.test('WebDuplexStream: finalFunction should emit final data', async () => {
|
||||
const stream = new WebDuplexStream<string, string>({
|
||||
writeFunction: async (chunk) => {
|
||||
return chunk;
|
||||
},
|
||||
finalFunction: async (tools) => {
|
||||
tools.push('final');
|
||||
return undefined;
|
||||
},
|
||||
});
|
||||
|
||||
const writer = stream.writable.getWriter();
|
||||
const reader = stream.readable.getReader();
|
||||
|
||||
const readPromise = collectAll(reader);
|
||||
await writer.write('hello');
|
||||
await writer.close();
|
||||
const results = await readPromise;
|
||||
|
||||
expect(results).toContain('hello');
|
||||
expect(results).toContain('final');
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// No writeFunction = passthrough
|
||||
// =============================================
|
||||
|
||||
tap.test('WebDuplexStream: no writeFunction should passthrough', async () => {
|
||||
const stream = new WebDuplexStream<string, string>({});
|
||||
|
||||
const writer = stream.writable.getWriter();
|
||||
const reader = stream.readable.getReader();
|
||||
|
||||
const readPromise = collectAll(reader);
|
||||
await writer.write('pass');
|
||||
await writer.close();
|
||||
const results = await readPromise;
|
||||
|
||||
expect(results[0]).toEqual('pass');
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartstream',
|
||||
version: '3.3.0',
|
||||
version: '3.4.2',
|
||||
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
|
||||
}
|
||||
|
||||
@@ -7,11 +7,11 @@ export interface IStreamTools {
|
||||
}
|
||||
|
||||
export interface IStreamWriteFunction<T, rT> {
|
||||
(chunkArg: T, toolsArg: IStreamTools): Promise<rT>;
|
||||
(chunkArg: T, toolsArg: IStreamTools): Promise<rT | void | null | undefined>;
|
||||
}
|
||||
|
||||
export interface IStreamFinalFunction<rT> {
|
||||
(toolsArg: IStreamTools): Promise<rT>;
|
||||
(toolsArg: IStreamTools): Promise<rT | void | null | undefined>;
|
||||
}
|
||||
|
||||
export interface ISmartDuplexOptions<TInput, TOutput> extends DuplexOptions {
|
||||
@@ -56,67 +56,116 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
||||
readableStream: ReadableStream<T>
|
||||
): SmartDuplex<T, T> {
|
||||
const smartDuplex = new SmartDuplex<T, T>({
|
||||
/**
|
||||
* this function is called whenever the stream is being read from and at the same time if nothing is enqueued
|
||||
* therefor it is important to always unlock the reader after reading
|
||||
*/
|
||||
readFunction: async () => {
|
||||
const reader = readableStream.getReader();
|
||||
const { value, done } = await reader.read();
|
||||
if (value !== undefined) {
|
||||
smartDuplex.push(value);
|
||||
}
|
||||
reader.releaseLock();
|
||||
if (done) {
|
||||
smartDuplex.push(null);
|
||||
}
|
||||
},
|
||||
objectMode: true,
|
||||
});
|
||||
|
||||
// Acquire reader ONCE
|
||||
const reader = readableStream.getReader();
|
||||
let reading = false;
|
||||
|
||||
// Override _read to pull from the web reader
|
||||
smartDuplex._read = function (_size: number) {
|
||||
if (reading) return;
|
||||
reading = true;
|
||||
reader.read().then(
|
||||
({ value, done }) => {
|
||||
reading = false;
|
||||
if (done) {
|
||||
smartDuplex.push(null);
|
||||
} else {
|
||||
smartDuplex.push(value);
|
||||
}
|
||||
},
|
||||
(err) => {
|
||||
reading = false;
|
||||
smartDuplex.destroy(err);
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
// Cancel reader on destroy
|
||||
smartDuplex.on('close', () => {
|
||||
reader.cancel().catch(() => {});
|
||||
});
|
||||
|
||||
return smartDuplex;
|
||||
}
|
||||
|
||||
// INSTANCE
|
||||
private backpressuredArray: plugins.lik.BackpressuredArray<TOutput>; // an array that only takes a defined amount of items
|
||||
private backpressuredArray: plugins.lik.BackpressuredArray<TOutput | null>;
|
||||
public options: ISmartDuplexOptions<TInput, TOutput>;
|
||||
private observableSubscription?: plugins.smartrx.rxjs.Subscription;
|
||||
private _consumerWantsData = false;
|
||||
private _readFunctionRunning = false;
|
||||
|
||||
private debugLog(messageArg: string) {
|
||||
// optional debug log
|
||||
if (this.options.debug) {
|
||||
console.log(messageArg);
|
||||
}
|
||||
}
|
||||
|
||||
constructor(optionsArg?: ISmartDuplexOptions<TInput, TOutput>) {
|
||||
const safeOptions = optionsArg || {} as ISmartDuplexOptions<TInput, TOutput>;
|
||||
super(
|
||||
Object.assign(
|
||||
{
|
||||
highWaterMark: 1,
|
||||
},
|
||||
optionsArg
|
||||
safeOptions
|
||||
)
|
||||
);
|
||||
this.options = optionsArg;
|
||||
this.backpressuredArray = new plugins.lik.BackpressuredArray<TOutput>(
|
||||
this.options = safeOptions;
|
||||
this.backpressuredArray = new plugins.lik.BackpressuredArray<TOutput | null>(
|
||||
this.options.highWaterMark || 1
|
||||
);
|
||||
}
|
||||
|
||||
public async _read(size: number): Promise<void> {
|
||||
this.debugLog(`${this.options.name}: read was called`);
|
||||
if (this.options.readFunction) {
|
||||
await this.options.readFunction();
|
||||
}
|
||||
await this.backpressuredArray.waitForItems();
|
||||
this.debugLog(`${this.options.name}: successfully waited for items.`);
|
||||
let canPushMore = true;
|
||||
while (this.backpressuredArray.data.length > 0 && canPushMore) {
|
||||
/**
|
||||
* Synchronously drains items from the backpressuredArray into the readable side.
|
||||
* Stops when push() returns false (consumer is full) or array is empty.
|
||||
*/
|
||||
private _drainBackpressuredArray(): void {
|
||||
while (this.backpressuredArray.data.length > 0) {
|
||||
const nextChunk = this.backpressuredArray.shift();
|
||||
canPushMore = this.push(nextChunk);
|
||||
if (nextChunk === null) {
|
||||
// EOF signal — push null to end readable side
|
||||
this.push(null);
|
||||
this._consumerWantsData = false;
|
||||
return;
|
||||
}
|
||||
const canPushMore = this.push(nextChunk);
|
||||
if (!canPushMore) {
|
||||
this._consumerWantsData = false;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// _read must NOT be async — Node.js ignores the return value
|
||||
public _read(size: number): void {
|
||||
this.debugLog(`${this.options.name}: read was called`);
|
||||
this._consumerWantsData = true;
|
||||
|
||||
// Drain any buffered items first
|
||||
if (this.backpressuredArray.data.length > 0) {
|
||||
this._drainBackpressuredArray();
|
||||
}
|
||||
|
||||
// If readFunction exists and is not already running, start it
|
||||
if (this.options.readFunction && !this._readFunctionRunning) {
|
||||
this._readFunctionRunning = true;
|
||||
this.options.readFunction().then(
|
||||
() => { this._readFunctionRunning = false; },
|
||||
(err) => { this._readFunctionRunning = false; this.destroy(err); }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public async backpressuredPush(pushArg: TOutput) {
|
||||
const canPushMore = this.backpressuredArray.push(pushArg);
|
||||
// Try to drain if the consumer wants data
|
||||
if (this._consumerWantsData) {
|
||||
this._drainBackpressuredArray();
|
||||
}
|
||||
if (!canPushMore) {
|
||||
this.debugLog(`${this.options.name}: cannot push more`);
|
||||
await this.backpressuredArray.waitForSpace();
|
||||
@@ -126,83 +175,151 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
||||
}
|
||||
|
||||
private asyncWritePromiseObjectmap = new plugins.lik.ObjectMap<Promise<any>>();
|
||||
// Ensure the _write method types the chunk as TInput and encodes TOutput
|
||||
public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) {
|
||||
|
||||
// _write must NOT be async — Node.js ignores the return value
|
||||
public _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) {
|
||||
if (!this.options.writeFunction) {
|
||||
return callback(new Error('No stream function provided'));
|
||||
}
|
||||
|
||||
let callbackCalled = false;
|
||||
const safeCallback = (err?: Error | null) => {
|
||||
if (!callbackCalled) {
|
||||
callbackCalled = true;
|
||||
callback(err);
|
||||
}
|
||||
};
|
||||
|
||||
let isTruncated = false;
|
||||
const tools: IStreamTools = {
|
||||
truncate: () => {
|
||||
this.push(null);
|
||||
isTruncated = true;
|
||||
callback();
|
||||
safeCallback();
|
||||
this.push(null);
|
||||
},
|
||||
push: async (pushArg: TOutput) => {
|
||||
return await this.backpressuredPush(pushArg);
|
||||
},
|
||||
};
|
||||
|
||||
try {
|
||||
const writeDeferred = plugins.smartpromise.defer();
|
||||
this.asyncWritePromiseObjectmap.add(writeDeferred.promise);
|
||||
const modifiedChunk = await this.options.writeFunction(chunk, tools);
|
||||
if (isTruncated) {
|
||||
return;
|
||||
}
|
||||
if (modifiedChunk) {
|
||||
await tools.push(modifiedChunk);
|
||||
}
|
||||
callback();
|
||||
writeDeferred.resolve();
|
||||
writeDeferred.promise.then(() => {
|
||||
const writeDeferred = plugins.smartpromise.defer();
|
||||
this.asyncWritePromiseObjectmap.add(writeDeferred.promise);
|
||||
|
||||
this.options.writeFunction(chunk, tools).then(
|
||||
(modifiedChunk) => {
|
||||
if (isTruncated) {
|
||||
writeDeferred.resolve();
|
||||
this.asyncWritePromiseObjectmap.remove(writeDeferred.promise);
|
||||
return;
|
||||
}
|
||||
const finish = () => {
|
||||
safeCallback();
|
||||
writeDeferred.resolve();
|
||||
this.asyncWritePromiseObjectmap.remove(writeDeferred.promise);
|
||||
};
|
||||
if (modifiedChunk !== undefined && modifiedChunk !== null) {
|
||||
this.backpressuredPush(modifiedChunk).then(finish, (err) => {
|
||||
safeCallback(err);
|
||||
writeDeferred.resolve();
|
||||
this.asyncWritePromiseObjectmap.remove(writeDeferred.promise);
|
||||
});
|
||||
} else {
|
||||
finish();
|
||||
}
|
||||
},
|
||||
(err) => {
|
||||
safeCallback(err);
|
||||
writeDeferred.resolve();
|
||||
this.asyncWritePromiseObjectmap.remove(writeDeferred.promise);
|
||||
});
|
||||
} catch (err) {
|
||||
callback(err);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public async _final(callback: (error?: Error | null) => void) {
|
||||
await Promise.all(this.asyncWritePromiseObjectmap.getArray());
|
||||
if (this.options.finalFunction) {
|
||||
const tools: IStreamTools = {
|
||||
truncate: () => callback(),
|
||||
push: async (pipeObject) => {
|
||||
return this.backpressuredArray.push(pipeObject);
|
||||
},
|
||||
};
|
||||
|
||||
try {
|
||||
const finalChunk = await this.options.finalFunction(tools);
|
||||
if (finalChunk) {
|
||||
this.backpressuredArray.push(finalChunk);
|
||||
}
|
||||
} catch (err) {
|
||||
this.backpressuredArray.push(null);
|
||||
// _final must NOT be async — Node.js ignores the return value
|
||||
public _final(callback: (error?: Error | null) => void) {
|
||||
let callbackCalled = false;
|
||||
const safeCallback = (err?: Error | null) => {
|
||||
if (!callbackCalled) {
|
||||
callbackCalled = true;
|
||||
callback(err);
|
||||
return;
|
||||
}
|
||||
}
|
||||
this.backpressuredArray.push(null);
|
||||
callback();
|
||||
};
|
||||
|
||||
Promise.all(this.asyncWritePromiseObjectmap.getArray()).then(() => {
|
||||
if (this.options.finalFunction) {
|
||||
const tools: IStreamTools = {
|
||||
truncate: () => safeCallback(),
|
||||
push: async (pipeObject) => {
|
||||
return await this.backpressuredPush(pipeObject);
|
||||
},
|
||||
};
|
||||
|
||||
this.options.finalFunction(tools).then(
|
||||
(finalChunk) => {
|
||||
const pushNull = () => {
|
||||
this.backpressuredArray.push(null);
|
||||
if (this._consumerWantsData) {
|
||||
this._drainBackpressuredArray();
|
||||
}
|
||||
safeCallback();
|
||||
};
|
||||
|
||||
if (finalChunk !== undefined && finalChunk !== null) {
|
||||
this.backpressuredPush(finalChunk).then(pushNull, (err) => {
|
||||
safeCallback(err);
|
||||
});
|
||||
} else {
|
||||
pushNull();
|
||||
}
|
||||
},
|
||||
(err) => {
|
||||
this.backpressuredArray.push(null);
|
||||
if (this._consumerWantsData) {
|
||||
this._drainBackpressuredArray();
|
||||
}
|
||||
safeCallback(err);
|
||||
}
|
||||
);
|
||||
} else {
|
||||
this.backpressuredArray.push(null);
|
||||
if (this._consumerWantsData) {
|
||||
this._drainBackpressuredArray();
|
||||
}
|
||||
safeCallback();
|
||||
}
|
||||
}, (err) => {
|
||||
safeCallback(err);
|
||||
});
|
||||
}
|
||||
|
||||
public async getWebStreams(): Promise<{ readable: ReadableStream; writable: WritableStream }> {
|
||||
const duplex = this;
|
||||
let readableClosed = false;
|
||||
|
||||
const readable = new ReadableStream({
|
||||
start(controller) {
|
||||
duplex.on('readable', () => {
|
||||
const onReadable = () => {
|
||||
let chunk;
|
||||
while (null !== (chunk = duplex.read())) {
|
||||
controller.enqueue(chunk);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
duplex.on('end', () => {
|
||||
controller.close();
|
||||
});
|
||||
const onEnd = () => {
|
||||
if (!readableClosed) {
|
||||
readableClosed = true;
|
||||
controller.close();
|
||||
}
|
||||
cleanup();
|
||||
};
|
||||
|
||||
const cleanup = () => {
|
||||
duplex.removeListener('readable', onReadable);
|
||||
duplex.removeListener('end', onEnd);
|
||||
};
|
||||
|
||||
duplex.on('readable', onReadable);
|
||||
duplex.on('end', onEnd);
|
||||
},
|
||||
cancel(reason) {
|
||||
duplex.destroy(new Error(reason));
|
||||
@@ -212,22 +329,38 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
||||
const writable = new WritableStream({
|
||||
write(chunk) {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
let resolved = false;
|
||||
const onDrain = () => {
|
||||
if (!resolved) {
|
||||
resolved = true;
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
|
||||
const isBackpressured = !duplex.write(chunk, (error) => {
|
||||
if (error) {
|
||||
reject(error);
|
||||
} else {
|
||||
if (!resolved) {
|
||||
resolved = true;
|
||||
duplex.removeListener('drain', onDrain);
|
||||
reject(error);
|
||||
}
|
||||
} else if (!isBackpressured && !resolved) {
|
||||
resolved = true;
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
|
||||
if (isBackpressured) {
|
||||
duplex.once('drain', resolve);
|
||||
duplex.once('drain', onDrain);
|
||||
}
|
||||
});
|
||||
},
|
||||
close() {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
duplex.end(resolve);
|
||||
duplex.end((err: Error | null) => {
|
||||
if (err) reject(err);
|
||||
else resolve();
|
||||
});
|
||||
});
|
||||
},
|
||||
abort(reason) {
|
||||
|
||||
@@ -6,8 +6,11 @@ export class StreamIntake<T> extends plugins.stream.Readable {
|
||||
const intakeStream = new StreamIntake<U>(options);
|
||||
|
||||
if (inputStream instanceof plugins.stream.Readable) {
|
||||
inputStream.on('data', (chunk: U) => {
|
||||
intakeStream.pushData(chunk);
|
||||
inputStream.on('readable', () => {
|
||||
let chunk: U;
|
||||
while (null !== (chunk = inputStream.read() as U)) {
|
||||
intakeStream.pushData(chunk);
|
||||
}
|
||||
});
|
||||
|
||||
inputStream.on('end', () => {
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
import * as plugins from './smartstream.plugins.js';
|
||||
|
||||
// interfaces
|
||||
import { Transform } from 'stream';
|
||||
|
||||
export interface IErrorFunction {
|
||||
(err: Error): any;
|
||||
}
|
||||
@@ -62,7 +59,7 @@ export class StreamWrapper {
|
||||
}
|
||||
|
||||
// combine the stream
|
||||
let finalStream = null;
|
||||
let finalStream: plugins.stream.Duplex | null = null;
|
||||
let firstIteration: boolean = true;
|
||||
for (const stream of streamExecutionArray) {
|
||||
if (firstIteration === true) {
|
||||
@@ -74,23 +71,30 @@ export class StreamWrapper {
|
||||
for (const customEventObject of this.customEventObjectArray) {
|
||||
stream.on(customEventObject.eventName, customEventObject.eventFunction);
|
||||
}
|
||||
if (!firstIteration) {
|
||||
if (!firstIteration && finalStream) {
|
||||
finalStream = finalStream.pipe(stream);
|
||||
}
|
||||
firstIteration = false;
|
||||
}
|
||||
|
||||
if (!finalStream) {
|
||||
done.resolve();
|
||||
return done.promise;
|
||||
}
|
||||
|
||||
this.streamStartedDeferred.resolve();
|
||||
|
||||
finalStream.on('end', () => {
|
||||
done.resolve();
|
||||
});
|
||||
finalStream.on('close', () => {
|
||||
done.resolve();
|
||||
});
|
||||
finalStream.on('finish', () => {
|
||||
done.resolve();
|
||||
});
|
||||
let resolved = false;
|
||||
const safeResolve = () => {
|
||||
if (!resolved) {
|
||||
resolved = true;
|
||||
done.resolve();
|
||||
}
|
||||
};
|
||||
|
||||
finalStream.on('end', safeResolve);
|
||||
finalStream.on('close', safeResolve);
|
||||
finalStream.on('finish', safeResolve);
|
||||
return done.promise;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { type TransformOptions } from 'stream';
|
||||
import { type DuplexOptions } from 'stream';
|
||||
import { SmartDuplex } from './smartstream.classes.smartduplex.js';
|
||||
|
||||
export interface AsyncTransformFunction<TInput, TOutput> {
|
||||
@@ -7,7 +7,7 @@ export interface AsyncTransformFunction<TInput, TOutput> {
|
||||
|
||||
export function createTransformFunction<TInput, TOutput>(
|
||||
asyncFunction: AsyncTransformFunction<TInput, TOutput>,
|
||||
options?: TransformOptions
|
||||
options?: DuplexOptions
|
||||
): SmartDuplex {
|
||||
const smartDuplexStream = new SmartDuplex({
|
||||
...options,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import * as plugins from './smartstream.plugins.js';
|
||||
|
||||
/**
|
||||
* Creates a Web ReadableStream from a file.
|
||||
* Creates a Web ReadableStream from a file using pull-based backpressure.
|
||||
*
|
||||
* @param filePath - The path to the file to be read
|
||||
* @returns A Web ReadableStream that reads the file in chunks
|
||||
@@ -11,23 +11,53 @@ export function createWebReadableStreamFromFile(filePath: string): ReadableStrea
|
||||
|
||||
return new ReadableStream({
|
||||
start(controller) {
|
||||
// When data is available, enqueue it into the Web ReadableStream
|
||||
fileStream.on('data', (chunk) => {
|
||||
controller.enqueue(chunk as Uint8Array);
|
||||
fileStream.on('error', (err) => {
|
||||
controller.error(err);
|
||||
});
|
||||
|
||||
// When the file stream ends, close the Web ReadableStream
|
||||
fileStream.on('end', () => {
|
||||
controller.close();
|
||||
});
|
||||
|
||||
// If there's an error, error the Web ReadableStream
|
||||
fileStream.on('error', (err) => {
|
||||
controller.error(err);
|
||||
// Pause immediately — pull() will drive reads
|
||||
fileStream.pause();
|
||||
},
|
||||
pull(controller) {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const chunk = fileStream.read();
|
||||
if (chunk !== null) {
|
||||
controller.enqueue(chunk as Uint8Array);
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
// No data available yet — wait for 'readable' or 'end'
|
||||
const onReadable = () => {
|
||||
cleanup();
|
||||
const data = fileStream.read();
|
||||
if (data !== null) {
|
||||
controller.enqueue(data as Uint8Array);
|
||||
}
|
||||
resolve();
|
||||
};
|
||||
const onEnd = () => {
|
||||
cleanup();
|
||||
resolve();
|
||||
};
|
||||
const onError = (err: Error) => {
|
||||
cleanup();
|
||||
reject(err);
|
||||
};
|
||||
const cleanup = () => {
|
||||
fileStream.removeListener('readable', onReadable);
|
||||
fileStream.removeListener('end', onEnd);
|
||||
fileStream.removeListener('error', onError);
|
||||
};
|
||||
fileStream.once('readable', onReadable);
|
||||
fileStream.once('end', onEnd);
|
||||
fileStream.once('error', onError);
|
||||
});
|
||||
},
|
||||
cancel() {
|
||||
// If the Web ReadableStream is canceled, destroy the file stream
|
||||
fileStream.destroy();
|
||||
}
|
||||
});
|
||||
@@ -43,23 +73,25 @@ export function convertWebReadableToNodeReadable(webStream: ReadableStream<Uint8
|
||||
const reader = webStream.getReader();
|
||||
|
||||
return new plugins.stream.Readable({
|
||||
async read() {
|
||||
try {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) {
|
||||
this.push(null); // Signal end of stream
|
||||
} else {
|
||||
this.push(Buffer.from(value)); // Convert Uint8Array to Buffer for Node.js Readable
|
||||
read() {
|
||||
reader.read().then(
|
||||
({ value, done }) => {
|
||||
if (done) {
|
||||
this.push(null);
|
||||
} else {
|
||||
this.push(Buffer.from(value));
|
||||
}
|
||||
},
|
||||
(err) => {
|
||||
this.destroy(err);
|
||||
}
|
||||
} catch (err) {
|
||||
this.destroy(err); // Handle errors by destroying the stream
|
||||
}
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a Node.js Readable stream to a Web ReadableStream.
|
||||
* Converts a Node.js Readable stream to a Web ReadableStream using pull-based backpressure.
|
||||
*
|
||||
* @param nodeStream - The Node.js Readable stream to convert
|
||||
* @returns A Web ReadableStream that reads data from the Node.js Readable stream
|
||||
@@ -67,16 +99,50 @@ export function convertWebReadableToNodeReadable(webStream: ReadableStream<Uint8
|
||||
export function convertNodeReadableToWebReadable(nodeStream: plugins.stream.Readable): ReadableStream<Uint8Array> {
|
||||
return new ReadableStream({
|
||||
start(controller) {
|
||||
nodeStream.on('data', (chunk) => {
|
||||
controller.enqueue(new Uint8Array(chunk));
|
||||
nodeStream.on('error', (err) => {
|
||||
controller.error(err);
|
||||
});
|
||||
|
||||
nodeStream.on('end', () => {
|
||||
controller.close();
|
||||
});
|
||||
|
||||
nodeStream.on('error', (err) => {
|
||||
controller.error(err);
|
||||
// Pause immediately — pull() will drive reads
|
||||
nodeStream.pause();
|
||||
},
|
||||
pull(controller) {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const chunk = nodeStream.read();
|
||||
if (chunk !== null) {
|
||||
controller.enqueue(new Uint8Array(chunk));
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
// No data available yet — wait for 'readable' or 'end'
|
||||
const onReadable = () => {
|
||||
cleanup();
|
||||
const data = nodeStream.read();
|
||||
if (data !== null) {
|
||||
controller.enqueue(new Uint8Array(data));
|
||||
}
|
||||
resolve();
|
||||
};
|
||||
const onEnd = () => {
|
||||
cleanup();
|
||||
resolve();
|
||||
};
|
||||
const onError = (err: Error) => {
|
||||
cleanup();
|
||||
reject(err);
|
||||
};
|
||||
const cleanup = () => {
|
||||
nodeStream.removeListener('readable', onReadable);
|
||||
nodeStream.removeListener('end', onEnd);
|
||||
nodeStream.removeListener('error', onError);
|
||||
};
|
||||
nodeStream.once('readable', onReadable);
|
||||
nodeStream.once('end', onEnd);
|
||||
nodeStream.once('error', onError);
|
||||
});
|
||||
},
|
||||
cancel() {
|
||||
@@ -95,19 +161,23 @@ export function convertWebWritableToNodeWritable(webWritable: WritableStream<Uin
|
||||
const writer = webWritable.getWriter();
|
||||
|
||||
return new plugins.stream.Writable({
|
||||
async write(chunk, encoding, callback) {
|
||||
try {
|
||||
await writer.write(new Uint8Array(chunk));
|
||||
callback();
|
||||
} catch (err) {
|
||||
callback(err);
|
||||
}
|
||||
write(chunk, encoding, callback) {
|
||||
writer.write(new Uint8Array(chunk)).then(
|
||||
() => callback(),
|
||||
(err) => callback(err)
|
||||
);
|
||||
},
|
||||
final(callback) {
|
||||
writer.close().then(() => callback()).catch(callback);
|
||||
},
|
||||
destroy(err, callback) {
|
||||
writer.abort(err).then(() => callback(err)).catch(callback);
|
||||
if (err) {
|
||||
writer.abort(err).then(() => callback(err)).catch(() => callback(err));
|
||||
} else {
|
||||
// Clean destroy — just release the lock
|
||||
writer.releaseLock();
|
||||
callback(null);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -133,7 +203,7 @@ export function convertNodeWritableToWebWritable(nodeWritable: plugins.stream.Wr
|
||||
},
|
||||
close() {
|
||||
return new Promise((resolve, reject) => {
|
||||
nodeWritable.end((err) => {
|
||||
nodeWritable.end((err: Error | null) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
@@ -143,9 +213,7 @@ export function convertNodeWritableToWebWritable(nodeWritable: plugins.stream.Wr
|
||||
});
|
||||
},
|
||||
abort(reason) {
|
||||
return new Promise((resolve, reject) => {
|
||||
nodeWritable.destroy(reason);
|
||||
});
|
||||
nodeWritable.destroy(reason instanceof Error ? reason : new Error(String(reason)));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartstream',
|
||||
version: '3.3.0',
|
||||
version: '3.4.2',
|
||||
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
|
||||
}
|
||||
|
||||
@@ -78,17 +78,14 @@ export class WebDuplexStream<TInput = any, TOutput = any> extends TransformStrea
|
||||
|
||||
try {
|
||||
const finalChunk = await optionsArg.finalFunction(tools);
|
||||
if (finalChunk) {
|
||||
controller.enqueue(finalChunk);
|
||||
if (finalChunk !== undefined && finalChunk !== null) {
|
||||
controller.enqueue(finalChunk as TOutput);
|
||||
}
|
||||
} catch (err) {
|
||||
controller.error(err);
|
||||
} finally {
|
||||
controller.terminate();
|
||||
}
|
||||
} else {
|
||||
controller.terminate();
|
||||
}
|
||||
// TransformStream auto-closes readable after flush resolves — no terminate() needed
|
||||
},
|
||||
});
|
||||
|
||||
@@ -96,7 +93,9 @@ export class WebDuplexStream<TInput = any, TOutput = any> extends TransformStrea
|
||||
|
||||
// Start producing data if readFunction is provided
|
||||
if (this.options.readFunction) {
|
||||
this._startReading();
|
||||
this._startReading().catch((err) => {
|
||||
// Prevent unhandled rejection — the error is propagated through the writable side
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,17 +103,28 @@ export class WebDuplexStream<TInput = any, TOutput = any> extends TransformStrea
|
||||
const writable = this.writable;
|
||||
const writer = writable.getWriter();
|
||||
|
||||
let doneSignaled = false;
|
||||
const tools: IStreamToolsRead<TInput, TOutput> = {
|
||||
done: () => writer.close(),
|
||||
done: () => {
|
||||
doneSignaled = true;
|
||||
},
|
||||
write: async (writeArg) => await writer.write(writeArg),
|
||||
};
|
||||
|
||||
try {
|
||||
await this.options.readFunction(tools);
|
||||
const readFunction = this.options.readFunction;
|
||||
if (readFunction) {
|
||||
await readFunction(tools);
|
||||
}
|
||||
if (doneSignaled) {
|
||||
await writer.close();
|
||||
}
|
||||
} catch (err) {
|
||||
writer.abort(err);
|
||||
} finally {
|
||||
writer.releaseLock();
|
||||
try {
|
||||
await writer.abort(err);
|
||||
} catch (_) {
|
||||
// Writer may already be in error state
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -128,8 +138,8 @@ export class WebDuplexStream<TInput = any, TOutput = any> extends TransformStrea
|
||||
});
|
||||
|
||||
const writer = stream.writable.getWriter();
|
||||
writer.write(uint8Array).then(() => writer.close());
|
||||
writer.write(uint8Array).then(() => writer.close()).catch(() => {});
|
||||
|
||||
return stream;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+3
-1
@@ -5,8 +5,10 @@
|
||||
"target": "ES2022",
|
||||
"module": "NodeNext",
|
||||
"moduleResolution": "NodeNext",
|
||||
"noImplicitAny": true,
|
||||
"esModuleInterop": true,
|
||||
"verbatimModuleSyntax": true
|
||||
"verbatimModuleSyntax": true,
|
||||
"types": ["node"]
|
||||
},
|
||||
"exclude": [
|
||||
"dist_*/**/*.d.ts"
|
||||
|
||||
Reference in New Issue
Block a user