Compare commits

...

11 Commits

Author SHA1 Message Date
3e062103f8 3.0.40 2024-06-02 23:40:52 +02:00
6451e93c12 fix(smartduplex): now has a .getWebStreams method, that exposes a web streams compatible API 2024-06-02 23:40:52 +02:00
70cf93595c 3.0.39 2024-06-02 16:42:42 +02:00
17e03e9790 fix(core): update 2024-06-02 16:42:42 +02:00
e52ce7af61 update description 2024-05-29 14:16:38 +02:00
f548f4b6cb 3.0.38 2024-05-17 19:21:34 +02:00
23a7a77a73 fix(core): update 2024-05-17 19:21:33 +02:00
13d2fc78b8 3.0.37 2024-05-17 18:40:33 +02:00
898cc0407d fix(core): update 2024-05-17 18:40:32 +02:00
8a3f43a11a 3.0.36 2024-05-17 18:13:52 +02:00
da2191bb96 fix(core): update 2024-05-17 18:13:51 +02:00
20 changed files with 5283 additions and 2990 deletions

View File

@ -1,140 +0,0 @@
# gitzone ci_default
image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
cache:
paths:
- .npmci_cache/
key: '$CI_BUILD_STAGE'
stages:
- security
- test
- release
- metadata
before_script:
- npm install -g @shipzone/npmci
# ====================
# security stage
# ====================
mirror:
stage: security
script:
- npmci git mirror
only:
- tags
tags:
- lossless
- docker
- notpriv
auditProductionDependencies:
image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
stage: security
script:
- npmci npm prepare
- npmci command npm install --production --ignore-scripts
- npmci command npm config set registry https://registry.npmjs.org
- npmci command npm audit --audit-level=high --only=prod --production
tags:
- docker
allow_failure: true
auditDevDependencies:
image: registry.gitlab.com/hosttoday/ht-docker-node:npmci
stage: security
script:
- npmci npm prepare
- npmci command npm install --ignore-scripts
- npmci command npm config set registry https://registry.npmjs.org
- npmci command npm audit --audit-level=high --only=dev
tags:
- docker
allow_failure: true
# ====================
# test stage
# ====================
testStable:
stage: test
script:
- npmci npm prepare
- npmci node install stable
- npmci npm install
- npmci npm test
coverage: /\d+.?\d+?\%\s*coverage/
tags:
- docker
testBuild:
stage: test
script:
- npmci npm prepare
- npmci node install stable
- npmci npm install
- npmci command npm run build
coverage: /\d+.?\d+?\%\s*coverage/
tags:
- docker
release:
stage: release
script:
- npmci node install stable
- npmci npm publish
only:
- tags
tags:
- lossless
- docker
- notpriv
# ====================
# metadata stage
# ====================
codequality:
stage: metadata
allow_failure: true
only:
- tags
script:
- npmci command npm install -g typescript
- npmci npm prepare
- npmci npm install
tags:
- lossless
- docker
- priv
trigger:
stage: metadata
script:
- npmci trigger
only:
- tags
tags:
- lossless
- docker
- notpriv
pages:
stage: metadata
script:
- npmci node install lts
- npmci command npm install -g @git.zone/tsdoc
- npmci npm prepare
- npmci npm install
- npmci command tsdoc
tags:
- lossless
- docker
- notpriv
only:
- tags
artifacts:
expire_in: 1 week
paths:
- public
allow_failure: true

View File

@ -1 +0,0 @@
console.log('Hello from deno');

View File

@ -9,27 +9,27 @@
"githost": "code.foss.global", "githost": "code.foss.global",
"gitscope": "push.rocks", "gitscope": "push.rocks",
"gitrepo": "smartstream", "gitrepo": "smartstream",
"description": "simplifies access to node streams", "description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
"npmPackagename": "@push.rocks/smartstream", "npmPackagename": "@push.rocks/smartstream",
"license": "MIT", "license": "MIT",
"keywords": [ "keywords": [
"stream",
"node.js", "node.js",
"streams", "typescript",
"stream manipulation", "stream manipulation",
"pipeline",
"data processing", "data processing",
"pipeline",
"async transformation", "async transformation",
"event handling", "event handling",
"backpressure management", "backpressure",
"readable streams", "readable stream",
"writable streams", "writable stream",
"duplex streams", "duplex stream",
"transform streams", "transform stream",
"file streaming", "file streaming",
"buffer streams", "buffer",
"stream utilities", "stream utilities",
"stream intake", "esm"
"stream output"
] ]
} }
}, },

View File

@ -1,39 +1,40 @@
{ {
"name": "@push.rocks/smartstream", "name": "@push.rocks/smartstream",
"version": "3.0.35", "version": "3.0.40",
"private": false, "private": false,
"description": "simplifies access to node streams", "description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.",
"main": "dist_ts/index.js",
"typings": "dist_ts/index.d.ts",
"type": "module", "type": "module",
"exports": {
".": "./dist_ts/index.js",
"./web": "./dist_ts_web/index.js"
},
"scripts": { "scripts": {
"test": "(tstest test/)", "test": "(tstest test/)",
"build": "(tsbuild)", "build": "(tsbuild tsfolders --web --allowimplicitany)"
"buildDocs": "tsdoc"
}, },
"repository": { "repository": {
"type": "git", "type": "git",
"url": "git+https://gitlab.com/push.rocks/smartstream.git" "url": "https://code.foss.global/push.rocks/smartstream.git"
}, },
"author": "Lossless GmbH", "author": "Lossless GmbH",
"license": "MIT", "license": "MIT",
"bugs": { "bugs": {
"url": "https://gitlab.com/push.rocks/smartstream/issues" "url": "https://gitlab.com/push.rocks/smartstream/issues"
}, },
"homepage": "https://gitlab.com/push.rocks/smartstream#readme", "homepage": "https://code.foss.global/push.rocks/smartstream",
"devDependencies": { "devDependencies": {
"@git.zone/tsbuild": "^2.1.72", "@git.zone/tsbuild": "^2.1.80",
"@git.zone/tsrun": "^1.2.44", "@git.zone/tsrun": "^1.2.44",
"@git.zone/tstest": "^1.0.88", "@git.zone/tstest": "^1.0.90",
"@push.rocks/smartfile": "^11.0.4", "@push.rocks/smartfile": "^11.0.15",
"@push.rocks/tapbundle": "^5.0.17", "@push.rocks/tapbundle": "^5.0.23",
"@types/node": "^20.11.28" "@types/node": "^20.12.12"
}, },
"dependencies": { "dependencies": {
"@push.rocks/lik": "^6.0.14", "@push.rocks/lik": "^6.0.15",
"@push.rocks/smartenv": "^5.0.12",
"@push.rocks/smartpromise": "^4.0.3", "@push.rocks/smartpromise": "^4.0.3",
"@push.rocks/smartrx": "^3.0.7", "@push.rocks/smartrx": "^3.0.7"
"@push.rocks/webstream": "^1.0.8"
}, },
"browserslist": [ "browserslist": [
"last 1 chrome versions" "last 1 chrome versions"
@ -51,22 +52,22 @@
"readme.md" "readme.md"
], ],
"keywords": [ "keywords": [
"stream",
"node.js", "node.js",
"streams", "typescript",
"stream manipulation", "stream manipulation",
"pipeline",
"data processing", "data processing",
"pipeline",
"async transformation", "async transformation",
"event handling", "event handling",
"backpressure management", "backpressure",
"readable streams", "readable stream",
"writable streams", "writable stream",
"duplex streams", "duplex stream",
"transform streams", "transform stream",
"file streaming", "file streaming",
"buffer streams", "buffer",
"stream utilities", "stream utilities",
"stream intake", "esm"
"stream output"
] ]
} }

7505
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@ -1 +1 @@
- make sure to respect backpressure handling.

242
readme.md
View File

@ -1,5 +1,6 @@
```markdown
# @push.rocks/smartstream # @push.rocks/smartstream
simplifies access to node streams A TypeScript library to simplify the creation and manipulation of Node.js streams, providing utilities for transform, duplex, and readable/writable stream handling while managing backpressure effectively.
## Install ## Install
To install `@push.rocks/smartstream`, you can use npm or yarn as follows: To install `@push.rocks/smartstream`, you can use npm or yarn as follows:
@ -14,7 +15,7 @@ This will add `@push.rocks/smartstream` to your project's dependencies.
## Usage ## Usage
The `@push.rocks/smartstream` module is designed to simplify working with Node.js streams by providing a set of utilities for creating and manipulating streams. This module makes heavy use of TypeScript for improved code quality, readability, and maintenance. ESM syntax is utilized throughout the examples. The `@push.rocks/smartstream` module is designed to simplify working with Node.js streams by providing a set of utilities for creating and manipulating streams. This module makes extensive use of TypeScript for improved code quality, readability, and maintenance. ESM syntax is utilized throughout the examples.
### Importing the Module ### Importing the Module
@ -24,6 +25,12 @@ Start by importing the module into your TypeScript file:
import * as smartstream from '@push.rocks/smartstream'; import * as smartstream from '@push.rocks/smartstream';
``` ```
For a more specific import, you may do the following:
```typescript
import { SmartDuplex, StreamWrapper, StreamIntake, createTransformFunction, createPassThrough } from '@push.rocks/smartstream';
```
### Creating Basic Transform Streams ### Creating Basic Transform Streams
The module provides utilities for creating transform streams. For example, to create a transform stream that modifies chunks of data, you can use the `createTransformFunction` utility: The module provides utilities for creating transform streams. For example, to create a transform stream that modifies chunks of data, you can use the `createTransformFunction` utility:
@ -58,7 +65,7 @@ const processDataDuplex = new SmartDuplex({
sourceStream.pipe(processDataDuplex).pipe(destinationStream); sourceStream.pipe(processDataDuplex).pipe(destinationStream);
``` ```
### Stream Combiners ### Combining Multiple Streams
`Smartstream` facilitates easy combining of multiple streams into a single pipeline, handling errors and cleanup automatically. Here's how you can combine multiple streams: `Smartstream` facilitates easy combining of multiple streams into a single pipeline, handling errors and cleanup automatically. Here's how you can combine multiple streams:
@ -101,7 +108,7 @@ Consider a scenario where you need to process a large CSV file, transform the da
```typescript ```typescript
import { SmartDuplex, createTransformFunction } from '@push.rocks/smartstream'; import { SmartDuplex, createTransformFunction } from '@push.rocks/smartstream';
import fs from 'fs'; import fs from 'fs';
import csvParser from 'csv-parser'; // Assume this is a CSV parsing library import csvParser from 'csv-parser';
const csvReadTransform = createTransformFunction<any, any>(async (row) => { const csvReadTransform = createTransformFunction<any, any>(async (row) => {
// Process row // Process row
@ -121,11 +128,232 @@ fs.createReadStream('path/to/largeFile.csv')
This example demonstrates reading a large CSV file, transforming each row with `createTransformFunction`, and using a `SmartDuplex` to manage the processed data flow efficiently, ensuring no data is lost due to backpressure issues. This example demonstrates reading a large CSV file, transforming each row with `createTransformFunction`, and using a `SmartDuplex` to manage the processed data flow efficiently, ensuring no data is lost due to backpressure issues.
### Conclusion ### Advanced Use Case: Backpressure Handling
`@push.rocks/smartstream` offers a robust set of tools for working with Node.js streams, providing a more intuitive and reliable way to create, manipulate, and combine streams. By leveraging TypeScript and ESM syntax, `smartstream` enables developers to build more maintainable and type-safe stream-based solutions. Effective backpressure handling is crucial when working with streams to avoid overwhelming the downstream consumers. Heres a comprehensive example that demonstrates handling backpressure in a pipeline with multiple `SmartDuplex` instances:
```typescript
import { SmartDuplex } from '@push.rocks/smartstream';
// Define the first SmartDuplex, which writes data slowly to simulate backpressure
const slowProcessingStream = new SmartDuplex({
name: 'SlowProcessor',
objectMode: true,
writeFunction: async (chunk, { push }) => {
await new Promise(resolve => setTimeout(resolve, 100)); // Simulated delay
console.log('Processed chunk:', chunk);
push(chunk);
}
});
// Define the second SmartDuplex as a fast processor
const fastProcessingStream = new SmartDuplex({
name: 'FastProcessor',
objectMode: true,
writeFunction: async (chunk, { push }) => {
console.log('Fast processing chunk:', chunk);
push(chunk);
}
});
// Create a StreamIntake to dynamically handle incoming data
const streamIntake = new StreamIntake<string>();
// Chain the streams together and handle the backpressure scenario
streamIntake
.pipe(fastProcessingStream)
.pipe(slowProcessingStream)
.pipe(createPassThrough()) // Use Pass-Through to provide intermediary handling
.on('data', data => console.log('Final output:', data))
.on('error', error => console.error('Stream encountered an error:', error));
// Simulate data pushing with intervals to observe backpressure handling
let counter = 0;
const interval = setInterval(() => {
if (counter >= 10) {
streamIntake.signalEnd();
clearInterval(interval);
} else {
streamIntake.pushData(`Chunk ${counter}`);
counter++;
}
}, 50);
```
In this advanced use case, a `SlowProcessor` and `FastProcessor` are created using `SmartDuplex`, simulating a situation where one stream is slower than another. The `StreamIntake` dynamically handles incoming chunks of data and the intermediary Pass-Through handles any potential interruptions.
### Transform Streams in Parallel
For scenarios where you need to process data in parallel:
```typescript
import { SmartDuplex, createTransformFunction } from '@push.rocks/smartstream';
const parallelTransform = createTransformFunction<any, any>(async (chunk) => {
// Parallel Processing
const results = await Promise.all(chunk.map(async item => await processItem(item)));
return results;
});
const streamIntake = new StreamIntake<any[]>();
streamIntake
.pipe(parallelTransform)
.pipe(new SmartDuplex({
async writeFunction(chunk, { push }) {
console.log('Processed parallel chunk:', chunk);
push(chunk);
}
}))
.on('finish', () => console.log('Parallel processing completed.'));
// Simulate data pushing
streamIntake.pushData([1, 2, 3, 4]);
streamIntake.pushData([5, 6, 7, 8]);
streamIntake.signalEnd();
```
### Error Handling in Stream Pipelines
Error handling is an essential part of working with streams. The `StreamWrapper` assists in combining multiple streams while managing errors seamlessly:
```typescript
import { StreamWrapper } from '@push.rocks/smartstream';
const faultyStream = new SmartDuplex({
async writeFunction(chunk, { push }) {
if (chunk === 'bad data') {
throw new Error('Faulty data encountered');
}
push(chunk);
}
});
const readStream = new StreamIntake<string>();
const writeStream = new SmartDuplex({
async writeFunction(chunk) {
console.log('Written chunk:', chunk);
}
});
const combinedStream = new StreamWrapper([readStream, faultyStream, writeStream]);
combinedStream.run()
.then(() => console.log('Stream processing completed.'))
.catch(err => console.error('Stream error:', err.message));
// Push Data
readStream.pushData('good data');
readStream.pushData('bad data'); // This will throw an error
readStream.pushData('more good data');
readStream.signalEnd();
```
### Testing Streams
Here's an example test case using the `tap` testing framework to verify the integrity of the `SmartDuplex` from a buffer:
```typescript
import { expect, tap } from '@push.rocks/tapbundle';
import { SmartDuplex } from '@push.rocks/smartstream';
tap.test('should create a SmartStream from a Buffer', async () => {
const bufferData = Buffer.from('This is a test buffer');
const smartStream = SmartDuplex.fromBuffer(bufferData, {});
let receivedData = Buffer.alloc(0);
return new Promise<void>((resolve) => {
smartStream.on('data', (chunk: Buffer) => {
receivedData = Buffer.concat([receivedData, chunk]);
});
smartStream.on('end', () => {
expect(receivedData.toString()).toEqual(bufferData.toString());
resolve();
});
});
});
tap.start();
```
### Working with Files and Buffers
You can easily stream files and buffers with `smartstream`. Heres a test illustrating reading and writing with file streams using `smartfile` combined with `smartstream` utilities:
```typescript
import { tap } from '@push.rocks/tapbundle';
import * as smartfile from '@push.rocks/smartfile';
import { SmartDuplex, StreamWrapper } from '@push.rocks/smartstream';
tap.test('should handle file read and write streams', async () => {
const readStream = smartfile.fsStream.createReadStream('./test/assets/readabletext.txt');
const writeStream = smartfile.fsStream.createWriteStream('./test/assets/writabletext.txt');
const transformStream = new SmartDuplex({
async writeFunction(chunk, { push }) {
const transformedChunk = chunk.toString().toUpperCase();
push(transformedChunk);
}
});
const streamWrapper = new StreamWrapper([readStream, transformStream, writeStream]);
await streamWrapper.run();
const outputContent = await smartfile.fs.promises.readFile('./test/assets/writabletext.txt', 'utf-8');
console.log('Output Content:', outputContent);
});
tap.start();
```
### Modular and Scoped Transformations
Creating modular and scoped transformations is straightforward with `SmartDuplex`:
```typescript
import { SmartDuplex } from '@push.rocks/smartstream';
type DataChunk = {
id: number;
data: string;
};
const transformationStream1 = new SmartDuplex<DataChunk, DataChunk>({
async writeFunction(chunk, { push }) {
chunk.data = chunk.data.toUpperCase();
push(chunk);
}
})
const transformationStream2 = new SmartDuplex<DataChunk, DataChunk>({
async writeFunction(chunk, { push }) {
chunk.data = `${chunk.data} processed with transformation 2`;
push(chunk);
}
});
const initialData: DataChunk[] = [
{ id: 1, data: 'first' },
{ id: 2, data: 'second' }
];
const intakeStream = new StreamIntake<DataChunk>();
intakeStream
.pipe(transformationStream1)
.pipe(transformationStream2)
.on('data', data => console.log('Transformed Data:', data));
initialData.forEach(item => intakeStream.pushData(item));
intakeStream.signalEnd();
```
By leveraging `SmartDuplex`, `StreamWrapper`, and `StreamIntake`, you can streamline and enhance your data transformation pipelines in Node.js with a clear, efficient, and backpressure-friendly approach.
```
For more detailed examples and documentation, visit the [GitLab Repository](https://gitlab.com/push.rocks/smartstream) or the [GitHub Mirror](https://github.com/pushrocks/smartstream).
## License and Legal Information ## License and Legal Information

70
test/test.ts_web.both.ts Normal file
View File

@ -0,0 +1,70 @@
import { expect, expectAsync, tap } from '@push.rocks/tapbundle';
import * as webstream from '../ts_web/index.js';
tap.test('WebDuplexStream', async (toolsArg) => {
const testDone = toolsArg.defer(); // Create a deferred object to control test completion.
const inputUint8Array = new Uint8Array([1, 2, 3, 4, 5]);
const stream = webstream.WebDuplexStream.fromUInt8Array(inputUint8Array);
const reader = stream.readable.getReader();
let readUint8Array = new Uint8Array();
reader.read().then(function processText({ done, value }) {
if (done) {
expect(readUint8Array).toEqual(inputUint8Array);
testDone.resolve(); // Correctly signal that the test is done.
return;
}
readUint8Array = new Uint8Array([...readUint8Array, ...value]);
return reader.read().then(processText);
});
return testDone.promise; // Return the promise to properly wait for the test to complete.
});
tap.test('should handle transform with a write function', async (toolsArg) => {
const testDone = toolsArg.defer();
const input = [1, 2, 3, 4, 5];
const expectedOutput = [2, 4, 6, 8, 10];
const transformStream = new webstream.WebDuplexStream<number, number>({
writeFunction: (chunk, { push }) => {
push(chunk * 2); // Push the doubled number into the stream
return Promise.resolve(); // Resolve the promise immediately
},
});
const writableStream = transformStream.writable.getWriter();
const readableStream = transformStream.readable.getReader();
const output: number[] = [];
// Process the text and resolve the test once done.
const processText = async ({ done, value }) => {
if (done) {
expect(output).toEqual(expectedOutput);
testDone.resolve(); // Resolve the deferred test once all values have been read.
return;
}
if (value !== undefined) {
output.push(value);
}
// Continue reading and processing.
await readableStream.read().then(processText);
};
// Start the read process before writing to the stream.
readableStream.read().then(processText);
// Sequentially write to the stream and close when done.
for (const num of input) {
await writableStream.write(num);
}
await writableStream.close();
return testDone.promise; // This will wait until the testDone is resolved before completing the test.
});
tap.start();

View File

@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartstream', name: '@push.rocks/smartstream',
version: '3.0.35', version: '3.0.40',
description: 'simplifies access to node streams' description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
} }

View File

@ -157,4 +157,53 @@ export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
this.backpressuredArray.push(null); this.backpressuredArray.push(null);
callback(); callback();
} }
public getWebStreams(): { readable: ReadableStream, writable: WritableStream } {
const duplex = this;
const readable = new ReadableStream({
start(controller) {
duplex.on('readable', () => {
let chunk;
while (null !== (chunk = duplex.read())) {
controller.enqueue(chunk);
}
});
duplex.on('end', () => {
controller.close();
});
},
cancel(reason) {
duplex.destroy(new Error(reason));
}
});
const writable = new WritableStream({
write(chunk) {
return new Promise<void>((resolve, reject) => {
const isBackpressured = !duplex.write(chunk, (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
if (isBackpressured) {
duplex.once('drain', resolve);
}
});
},
close() {
return new Promise<void>((resolve, reject) => {
duplex.end(resolve);
});
},
abort(reason) {
duplex.destroy(new Error(reason));
}
});
return { readable, writable };
}
} }

View File

@ -14,7 +14,7 @@ export class StreamIntake<T> extends plugins.stream.Readable {
_read(size: number): void { _read(size: number): void {
// console.log('get next'); // console.log('get next');
const pushChunk = (): void => { const pushChunk = (): void => {
if (this.chunkStore.length > 0) { while (this.chunkStore.length > 0) {
// If push returns false, then we should stop reading // If push returns false, then we should stop reading
if (!this.push(this.chunkStore.shift())) { if (!this.push(this.chunkStore.shift())) {
return; return;

View File

@ -7,7 +7,7 @@ export { stream };
import * as lik from '@push.rocks/lik'; import * as lik from '@push.rocks/lik';
import * as smartpromise from '@push.rocks/smartpromise'; import * as smartpromise from '@push.rocks/smartpromise';
import * as smartrx from '@push.rocks/smartrx'; import * as smartrx from '@push.rocks/smartrx';
import * as webstream from '@push.rocks/webstream'; import * as webstream from '../dist_ts_web/index.js';
export { lik, smartpromise, smartrx, webstream }; export { lik, smartpromise, smartrx, webstream };

View File

@ -0,0 +1,8 @@
/**
* autocreated commitinfo by @pushrocks/commitinfo
*/
export const commitinfo = {
name: '@push.rocks/smartstream',
version: '3.0.40',
description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.'
}

View File

@ -0,0 +1,156 @@
import * as plugins from './plugins.js';
// ========================================
// READ
// ========================================
export interface IStreamToolsRead<TInput, TOutput> {
done: () => void;
write: (writeArg: TInput) => void;
}
/**
* the read function is called anytime
* -> the WebDuplexStream is being read from
* and at the same time if nothing is enqueued
*/
export interface IStreamReadFunction<TInput, TOutput> {
(toolsArg: IStreamToolsRead<TInput, TOutput>): Promise<void>;
}
// ========================================
// WRITE
// ========================================
export interface IStreamToolsWrite<TInput, TOutput> {
truncate: () => void;
push: (pushArg: TOutput) => void;
}
/**
* the write function can return something.
* It is called anytime a chunk is written to the stream.
*/
export interface IStreamWriteFunction<TInput, TOutput> {
(chunkArg: TInput, toolsArg: IStreamToolsWrite<TInput, TOutput>): Promise<any>;
}
export interface IStreamFinalFunction<TInput, TOutput> {
(toolsArg: IStreamToolsWrite<TInput, TOutput>): Promise<TOutput>;
}
export interface WebDuplexStreamOptions<TInput, TOutput> {
readFunction?: IStreamReadFunction<TInput, TOutput>;
writeFunction?: IStreamWriteFunction<TInput, TOutput>;
finalFunction?: IStreamFinalFunction<TInput, TOutput>;
}
export class WebDuplexStream<TInput = any, TOutput = any> extends TransformStream<TInput, TOutput> {
static fromUInt8Array(uint8Array: Uint8Array): WebDuplexStream<Uint8Array, Uint8Array> {
const stream = new WebDuplexStream<Uint8Array, Uint8Array>({
writeFunction: async (chunk, { push }) => {
push(chunk); // Directly push the chunk as is
return null;
}
});
const writer = stream.writable.getWriter();
writer.write(uint8Array).then(() => writer.close());
return stream;
}
// INSTANCE
options: WebDuplexStreamOptions<TInput, TOutput>;
constructor(optionsArg: WebDuplexStreamOptions<TInput, TOutput>) {
super({
async transform(chunk, controller) {
// Transformation logic remains unchanged
if (optionsArg?.writeFunction) {
const tools: IStreamToolsWrite<TInput, TOutput> = {
truncate: () => controller.terminate(),
push: (pushArg: TOutput) => controller.enqueue(pushArg),
};
optionsArg.writeFunction(chunk, tools)
.then(writeReturnChunk => {
// the write return chunk is optional
// just in case the write function returns something other than void.
if (writeReturnChunk) {
controller.enqueue(writeReturnChunk);
}
})
.catch(err => controller.error(err));
} else {
controller.error(new Error('No write function provided'));
}
},
async flush(controller) {
// Flush logic remains unchanged
if (optionsArg?.finalFunction) {
const tools: IStreamToolsWrite<TInput, TOutput> = {
truncate: () => controller.terminate(),
push: (pipeObject) => controller.enqueue(pipeObject),
};
optionsArg.finalFunction(tools)
.then(finalChunk => {
if (finalChunk) {
controller.enqueue(finalChunk);
}
})
.catch(err => controller.error(err))
.finally(() => controller.terminate());
} else {
controller.terminate();
}
}
});
this.options = optionsArg;
}
// Method to create a custom readable stream that integrates the readFunction
// readFunction is executed whenever the stream is being read from and nothing is enqueued
getCustomReadableStream() {
const readableStream = this.readable;
const options = this.options;
const customReadable = new ReadableStream({
async pull(controller) {
const reader = readableStream.getReader();
// Check the current state of the original stream
const { value, done } = await reader.read();
reader.releaseLock();
if (done) {
// If the original stream is done, close the custom readable stream
controller.close();
} else {
if (value) {
// If there is data in the original stream, enqueue it and do not execute the readFunction
controller.enqueue(value);
} else if (options.readFunction) {
// If the original stream is empty, execute the readFunction and read again
await options.readFunction({
done: () => controller.close(),
write: (writeArg) => controller.enqueue(writeArg),
});
const newReader = readableStream.getReader();
const { value: newValue, done: newDone } = await newReader.read();
newReader.releaseLock();
if (newDone) {
controller.close();
} else {
controller.enqueue(newValue);
}
}
}
}
});
return customReadable;
}
}

2
ts_web/index.ts Normal file
View File

@ -0,0 +1,2 @@
import './plugins.js';
export * from './classes.webduplexstream.js';

15
ts_web/plugins.ts Normal file
View File

@ -0,0 +1,15 @@
// @push.rocks scope
import * as smartenv from '@push.rocks/smartenv';
export {
smartenv,
}
// lets setup dependencies
const smartenvInstance = new smartenv.Smartenv();
await smartenvInstance.getSafeNodeModule<typeof import('stream/web')>('stream/web', async (moduleArg) => {
globalThis.ReadableStream = moduleArg.ReadableStream;
globalThis.WritableStream = moduleArg.WritableStream;
globalThis.TransformStream = moduleArg.TransformStream;
})