Compare commits
13 Commits
Author | SHA1 | Date | |
---|---|---|---|
1cb6f727af | |||
824c44d165 | |||
3e062103f8 | |||
6451e93c12 | |||
70cf93595c | |||
17e03e9790 | |||
e52ce7af61 | |||
f548f4b6cb | |||
23a7a77a73 | |||
13d2fc78b8 | |||
898cc0407d | |||
8a3f43a11a | |||
da2191bb96 |
140
.gitlab-ci.yml
140
.gitlab-ci.yml
@ -1,140 +0,0 @@
|
||||
# gitzone ci_default
|
||||
image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
|
||||
|
||||
cache:
|
||||
paths:
|
||||
- .npmci_cache/
|
||||
key: '$CI_BUILD_STAGE'
|
||||
|
||||
stages:
|
||||
- security
|
||||
- test
|
||||
- release
|
||||
- metadata
|
||||
|
||||
before_script:
|
||||
- npm install -g @shipzone/npmci
|
||||
|
||||
# ====================
|
||||
# security stage
|
||||
# ====================
|
||||
mirror:
|
||||
stage: security
|
||||
script:
|
||||
- npmci git mirror
|
||||
only:
|
||||
- tags
|
||||
tags:
|
||||
- lossless
|
||||
- docker
|
||||
- notpriv
|
||||
|
||||
auditProductionDependencies:
|
||||
image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
|
||||
stage: security
|
||||
script:
|
||||
- npmci npm prepare
|
||||
- npmci command npm install --production --ignore-scripts
|
||||
- npmci command npm config set registry https://registry.npmjs.org
|
||||
- npmci command npm audit --audit-level=high --only=prod --production
|
||||
tags:
|
||||
- docker
|
||||
allow_failure: true
|
||||
|
||||
auditDevDependencies:
|
||||
image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
|
||||
stage: security
|
||||
script:
|
||||
- npmci npm prepare
|
||||
- npmci command npm install --ignore-scripts
|
||||
- npmci command npm config set registry https://registry.npmjs.org
|
||||
- npmci command npm audit --audit-level=high --only=dev
|
||||
tags:
|
||||
- docker
|
||||
allow_failure: true
|
||||
|
||||
# ====================
|
||||
# test stage
|
||||
# ====================
|
||||
|
||||
testStable:
|
||||
stage: test
|
||||
script:
|
||||
- npmci npm prepare
|
||||
- npmci node install stable
|
||||
- npmci npm install
|
||||
- npmci npm test
|
||||
coverage: /\d+.?\d+?\%\s*coverage/
|
||||
tags:
|
||||
- docker
|
||||
|
||||
testBuild:
|
||||
stage: test
|
||||
script:
|
||||
- npmci npm prepare
|
||||
- npmci node install stable
|
||||
- npmci npm install
|
||||
- npmci command npm run build
|
||||
coverage: /\d+.?\d+?\%\s*coverage/
|
||||
tags:
|
||||
- docker
|
||||
|
||||
release:
|
||||
stage: release
|
||||
script:
|
||||
- npmci node install stable
|
||||
- npmci npm publish
|
||||
only:
|
||||
- tags
|
||||
tags:
|
||||
- lossless
|
||||
- docker
|
||||
- notpriv
|
||||
|
||||
# ====================
|
||||
# metadata stage
|
||||
# ====================
|
||||
codequality:
|
||||
stage: metadata
|
||||
allow_failure: true
|
||||
only:
|
||||
- tags
|
||||
script:
|
||||
- npmci command npm install -g typescript
|
||||
- npmci npm prepare
|
||||
- npmci npm install
|
||||
tags:
|
||||
- lossless
|
||||
- docker
|
||||
- priv
|
||||
|
||||
trigger:
|
||||
stage: metadata
|
||||
script:
|
||||
- npmci trigger
|
||||
only:
|
||||
- tags
|
||||
tags:
|
||||
- lossless
|
||||
- docker
|
||||
- notpriv
|
||||
|
||||
pages:
|
||||
stage: metadata
|
||||
script:
|
||||
- npmci node install lts
|
||||
- npmci command npm install -g @git.zone/tsdoc
|
||||
- npmci npm prepare
|
||||
- npmci npm install
|
||||
- npmci command tsdoc
|
||||
tags:
|
||||
- lossless
|
||||
- docker
|
||||
- notpriv
|
||||
only:
|
||||
- tags
|
||||
artifacts:
|
||||
expire_in: 1 week
|
||||
paths:
|
||||
- public
|
||||
allow_failure: true
|
@ -1 +0,0 @@
|
||||
console.log('Hello from deno');
|
@ -9,27 +9,27 @@
|
||||
"githost": "code.foss.global",
|
||||
"gitscope": "push.rocks",
|
||||
"gitrepo": "smartstream",
|
||||
"description": "simplifies access to node streams",
|
||||
"description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
|
||||
"npmPackagename": "@push.rocks/smartstream",
|
||||
"license": "MIT",
|
||||
"keywords": [
|
||||
"stream",
|
||||
"node.js",
|
||||
"streams",
|
||||
"typescript",
|
||||
"stream manipulation",
|
||||
"pipeline",
|
||||
"data processing",
|
||||
"pipeline",
|
||||
"async transformation",
|
||||
"event handling",
|
||||
"backpressure management",
|
||||
"readable streams",
|
||||
"writable streams",
|
||||
"duplex streams",
|
||||
"transform streams",
|
||||
"backpressure",
|
||||
"readable stream",
|
||||
"writable stream",
|
||||
"duplex stream",
|
||||
"transform stream",
|
||||
"file streaming",
|
||||
"buffer streams",
|
||||
"buffer",
|
||||
"stream utilities",
|
||||
"stream intake",
|
||||
"stream output"
|
||||
"esm"
|
||||
]
|
||||
}
|
||||
},
|
||||
|
53
package.json
53
package.json
@ -1,39 +1,40 @@
|
||||
{
|
||||
"name": "@push.rocks/smartstream",
|
||||
"version": "3.0.35",
|
||||
"version": "3.0.41",
|
||||
"private": false,
|
||||
"description": "simplifies access to node streams",
|
||||
"main": "dist_ts/index.js",
|
||||
"typings": "dist_ts/index.d.ts",
|
||||
"description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
|
||||
"type": "module",
|
||||
"exports": {
|
||||
".": "./dist_ts/index.js",
|
||||
"./web": "./dist_ts_web/index.js"
|
||||
},
|
||||
"scripts": {
|
||||
"test": "(tstest test/)",
|
||||
"build": "(tsbuild)",
|
||||
"buildDocs": "tsdoc"
|
||||
"build": "(tsbuild tsfolders --web --allowimplicitany)"
|
||||
},
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "git+https://gitlab.com/push.rocks/smartstream.git"
|
||||
"url": "https://code.foss.global/push.rocks/smartstream.git"
|
||||
},
|
||||
"author": "Lossless GmbH",
|
||||
"license": "MIT",
|
||||
"bugs": {
|
||||
"url": "https://gitlab.com/push.rocks/smartstream/issues"
|
||||
},
|
||||
"homepage": "https://gitlab.com/push.rocks/smartstream#readme",
|
||||
"homepage": "https://code.foss.global/push.rocks/smartstream",
|
||||
"devDependencies": {
|
||||
"@git.zone/tsbuild": "^2.1.72",
|
||||
"@git.zone/tsbuild": "^2.1.80",
|
||||
"@git.zone/tsrun": "^1.2.44",
|
||||
"@git.zone/tstest": "^1.0.88",
|
||||
"@push.rocks/smartfile": "^11.0.4",
|
||||
"@push.rocks/tapbundle": "^5.0.17",
|
||||
"@types/node": "^20.11.28"
|
||||
"@git.zone/tstest": "^1.0.90",
|
||||
"@push.rocks/smartfile": "^11.0.15",
|
||||
"@push.rocks/tapbundle": "^5.0.23",
|
||||
"@types/node": "^20.12.12"
|
||||
},
|
||||
"dependencies": {
|
||||
"@push.rocks/lik": "^6.0.14",
|
||||
"@push.rocks/lik": "^6.0.15",
|
||||
"@push.rocks/smartenv": "^5.0.12",
|
||||
"@push.rocks/smartpromise": "^4.0.3",
|
||||
"@push.rocks/smartrx": "^3.0.7",
|
||||
"@push.rocks/webstream": "^1.0.8"
|
||||
"@push.rocks/smartrx": "^3.0.7"
|
||||
},
|
||||
"browserslist": [
|
||||
"last 1 chrome versions"
|
||||
@ -51,22 +52,22 @@
|
||||
"readme.md"
|
||||
],
|
||||
"keywords": [
|
||||
"stream",
|
||||
"node.js",
|
||||
"streams",
|
||||
"typescript",
|
||||
"stream manipulation",
|
||||
"pipeline",
|
||||
"data processing",
|
||||
"pipeline",
|
||||
"async transformation",
|
||||
"event handling",
|
||||
"backpressure management",
|
||||
"readable streams",
|
||||
"writable streams",
|
||||
"duplex streams",
|
||||
"transform streams",
|
||||
"backpressure",
|
||||
"readable stream",
|
||||
"writable stream",
|
||||
"duplex stream",
|
||||
"transform stream",
|
||||
"file streaming",
|
||||
"buffer streams",
|
||||
"buffer",
|
||||
"stream utilities",
|
||||
"stream intake",
|
||||
"stream output"
|
||||
"esm"
|
||||
]
|
||||
}
|
||||
|
7505
pnpm-lock.yaml
generated
7505
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
@ -1 +1 @@
|
||||
|
||||
- make sure to respect backpressure handling.
|
242
readme.md
242
readme.md
@ -1,5 +1,6 @@
|
||||
```markdown
|
||||
# @push.rocks/smartstream
|
||||
simplifies access to node streams
|
||||
A TypeScript library to simplify the creation and manipulation of Node.js streams, providing utilities for transform, duplex, and readable/writable stream handling while managing backpressure effectively.
|
||||
|
||||
## Install
|
||||
To install `@push.rocks/smartstream`, you can use npm or yarn as follows:
|
||||
@ -14,7 +15,7 @@ This will add `@push.rocks/smartstream` to your project's dependencies.
|
||||
|
||||
## Usage
|
||||
|
||||
The `@push.rocks/smartstream` module is designed to simplify working with Node.js streams by providing a set of utilities for creating and manipulating streams. This module makes heavy use of TypeScript for improved code quality, readability, and maintenance. ESM syntax is utilized throughout the examples.
|
||||
The `@push.rocks/smartstream` module is designed to simplify working with Node.js streams by providing a set of utilities for creating and manipulating streams. This module makes extensive use of TypeScript for improved code quality, readability, and maintenance. ESM syntax is utilized throughout the examples.
|
||||
|
||||
### Importing the Module
|
||||
|
||||
@ -24,6 +25,12 @@ Start by importing the module into your TypeScript file:
|
||||
import * as smartstream from '@push.rocks/smartstream';
|
||||
```
|
||||
|
||||
For a more specific import, you may do the following:
|
||||
|
||||
```typescript
|
||||
import { SmartDuplex, StreamWrapper, StreamIntake, createTransformFunction, createPassThrough } from '@push.rocks/smartstream';
|
||||
```
|
||||
|
||||
### Creating Basic Transform Streams
|
||||
|
||||
The module provides utilities for creating transform streams. For example, to create a transform stream that modifies chunks of data, you can use the `createTransformFunction` utility:
|
||||
@ -58,7 +65,7 @@ const processDataDuplex = new SmartDuplex({
|
||||
sourceStream.pipe(processDataDuplex).pipe(destinationStream);
|
||||
```
|
||||
|
||||
### Stream Combiners
|
||||
### Combining Multiple Streams
|
||||
|
||||
`Smartstream` facilitates easy combining of multiple streams into a single pipeline, handling errors and cleanup automatically. Here's how you can combine multiple streams:
|
||||
|
||||
@ -101,7 +108,7 @@ Consider a scenario where you need to process a large CSV file, transform the da
|
||||
```typescript
|
||||
import { SmartDuplex, createTransformFunction } from '@push.rocks/smartstream';
|
||||
import fs from 'fs';
|
||||
import csvParser from 'csv-parser'; // Assume this is a CSV parsing library
|
||||
import csvParser from 'csv-parser';
|
||||
|
||||
const csvReadTransform = createTransformFunction<any, any>(async (row) => {
|
||||
// Process row
|
||||
@ -121,11 +128,232 @@ fs.createReadStream('path/to/largeFile.csv')
|
||||
|
||||
This example demonstrates reading a large CSV file, transforming each row with `createTransformFunction`, and using a `SmartDuplex` to manage the processed data flow efficiently, ensuring no data is lost due to backpressure issues.
|
||||
|
||||
### Conclusion
|
||||
### Advanced Use Case: Backpressure Handling
|
||||
|
||||
`@push.rocks/smartstream` offers a robust set of tools for working with Node.js streams, providing a more intuitive and reliable way to create, manipulate, and combine streams. By leveraging TypeScript and ESM syntax, `smartstream` enables developers to build more maintainable and type-safe stream-based solutions.
|
||||
Effective backpressure handling is crucial when working with streams to avoid overwhelming the downstream consumers. Here’s a comprehensive example that demonstrates handling backpressure in a pipeline with multiple `SmartDuplex` instances:
|
||||
|
||||
```typescript
|
||||
import { SmartDuplex } from '@push.rocks/smartstream';
|
||||
|
||||
// Define the first SmartDuplex, which writes data slowly to simulate backpressure
|
||||
const slowProcessingStream = new SmartDuplex({
|
||||
name: 'SlowProcessor',
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, { push }) => {
|
||||
await new Promise(resolve => setTimeout(resolve, 100)); // Simulated delay
|
||||
console.log('Processed chunk:', chunk);
|
||||
push(chunk);
|
||||
}
|
||||
});
|
||||
|
||||
// Define the second SmartDuplex as a fast processor
|
||||
const fastProcessingStream = new SmartDuplex({
|
||||
name: 'FastProcessor',
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, { push }) => {
|
||||
console.log('Fast processing chunk:', chunk);
|
||||
push(chunk);
|
||||
}
|
||||
});
|
||||
|
||||
// Create a StreamIntake to dynamically handle incoming data
|
||||
const streamIntake = new StreamIntake<string>();
|
||||
|
||||
// Chain the streams together and handle the backpressure scenario
|
||||
streamIntake
|
||||
.pipe(fastProcessingStream)
|
||||
.pipe(slowProcessingStream)
|
||||
.pipe(createPassThrough()) // Use Pass-Through to provide intermediary handling
|
||||
.on('data', data => console.log('Final output:', data))
|
||||
.on('error', error => console.error('Stream encountered an error:', error));
|
||||
|
||||
// Simulate data pushing with intervals to observe backpressure handling
|
||||
let counter = 0;
|
||||
const interval = setInterval(() => {
|
||||
if (counter >= 10) {
|
||||
streamIntake.signalEnd();
|
||||
clearInterval(interval);
|
||||
} else {
|
||||
streamIntake.pushData(`Chunk ${counter}`);
|
||||
counter++;
|
||||
}
|
||||
}, 50);
|
||||
```
|
||||
|
||||
In this advanced use case, a `SlowProcessor` and `FastProcessor` are created using `SmartDuplex`, simulating a situation where one stream is slower than another. The `StreamIntake` dynamically handles incoming chunks of data and the intermediary Pass-Through handles any potential interruptions.
|
||||
|
||||
### Transform Streams in Parallel
|
||||
|
||||
For scenarios where you need to process data in parallel:
|
||||
|
||||
```typescript
|
||||
import { SmartDuplex, createTransformFunction } from '@push.rocks/smartstream';
|
||||
|
||||
const parallelTransform = createTransformFunction<any, any>(async (chunk) => {
|
||||
// Parallel Processing
|
||||
const results = await Promise.all(chunk.map(async item => await processItem(item)));
|
||||
return results;
|
||||
});
|
||||
|
||||
const streamIntake = new StreamIntake<any[]>();
|
||||
|
||||
streamIntake
|
||||
.pipe(parallelTransform)
|
||||
.pipe(new SmartDuplex({
|
||||
async writeFunction(chunk, { push }) {
|
||||
console.log('Processed parallel chunk:', chunk);
|
||||
push(chunk);
|
||||
}
|
||||
}))
|
||||
.on('finish', () => console.log('Parallel processing completed.'));
|
||||
|
||||
// Simulate data pushing
|
||||
streamIntake.pushData([1, 2, 3, 4]);
|
||||
streamIntake.pushData([5, 6, 7, 8]);
|
||||
streamIntake.signalEnd();
|
||||
```
|
||||
|
||||
### Error Handling in Stream Pipelines
|
||||
|
||||
Error handling is an essential part of working with streams. The `StreamWrapper` assists in combining multiple streams while managing errors seamlessly:
|
||||
|
||||
```typescript
|
||||
import { StreamWrapper } from '@push.rocks/smartstream';
|
||||
|
||||
const faultyStream = new SmartDuplex({
|
||||
async writeFunction(chunk, { push }) {
|
||||
if (chunk === 'bad data') {
|
||||
throw new Error('Faulty data encountered');
|
||||
}
|
||||
push(chunk);
|
||||
}
|
||||
});
|
||||
|
||||
const readStream = new StreamIntake<string>();
|
||||
const writeStream = new SmartDuplex({
|
||||
async writeFunction(chunk) {
|
||||
console.log('Written chunk:', chunk);
|
||||
}
|
||||
});
|
||||
|
||||
const combinedStream = new StreamWrapper([readStream, faultyStream, writeStream]);
|
||||
|
||||
combinedStream.run()
|
||||
.then(() => console.log('Stream processing completed.'))
|
||||
.catch(err => console.error('Stream error:', err.message));
|
||||
|
||||
// Push Data
|
||||
readStream.pushData('good data');
|
||||
readStream.pushData('bad data'); // This will throw an error
|
||||
readStream.pushData('more good data');
|
||||
readStream.signalEnd();
|
||||
```
|
||||
|
||||
### Testing Streams
|
||||
|
||||
Here's an example test case using the `tap` testing framework to verify the integrity of the `SmartDuplex` from a buffer:
|
||||
|
||||
```typescript
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import { SmartDuplex } from '@push.rocks/smartstream';
|
||||
|
||||
tap.test('should create a SmartStream from a Buffer', async () => {
|
||||
const bufferData = Buffer.from('This is a test buffer');
|
||||
const smartStream = SmartDuplex.fromBuffer(bufferData, {});
|
||||
|
||||
let receivedData = Buffer.alloc(0);
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
smartStream.on('data', (chunk: Buffer) => {
|
||||
receivedData = Buffer.concat([receivedData, chunk]);
|
||||
});
|
||||
|
||||
smartStream.on('end', () => {
|
||||
expect(receivedData.toString()).toEqual(bufferData.toString());
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
tap.start();
|
||||
```
|
||||
|
||||
### Working with Files and Buffers
|
||||
|
||||
You can easily stream files and buffers with `smartstream`. Here’s a test illustrating reading and writing with file streams using `smartfile` combined with `smartstream` utilities:
|
||||
|
||||
```typescript
|
||||
import { tap } from '@push.rocks/tapbundle';
|
||||
import * as smartfile from '@push.rocks/smartfile';
|
||||
import { SmartDuplex, StreamWrapper } from '@push.rocks/smartstream';
|
||||
|
||||
tap.test('should handle file read and write streams', async () => {
|
||||
const readStream = smartfile.fsStream.createReadStream('./test/assets/readabletext.txt');
|
||||
const writeStream = smartfile.fsStream.createWriteStream('./test/assets/writabletext.txt');
|
||||
|
||||
const transformStream = new SmartDuplex({
|
||||
async writeFunction(chunk, { push }) {
|
||||
const transformedChunk = chunk.toString().toUpperCase();
|
||||
push(transformedChunk);
|
||||
}
|
||||
});
|
||||
|
||||
const streamWrapper = new StreamWrapper([readStream, transformStream, writeStream]);
|
||||
|
||||
await streamWrapper.run();
|
||||
|
||||
const outputContent = await smartfile.fs.promises.readFile('./test/assets/writabletext.txt', 'utf-8');
|
||||
console.log('Output Content:', outputContent);
|
||||
});
|
||||
|
||||
tap.start();
|
||||
```
|
||||
|
||||
### Modular and Scoped Transformations
|
||||
|
||||
Creating modular and scoped transformations is straightforward with `SmartDuplex`:
|
||||
|
||||
```typescript
|
||||
import { SmartDuplex } from '@push.rocks/smartstream';
|
||||
|
||||
type DataChunk = {
|
||||
id: number;
|
||||
data: string;
|
||||
};
|
||||
|
||||
const transformationStream1 = new SmartDuplex<DataChunk, DataChunk>({
|
||||
async writeFunction(chunk, { push }) {
|
||||
chunk.data = chunk.data.toUpperCase();
|
||||
push(chunk);
|
||||
}
|
||||
})
|
||||
|
||||
const transformationStream2 = new SmartDuplex<DataChunk, DataChunk>({
|
||||
async writeFunction(chunk, { push }) {
|
||||
chunk.data = `${chunk.data} processed with transformation 2`;
|
||||
push(chunk);
|
||||
}
|
||||
});
|
||||
|
||||
const initialData: DataChunk[] = [
|
||||
{ id: 1, data: 'first' },
|
||||
{ id: 2, data: 'second' }
|
||||
];
|
||||
|
||||
const intakeStream = new StreamIntake<DataChunk>();
|
||||
|
||||
intakeStream
|
||||
.pipe(transformationStream1)
|
||||
.pipe(transformationStream2)
|
||||
.on('data', data => console.log('Transformed Data:', data));
|
||||
|
||||
initialData.forEach(item => intakeStream.pushData(item));
|
||||
intakeStream.signalEnd();
|
||||
```
|
||||
|
||||
By leveraging `SmartDuplex`, `StreamWrapper`, and `StreamIntake`, you can streamline and enhance your data transformation pipelines in Node.js with a clear, efficient, and backpressure-friendly approach.
|
||||
```
|
||||
|
||||
For more detailed examples and documentation, visit the [GitLab Repository](https://gitlab.com/push.rocks/smartstream) or the [GitHub Mirror](https://github.com/pushrocks/smartstream).
|
||||
|
||||
## License and Legal Information
|
||||
|
||||
|
70
test/test.ts_web.both.ts
Normal file
70
test/test.ts_web.both.ts
Normal file
@ -0,0 +1,70 @@
|
||||
import { expect, expectAsync, tap } from '@push.rocks/tapbundle';
|
||||
import * as webstream from '../ts_web/index.js';
|
||||
|
||||
tap.test('WebDuplexStream', async (toolsArg) => {
|
||||
const testDone = toolsArg.defer(); // Create a deferred object to control test completion.
|
||||
const inputUint8Array = new Uint8Array([1, 2, 3, 4, 5]);
|
||||
const stream = webstream.WebDuplexStream.fromUInt8Array(inputUint8Array);
|
||||
|
||||
const reader = stream.readable.getReader();
|
||||
let readUint8Array = new Uint8Array();
|
||||
|
||||
reader.read().then(function processText({ done, value }) {
|
||||
if (done) {
|
||||
expect(readUint8Array).toEqual(inputUint8Array);
|
||||
testDone.resolve(); // Correctly signal that the test is done.
|
||||
return;
|
||||
}
|
||||
readUint8Array = new Uint8Array([...readUint8Array, ...value]);
|
||||
return reader.read().then(processText);
|
||||
});
|
||||
|
||||
return testDone.promise; // Return the promise to properly wait for the test to complete.
|
||||
});
|
||||
|
||||
|
||||
tap.test('should handle transform with a write function', async (toolsArg) => {
|
||||
const testDone = toolsArg.defer();
|
||||
const input = [1, 2, 3, 4, 5];
|
||||
const expectedOutput = [2, 4, 6, 8, 10];
|
||||
|
||||
const transformStream = new webstream.WebDuplexStream<number, number>({
|
||||
writeFunction: (chunk, { push }) => {
|
||||
push(chunk * 2); // Push the doubled number into the stream
|
||||
return Promise.resolve(); // Resolve the promise immediately
|
||||
},
|
||||
});
|
||||
|
||||
const writableStream = transformStream.writable.getWriter();
|
||||
const readableStream = transformStream.readable.getReader();
|
||||
|
||||
const output: number[] = [];
|
||||
|
||||
// Process the text and resolve the test once done.
|
||||
const processText = async ({ done, value }) => {
|
||||
if (done) {
|
||||
expect(output).toEqual(expectedOutput);
|
||||
testDone.resolve(); // Resolve the deferred test once all values have been read.
|
||||
return;
|
||||
}
|
||||
if (value !== undefined) {
|
||||
output.push(value);
|
||||
}
|
||||
// Continue reading and processing.
|
||||
await readableStream.read().then(processText);
|
||||
};
|
||||
|
||||
// Start the read process before writing to the stream.
|
||||
readableStream.read().then(processText);
|
||||
|
||||
// Sequentially write to the stream and close when done.
|
||||
for (const num of input) {
|
||||
await writableStream.write(num);
|
||||
}
|
||||
await writableStream.close();
|
||||
|
||||
return testDone.promise; // This will wait until the testDone is resolved before completing the test.
|
||||
});
|
||||
|
||||
|
||||
tap.start();
|
@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartstream',
|
||||
version: '3.0.35',
|
||||
description: 'simplifies access to node streams'
|
||||
version: '3.0.41',
|
||||
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
|
||||
}
|
||||
|
@ -157,4 +157,53 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
||||
this.backpressuredArray.push(null);
|
||||
callback();
|
||||
}
|
||||
|
||||
public async getWebStreams(): Promise<{ readable: ReadableStream, writable: WritableStream }> {
|
||||
const duplex = this;
|
||||
const readable = new ReadableStream({
|
||||
start(controller) {
|
||||
duplex.on('readable', () => {
|
||||
let chunk;
|
||||
while (null !== (chunk = duplex.read())) {
|
||||
controller.enqueue(chunk);
|
||||
}
|
||||
});
|
||||
|
||||
duplex.on('end', () => {
|
||||
controller.close();
|
||||
});
|
||||
},
|
||||
cancel(reason) {
|
||||
duplex.destroy(new Error(reason));
|
||||
}
|
||||
});
|
||||
|
||||
const writable = new WritableStream({
|
||||
write(chunk) {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const isBackpressured = !duplex.write(chunk, (error) => {
|
||||
if (error) {
|
||||
reject(error);
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
|
||||
if (isBackpressured) {
|
||||
duplex.once('drain', resolve);
|
||||
}
|
||||
});
|
||||
},
|
||||
close() {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
duplex.end(resolve);
|
||||
});
|
||||
},
|
||||
abort(reason) {
|
||||
duplex.destroy(new Error(reason));
|
||||
}
|
||||
});
|
||||
|
||||
return { readable, writable };
|
||||
}
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ export class StreamIntake<T> extends plugins.stream.Readable {
|
||||
_read(size: number): void {
|
||||
// console.log('get next');
|
||||
const pushChunk = (): void => {
|
||||
if (this.chunkStore.length > 0) {
|
||||
while (this.chunkStore.length > 0) {
|
||||
// If push returns false, then we should stop reading
|
||||
if (!this.push(this.chunkStore.shift())) {
|
||||
return;
|
||||
|
@ -7,7 +7,7 @@ export { stream };
|
||||
import * as lik from '@push.rocks/lik';
|
||||
import * as smartpromise from '@push.rocks/smartpromise';
|
||||
import * as smartrx from '@push.rocks/smartrx';
|
||||
import * as webstream from '@push.rocks/webstream';
|
||||
import * as webstream from '../dist_ts_web/index.js';
|
||||
|
||||
export { lik, smartpromise, smartrx, webstream };
|
||||
|
||||
|
8
ts_web/00_commitinfo_data.ts
Normal file
8
ts_web/00_commitinfo_data.ts
Normal file
@ -0,0 +1,8 @@
|
||||
/**
|
||||
* autocreated commitinfo by @pushrocks/commitinfo
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartstream',
|
||||
version: '3.0.41',
|
||||
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
|
||||
}
|
156
ts_web/classes.webduplexstream.ts
Normal file
156
ts_web/classes.webduplexstream.ts
Normal file
@ -0,0 +1,156 @@
|
||||
import * as plugins from './plugins.js';
|
||||
|
||||
|
||||
// ========================================
|
||||
// READ
|
||||
// ========================================
|
||||
export interface IStreamToolsRead<TInput, TOutput> {
|
||||
done: () => void;
|
||||
write: (writeArg: TInput) => void;
|
||||
}
|
||||
|
||||
/**
|
||||
* the read function is called anytime
|
||||
* -> the WebDuplexStream is being read from
|
||||
* and at the same time if nothing is enqueued
|
||||
*/
|
||||
export interface IStreamReadFunction<TInput, TOutput> {
|
||||
(toolsArg: IStreamToolsRead<TInput, TOutput>): Promise<void>;
|
||||
}
|
||||
|
||||
// ========================================
|
||||
// WRITE
|
||||
// ========================================
|
||||
export interface IStreamToolsWrite<TInput, TOutput> {
|
||||
truncate: () => void;
|
||||
push: (pushArg: TOutput) => void;
|
||||
}
|
||||
|
||||
/**
|
||||
* the write function can return something.
|
||||
* It is called anytime a chunk is written to the stream.
|
||||
*/
|
||||
export interface IStreamWriteFunction<TInput, TOutput> {
|
||||
(chunkArg: TInput, toolsArg: IStreamToolsWrite<TInput, TOutput>): Promise<any>;
|
||||
}
|
||||
|
||||
export interface IStreamFinalFunction<TInput, TOutput> {
|
||||
(toolsArg: IStreamToolsWrite<TInput, TOutput>): Promise<TOutput>;
|
||||
}
|
||||
|
||||
export interface WebDuplexStreamOptions<TInput, TOutput> {
|
||||
readFunction?: IStreamReadFunction<TInput, TOutput>;
|
||||
writeFunction?: IStreamWriteFunction<TInput, TOutput>;
|
||||
finalFunction?: IStreamFinalFunction<TInput, TOutput>;
|
||||
}
|
||||
|
||||
export class WebDuplexStream<TInput = any, TOutput = any> extends TransformStream<TInput, TOutput> {
|
||||
static fromUInt8Array(uint8Array: Uint8Array): WebDuplexStream<Uint8Array, Uint8Array> {
|
||||
const stream = new WebDuplexStream<Uint8Array, Uint8Array>({
|
||||
writeFunction: async (chunk, { push }) => {
|
||||
push(chunk); // Directly push the chunk as is
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
const writer = stream.writable.getWriter();
|
||||
writer.write(uint8Array).then(() => writer.close());
|
||||
|
||||
return stream;
|
||||
}
|
||||
|
||||
// INSTANCE
|
||||
options: WebDuplexStreamOptions<TInput, TOutput>;
|
||||
|
||||
constructor(optionsArg: WebDuplexStreamOptions<TInput, TOutput>) {
|
||||
super({
|
||||
async transform(chunk, controller) {
|
||||
// Transformation logic remains unchanged
|
||||
if (optionsArg?.writeFunction) {
|
||||
const tools: IStreamToolsWrite<TInput, TOutput> = {
|
||||
truncate: () => controller.terminate(),
|
||||
push: (pushArg: TOutput) => controller.enqueue(pushArg),
|
||||
};
|
||||
|
||||
optionsArg.writeFunction(chunk, tools)
|
||||
.then(writeReturnChunk => {
|
||||
// the write return chunk is optional
|
||||
// just in case the write function returns something other than void.
|
||||
if (writeReturnChunk) {
|
||||
controller.enqueue(writeReturnChunk);
|
||||
}
|
||||
})
|
||||
.catch(err => controller.error(err));
|
||||
} else {
|
||||
controller.error(new Error('No write function provided'));
|
||||
}
|
||||
},
|
||||
async flush(controller) {
|
||||
// Flush logic remains unchanged
|
||||
if (optionsArg?.finalFunction) {
|
||||
const tools: IStreamToolsWrite<TInput, TOutput> = {
|
||||
truncate: () => controller.terminate(),
|
||||
push: (pipeObject) => controller.enqueue(pipeObject),
|
||||
};
|
||||
|
||||
optionsArg.finalFunction(tools)
|
||||
.then(finalChunk => {
|
||||
if (finalChunk) {
|
||||
controller.enqueue(finalChunk);
|
||||
}
|
||||
})
|
||||
.catch(err => controller.error(err))
|
||||
.finally(() => controller.terminate());
|
||||
} else {
|
||||
controller.terminate();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
this.options = optionsArg;
|
||||
}
|
||||
|
||||
// Method to create a custom readable stream that integrates the readFunction
|
||||
// readFunction is executed whenever the stream is being read from and nothing is enqueued
|
||||
getCustomReadableStream() {
|
||||
const readableStream = this.readable;
|
||||
const options = this.options;
|
||||
const customReadable = new ReadableStream({
|
||||
async pull(controller) {
|
||||
const reader = readableStream.getReader();
|
||||
|
||||
// Check the current state of the original stream
|
||||
const { value, done } = await reader.read();
|
||||
reader.releaseLock();
|
||||
|
||||
if (done) {
|
||||
// If the original stream is done, close the custom readable stream
|
||||
controller.close();
|
||||
} else {
|
||||
if (value) {
|
||||
// If there is data in the original stream, enqueue it and do not execute the readFunction
|
||||
controller.enqueue(value);
|
||||
} else if (options.readFunction) {
|
||||
// If the original stream is empty, execute the readFunction and read again
|
||||
await options.readFunction({
|
||||
done: () => controller.close(),
|
||||
write: (writeArg) => controller.enqueue(writeArg),
|
||||
});
|
||||
|
||||
const newReader = readableStream.getReader();
|
||||
const { value: newValue, done: newDone } = await newReader.read();
|
||||
newReader.releaseLock();
|
||||
|
||||
if (newDone) {
|
||||
controller.close();
|
||||
} else {
|
||||
controller.enqueue(newValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return customReadable;
|
||||
}
|
||||
}
|
2
ts_web/index.ts
Normal file
2
ts_web/index.ts
Normal file
@ -0,0 +1,2 @@
|
||||
import './plugins.js';
|
||||
export * from './classes.webduplexstream.js';
|
15
ts_web/plugins.ts
Normal file
15
ts_web/plugins.ts
Normal file
@ -0,0 +1,15 @@
|
||||
// @push.rocks scope
|
||||
import * as smartenv from '@push.rocks/smartenv';
|
||||
|
||||
export {
|
||||
smartenv,
|
||||
}
|
||||
|
||||
// lets setup dependencies
|
||||
const smartenvInstance = new smartenv.Smartenv();
|
||||
|
||||
await smartenvInstance.getSafeNodeModule<typeof import('stream/web')>('stream/web', async (moduleArg) => {
|
||||
globalThis.ReadableStream = moduleArg.ReadableStream;
|
||||
globalThis.WritableStream = moduleArg.WritableStream;
|
||||
globalThis.TransformStream = moduleArg.TransformStream;
|
||||
})
|
Reference in New Issue
Block a user