|
|
@@ -12,7 +12,7 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
|
|
|
|
pnpm install @push.rocks/smartstream
|
|
|
|
pnpm install @push.rocks/smartstream
|
|
|
|
```
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
The package ships with two entry points:
|
|
|
|
The package ships with **two entry points** so you can pick exactly what you need:
|
|
|
|
|
|
|
|
|
|
|
|
| Entry Point | Import Path | Environment |
|
|
|
|
| Entry Point | Import Path | Environment |
|
|
|
|
|---|---|---|
|
|
|
|
|---|---|---|
|
|
|
@@ -83,6 +83,8 @@ const splitter = new SmartDuplex<string, string>({
|
|
|
|
Run cleanup or emit final data when the writable side ends:
|
|
|
|
Run cleanup or emit final data when the writable side ends:
|
|
|
|
|
|
|
|
|
|
|
|
```typescript
|
|
|
|
```typescript
|
|
|
|
|
|
|
|
let runningTotal = 0;
|
|
|
|
|
|
|
|
|
|
|
|
const aggregator = new SmartDuplex<number, number>({
|
|
|
|
const aggregator = new SmartDuplex<number, number>({
|
|
|
|
objectMode: true,
|
|
|
|
objectMode: true,
|
|
|
|
writeFunction: async (chunk, tools) => {
|
|
|
|
writeFunction: async (chunk, tools) => {
|
|
|
@@ -95,6 +97,8 @@ const aggregator = new SmartDuplex<number, number>({
|
|
|
|
});
|
|
|
|
});
|
|
|
|
```
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
The `finalFunction` can also push multiple chunks via `tools.push()`, just like `writeFunction`.
|
|
|
|
|
|
|
|
|
|
|
|
#### Truncating a Stream Early
|
|
|
|
#### Truncating a Stream Early
|
|
|
|
|
|
|
|
|
|
|
|
Call `tools.truncate()` inside `writeFunction` to signal that no more data should be read:
|
|
|
|
Call `tools.truncate()` inside `writeFunction` to signal that no more data should be read:
|
|
|
@@ -132,16 +136,36 @@ nodeDuplex.pipe(processTransform).pipe(outputStream);
|
|
|
|
|
|
|
|
|
|
|
|
#### Getting Web Streams from SmartDuplex
|
|
|
|
#### Getting Web Streams from SmartDuplex
|
|
|
|
|
|
|
|
|
|
|
|
Convert a `SmartDuplex` into Web `ReadableStream` + `WritableStream` pair:
|
|
|
|
Convert a `SmartDuplex` into a Web `ReadableStream` + `WritableStream` pair:
|
|
|
|
|
|
|
|
|
|
|
|
```typescript
|
|
|
|
```typescript
|
|
|
|
const duplex = new SmartDuplex({
|
|
|
|
const duplex = new SmartDuplex({
|
|
|
|
|
|
|
|
objectMode: true,
|
|
|
|
writeFunction: async (chunk, tools) => {
|
|
|
|
writeFunction: async (chunk, tools) => {
|
|
|
|
return transform(chunk);
|
|
|
|
return transform(chunk);
|
|
|
|
},
|
|
|
|
},
|
|
|
|
});
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
const { readable, writable } = await duplex.getWebStreams();
|
|
|
|
const { readable, writable } = await duplex.getWebStreams();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const writer = writable.getWriter();
|
|
|
|
|
|
|
|
const reader = readable.getReader();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Read and write concurrently to avoid TransformStream backpressure
|
|
|
|
|
|
|
|
const readAll = async () => {
|
|
|
|
|
|
|
|
const results = [];
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
|
|
|
|
const { value, done } = await reader.read();
|
|
|
|
|
|
|
|
if (done) break;
|
|
|
|
|
|
|
|
results.push(value);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return results;
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const readPromise = readAll();
|
|
|
|
|
|
|
|
await writer.write('hello');
|
|
|
|
|
|
|
|
await writer.close();
|
|
|
|
|
|
|
|
const results = await readPromise;
|
|
|
|
```
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
#### Debug Mode
|
|
|
|
#### Debug Mode
|
|
|
@@ -186,7 +210,7 @@ pipeline.run()
|
|
|
|
.catch((err) => console.error('Pipeline failed:', err));
|
|
|
|
.catch((err) => console.error('Pipeline failed:', err));
|
|
|
|
```
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
You can also listen for custom events across all streams:
|
|
|
|
You can listen for custom events across all streams in the pipeline:
|
|
|
|
|
|
|
|
|
|
|
|
```typescript
|
|
|
|
```typescript
|
|
|
|
pipeline.onCustomEvent('progress', () => {
|
|
|
|
pipeline.onCustomEvent('progress', () => {
|
|
|
@@ -194,6 +218,14 @@ pipeline.onCustomEvent('progress', () => {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
```
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
You can also await `streamStarted()` to know when the pipeline has been wired up:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```typescript
|
|
|
|
|
|
|
|
const runPromise = pipeline.run();
|
|
|
|
|
|
|
|
await pipeline.streamStarted(); // Resolves once pipes are connected
|
|
|
|
|
|
|
|
await runPromise;
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
### 📥 StreamIntake — Dynamic Data Injection
|
|
|
|
### 📥 StreamIntake — Dynamic Data Injection
|
|
|
@@ -222,7 +254,7 @@ intake.pushData('World');
|
|
|
|
intake.signalEnd(); // Signal end-of-stream
|
|
|
|
intake.signalEnd(); // Signal end-of-stream
|
|
|
|
```
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
#### Demand-driven Production with Observable
|
|
|
|
#### Demand-Driven Production with Observable
|
|
|
|
|
|
|
|
|
|
|
|
`pushNextObservable` emits whenever the stream is ready for more data — perfect for throttled or event-driven producers:
|
|
|
|
`pushNextObservable` emits whenever the stream is ready for more data — perfect for throttled or event-driven producers:
|
|
|
|
|
|
|
|
|
|
|
@@ -265,7 +297,10 @@ Quickly create a `SmartDuplex` from a simple async mapping function:
|
|
|
|
```typescript
|
|
|
|
```typescript
|
|
|
|
import { createTransformFunction } from '@push.rocks/smartstream';
|
|
|
|
import { createTransformFunction } from '@push.rocks/smartstream';
|
|
|
|
|
|
|
|
|
|
|
|
const doubler = createTransformFunction<number, number>(async (n) => n * 2);
|
|
|
|
const doubler = createTransformFunction<number, number>(
|
|
|
|
|
|
|
|
async (n) => n * 2,
|
|
|
|
|
|
|
|
{ objectMode: true }
|
|
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
intakeStream.pipe(doubler).pipe(outputStream);
|
|
|
|
intakeStream.pipe(doubler).pipe(outputStream);
|
|
|
|
```
|
|
|
|
```
|
|
|
@@ -299,16 +334,26 @@ const stream = new WebDuplexStream<number, number>({
|
|
|
|
const writer = stream.writable.getWriter();
|
|
|
|
const writer = stream.writable.getWriter();
|
|
|
|
const reader = stream.readable.getReader();
|
|
|
|
const reader = stream.readable.getReader();
|
|
|
|
|
|
|
|
|
|
|
|
// Write
|
|
|
|
// Always read concurrently with writes — TransformStream applies backpressure
|
|
|
|
|
|
|
|
const readPromise = (async () => {
|
|
|
|
|
|
|
|
const results = [];
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
|
|
|
|
const { value, done } = await reader.read();
|
|
|
|
|
|
|
|
if (done) break;
|
|
|
|
|
|
|
|
results.push(value);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return results;
|
|
|
|
|
|
|
|
})();
|
|
|
|
|
|
|
|
|
|
|
|
await writer.write(5);
|
|
|
|
await writer.write(5);
|
|
|
|
await writer.write(10);
|
|
|
|
await writer.write(10);
|
|
|
|
await writer.close();
|
|
|
|
await writer.close();
|
|
|
|
|
|
|
|
|
|
|
|
// Read
|
|
|
|
const results = await readPromise; // [10, 20]
|
|
|
|
const { value } = await reader.read(); // 10
|
|
|
|
|
|
|
|
const { value: v2 } = await reader.read(); // 20
|
|
|
|
|
|
|
|
```
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
> 💡 **Tip:** Because `TransformStream` enforces backpressure between its writable and readable sides, you must start reading *before* or *concurrently with* writes. If you await all writes first and then read, the writes will block waiting for the readable side to drain.
|
|
|
|
|
|
|
|
|
|
|
|
#### From a Uint8Array
|
|
|
|
#### From a Uint8Array
|
|
|
|
|
|
|
|
|
|
|
|
```typescript
|
|
|
|
```typescript
|
|
|
@@ -337,11 +382,24 @@ const reader = stream.readable.getReader();
|
|
|
|
// reads "CHUNK 1", "CHUNK 2"
|
|
|
|
// reads "CHUNK 1", "CHUNK 2"
|
|
|
|
```
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### Final Function
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Emit trailing data when the writable side is closed:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```typescript
|
|
|
|
|
|
|
|
const stream = new WebDuplexStream<string, string>({
|
|
|
|
|
|
|
|
writeFunction: async (chunk) => chunk,
|
|
|
|
|
|
|
|
finalFunction: async (tools) => {
|
|
|
|
|
|
|
|
tools.push('footer');
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
### 🔀 Node ↔ Web Stream Converters
|
|
|
|
### 🔀 Node ↔ Web Stream Converters
|
|
|
|
|
|
|
|
|
|
|
|
The `nodewebhelpers` namespace provides bidirectional converters between Node.js and Web Streams:
|
|
|
|
The `nodewebhelpers` namespace provides bidirectional converters between Node.js and Web Streams with proper backpressure handling:
|
|
|
|
|
|
|
|
|
|
|
|
```typescript
|
|
|
|
```typescript
|
|
|
|
import { nodewebhelpers } from '@push.rocks/smartstream';
|
|
|
|
import { nodewebhelpers } from '@push.rocks/smartstream';
|
|
|
@@ -381,6 +439,16 @@ const nodeReadable2 = nodewebhelpers.convertWebReadableToNodeReadable(webReadabl
|
|
|
|
nodeReadable2.pipe(fs.createWriteStream('./copy.bin'));
|
|
|
|
nodeReadable2.pipe(fs.createWriteStream('./copy.bin'));
|
|
|
|
```
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### Example: Round-Trip Conversion
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```typescript
|
|
|
|
|
|
|
|
// Node → Web → Node (lossless round-trip)
|
|
|
|
|
|
|
|
const original = fs.createReadStream('./photo.jpg');
|
|
|
|
|
|
|
|
const webStream = nodewebhelpers.convertNodeReadableToWebReadable(original);
|
|
|
|
|
|
|
|
const backToNode = nodewebhelpers.convertWebReadableToNodeReadable(webStream);
|
|
|
|
|
|
|
|
backToNode.pipe(fs.createWriteStream('./photo-copy.jpg'));
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
### 🏗️ Backpressure Handling
|
|
|
|
### 🏗️ Backpressure Handling
|
|
|
@@ -417,25 +485,23 @@ fast.end();
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
### 🎯 Real-World Example: Processing Pipeline
|
|
|
|
### 🎯 Real-World Example: Log Processing Pipeline
|
|
|
|
|
|
|
|
|
|
|
|
```typescript
|
|
|
|
```typescript
|
|
|
|
import fs from 'fs';
|
|
|
|
import fs from 'fs';
|
|
|
|
import { SmartDuplex, StreamWrapper } from '@push.rocks/smartstream';
|
|
|
|
import { SmartDuplex, StreamWrapper } from '@push.rocks/smartstream';
|
|
|
|
|
|
|
|
|
|
|
|
// Read → Transform → Filter → Write
|
|
|
|
// Read → Parse → Filter → Write
|
|
|
|
const pipeline = new StreamWrapper([
|
|
|
|
const pipeline = new StreamWrapper([
|
|
|
|
fs.createReadStream('./access.log'),
|
|
|
|
fs.createReadStream('./access.log'),
|
|
|
|
new SmartDuplex({
|
|
|
|
new SmartDuplex({
|
|
|
|
writeFunction: async (chunk) => {
|
|
|
|
writeFunction: async (chunk) => {
|
|
|
|
// Parse each line
|
|
|
|
|
|
|
|
return chunk.toString().split('\n');
|
|
|
|
return chunk.toString().split('\n');
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}),
|
|
|
|
}),
|
|
|
|
new SmartDuplex({
|
|
|
|
new SmartDuplex({
|
|
|
|
objectMode: true,
|
|
|
|
objectMode: true,
|
|
|
|
writeFunction: async (lines: string[], tools) => {
|
|
|
|
writeFunction: async (lines: string[], tools) => {
|
|
|
|
// Filter and push matching lines
|
|
|
|
|
|
|
|
for (const line of lines) {
|
|
|
|
for (const line of lines) {
|
|
|
|
if (line.includes('ERROR')) {
|
|
|
|
if (line.includes('ERROR')) {
|
|
|
|
await tools.push(line + '\n');
|
|
|
|
await tools.push(line + '\n');
|
|
|
@@ -450,6 +516,70 @@ await pipeline.run();
|
|
|
|
console.log('Error extraction complete');
|
|
|
|
console.log('Error extraction complete');
|
|
|
|
```
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
### 📋 API Reference
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### SmartDuplex
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| Member | Type | Description |
|
|
|
|
|
|
|
|
|---|---|---|
|
|
|
|
|
|
|
|
| `new SmartDuplex(options?)` | Constructor | Create a new duplex stream |
|
|
|
|
|
|
|
|
| `options.writeFunction` | `(chunk, tools) => Promise<T>` | Transform each chunk; return to push, or use `tools.push()` |
|
|
|
|
|
|
|
|
| `options.finalFunction` | `(tools) => Promise<T>` | Emit final data when writable ends |
|
|
|
|
|
|
|
|
| `options.readFunction` | `() => Promise<void>` | Supply data to the readable side |
|
|
|
|
|
|
|
|
| `options.debug` | `boolean` | Enable internal logging |
|
|
|
|
|
|
|
|
| `options.name` | `string` | Stream name for debug logs |
|
|
|
|
|
|
|
|
| `SmartDuplex.fromBuffer(buf)` | Static | Create a readable stream from a Buffer |
|
|
|
|
|
|
|
|
| `SmartDuplex.fromWebReadableStream(rs)` | Static | Bridge a Web ReadableStream to Node.js Duplex |
|
|
|
|
|
|
|
|
| `duplex.getWebStreams()` | Method | Get `{ readable, writable }` Web Streams pair |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### WebDuplexStream
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| Member | Type | Description |
|
|
|
|
|
|
|
|
|---|---|---|
|
|
|
|
|
|
|
|
| `new WebDuplexStream(options)` | Constructor | Create a new web transform stream |
|
|
|
|
|
|
|
|
| `options.writeFunction` | `(chunk, tools) => Promise<T>` | Transform each chunk; use `tools.push()` or return |
|
|
|
|
|
|
|
|
| `options.finalFunction` | `(tools) => Promise<T>` | Emit data on flush |
|
|
|
|
|
|
|
|
| `options.readFunction` | `(tools) => Promise<void>` | Supply data via `tools.write()`, signal `tools.done()` |
|
|
|
|
|
|
|
|
| `WebDuplexStream.fromUInt8Array(arr)` | Static | Create a stream from a Uint8Array |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### StreamWrapper
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| Member | Type | Description |
|
|
|
|
|
|
|
|
|---|---|---|
|
|
|
|
|
|
|
|
| `new StreamWrapper(streams[])` | Constructor | Create a pipeline from an array of streams |
|
|
|
|
|
|
|
|
| `wrapper.run()` | Method | Pipe all streams and return a Promise |
|
|
|
|
|
|
|
|
| `wrapper.streamStarted()` | Method | Promise that resolves when pipes are connected |
|
|
|
|
|
|
|
|
| `wrapper.onCustomEvent(name, fn)` | Method | Listen for custom events across all streams |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### StreamIntake
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| Member | Type | Description |
|
|
|
|
|
|
|
|
|---|---|---|
|
|
|
|
|
|
|
|
| `new StreamIntake<T>()` | Constructor | Create a new intake stream (object mode) |
|
|
|
|
|
|
|
|
| `intake.pushData(data)` | Method | Push data into the stream |
|
|
|
|
|
|
|
|
| `intake.signalEnd()` | Method | Signal end of stream |
|
|
|
|
|
|
|
|
| `intake.pushNextObservable` | Property | Observable that emits when the stream wants more data |
|
|
|
|
|
|
|
|
| `StreamIntake.fromStream(stream)` | Static | Wrap a Node.js Readable or Web ReadableStream |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### nodewebhelpers
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| Function | Description |
|
|
|
|
|
|
|
|
|---|---|
|
|
|
|
|
|
|
|
| `createWebReadableStreamFromFile(path)` | File → Web ReadableStream (pull-based backpressure) |
|
|
|
|
|
|
|
|
| `convertWebReadableToNodeReadable(rs)` | Web ReadableStream → Node.js Readable |
|
|
|
|
|
|
|
|
| `convertNodeReadableToWebReadable(ns)` | Node.js Readable → Web ReadableStream (pull-based backpressure) |
|
|
|
|
|
|
|
|
| `convertWebWritableToNodeWritable(ws)` | Web WritableStream → Node.js Writable |
|
|
|
|
|
|
|
|
| `convertNodeWritableToWebWritable(nw)` | Node.js Writable → Web WritableStream |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### Utility Functions
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| Function | Description |
|
|
|
|
|
|
|
|
|---|---|
|
|
|
|
|
|
|
|
| `createTransformFunction(fn, opts?)` | Create a SmartDuplex from an async mapping function |
|
|
|
|
|
|
|
|
| `createPassThrough()` | Create an object-mode passthrough stream |
|
|
|
|
|
|
|
|
|
|
|
|
## License and Legal Information
|
|
|
|
## License and Legal Information
|
|
|
|
|
|
|
|
|
|
|
|
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
|
|
|
|
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
|
|
|
@@ -464,7 +594,7 @@ Use of these trademarks must comply with Task Venture Capital GmbH's Trademark G
|
|
|
|
|
|
|
|
|
|
|
|
### Company Information
|
|
|
|
### Company Information
|
|
|
|
|
|
|
|
|
|
|
|
Task Venture Capital GmbH
|
|
|
|
Task Venture Capital GmbH
|
|
|
|
Registered at District Court Bremen HRB 35230 HB, Germany
|
|
|
|
Registered at District Court Bremen HRB 35230 HB, Germany
|
|
|
|
|
|
|
|
|
|
|
|
For any legal inquiries or further information, please contact us via email at hello@task.vc.
|
|
|
|
For any legal inquiries or further information, please contact us via email at hello@task.vc.
|
|
|
|