From 04d2f3223a8cd2e77d18832d526820135665dbba Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Mon, 2 Mar 2026 07:01:47 +0000 Subject: [PATCH] fix(readme): improve README: clarify entry points, add Web & Node stream examples, finalization and backpressure tips, and comprehensive API reference --- changelog.md | 10 +++ readme.md | 160 +++++++++++++++++++++++++++++++---- ts/00_commitinfo_data.ts | 2 +- ts_web/00_commitinfo_data.ts | 2 +- 4 files changed, 157 insertions(+), 17 deletions(-) diff --git a/changelog.md b/changelog.md index 7d6ce1f..4e00967 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,15 @@ # Changelog +## 2026-03-02 - 3.4.1 - fix(readme) +improve README: clarify entry points, add Web & Node stream examples, finalization and backpressure tips, and comprehensive API reference + +- Clarified package entry points and import paths so consumers can choose Node or Web builds. +- Expanded examples: SmartDuplex finalFunction pushing multiple chunks, converting SmartDuplex to Web streams (reader/writer example), WebDuplexStream finalFunction, and a Node↔Web round-trip conversion example. +- Added guidance and examples for concurrency and backpressure (TransformStream read/write concurrency tip, reading concurrently with writes, and backpressure notes for Node↔Web converters). +- Documented StreamWrapper.streamStarted() and onCustomEvent() usage with examples showing awaiting stream startup. +- Added a new API reference section documenting SmartDuplex, WebDuplexStream, StreamWrapper, StreamIntake, nodewebhelpers, and utility functions. +- Various README wording, formatting, and example clarifications (tips, headings, and minor cosmetic fixes). + ## 2026-03-02 - 3.4.0 - feat(smartduplex) improve backpressure handling and web/node stream interoperability diff --git a/readme.md b/readme.md index c9d7e26..4a7e306 100644 --- a/readme.md +++ b/readme.md @@ -12,7 +12,7 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community pnpm install @push.rocks/smartstream ``` -The package ships with two entry points: +The package ships with **two entry points** so you can pick exactly what you need: | Entry Point | Import Path | Environment | |---|---|---| @@ -83,6 +83,8 @@ const splitter = new SmartDuplex({ Run cleanup or emit final data when the writable side ends: ```typescript +let runningTotal = 0; + const aggregator = new SmartDuplex({ objectMode: true, writeFunction: async (chunk, tools) => { @@ -95,6 +97,8 @@ const aggregator = new SmartDuplex({ }); ``` +The `finalFunction` can also push multiple chunks via `tools.push()`, just like `writeFunction`. + #### Truncating a Stream Early Call `tools.truncate()` inside `writeFunction` to signal that no more data should be read: @@ -132,16 +136,36 @@ nodeDuplex.pipe(processTransform).pipe(outputStream); #### Getting Web Streams from SmartDuplex -Convert a `SmartDuplex` into Web `ReadableStream` + `WritableStream` pair: +Convert a `SmartDuplex` into a Web `ReadableStream` + `WritableStream` pair: ```typescript const duplex = new SmartDuplex({ + objectMode: true, writeFunction: async (chunk, tools) => { return transform(chunk); }, }); const { readable, writable } = await duplex.getWebStreams(); + +const writer = writable.getWriter(); +const reader = readable.getReader(); + +// Read and write concurrently to avoid TransformStream backpressure +const readAll = async () => { + const results = []; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + results.push(value); + } + return results; +}; + +const readPromise = readAll(); +await writer.write('hello'); +await writer.close(); +const results = await readPromise; ``` #### Debug Mode @@ -186,7 +210,7 @@ pipeline.run() .catch((err) => console.error('Pipeline failed:', err)); ``` -You can also listen for custom events across all streams: +You can listen for custom events across all streams in the pipeline: ```typescript pipeline.onCustomEvent('progress', () => { @@ -194,6 +218,14 @@ pipeline.onCustomEvent('progress', () => { }); ``` +You can also await `streamStarted()` to know when the pipeline has been wired up: + +```typescript +const runPromise = pipeline.run(); +await pipeline.streamStarted(); // Resolves once pipes are connected +await runPromise; +``` + --- ### πŸ“₯ StreamIntake β€” Dynamic Data Injection @@ -222,7 +254,7 @@ intake.pushData('World'); intake.signalEnd(); // Signal end-of-stream ``` -#### Demand-driven Production with Observable +#### Demand-Driven Production with Observable `pushNextObservable` emits whenever the stream is ready for more data β€” perfect for throttled or event-driven producers: @@ -265,7 +297,10 @@ Quickly create a `SmartDuplex` from a simple async mapping function: ```typescript import { createTransformFunction } from '@push.rocks/smartstream'; -const doubler = createTransformFunction(async (n) => n * 2); +const doubler = createTransformFunction( + async (n) => n * 2, + { objectMode: true } +); intakeStream.pipe(doubler).pipe(outputStream); ``` @@ -299,16 +334,26 @@ const stream = new WebDuplexStream({ const writer = stream.writable.getWriter(); const reader = stream.readable.getReader(); -// Write +// Always read concurrently with writes β€” TransformStream applies backpressure +const readPromise = (async () => { + const results = []; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + results.push(value); + } + return results; +})(); + await writer.write(5); await writer.write(10); await writer.close(); -// Read -const { value } = await reader.read(); // 10 -const { value: v2 } = await reader.read(); // 20 +const results = await readPromise; // [10, 20] ``` +> πŸ’‘ **Tip:** Because `TransformStream` enforces backpressure between its writable and readable sides, you must start reading *before* or *concurrently with* writes. If you await all writes first and then read, the writes will block waiting for the readable side to drain. + #### From a Uint8Array ```typescript @@ -337,11 +382,24 @@ const reader = stream.readable.getReader(); // reads "CHUNK 1", "CHUNK 2" ``` +#### Final Function + +Emit trailing data when the writable side is closed: + +```typescript +const stream = new WebDuplexStream({ + writeFunction: async (chunk) => chunk, + finalFunction: async (tools) => { + tools.push('footer'); + }, +}); +``` + --- ### πŸ”€ Node ↔ Web Stream Converters -The `nodewebhelpers` namespace provides bidirectional converters between Node.js and Web Streams: +The `nodewebhelpers` namespace provides bidirectional converters between Node.js and Web Streams with proper backpressure handling: ```typescript import { nodewebhelpers } from '@push.rocks/smartstream'; @@ -381,6 +439,16 @@ const nodeReadable2 = nodewebhelpers.convertWebReadableToNodeReadable(webReadabl nodeReadable2.pipe(fs.createWriteStream('./copy.bin')); ``` +#### Example: Round-Trip Conversion + +```typescript +// Node β†’ Web β†’ Node (lossless round-trip) +const original = fs.createReadStream('./photo.jpg'); +const webStream = nodewebhelpers.convertNodeReadableToWebReadable(original); +const backToNode = nodewebhelpers.convertWebReadableToNodeReadable(webStream); +backToNode.pipe(fs.createWriteStream('./photo-copy.jpg')); +``` + --- ### πŸ—οΈ Backpressure Handling @@ -417,25 +485,23 @@ fast.end(); --- -### 🎯 Real-World Example: Processing Pipeline +### 🎯 Real-World Example: Log Processing Pipeline ```typescript import fs from 'fs'; import { SmartDuplex, StreamWrapper } from '@push.rocks/smartstream'; -// Read β†’ Transform β†’ Filter β†’ Write +// Read β†’ Parse β†’ Filter β†’ Write const pipeline = new StreamWrapper([ fs.createReadStream('./access.log'), new SmartDuplex({ writeFunction: async (chunk) => { - // Parse each line return chunk.toString().split('\n'); }, }), new SmartDuplex({ objectMode: true, writeFunction: async (lines: string[], tools) => { - // Filter and push matching lines for (const line of lines) { if (line.includes('ERROR')) { await tools.push(line + '\n'); @@ -450,6 +516,70 @@ await pipeline.run(); console.log('Error extraction complete'); ``` +--- + +### πŸ“‹ API Reference + +#### SmartDuplex + +| Member | Type | Description | +|---|---|---| +| `new SmartDuplex(options?)` | Constructor | Create a new duplex stream | +| `options.writeFunction` | `(chunk, tools) => Promise` | Transform each chunk; return to push, or use `tools.push()` | +| `options.finalFunction` | `(tools) => Promise` | Emit final data when writable ends | +| `options.readFunction` | `() => Promise` | Supply data to the readable side | +| `options.debug` | `boolean` | Enable internal logging | +| `options.name` | `string` | Stream name for debug logs | +| `SmartDuplex.fromBuffer(buf)` | Static | Create a readable stream from a Buffer | +| `SmartDuplex.fromWebReadableStream(rs)` | Static | Bridge a Web ReadableStream to Node.js Duplex | +| `duplex.getWebStreams()` | Method | Get `{ readable, writable }` Web Streams pair | + +#### WebDuplexStream + +| Member | Type | Description | +|---|---|---| +| `new WebDuplexStream(options)` | Constructor | Create a new web transform stream | +| `options.writeFunction` | `(chunk, tools) => Promise` | Transform each chunk; use `tools.push()` or return | +| `options.finalFunction` | `(tools) => Promise` | Emit data on flush | +| `options.readFunction` | `(tools) => Promise` | Supply data via `tools.write()`, signal `tools.done()` | +| `WebDuplexStream.fromUInt8Array(arr)` | Static | Create a stream from a Uint8Array | + +#### StreamWrapper + +| Member | Type | Description | +|---|---|---| +| `new StreamWrapper(streams[])` | Constructor | Create a pipeline from an array of streams | +| `wrapper.run()` | Method | Pipe all streams and return a Promise | +| `wrapper.streamStarted()` | Method | Promise that resolves when pipes are connected | +| `wrapper.onCustomEvent(name, fn)` | Method | Listen for custom events across all streams | + +#### StreamIntake + +| Member | Type | Description | +|---|---|---| +| `new StreamIntake()` | Constructor | Create a new intake stream (object mode) | +| `intake.pushData(data)` | Method | Push data into the stream | +| `intake.signalEnd()` | Method | Signal end of stream | +| `intake.pushNextObservable` | Property | Observable that emits when the stream wants more data | +| `StreamIntake.fromStream(stream)` | Static | Wrap a Node.js Readable or Web ReadableStream | + +#### nodewebhelpers + +| Function | Description | +|---|---| +| `createWebReadableStreamFromFile(path)` | File β†’ Web ReadableStream (pull-based backpressure) | +| `convertWebReadableToNodeReadable(rs)` | Web ReadableStream β†’ Node.js Readable | +| `convertNodeReadableToWebReadable(ns)` | Node.js Readable β†’ Web ReadableStream (pull-based backpressure) | +| `convertWebWritableToNodeWritable(ws)` | Web WritableStream β†’ Node.js Writable | +| `convertNodeWritableToWebWritable(nw)` | Node.js Writable β†’ Web WritableStream | + +#### Utility Functions + +| Function | Description | +|---|---| +| `createTransformFunction(fn, opts?)` | Create a SmartDuplex from an async mapping function | +| `createPassThrough()` | Create an object-mode passthrough stream | + ## License and Legal Information This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file. @@ -464,7 +594,7 @@ Use of these trademarks must comply with Task Venture Capital GmbH's Trademark G ### Company Information -Task Venture Capital GmbH +Task Venture Capital GmbH Registered at District Court Bremen HRB 35230 HB, Germany For any legal inquiries or further information, please contact us via email at hello@task.vc. diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 8051f85..0e65972 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '3.4.0', + version: '3.4.1', description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.' } diff --git a/ts_web/00_commitinfo_data.ts b/ts_web/00_commitinfo_data.ts index 8051f85..0e65972 100644 --- a/ts_web/00_commitinfo_data.ts +++ b/ts_web/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '3.4.0', + version: '3.4.1', description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.' }