Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a765363371 | |||
| fccd0f86ad | |||
| f78469a299 | |||
| 04d2f3223a |
@@ -0,0 +1,27 @@
|
||||
{
|
||||
"@git.zone/cli": {
|
||||
"projectType": "npm",
|
||||
"module": {
|
||||
"githost": "code.foss.global",
|
||||
"gitscope": "push.rocks",
|
||||
"gitrepo": "smartstream",
|
||||
"description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
|
||||
"npmPackagename": "@push.rocks/smartstream",
|
||||
"license": "MIT",
|
||||
"projectDomain": "push.rocks"
|
||||
},
|
||||
"release": {
|
||||
"registries": [
|
||||
"https://verdaccio.lossless.digital",
|
||||
"https://registry.npmjs.org"
|
||||
],
|
||||
"accessLevel": "public"
|
||||
}
|
||||
},
|
||||
"@git.zone/tsdoc": {
|
||||
"legal": "\n## License and Legal Information\n\nThis repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the [license](license) file within this repository. \n\n**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.\n\n### Trademarks\n\nThis project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.\n\n### Company Information\n\nTask Venture Capital GmbH \nRegistered at District court Bremen HRB 35230 HB, Germany\n\nFor any legal inquiries or if you require further information, please contact us via email at hello@task.vc.\n\nBy using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.\n"
|
||||
},
|
||||
"@ship.zone/szci": {
|
||||
"npmGlobalTools": []
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,23 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-04-30 - 3.4.2 - fix(streams)
|
||||
tighten stream typings and guard optional runtime paths for duplex and wrapper utilities
|
||||
|
||||
- allow SmartDuplex write and final handlers to return empty values in addition to transformed output
|
||||
- prevent StreamWrapper from piping or starting when no executable stream chain is produced
|
||||
- guard optional WebDuplexStream readFunction execution before invoking it
|
||||
- update tests and TypeScript configuration to satisfy stricter noImplicitAny checks and newer tstest tooling
|
||||
|
||||
## 2026-03-02 - 3.4.1 - fix(readme)
|
||||
improve README: clarify entry points, add Web & Node stream examples, finalization and backpressure tips, and comprehensive API reference
|
||||
|
||||
- Clarified package entry points and import paths so consumers can choose Node or Web builds.
|
||||
- Expanded examples: SmartDuplex finalFunction pushing multiple chunks, converting SmartDuplex to Web streams (reader/writer example), WebDuplexStream finalFunction, and a Node↔Web round-trip conversion example.
|
||||
- Added guidance and examples for concurrency and backpressure (TransformStream read/write concurrency tip, reading concurrently with writes, and backpressure notes for Node↔Web converters).
|
||||
- Documented StreamWrapper.streamStarted() and onCustomEvent() usage with examples showing awaiting stream startup.
|
||||
- Added a new API reference section documenting SmartDuplex, WebDuplexStream, StreamWrapper, StreamIntake, nodewebhelpers, and utility functions.
|
||||
- Various README wording, formatting, and example clarifications (tips, headings, and minor cosmetic fixes).
|
||||
|
||||
## 2026-03-02 - 3.4.0 - feat(smartduplex)
|
||||
improve backpressure handling and web/node stream interoperability
|
||||
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2016 Push.Rocks
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
+11
-9
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartstream",
|
||||
"version": "3.4.0",
|
||||
"version": "3.4.2",
|
||||
"private": false,
|
||||
"description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
|
||||
"type": "module",
|
||||
@@ -10,7 +10,7 @@
|
||||
},
|
||||
"scripts": {
|
||||
"test": "(tstest test/ --verbose --logfile --timeout 60)",
|
||||
"build": "(tsbuild tsfolders --allowimplicitany)"
|
||||
"build": "(tsbuild tsfolders)"
|
||||
},
|
||||
"repository": {
|
||||
"type": "git",
|
||||
@@ -23,14 +23,13 @@
|
||||
},
|
||||
"homepage": "https://code.foss.global/push.rocks/smartstream",
|
||||
"devDependencies": {
|
||||
"@git.zone/tsbuild": "^4.1.2",
|
||||
"@git.zone/tsrun": "^2.0.1",
|
||||
"@git.zone/tstest": "^3.1.8",
|
||||
"@push.rocks/tapbundle": "^6.0.3",
|
||||
"@types/node": "^25.3.3"
|
||||
"@git.zone/tsbuild": "^4.4.0",
|
||||
"@git.zone/tsrun": "^2.0.2",
|
||||
"@git.zone/tstest": "^3.6.3",
|
||||
"@types/node": "^25.6.0"
|
||||
},
|
||||
"dependencies": {
|
||||
"@push.rocks/lik": "^6.3.1",
|
||||
"@push.rocks/lik": "^6.4.1",
|
||||
"@push.rocks/smartenv": "^6.0.0",
|
||||
"@push.rocks/smartpromise": "^4.2.3",
|
||||
"@push.rocks/smartrx": "^3.0.10"
|
||||
@@ -47,6 +46,8 @@
|
||||
"dist_ts_web/**/*",
|
||||
"assets/**/*",
|
||||
"cli.js",
|
||||
".smartconfig.json",
|
||||
"license",
|
||||
"npmextra.json",
|
||||
"readme.md"
|
||||
],
|
||||
@@ -73,5 +74,6 @@
|
||||
"buffer",
|
||||
"stream utilities",
|
||||
"esm"
|
||||
]
|
||||
],
|
||||
"packageManager": "pnpm@10.28.2"
|
||||
}
|
||||
|
||||
Generated
+1584
-4216
File diff suppressed because it is too large
Load Diff
@@ -12,7 +12,7 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
|
||||
pnpm install @push.rocks/smartstream
|
||||
```
|
||||
|
||||
The package ships with two entry points:
|
||||
The package ships with **two entry points** so you can pick exactly what you need:
|
||||
|
||||
| Entry Point | Import Path | Environment |
|
||||
|---|---|---|
|
||||
@@ -83,6 +83,8 @@ const splitter = new SmartDuplex<string, string>({
|
||||
Run cleanup or emit final data when the writable side ends:
|
||||
|
||||
```typescript
|
||||
let runningTotal = 0;
|
||||
|
||||
const aggregator = new SmartDuplex<number, number>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
@@ -95,6 +97,8 @@ const aggregator = new SmartDuplex<number, number>({
|
||||
});
|
||||
```
|
||||
|
||||
The `finalFunction` can also push multiple chunks via `tools.push()`, just like `writeFunction`.
|
||||
|
||||
#### Truncating a Stream Early
|
||||
|
||||
Call `tools.truncate()` inside `writeFunction` to signal that no more data should be read:
|
||||
@@ -132,16 +136,36 @@ nodeDuplex.pipe(processTransform).pipe(outputStream);
|
||||
|
||||
#### Getting Web Streams from SmartDuplex
|
||||
|
||||
Convert a `SmartDuplex` into Web `ReadableStream` + `WritableStream` pair:
|
||||
Convert a `SmartDuplex` into a Web `ReadableStream` + `WritableStream` pair:
|
||||
|
||||
```typescript
|
||||
const duplex = new SmartDuplex({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
return transform(chunk);
|
||||
},
|
||||
});
|
||||
|
||||
const { readable, writable } = await duplex.getWebStreams();
|
||||
|
||||
const writer = writable.getWriter();
|
||||
const reader = readable.getReader();
|
||||
|
||||
// Read and write concurrently to avoid TransformStream backpressure
|
||||
const readAll = async () => {
|
||||
const results = [];
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
results.push(value);
|
||||
}
|
||||
return results;
|
||||
};
|
||||
|
||||
const readPromise = readAll();
|
||||
await writer.write('hello');
|
||||
await writer.close();
|
||||
const results = await readPromise;
|
||||
```
|
||||
|
||||
#### Debug Mode
|
||||
@@ -186,7 +210,7 @@ pipeline.run()
|
||||
.catch((err) => console.error('Pipeline failed:', err));
|
||||
```
|
||||
|
||||
You can also listen for custom events across all streams:
|
||||
You can listen for custom events across all streams in the pipeline:
|
||||
|
||||
```typescript
|
||||
pipeline.onCustomEvent('progress', () => {
|
||||
@@ -194,6 +218,14 @@ pipeline.onCustomEvent('progress', () => {
|
||||
});
|
||||
```
|
||||
|
||||
You can also await `streamStarted()` to know when the pipeline has been wired up:
|
||||
|
||||
```typescript
|
||||
const runPromise = pipeline.run();
|
||||
await pipeline.streamStarted(); // Resolves once pipes are connected
|
||||
await runPromise;
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 📥 StreamIntake — Dynamic Data Injection
|
||||
@@ -222,7 +254,7 @@ intake.pushData('World');
|
||||
intake.signalEnd(); // Signal end-of-stream
|
||||
```
|
||||
|
||||
#### Demand-driven Production with Observable
|
||||
#### Demand-Driven Production with Observable
|
||||
|
||||
`pushNextObservable` emits whenever the stream is ready for more data — perfect for throttled or event-driven producers:
|
||||
|
||||
@@ -265,7 +297,10 @@ Quickly create a `SmartDuplex` from a simple async mapping function:
|
||||
```typescript
|
||||
import { createTransformFunction } from '@push.rocks/smartstream';
|
||||
|
||||
const doubler = createTransformFunction<number, number>(async (n) => n * 2);
|
||||
const doubler = createTransformFunction<number, number>(
|
||||
async (n) => n * 2,
|
||||
{ objectMode: true }
|
||||
);
|
||||
|
||||
intakeStream.pipe(doubler).pipe(outputStream);
|
||||
```
|
||||
@@ -299,16 +334,26 @@ const stream = new WebDuplexStream<number, number>({
|
||||
const writer = stream.writable.getWriter();
|
||||
const reader = stream.readable.getReader();
|
||||
|
||||
// Write
|
||||
// Always read concurrently with writes — TransformStream applies backpressure
|
||||
const readPromise = (async () => {
|
||||
const results = [];
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
results.push(value);
|
||||
}
|
||||
return results;
|
||||
})();
|
||||
|
||||
await writer.write(5);
|
||||
await writer.write(10);
|
||||
await writer.close();
|
||||
|
||||
// Read
|
||||
const { value } = await reader.read(); // 10
|
||||
const { value: v2 } = await reader.read(); // 20
|
||||
const results = await readPromise; // [10, 20]
|
||||
```
|
||||
|
||||
> 💡 **Tip:** Because `TransformStream` enforces backpressure between its writable and readable sides, you must start reading *before* or *concurrently with* writes. If you await all writes first and then read, the writes will block waiting for the readable side to drain.
|
||||
|
||||
#### From a Uint8Array
|
||||
|
||||
```typescript
|
||||
@@ -337,11 +382,24 @@ const reader = stream.readable.getReader();
|
||||
// reads "CHUNK 1", "CHUNK 2"
|
||||
```
|
||||
|
||||
#### Final Function
|
||||
|
||||
Emit trailing data when the writable side is closed:
|
||||
|
||||
```typescript
|
||||
const stream = new WebDuplexStream<string, string>({
|
||||
writeFunction: async (chunk) => chunk,
|
||||
finalFunction: async (tools) => {
|
||||
tools.push('footer');
|
||||
},
|
||||
});
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 🔀 Node ↔ Web Stream Converters
|
||||
|
||||
The `nodewebhelpers` namespace provides bidirectional converters between Node.js and Web Streams:
|
||||
The `nodewebhelpers` namespace provides bidirectional converters between Node.js and Web Streams with proper backpressure handling:
|
||||
|
||||
```typescript
|
||||
import { nodewebhelpers } from '@push.rocks/smartstream';
|
||||
@@ -381,6 +439,16 @@ const nodeReadable2 = nodewebhelpers.convertWebReadableToNodeReadable(webReadabl
|
||||
nodeReadable2.pipe(fs.createWriteStream('./copy.bin'));
|
||||
```
|
||||
|
||||
#### Example: Round-Trip Conversion
|
||||
|
||||
```typescript
|
||||
// Node → Web → Node (lossless round-trip)
|
||||
const original = fs.createReadStream('./photo.jpg');
|
||||
const webStream = nodewebhelpers.convertNodeReadableToWebReadable(original);
|
||||
const backToNode = nodewebhelpers.convertWebReadableToNodeReadable(webStream);
|
||||
backToNode.pipe(fs.createWriteStream('./photo-copy.jpg'));
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 🏗️ Backpressure Handling
|
||||
@@ -417,25 +485,23 @@ fast.end();
|
||||
|
||||
---
|
||||
|
||||
### 🎯 Real-World Example: Processing Pipeline
|
||||
### 🎯 Real-World Example: Log Processing Pipeline
|
||||
|
||||
```typescript
|
||||
import fs from 'fs';
|
||||
import { SmartDuplex, StreamWrapper } from '@push.rocks/smartstream';
|
||||
|
||||
// Read → Transform → Filter → Write
|
||||
// Read → Parse → Filter → Write
|
||||
const pipeline = new StreamWrapper([
|
||||
fs.createReadStream('./access.log'),
|
||||
new SmartDuplex({
|
||||
writeFunction: async (chunk) => {
|
||||
// Parse each line
|
||||
return chunk.toString().split('\n');
|
||||
},
|
||||
}),
|
||||
new SmartDuplex({
|
||||
objectMode: true,
|
||||
writeFunction: async (lines: string[], tools) => {
|
||||
// Filter and push matching lines
|
||||
for (const line of lines) {
|
||||
if (line.includes('ERROR')) {
|
||||
await tools.push(line + '\n');
|
||||
@@ -450,6 +516,70 @@ await pipeline.run();
|
||||
console.log('Error extraction complete');
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 📋 API Reference
|
||||
|
||||
#### SmartDuplex
|
||||
|
||||
| Member | Type | Description |
|
||||
|---|---|---|
|
||||
| `new SmartDuplex(options?)` | Constructor | Create a new duplex stream |
|
||||
| `options.writeFunction` | `(chunk, tools) => Promise<T>` | Transform each chunk; return to push, or use `tools.push()` |
|
||||
| `options.finalFunction` | `(tools) => Promise<T>` | Emit final data when writable ends |
|
||||
| `options.readFunction` | `() => Promise<void>` | Supply data to the readable side |
|
||||
| `options.debug` | `boolean` | Enable internal logging |
|
||||
| `options.name` | `string` | Stream name for debug logs |
|
||||
| `SmartDuplex.fromBuffer(buf)` | Static | Create a readable stream from a Buffer |
|
||||
| `SmartDuplex.fromWebReadableStream(rs)` | Static | Bridge a Web ReadableStream to Node.js Duplex |
|
||||
| `duplex.getWebStreams()` | Method | Get `{ readable, writable }` Web Streams pair |
|
||||
|
||||
#### WebDuplexStream
|
||||
|
||||
| Member | Type | Description |
|
||||
|---|---|---|
|
||||
| `new WebDuplexStream(options)` | Constructor | Create a new web transform stream |
|
||||
| `options.writeFunction` | `(chunk, tools) => Promise<T>` | Transform each chunk; use `tools.push()` or return |
|
||||
| `options.finalFunction` | `(tools) => Promise<T>` | Emit data on flush |
|
||||
| `options.readFunction` | `(tools) => Promise<void>` | Supply data via `tools.write()`, signal `tools.done()` |
|
||||
| `WebDuplexStream.fromUInt8Array(arr)` | Static | Create a stream from a Uint8Array |
|
||||
|
||||
#### StreamWrapper
|
||||
|
||||
| Member | Type | Description |
|
||||
|---|---|---|
|
||||
| `new StreamWrapper(streams[])` | Constructor | Create a pipeline from an array of streams |
|
||||
| `wrapper.run()` | Method | Pipe all streams and return a Promise |
|
||||
| `wrapper.streamStarted()` | Method | Promise that resolves when pipes are connected |
|
||||
| `wrapper.onCustomEvent(name, fn)` | Method | Listen for custom events across all streams |
|
||||
|
||||
#### StreamIntake
|
||||
|
||||
| Member | Type | Description |
|
||||
|---|---|---|
|
||||
| `new StreamIntake<T>()` | Constructor | Create a new intake stream (object mode) |
|
||||
| `intake.pushData(data)` | Method | Push data into the stream |
|
||||
| `intake.signalEnd()` | Method | Signal end of stream |
|
||||
| `intake.pushNextObservable` | Property | Observable that emits when the stream wants more data |
|
||||
| `StreamIntake.fromStream(stream)` | Static | Wrap a Node.js Readable or Web ReadableStream |
|
||||
|
||||
#### nodewebhelpers
|
||||
|
||||
| Function | Description |
|
||||
|---|---|
|
||||
| `createWebReadableStreamFromFile(path)` | File → Web ReadableStream (pull-based backpressure) |
|
||||
| `convertWebReadableToNodeReadable(rs)` | Web ReadableStream → Node.js Readable |
|
||||
| `convertNodeReadableToWebReadable(ns)` | Node.js Readable → Web ReadableStream (pull-based backpressure) |
|
||||
| `convertWebWritableToNodeWritable(ws)` | Web WritableStream → Node.js Writable |
|
||||
| `convertNodeWritableToWebWritable(nw)` | Node.js Writable → Web WritableStream |
|
||||
|
||||
#### Utility Functions
|
||||
|
||||
| Function | Description |
|
||||
|---|---|
|
||||
| `createTransformFunction(fn, opts?)` | Create a SmartDuplex from an async mapping function |
|
||||
| `createPassThrough()` | Create an object-mode passthrough stream |
|
||||
|
||||
## License and Legal Information
|
||||
|
||||
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
|
||||
@@ -464,7 +594,7 @@ Use of these trademarks must comply with Task Venture Capital GmbH's Trademark G
|
||||
|
||||
### Company Information
|
||||
|
||||
Task Venture Capital GmbH
|
||||
Task Venture Capital GmbH
|
||||
Registered at District Court Bremen HRB 35230 HB, Germany
|
||||
|
||||
For any legal inquiries or further information, please contact us via email at hello@task.vc.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { tap, expect } from '@push.rocks/tapbundle';
|
||||
import { tap, expect } from '@git.zone/tstest/tapbundle';
|
||||
import { SmartDuplex } from '../ts/index.js';
|
||||
|
||||
tap.test('Backpressure: should apply backpressure across piped streams', async (toolsArg) => {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import * as fs from 'fs';
|
||||
import * as stream from 'stream';
|
||||
import { nodewebhelpers } from '../ts/index.js';
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import * as fs from 'fs';
|
||||
import * as smartstream from '../ts/index.js';
|
||||
import { SmartDuplex } from '../ts/smartstream.classes.smartduplex.js';
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import * as fs from 'fs';
|
||||
import { StreamIntake, SmartDuplex } from '../ts/index.js';
|
||||
import * as stream from 'stream';
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import * as fs from 'fs';
|
||||
import { StreamWrapper, SmartDuplex } from '../ts/index.js';
|
||||
|
||||
@@ -27,7 +27,8 @@ tap.test('StreamWrapper: should propagate errors', async (tools) => {
|
||||
await wrapper.run();
|
||||
} catch (err) {
|
||||
errorCaught = true;
|
||||
expect(err.message).toEqual('intentional error');
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
expect(message).toEqual('intentional error');
|
||||
}
|
||||
expect(errorCaught).toBeTrue();
|
||||
});
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import { createTransformFunction, createPassThrough, SmartDuplex, StreamWrapper } from '../ts/index.js';
|
||||
|
||||
// =============================================
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import { WebDuplexStream } from '../ts_web/index.js';
|
||||
|
||||
// Helper: collect all chunks from a readable
|
||||
async function collectAll<T>(reader: ReadableStreamDefaultReader<T>): Promise<T[]> {
|
||||
const results: T[] = [];
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
results.push(value);
|
||||
const result = await reader.read();
|
||||
if (result.done) break;
|
||||
results.push(result.value);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
@@ -70,6 +70,9 @@ tap.test('WebDuplexStream: fromUInt8Array should produce data', async () => {
|
||||
|
||||
const { value } = await reader.read();
|
||||
expect(value).toBeTruthy();
|
||||
if (!value) {
|
||||
throw new Error('Expected fromUInt8Array to produce data');
|
||||
}
|
||||
expect(value.length).toEqual(5);
|
||||
});
|
||||
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartstream',
|
||||
version: '3.4.0',
|
||||
version: '3.4.2',
|
||||
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
|
||||
}
|
||||
|
||||
@@ -7,11 +7,11 @@ export interface IStreamTools {
|
||||
}
|
||||
|
||||
export interface IStreamWriteFunction<T, rT> {
|
||||
(chunkArg: T, toolsArg: IStreamTools): Promise<rT>;
|
||||
(chunkArg: T, toolsArg: IStreamTools): Promise<rT | void | null | undefined>;
|
||||
}
|
||||
|
||||
export interface IStreamFinalFunction<rT> {
|
||||
(toolsArg: IStreamTools): Promise<rT>;
|
||||
(toolsArg: IStreamTools): Promise<rT | void | null | undefined>;
|
||||
}
|
||||
|
||||
export interface ISmartDuplexOptions<TInput, TOutput> extends DuplexOptions {
|
||||
@@ -92,7 +92,7 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
||||
}
|
||||
|
||||
// INSTANCE
|
||||
private backpressuredArray: plugins.lik.BackpressuredArray<TOutput>;
|
||||
private backpressuredArray: plugins.lik.BackpressuredArray<TOutput | null>;
|
||||
public options: ISmartDuplexOptions<TInput, TOutput>;
|
||||
private _consumerWantsData = false;
|
||||
private _readFunctionRunning = false;
|
||||
@@ -114,7 +114,7 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
||||
)
|
||||
);
|
||||
this.options = safeOptions;
|
||||
this.backpressuredArray = new plugins.lik.BackpressuredArray<TOutput>(
|
||||
this.backpressuredArray = new plugins.lik.BackpressuredArray<TOutput | null>(
|
||||
this.options.highWaterMark || 1
|
||||
);
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ export class StreamWrapper {
|
||||
}
|
||||
|
||||
// combine the stream
|
||||
let finalStream = null;
|
||||
let finalStream: plugins.stream.Duplex | null = null;
|
||||
let firstIteration: boolean = true;
|
||||
for (const stream of streamExecutionArray) {
|
||||
if (firstIteration === true) {
|
||||
@@ -71,12 +71,17 @@ export class StreamWrapper {
|
||||
for (const customEventObject of this.customEventObjectArray) {
|
||||
stream.on(customEventObject.eventName, customEventObject.eventFunction);
|
||||
}
|
||||
if (!firstIteration) {
|
||||
if (!firstIteration && finalStream) {
|
||||
finalStream = finalStream.pipe(stream);
|
||||
}
|
||||
firstIteration = false;
|
||||
}
|
||||
|
||||
if (!finalStream) {
|
||||
done.resolve();
|
||||
return done.promise;
|
||||
}
|
||||
|
||||
this.streamStartedDeferred.resolve();
|
||||
|
||||
let resolved = false;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { type TransformOptions } from 'stream';
|
||||
import { type DuplexOptions } from 'stream';
|
||||
import { SmartDuplex } from './smartstream.classes.smartduplex.js';
|
||||
|
||||
export interface AsyncTransformFunction<TInput, TOutput> {
|
||||
@@ -7,7 +7,7 @@ export interface AsyncTransformFunction<TInput, TOutput> {
|
||||
|
||||
export function createTransformFunction<TInput, TOutput>(
|
||||
asyncFunction: AsyncTransformFunction<TInput, TOutput>,
|
||||
options?: TransformOptions
|
||||
options?: DuplexOptions
|
||||
): SmartDuplex {
|
||||
const smartDuplexStream = new SmartDuplex({
|
||||
...options,
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartstream',
|
||||
version: '3.4.0',
|
||||
version: '3.4.2',
|
||||
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
|
||||
}
|
||||
|
||||
@@ -112,7 +112,10 @@ export class WebDuplexStream<TInput = any, TOutput = any> extends TransformStrea
|
||||
};
|
||||
|
||||
try {
|
||||
await this.options.readFunction(tools);
|
||||
const readFunction = this.options.readFunction;
|
||||
if (readFunction) {
|
||||
await readFunction(tools);
|
||||
}
|
||||
if (doneSignaled) {
|
||||
await writer.close();
|
||||
}
|
||||
|
||||
+3
-1
@@ -5,8 +5,10 @@
|
||||
"target": "ES2022",
|
||||
"module": "NodeNext",
|
||||
"moduleResolution": "NodeNext",
|
||||
"noImplicitAny": true,
|
||||
"esModuleInterop": true,
|
||||
"verbatimModuleSyntax": true
|
||||
"verbatimModuleSyntax": true,
|
||||
"types": ["node"]
|
||||
},
|
||||
"exclude": [
|
||||
"dist_*/**/*.d.ts"
|
||||
|
||||
Reference in New Issue
Block a user