Compare commits
32 Commits
Author | SHA1 | Date | |
---|---|---|---|
f13db1e422 | |||
42a90e804a | |||
413e2af717 | |||
267a76af13 | |||
7834b7e6d2 | |||
ae643708e7 | |||
d9d96b8bb7 | |||
a961eea431 | |||
edb58ade28 | |||
753a481765 | |||
bbbd1b73b9 | |||
271d0be106 | |||
0ceeacd5a0 | |||
287695e445 | |||
60f9e541a5 | |||
96ea67e135 | |||
ba0a2023ad | |||
a09c359847 | |||
e2b4d772b3 | |||
0f46b62b2d | |||
9bf37469c6 | |||
12bb125bdc | |||
703dc11c6c | |||
28725d1723 | |||
c77e0f2ba6 | |||
196fb6d396 | |||
df0ddf04b3 | |||
2e1aa4a8ff | |||
bc09033af0 | |||
22df9dfd94 | |||
d48ef6eb43 | |||
9421c652a2 |
1
assets/denoentry.ts
Normal file
1
assets/denoentry.ts
Normal file
@ -0,0 +1 @@
|
|||||||
|
console.log('Hello from deno');
|
@ -6,12 +6,34 @@
|
|||||||
"gitzone": {
|
"gitzone": {
|
||||||
"projectType": "npm",
|
"projectType": "npm",
|
||||||
"module": {
|
"module": {
|
||||||
"githost": "gitlab.com",
|
"githost": "code.foss.global",
|
||||||
"gitscope": "push.rocks",
|
"gitscope": "push.rocks",
|
||||||
"gitrepo": "smartstream",
|
"gitrepo": "smartstream",
|
||||||
"description": "simplifies access to node streams",
|
"description": "simplifies access to node streams",
|
||||||
"npmPackagename": "@push.rocks/smartstream",
|
"npmPackagename": "@push.rocks/smartstream",
|
||||||
"license": "MIT"
|
"license": "MIT",
|
||||||
|
"keywords": [
|
||||||
|
"node.js",
|
||||||
|
"streams",
|
||||||
|
"stream manipulation",
|
||||||
|
"pipeline",
|
||||||
|
"data processing",
|
||||||
|
"async transformation",
|
||||||
|
"event handling",
|
||||||
|
"backpressure management",
|
||||||
|
"readable streams",
|
||||||
|
"writable streams",
|
||||||
|
"duplex streams",
|
||||||
|
"transform streams",
|
||||||
|
"file streaming",
|
||||||
|
"buffer streams",
|
||||||
|
"stream utilities",
|
||||||
|
"stream intake",
|
||||||
|
"stream output"
|
||||||
|
]
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
"tsdoc": {
|
||||||
|
"legal": "\n## License and Legal Information\n\nThis repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the [license](license) file within this repository. \n\n**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.\n\n### Trademarks\n\nThis project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.\n\n### Company Information\n\nTask Venture Capital GmbH \nRegistered at District court Bremen HRB 35230 HB, Germany\n\nFor any legal inquiries or if you require further information, please contact us via email at hello@task.vc.\n\nBy using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.\n"
|
||||||
}
|
}
|
||||||
}
|
}
|
36
package.json
36
package.json
@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@push.rocks/smartstream",
|
"name": "@push.rocks/smartstream",
|
||||||
"version": "3.0.21",
|
"version": "3.0.35",
|
||||||
"private": false,
|
"private": false,
|
||||||
"description": "simplifies access to node streams",
|
"description": "simplifies access to node streams",
|
||||||
"main": "dist_ts/index.js",
|
"main": "dist_ts/index.js",
|
||||||
@ -22,17 +22,18 @@
|
|||||||
},
|
},
|
||||||
"homepage": "https://gitlab.com/push.rocks/smartstream#readme",
|
"homepage": "https://gitlab.com/push.rocks/smartstream#readme",
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@git.zone/tsbuild": "^2.1.66",
|
"@git.zone/tsbuild": "^2.1.72",
|
||||||
"@git.zone/tsrun": "^1.2.44",
|
"@git.zone/tsrun": "^1.2.44",
|
||||||
"@git.zone/tstest": "^1.0.84",
|
"@git.zone/tstest": "^1.0.88",
|
||||||
"@push.rocks/smartfile": "^11.0.0",
|
"@push.rocks/smartfile": "^11.0.4",
|
||||||
"@push.rocks/tapbundle": "^5.0.15",
|
"@push.rocks/tapbundle": "^5.0.17",
|
||||||
"@types/node": "^20.9.0"
|
"@types/node": "^20.11.28"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@push.rocks/lik": "^6.0.12",
|
"@push.rocks/lik": "^6.0.14",
|
||||||
"@push.rocks/smartpromise": "^4.0.3",
|
"@push.rocks/smartpromise": "^4.0.3",
|
||||||
"@push.rocks/smartrx": "^3.0.7"
|
"@push.rocks/smartrx": "^3.0.7",
|
||||||
|
"@push.rocks/webstream": "^1.0.8"
|
||||||
},
|
},
|
||||||
"browserslist": [
|
"browserslist": [
|
||||||
"last 1 chrome versions"
|
"last 1 chrome versions"
|
||||||
@ -48,5 +49,24 @@
|
|||||||
"cli.js",
|
"cli.js",
|
||||||
"npmextra.json",
|
"npmextra.json",
|
||||||
"readme.md"
|
"readme.md"
|
||||||
|
],
|
||||||
|
"keywords": [
|
||||||
|
"node.js",
|
||||||
|
"streams",
|
||||||
|
"stream manipulation",
|
||||||
|
"pipeline",
|
||||||
|
"data processing",
|
||||||
|
"async transformation",
|
||||||
|
"event handling",
|
||||||
|
"backpressure management",
|
||||||
|
"readable streams",
|
||||||
|
"writable streams",
|
||||||
|
"duplex streams",
|
||||||
|
"transform streams",
|
||||||
|
"file streaming",
|
||||||
|
"buffer streams",
|
||||||
|
"stream utilities",
|
||||||
|
"stream intake",
|
||||||
|
"stream output"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
2149
pnpm-lock.yaml
generated
2149
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
1
readme.hints.md
Normal file
1
readme.hints.md
Normal file
@ -0,0 +1 @@
|
|||||||
|
|
171
readme.md
171
readme.md
@ -1,54 +1,147 @@
|
|||||||
# @pushrocks/smartstream
|
# @push.rocks/smartstream
|
||||||
simplifies access to node streams
|
simplifies access to node streams
|
||||||
|
|
||||||
## Availabililty and Links
|
## Install
|
||||||
* [npmjs.org (npm package)](https://www.npmjs.com/package/@pushrocks/smartstream)
|
To install `@push.rocks/smartstream`, you can use npm or yarn as follows:
|
||||||
* [gitlab.com (source)](https://gitlab.com/pushrocks/smartstream)
|
|
||||||
* [github.com (source mirror)](https://github.com/pushrocks/smartstream)
|
|
||||||
* [docs (typedoc)](https://pushrocks.gitlab.io/smartstream/)
|
|
||||||
|
|
||||||
## Status for master
|
```bash
|
||||||
|
npm install @push.rocks/smartstream --save
|
||||||
|
# OR
|
||||||
|
yarn add @push.rocks/smartstream
|
||||||
|
```
|
||||||
|
|
||||||
Status Category | Status Badge
|
This will add `@push.rocks/smartstream` to your project's dependencies.
|
||||||
-- | --
|
|
||||||
GitLab Pipelines | [](https://lossless.cloud)
|
|
||||||
GitLab Pipline Test Coverage | [](https://lossless.cloud)
|
|
||||||
npm | [](https://lossless.cloud)
|
|
||||||
Snyk | [](https://lossless.cloud)
|
|
||||||
TypeScript Support | [](https://lossless.cloud)
|
|
||||||
node Support | [](https://nodejs.org/dist/latest-v10.x/docs/api/)
|
|
||||||
Code Style | [](https://lossless.cloud)
|
|
||||||
PackagePhobia (total standalone install weight) | [](https://lossless.cloud)
|
|
||||||
PackagePhobia (package size on registry) | [](https://lossless.cloud)
|
|
||||||
BundlePhobia (total size when bundled) | [](https://lossless.cloud)
|
|
||||||
Platform support | [](https://lossless.cloud) [](https://lossless.cloud)
|
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
Use TypeScript for best in class instellisense.
|
The `@push.rocks/smartstream` module is designed to simplify working with Node.js streams by providing a set of utilities for creating and manipulating streams. This module makes heavy use of TypeScript for improved code quality, readability, and maintenance. ESM syntax is utilized throughout the examples.
|
||||||
|
|
||||||
|
### Importing the Module
|
||||||
|
|
||||||
|
Start by importing the module into your TypeScript file:
|
||||||
|
|
||||||
```typescript
|
```typescript
|
||||||
import { Smartstream } from 'smartstream'
|
import * as smartstream from '@push.rocks/smartstream';
|
||||||
import * as gUglify from 'gulp-uglify'
|
|
||||||
|
|
||||||
let mySmartstream = new Smartstream([
|
|
||||||
gulp.src(['./file1.js','./file2.js']),
|
|
||||||
gUglify(),
|
|
||||||
gulp.dest('./some/output/path')
|
|
||||||
])
|
|
||||||
|
|
||||||
mySmartstream.onError((err) => { /* handle error */ }) // handles all errors in stream
|
|
||||||
myStream.onCustomEvent('myeventname', (args...) => { /* Do something */ }) // emit an custom event anywhere in your stream
|
|
||||||
mySmartstream.run().then(() => {/* do something when stream is finished */})
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## Contribution
|
### Creating Basic Transform Streams
|
||||||
|
|
||||||
We are always happy for code contributions. If you are not the code contributing type that is ok. Still, maintaining Open Source repositories takes considerable time and thought. If you like the quality of what we do and our modules are useful to you we would appreciate a little monthly contribution: You can [contribute one time](https://lossless.link/contribute-onetime) or [contribute monthly](https://lossless.link/contribute). :)
|
The module provides utilities for creating transform streams. For example, to create a transform stream that modifies chunks of data, you can use the `createTransformFunction` utility:
|
||||||
|
|
||||||
For further information read the linked docs at the top of this readme.
|
```typescript
|
||||||
|
import { createTransformFunction } from '@push.rocks/smartstream';
|
||||||
|
|
||||||
> MIT licensed | **©** [Lossless GmbH](https://lossless.gmbh)
|
const upperCaseTransform = createTransformFunction<string, string>(async (chunk) => {
|
||||||
| By using this npm module you agree to our [privacy policy](https://lossless.gmbH/privacy)
|
return chunk.toUpperCase();
|
||||||
|
});
|
||||||
|
|
||||||
[](https://maintainedby.lossless.com)
|
// Usage with pipe
|
||||||
|
readableStream
|
||||||
|
.pipe(upperCaseTransform)
|
||||||
|
.pipe(writableStream);
|
||||||
|
```
|
||||||
|
|
||||||
|
### Handling Backpressure with SmartDuplex
|
||||||
|
|
||||||
|
`SmartDuplex` is a powerful part of the `smartstream` module designed to handle backpressure effectively. Here's an example of how to create a `SmartDuplex` stream that processes data and respects the consumer's pace:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import { SmartDuplex } from '@push.rocks/smartstream';
|
||||||
|
|
||||||
|
const processDataDuplex = new SmartDuplex({
|
||||||
|
async writeFunction(chunk, { push }) {
|
||||||
|
const processedChunk = await processChunk(chunk); // Assume this is a defined asynchronous function
|
||||||
|
push(processedChunk);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
sourceStream.pipe(processDataDuplex).pipe(destinationStream);
|
||||||
|
```
|
||||||
|
|
||||||
|
### Stream Combiners
|
||||||
|
|
||||||
|
`Smartstream` facilitates easy combining of multiple streams into a single pipeline, handling errors and cleanup automatically. Here's how you can combine multiple streams:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import { StreamWrapper } from '@push.rocks/smartstream';
|
||||||
|
|
||||||
|
const combinedStream = new StreamWrapper([
|
||||||
|
readStream, // Source stream
|
||||||
|
transformStream1, // Transformation
|
||||||
|
transformStream2, // Another transformation
|
||||||
|
writeStream // Destination stream
|
||||||
|
]);
|
||||||
|
|
||||||
|
combinedStream.run()
|
||||||
|
.then(() => console.log('Processing completed.'))
|
||||||
|
.catch(err => console.error('An error occurred:', err));
|
||||||
|
```
|
||||||
|
|
||||||
|
### Working with StreamIntake
|
||||||
|
|
||||||
|
`StreamIntake` allows for more dynamic control of the reading process, facilitating scenarios where data is not continuously available:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import { StreamIntake } from '@push.rocks/smartstream';
|
||||||
|
|
||||||
|
const streamIntake = new StreamIntake<string>();
|
||||||
|
|
||||||
|
// Dynamically push data into the intake
|
||||||
|
streamIntake.pushData('Hello, World!');
|
||||||
|
streamIntake.pushData('Another message');
|
||||||
|
|
||||||
|
// Signal end when no more data is to be pushed
|
||||||
|
streamIntake.signalEnd();
|
||||||
|
```
|
||||||
|
|
||||||
|
### Real-world Scenario: Processing Large Files
|
||||||
|
|
||||||
|
Consider a scenario where you need to process a large CSV file, transform the data row-by-row, and then write the results to a database or another file. With `smartstream`, you could create a pipe that reads the CSV, processes each row, and handles backpressure, ensuring efficient use of resources.
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import { SmartDuplex, createTransformFunction } from '@push.rocks/smartstream';
|
||||||
|
import fs from 'fs';
|
||||||
|
import csvParser from 'csv-parser'; // Assume this is a CSV parsing library
|
||||||
|
|
||||||
|
const csvReadTransform = createTransformFunction<any, any>(async (row) => {
|
||||||
|
// Process row
|
||||||
|
return processedRow;
|
||||||
|
});
|
||||||
|
|
||||||
|
fs.createReadStream('path/to/largeFile.csv')
|
||||||
|
.pipe(csvParser())
|
||||||
|
.pipe(csvReadTransform)
|
||||||
|
.pipe(new SmartDuplex({
|
||||||
|
async writeFunction(chunk, { push }) {
|
||||||
|
await writeToDatabase(chunk); // Assume this writes to a database
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
.on('finish', () => console.log('File processed successfully.'));
|
||||||
|
```
|
||||||
|
|
||||||
|
This example demonstrates reading a large CSV file, transforming each row with `createTransformFunction`, and using a `SmartDuplex` to manage the processed data flow efficiently, ensuring no data is lost due to backpressure issues.
|
||||||
|
|
||||||
|
### Conclusion
|
||||||
|
|
||||||
|
`@push.rocks/smartstream` offers a robust set of tools for working with Node.js streams, providing a more intuitive and reliable way to create, manipulate, and combine streams. By leveraging TypeScript and ESM syntax, `smartstream` enables developers to build more maintainable and type-safe stream-based solutions.
|
||||||
|
|
||||||
|
For more detailed examples and documentation, visit the [GitLab Repository](https://gitlab.com/push.rocks/smartstream) or the [GitHub Mirror](https://github.com/pushrocks/smartstream).
|
||||||
|
|
||||||
|
## License and Legal Information
|
||||||
|
|
||||||
|
This repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the [license](license) file within this repository.
|
||||||
|
|
||||||
|
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
|
||||||
|
|
||||||
|
### Trademarks
|
||||||
|
|
||||||
|
This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.
|
||||||
|
|
||||||
|
### Company Information
|
||||||
|
|
||||||
|
Task Venture Capital GmbH
|
||||||
|
Registered at District court Bremen HRB 35230 HB, Germany
|
||||||
|
|
||||||
|
For any legal inquiries or if you require further information, please contact us via email at hello@task.vc.
|
||||||
|
|
||||||
|
By using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.
|
||||||
|
@ -7,7 +7,6 @@ tap.test('should run backpressure test', async (toolsArg) => {
|
|||||||
const stream1 = new SmartDuplex({
|
const stream1 = new SmartDuplex({
|
||||||
name: 'stream1',
|
name: 'stream1',
|
||||||
objectMode: true,
|
objectMode: true,
|
||||||
handleBackpressure: true,
|
|
||||||
writeFunction: async (chunk, tools) => {
|
writeFunction: async (chunk, tools) => {
|
||||||
await new Promise((resolve) => setTimeout(resolve, 10)); // Slow processing
|
await new Promise((resolve) => setTimeout(resolve, 10)); // Slow processing
|
||||||
console.log(`processed chunk ${chunk} in stream 1`);
|
console.log(`processed chunk ${chunk} in stream 1`);
|
||||||
@ -17,19 +16,18 @@ tap.test('should run backpressure test', async (toolsArg) => {
|
|||||||
const stream2 = new SmartDuplex({
|
const stream2 = new SmartDuplex({
|
||||||
name: 'stream2',
|
name: 'stream2',
|
||||||
objectMode: true,
|
objectMode: true,
|
||||||
handleBackpressure: true,
|
|
||||||
writeFunction: async (chunk, tools) => {
|
writeFunction: async (chunk, tools) => {
|
||||||
await new Promise((resolve) => setTimeout(resolve, 20)); // Slow processing
|
await new Promise((resolve) => setTimeout(resolve, 20)); // Slow processing
|
||||||
console.log(`processed chunk ${chunk} in stream 2`);
|
console.log(`processed chunk ${chunk} in stream 2`);
|
||||||
return chunk;
|
await tools.push(chunk);
|
||||||
|
// return chunk, optionally return ;
|
||||||
},
|
},
|
||||||
}); // This stream processes data more slowly
|
}); // This stream processes data more slowly
|
||||||
const stream3 = new SmartDuplex({
|
const stream3 = new SmartDuplex({
|
||||||
objectMode: true,
|
objectMode: true,
|
||||||
name: 'stream3',
|
name: 'stream3',
|
||||||
handleBackpressure: true,
|
|
||||||
writeFunction: async (chunk, tools) => {
|
writeFunction: async (chunk, tools) => {
|
||||||
await new Promise((resolve) => setTimeout(resolve, 100)); // Slow processing
|
await new Promise((resolve) => setTimeout(resolve, 200)); // Slow processing
|
||||||
console.log(`processed chunk ${chunk} in stream 3`);
|
console.log(`processed chunk ${chunk} in stream 3`);
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
@ -5,9 +5,7 @@ import * as fs from 'fs';
|
|||||||
|
|
||||||
tap.test('should create a SmartStream from a Buffer', async () => {
|
tap.test('should create a SmartStream from a Buffer', async () => {
|
||||||
const bufferData = Buffer.from('This is a test buffer');
|
const bufferData = Buffer.from('This is a test buffer');
|
||||||
const smartStream = SmartDuplex.fromBuffer(bufferData, {
|
const smartStream = SmartDuplex.fromBuffer(bufferData, {});
|
||||||
handleBackpressure: false,
|
|
||||||
});
|
|
||||||
|
|
||||||
let receivedData = Buffer.alloc(0);
|
let receivedData = Buffer.alloc(0);
|
||||||
|
|
||||||
|
@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: '@push.rocks/smartstream',
|
name: '@push.rocks/smartstream',
|
||||||
version: '3.0.21',
|
version: '3.0.35',
|
||||||
description: 'simplifies access to node streams'
|
description: 'simplifies access to node streams'
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
export * from './smartstream.classes.passthrough.js';
|
|
||||||
export * from './smartstream.classes.smartduplex.js';
|
export * from './smartstream.classes.smartduplex.js';
|
||||||
export * from './smartstream.classes.streamwrapper.js';
|
export * from './smartstream.classes.streamwrapper.js';
|
||||||
export * from './smartstream.classes.streamintake.js';
|
export * from './smartstream.classes.streamintake.js';
|
||||||
|
|
||||||
export * from './smartstream.functions.js'
|
export * from './smartstream.functions.js';
|
||||||
|
|
||||||
|
import * as plugins from './smartstream.plugins.js';
|
||||||
|
export const webstream = plugins.webstream;
|
||||||
|
@ -1,21 +0,0 @@
|
|||||||
import * as plugins from './smartstream.plugins.js';
|
|
||||||
|
|
||||||
export class PassThrough extends plugins.stream.Duplex {
|
|
||||||
constructor(options?: plugins.stream.DuplexOptions) {
|
|
||||||
super(options);
|
|
||||||
}
|
|
||||||
|
|
||||||
_read(size: number): void {
|
|
||||||
// No-op: Data written will be automatically available for reading.
|
|
||||||
}
|
|
||||||
|
|
||||||
_write(chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void): void {
|
|
||||||
if (this.push(chunk, encoding)) {
|
|
||||||
callback();
|
|
||||||
} else {
|
|
||||||
this.once('drain', () => {
|
|
||||||
callback();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -3,7 +3,7 @@ import { Duplex, type DuplexOptions } from 'stream';
|
|||||||
|
|
||||||
export interface IStreamTools {
|
export interface IStreamTools {
|
||||||
truncate: () => void;
|
truncate: () => void;
|
||||||
push: (pipeObject: any) => void;
|
push: (pipeObject: any) => Promise<boolean>;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface IStreamWriteFunction<T, rT> {
|
export interface IStreamWriteFunction<T, rT> {
|
||||||
@ -15,13 +15,30 @@ export interface IStreamFinalFunction<rT> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export interface ISmartDuplexOptions<TInput, TOutput> extends DuplexOptions {
|
export interface ISmartDuplexOptions<TInput, TOutput> extends DuplexOptions {
|
||||||
|
/**
|
||||||
|
* wether to print debug logs
|
||||||
|
*/
|
||||||
debug?: boolean;
|
debug?: boolean;
|
||||||
|
/**
|
||||||
|
* the name of the stream
|
||||||
|
*/
|
||||||
name?: string;
|
name?: string;
|
||||||
handleBackpressure?: boolean;
|
/**
|
||||||
|
* a function that is being called to read more stuff from whereever to be processed by the stream
|
||||||
|
* @returns
|
||||||
|
*/
|
||||||
readFunction?: () => Promise<void>;
|
readFunction?: () => Promise<void>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* the write function is called for every chunk that is being written to the stream
|
||||||
|
* it can push or return chunks (but does not have to) to be written to the readable side of the stream
|
||||||
|
*/
|
||||||
writeFunction?: IStreamWriteFunction<TInput, TOutput>;
|
writeFunction?: IStreamWriteFunction<TInput, TOutput>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* a final function that is run at the end of the stream
|
||||||
|
*/
|
||||||
finalFunction?: IStreamFinalFunction<TOutput>;
|
finalFunction?: IStreamFinalFunction<TOutput>;
|
||||||
// Add other custom options if necessary
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
||||||
@ -36,7 +53,7 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// INSTANCE
|
// INSTANCE
|
||||||
private backpressuredArray = new plugins.lik.BackpressuredArray<TOutput>();
|
private backpressuredArray: plugins.lik.BackpressuredArray<TOutput>;
|
||||||
public options: ISmartDuplexOptions<TInput, TOutput>;
|
public options: ISmartDuplexOptions<TInput, TOutput>;
|
||||||
private observableSubscription?: plugins.smartrx.rxjs.Subscription;
|
private observableSubscription?: plugins.smartrx.rxjs.Subscription;
|
||||||
private debugLog(messageArg: string) {
|
private debugLog(messageArg: string) {
|
||||||
@ -46,8 +63,11 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
|||||||
}
|
}
|
||||||
|
|
||||||
constructor(optionsArg?: ISmartDuplexOptions<TInput, TOutput>) {
|
constructor(optionsArg?: ISmartDuplexOptions<TInput, TOutput>) {
|
||||||
super(optionsArg);
|
super(Object.assign({
|
||||||
|
highWaterMark: 1,
|
||||||
|
}, optionsArg));
|
||||||
this.options = optionsArg;
|
this.options = optionsArg;
|
||||||
|
this.backpressuredArray = new plugins.lik.BackpressuredArray<TOutput>(this.options.highWaterMark || 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
public async _read(size: number): Promise<void> {
|
public async _read(size: number): Promise<void> {
|
||||||
@ -60,12 +80,20 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
|||||||
let canPushMore = true;
|
let canPushMore = true;
|
||||||
while(this.backpressuredArray.data.length > 0 && canPushMore) {
|
while(this.backpressuredArray.data.length > 0 && canPushMore) {
|
||||||
const nextChunk = this.backpressuredArray.shift();
|
const nextChunk = this.backpressuredArray.shift();
|
||||||
if (nextChunk) {
|
canPushMore = this.push(nextChunk);
|
||||||
canPushMore = this.push(nextChunk);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async backpressuredPush (pushArg: TOutput) {
|
||||||
|
const canPushMore = this.backpressuredArray.push(pushArg);
|
||||||
|
if (!canPushMore) {
|
||||||
|
this.debugLog(`${this.options.name}: cannot push more`);
|
||||||
|
await this.backpressuredArray.waitForSpace();
|
||||||
|
this.debugLog(`${this.options.name}: can push more again`);
|
||||||
|
}
|
||||||
|
return canPushMore;
|
||||||
|
};
|
||||||
|
|
||||||
private asyncWritePromiseObjectmap = new plugins.lik.ObjectMap<Promise<any>>();
|
private asyncWritePromiseObjectmap = new plugins.lik.ObjectMap<Promise<any>>();
|
||||||
// Ensure the _write method types the chunk as TInput and encodes TOutput
|
// Ensure the _write method types the chunk as TInput and encodes TOutput
|
||||||
public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) {
|
public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) {
|
||||||
@ -80,9 +108,9 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
|||||||
isTruncated = true;
|
isTruncated = true;
|
||||||
callback();
|
callback();
|
||||||
},
|
},
|
||||||
push: (pushArg: TOutput) => {
|
push: async (pushArg: TOutput) => {
|
||||||
this.backpressuredArray.push(pushArg);
|
return await this.backpressuredPush(pushArg);
|
||||||
},
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -93,12 +121,7 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (modifiedChunk) {
|
if (modifiedChunk) {
|
||||||
const canPushMore = this.backpressuredArray.push(modifiedChunk);
|
await tools.push(modifiedChunk);
|
||||||
if (!canPushMore) {
|
|
||||||
this.debugLog(`${this.options.name}: cannot push more`);
|
|
||||||
await this.backpressuredArray.waitForSpace();
|
|
||||||
this.debugLog(`${this.options.name}: can push more again`);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
callback();
|
callback();
|
||||||
writeDeferred.resolve();
|
writeDeferred.resolve();
|
||||||
@ -115,21 +138,23 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
|
|||||||
if (this.options.finalFunction) {
|
if (this.options.finalFunction) {
|
||||||
const tools: IStreamTools = {
|
const tools: IStreamTools = {
|
||||||
truncate: () => callback(),
|
truncate: () => callback(),
|
||||||
push: (pipeObject) => this.push(pipeObject),
|
push: async (pipeObject) => {
|
||||||
|
return this.backpressuredArray.push(pipeObject);
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const finalChunk = await this.options.finalFunction(tools);
|
const finalChunk = await this.options.finalFunction(tools);
|
||||||
if (finalChunk) {
|
if (finalChunk) {
|
||||||
this.push(finalChunk);
|
this.backpressuredArray.push(finalChunk);
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
this.push(null);
|
this.backpressuredArray.push(null);
|
||||||
callback(err);
|
callback(err);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.push(null);
|
this.backpressuredArray.push(null);
|
||||||
callback();
|
callback();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,4 +18,13 @@ export function createTransformFunction<TInput, TOutput>(
|
|||||||
});
|
});
|
||||||
|
|
||||||
return smartDuplexStream;
|
return smartDuplexStream;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const createPassThrough = () => {
|
||||||
|
return new SmartDuplex({
|
||||||
|
objectMode: true,
|
||||||
|
writeFunction: async (chunkArg, toolsArg) => {
|
||||||
|
return chunkArg;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -7,6 +7,7 @@ export { stream };
|
|||||||
import * as lik from '@push.rocks/lik';
|
import * as lik from '@push.rocks/lik';
|
||||||
import * as smartpromise from '@push.rocks/smartpromise';
|
import * as smartpromise from '@push.rocks/smartpromise';
|
||||||
import * as smartrx from '@push.rocks/smartrx';
|
import * as smartrx from '@push.rocks/smartrx';
|
||||||
|
import * as webstream from '@push.rocks/webstream';
|
||||||
|
|
||||||
export { lik, smartpromise, smartrx };
|
export { lik, smartpromise, smartrx, webstream };
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user