diff --git a/changelog.md b/changelog.md index 8b6f77a..7d6ce1f 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,14 @@ # Changelog +## 2026-03-02 - 3.4.0 - feat(smartduplex) +improve backpressure handling and web/node stream interoperability + +- Refactored SmartDuplex to use synchronous _read/_write/_final (avoids async pitfalls), added internal backpressured buffer draining and consumer signaling +- Implemented pull-based backpressure for Node <-> Web stream conversions (nodewebhelpers and convertNodeReadableToWebReadable/convertWebReadableToNodeReadable) +- StreamIntake.fromStream now reads from 'readable' and drains properly; StreamWrapper resolves safely on end/close/finish +- Improved getWebStreams / WebDuplexStream behavior (safer enqueue/close/abort handling, final/readFunction improvements) +- Added many new unit tests covering backpressure, web/node helpers, StreamIntake, StreamWrapper, WebDuplexStream; bumped @push.rocks/lik and @types/node versions + ## 2026-02-28 - 3.3.0 - feat(smartstream) bump dependencies, update build/publish config, refactor tests, and overhaul documentation diff --git a/package.json b/package.json index 81eacd4..a45c128 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,7 @@ "./web": "./dist_ts_web/index.js" }, "scripts": { - "test": "(tstest test/)", + "test": "(tstest test/ --verbose --logfile --timeout 60)", "build": "(tsbuild tsfolders --allowimplicitany)" }, "repository": { @@ -27,10 +27,10 @@ "@git.zone/tsrun": "^2.0.1", "@git.zone/tstest": "^3.1.8", "@push.rocks/tapbundle": "^6.0.3", - "@types/node": "^25.3.2" + "@types/node": "^25.3.3" }, "dependencies": { - "@push.rocks/lik": "^6.2.2", + "@push.rocks/lik": "^6.3.1", "@push.rocks/smartenv": "^6.0.0", "@push.rocks/smartpromise": "^4.2.3", "@push.rocks/smartrx": "^3.0.10" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4841b8b..e4a6131 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -12,8 +12,8 @@ importers: .: dependencies: '@push.rocks/lik': - specifier: ^6.2.2 - version: 6.2.2 + specifier: ^6.3.1 + version: 6.3.1 '@push.rocks/smartenv': specifier: ^6.0.0 version: 6.0.0 @@ -37,8 +37,8 @@ importers: specifier: ^6.0.3 version: 6.0.3(socks@2.8.7) '@types/node': - specifier: ^25.3.2 - version: 25.3.2 + specifier: ^25.3.3 + version: 25.3.3 packages: @@ -672,8 +672,8 @@ packages: '@push.rocks/levelcache@3.2.0': resolution: {integrity: sha512-Ch0Oguta2I0SVi704kHghhBcgfyfS92ua1elRu9d8X1/9LMRYuqvvBAnyXyFxQzI3S8q8QC6EkRdd8CAAYSzRg==} - '@push.rocks/lik@6.2.2': - resolution: {integrity: sha512-j64FFPPyMXeeUorjKJVF6PWaJUfiIrF3pc41iJH4lOh0UUpBAHpcNzHVxTR58orwbVA/h3Hz+DQd4b1Rq0dFDQ==} + '@push.rocks/lik@6.3.1': + resolution: {integrity: sha512-UWDwGBaVx5yPtAFXqDDBtQZCzETUOA/7myQIXb+YBsuiIw4yQuhNZ23uY2ChQH2Zn6DLqdNSgQcYC0WywMZBNQ==} '@push.rocks/mongodump@1.1.0': resolution: {integrity: sha512-kW0ZUGyf1e4nwloVwBQjNId+MzgTcNS834C+RxH21i1NqyOubbpWZtJtPP+K+s35nSJRyCTy3ICfBMdDBTAm2w==} @@ -1595,14 +1595,11 @@ packages: '@types/node-forge@1.3.14': resolution: {integrity: sha512-mhVF2BnD4BO+jtOp7z1CdzaK4mbuK0LLQYAvdOLqHTavxFNq4zA1EmYkpnFjP8HOUzedfQkRnp0E2ulSAYSzAw==} - '@types/node@20.12.12': - resolution: {integrity: sha512-eWLDGF/FOSPtAvEqeRAQ4C8LSA7M1I7i0ky1I8U7kD1J5ITyW3AsRhQrKVoWf5pFKZ2kILsEGJhsI9r93PYnOw==} - '@types/node@22.19.13': resolution: {integrity: sha512-akNQMv0wW5uyRpD2v2IEyRSZiR+BeGuoB6L310EgGObO44HSMNT8z1xzio28V8qOrgYaopIDNA18YgdXd+qTiw==} - '@types/node@25.3.2': - resolution: {integrity: sha512-RpV6r/ij22zRRdyBPcxDeKAzH43phWVKEjL2iksqo1Vz3CuBUrgmPpPhALKiRfU7OMCmeeO9vECBMsV0hMTG8Q==} + '@types/node@25.3.3': + resolution: {integrity: sha512-DpzbrH7wIcBaJibpKo9nnSQL0MTRdnWttGyE5haGwK86xgMOkFLp7vEyfQPGLOJh5wNYiJ3V9PmUMDhV9u8kkQ==} '@types/parse5@6.0.3': resolution: {integrity: sha512-SuT16Q1K51EAVPz1K29DJ/sXjhSQ0zjvsypYJ6tlwVsRV9jwW5Adq2ch8Dq8kDBCkYnELS7N7VNCSB5nC56t/g==} @@ -4281,9 +4278,6 @@ packages: unbzip2-stream@1.4.3: resolution: {integrity: sha512-mlExGW4w71ebDJviH16lQLtZS32VKqsSfk80GCfUlwT/4/hNRFsoscrF/c++9xinkMzECL1uL9DDwXqFWkruPg==} - undici-types@5.26.5: - resolution: {integrity: sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==} - undici-types@6.21.0: resolution: {integrity: sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==} @@ -4511,7 +4505,7 @@ snapshots: dependencies: '@api.global/typedrequest-interfaces': 3.0.19 '@push.rocks/isounique': 1.0.5 - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartbuffer': 3.0.4 '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartguard': 3.1.0 @@ -4526,7 +4520,7 @@ snapshots: '@api.global/typedsocket': 3.0.1 '@cloudflare/workers-types': 4.20260305.0 '@design.estate/dees-comms': 1.0.30 - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartchok': 1.2.0 '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartenv': 5.0.13 @@ -5071,7 +5065,7 @@ snapshots: dependencies: '@api.global/typedrequest': 3.2.6 '@design.estate/dees-comms': 1.0.30 - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartjson': 5.2.0 '@push.rocks/smartmarkdown': 3.0.3 @@ -5439,7 +5433,7 @@ snapshots: '@jest/schemas': 29.6.3 '@types/istanbul-lib-coverage': 2.0.6 '@types/istanbul-reports': 3.0.4 - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/yargs': 17.0.35 chalk: 4.1.2 @@ -5704,7 +5698,7 @@ snapshots: '@push.rocks/levelcache@3.2.0': dependencies: - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartbucket': 3.3.10 '@push.rocks/smartcache': 1.0.18 '@push.rocks/smartenv': 5.0.13 @@ -5724,7 +5718,7 @@ snapshots: - supports-color - vue - '@push.rocks/lik@6.2.2': + '@push.rocks/lik@6.3.1': dependencies: '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartmatch': 2.0.0 @@ -5737,7 +5731,7 @@ snapshots: '@push.rocks/mongodump@1.1.0(socks@2.8.7)': dependencies: - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartfile': 11.2.7 '@push.rocks/smartjson': 5.2.0 '@push.rocks/smartpath': 6.0.0 @@ -5864,7 +5858,7 @@ snapshots: '@push.rocks/smartchok@1.2.0': dependencies: - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartpromise': 4.2.3 '@push.rocks/smartrx': 3.0.10 chokidar: 5.0.0 @@ -5872,7 +5866,7 @@ snapshots: '@push.rocks/smartcli@4.0.20': dependencies: - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartlog': 3.2.1 '@push.rocks/smartobject': 1.0.12 '@push.rocks/smartpromise': 4.2.3 @@ -5896,7 +5890,7 @@ snapshots: '@push.rocks/smartdata@5.16.7(socks@2.8.7)': dependencies: - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartlog': 3.2.1 '@push.rocks/smartmongo': 2.2.0(socks@2.8.7) @@ -5954,7 +5948,7 @@ snapshots: '@push.rocks/smartexit@1.0.23': dependencies: - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartpromise': 4.2.3 tree-kill: 1.2.2 @@ -5974,7 +5968,7 @@ snapshots: '@push.rocks/smartfile@11.2.7': dependencies: - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartfile-interfaces': 1.0.7 '@push.rocks/smarthash': 3.2.6 @@ -5992,7 +5986,7 @@ snapshots: '@push.rocks/smartfile@13.1.2': dependencies: - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartfile-interfaces': 1.0.7 '@push.rocks/smartfs': 1.3.1 @@ -6033,7 +6027,7 @@ snapshots: '@push.rocks/smartinteract@2.0.16': dependencies: - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartobject': 1.0.12 '@push.rocks/smartpromise': 4.2.3 inquirer: 11.1.0 @@ -6286,7 +6280,7 @@ snapshots: '@push.rocks/smartrouter@1.3.3': dependencies: - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartrx': 3.0.10 path-to-regexp: 8.3.0 @@ -6354,7 +6348,7 @@ snapshots: '@api.global/typedserver': 3.0.80 '@push.rocks/isohash': 2.0.1 '@push.rocks/isounique': 1.0.5 - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartenv': 5.0.13 '@push.rocks/smartjson': 5.2.0 @@ -6392,7 +6386,7 @@ snapshots: '@push.rocks/smartstream@3.2.5': dependencies: - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartenv': 5.0.13 '@push.rocks/smartpromise': 4.2.3 '@push.rocks/smartrx': 3.0.10 @@ -6414,7 +6408,7 @@ snapshots: '@push.rocks/smarttime@4.0.6': dependencies: - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartpromise': 4.2.3 croner: 7.0.7 @@ -6424,7 +6418,7 @@ snapshots: '@push.rocks/smarttime@4.2.3': dependencies: - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartpromise': 4.2.3 croner: 10.0.1 @@ -6483,24 +6477,21 @@ snapshots: transitivePeerDependencies: - '@aws-sdk/credential-providers' - '@mongodb-js/zstd' - - '@nuxt/kit' - aws-crt - bare-abort-controller - bufferutil - gcp-metadata - kerberos - mongodb-client-encryption - - react - react-native-b4a - snappy - socks - supports-color - utf-8-validate - - vue '@push.rocks/taskbuffer@3.1.7': dependencies: - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartlog': 3.2.1 '@push.rocks/smartpromise': 4.2.3 @@ -6511,7 +6502,7 @@ snapshots: '@push.rocks/taskbuffer@3.5.0': dependencies: '@design.estate/dees-element': 2.1.6 - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartlog': 3.2.1 '@push.rocks/smartpromise': 4.2.3 @@ -6549,7 +6540,7 @@ snapshots: '@push.rocks/webstore@2.0.17': dependencies: '@apiglobal/typedrequest-interfaces': 2.0.1 - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartenv': 5.0.13 '@push.rocks/smartjson': 5.2.0 '@push.rocks/smartpromise': 4.2.3 @@ -6560,7 +6551,7 @@ snapshots: '@push.rocks/webstore@2.0.20': dependencies: '@api.global/typedrequest-interfaces': 3.0.19 - '@push.rocks/lik': 6.2.2 + '@push.rocks/lik': 6.3.1 '@push.rocks/smartenv': 5.0.13 '@push.rocks/smartjson': 5.2.0 '@push.rocks/smartpromise': 4.2.3 @@ -7190,14 +7181,14 @@ snapshots: '@types/accepts@1.3.7': dependencies: - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/babel__code-frame@7.0.6': {} '@types/body-parser@1.19.5': dependencies: '@types/connect': 3.4.38 - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/buffer-json@2.0.3': {} @@ -7209,17 +7200,17 @@ snapshots: '@types/clean-css@4.2.11': dependencies: - '@types/node': 20.12.12 + '@types/node': 25.3.3 source-map: 0.6.1 '@types/co-body@6.1.3': dependencies: - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/qs': 6.9.15 '@types/connect@3.4.38': dependencies: - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/content-disposition@0.5.8': {} @@ -7232,11 +7223,11 @@ snapshots: '@types/connect': 3.4.38 '@types/express': 4.17.21 '@types/keygrip': 1.0.6 - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/cors@2.8.17': dependencies: - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/debounce@1.2.4': {} @@ -7246,14 +7237,14 @@ snapshots: '@types/express-serve-static-core@4.19.0': dependencies: - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/qs': 6.9.15 '@types/range-parser': 1.2.7 '@types/send': 0.17.4 '@types/express-serve-static-core@5.1.1': dependencies: - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/qs': 6.9.15 '@types/range-parser': 1.2.7 '@types/send': 0.17.4 @@ -7274,7 +7265,7 @@ snapshots: '@types/fs-extra@11.0.4': dependencies: '@types/jsonfile': 6.1.4 - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/hast@3.0.4': dependencies: @@ -7308,7 +7299,7 @@ snapshots: '@types/jsonfile@6.1.4': dependencies: - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/keygrip@1.0.6': {} @@ -7325,7 +7316,7 @@ snapshots: '@types/http-errors': 2.0.4 '@types/keygrip': 1.0.6 '@types/koa-compose': 3.2.8 - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/mdast@4.0.4': dependencies: @@ -7343,21 +7334,17 @@ snapshots: '@types/mute-stream@0.0.4': dependencies: - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/node-forge@1.3.14': dependencies: - '@types/node': 20.12.12 - - '@types/node@20.12.12': - dependencies: - undici-types: 5.26.5 + '@types/node': 25.3.3 '@types/node@22.19.13': dependencies: undici-types: 6.21.0 - '@types/node@25.3.2': + '@types/node@25.3.3': dependencies: undici-types: 7.18.2 @@ -7375,25 +7362,25 @@ snapshots: '@types/s3rver@3.7.4': dependencies: - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/semver@7.7.1': {} '@types/send@0.17.4': dependencies: '@types/mime': 1.3.5 - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/serve-static@1.15.7': dependencies: '@types/http-errors': 2.0.4 - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/send': 0.17.4 '@types/serve-static@2.2.0': dependencies: '@types/http-errors': 2.0.4 - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/sinon-chai@3.2.12': dependencies: @@ -7412,11 +7399,11 @@ snapshots: '@types/tar-stream@3.1.4': dependencies: - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/through2@2.0.41': dependencies: - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/triple-beam@1.3.5': {} @@ -7448,11 +7435,11 @@ snapshots: '@types/ws@7.4.7': dependencies: - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/ws@8.18.1': dependencies: - '@types/node': 20.12.12 + '@types/node': 25.3.3 '@types/yargs-parser@21.0.3': {} @@ -7462,7 +7449,7 @@ snapshots: '@types/yauzl@2.10.3': dependencies: - '@types/node': 20.12.12 + '@types/node': 25.3.3 optional: true '@ungap/structured-clone@1.2.0': {} @@ -8133,7 +8120,7 @@ snapshots: dependencies: '@types/cookie': 0.4.1 '@types/cors': 2.8.17 - '@types/node': 20.12.12 + '@types/node': 25.3.3 accepts: 1.3.8 base64id: 2.0.0 cookie: 0.4.2 @@ -8928,7 +8915,7 @@ snapshots: jest-util@29.7.0: dependencies: '@jest/types': 29.6.3 - '@types/node': 20.12.12 + '@types/node': 25.3.3 chalk: 4.1.2 ci-info: 3.9.0 graceful-fs: 4.2.11 @@ -10547,8 +10534,6 @@ snapshots: buffer: 5.7.1 through: 2.3.8 - undici-types@5.26.5: {} - undici-types@6.21.0: {} undici-types@7.18.2: {} diff --git a/test/assets/writabletext.txt b/test/assets/writabletext.txt index e922c03..09b8ad4 100644 --- a/test/assets/writabletext.txt +++ b/test/assets/writabletext.txt @@ -1,50 +1,10 @@ -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow -hi+wow +data +data +data +data +data +data +data +data +data +data diff --git a/test/test.backpressure.ts b/test/test.backpressure.ts new file mode 100644 index 0000000..22234fc --- /dev/null +++ b/test/test.backpressure.ts @@ -0,0 +1,56 @@ +import { tap, expect } from '@push.rocks/tapbundle'; +import { SmartDuplex } from '../ts/index.js'; + +tap.test('Backpressure: should apply backpressure across piped streams', async (toolsArg) => { + const done = toolsArg.defer(); + + const stream1 = new SmartDuplex({ + name: 'stream1', + objectMode: true, + writeFunction: async (chunk, tools) => { + await new Promise((resolve) => setTimeout(resolve, 10)); + return chunk; + }, + }); + + const stream2 = new SmartDuplex({ + name: 'stream2', + objectMode: true, + writeFunction: async (chunk, tools) => { + await new Promise((resolve) => setTimeout(resolve, 20)); + await tools.push(chunk); + }, + }); + + const stream3 = new SmartDuplex({ + objectMode: true, + name: 'stream3', + writeFunction: async (chunk, tools) => { + await new Promise((resolve) => setTimeout(resolve, 200)); + }, + }); + + stream1.pipe(stream2).pipe(stream3); + + let backpressured = false; + for (let i = 0; i < 200; i++) { + const canContinue = stream1.write(`Chunk ${i}`, 'utf8'); + if (!canContinue) { + backpressured = true; + } + } + + stream1.end(); + + stream3.on('finish', () => { + if (!backpressured) { + throw new Error('No backpressure was observed.'); + } else { + done.resolve(); + } + }); + + await done.promise; +}); + +export default tap.start(); diff --git a/test/test.nodewebhelpers.ts b/test/test.nodewebhelpers.ts new file mode 100644 index 0000000..fa83a7f --- /dev/null +++ b/test/test.nodewebhelpers.ts @@ -0,0 +1,152 @@ +import { expect, tap } from '@push.rocks/tapbundle'; +import * as fs from 'fs'; +import * as stream from 'stream'; +import { nodewebhelpers } from '../ts/index.js'; + +// ============================================= +// createWebReadableStreamFromFile +// ============================================= + +tap.test('nodewebhelpers: createWebReadableStreamFromFile should read a file', async () => { + const webStream = nodewebhelpers.createWebReadableStreamFromFile('./test/assets/readabletext.txt'); + const reader = webStream.getReader(); + + const chunks: Uint8Array[] = []; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + chunks.push(value); + } + + expect(chunks.length).toBeGreaterThan(0); + const content = Buffer.concat(chunks).toString(); + expect(content.length).toBeGreaterThan(0); +}); + +// ============================================= +// convertNodeReadableToWebReadable +// ============================================= + +tap.test('nodewebhelpers: convertNodeReadableToWebReadable should convert', async () => { + const nodeReadable = fs.createReadStream('./test/assets/readabletext.txt'); + const webReadable = nodewebhelpers.convertNodeReadableToWebReadable(nodeReadable); + + const reader = webReadable.getReader(); + const chunks: Uint8Array[] = []; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + chunks.push(value); + } + + expect(chunks.length).toBeGreaterThan(0); + const content = Buffer.concat(chunks).toString(); + expect(content.length).toBeGreaterThan(0); +}); + +// ============================================= +// convertWebReadableToNodeReadable +// ============================================= + +tap.test('nodewebhelpers: convertWebReadableToNodeReadable should convert', async (tools) => { + const data = new Uint8Array([72, 101, 108, 108, 111]); // "Hello" + const webReadable = new ReadableStream({ + start(controller) { + controller.enqueue(data); + controller.close(); + }, + }); + + const nodeReadable = nodewebhelpers.convertWebReadableToNodeReadable(webReadable); + + const chunks: Buffer[] = []; + const done = tools.defer(); + + nodeReadable.on('data', (chunk: Buffer) => { + chunks.push(chunk); + }); + + nodeReadable.on('end', () => { + const result = Buffer.concat(chunks).toString(); + expect(result).toEqual('Hello'); + done.resolve(); + }); + + await done.promise; +}); + +// ============================================= +// convertNodeWritableToWebWritable +// ============================================= + +tap.test('nodewebhelpers: convertNodeWritableToWebWritable should convert', async () => { + const chunks: Buffer[] = []; + const nodeWritable = new stream.Writable({ + write(chunk, encoding, callback) { + chunks.push(chunk); + callback(); + }, + }); + + const webWritable = nodewebhelpers.convertNodeWritableToWebWritable(nodeWritable); + const writer = webWritable.getWriter(); + + await writer.write(new Uint8Array([65, 66, 67])); // "ABC" + await writer.close(); + + const result = Buffer.concat(chunks).toString(); + expect(result).toEqual('ABC'); +}); + +// ============================================= +// convertWebWritableToNodeWritable +// ============================================= + +tap.test('nodewebhelpers: convertWebWritableToNodeWritable should convert', async (tools) => { + const chunks: Uint8Array[] = []; + + const webWritable = new WritableStream({ + write(chunk) { + chunks.push(chunk); + }, + }); + + const nodeWritable = nodewebhelpers.convertWebWritableToNodeWritable(webWritable); + + const done = tools.defer(); + nodeWritable.write(Buffer.from('Hello'), (err) => { + expect(err).toBeFalsy(); + nodeWritable.end(() => { + expect(chunks.length).toBeGreaterThan(0); + done.resolve(); + }); + }); + + await done.promise; +}); + +// ============================================= +// Round-trip: Node → Web → Node +// ============================================= + +tap.test('nodewebhelpers: round-trip Node → Web → Node readable', async (tools) => { + const nodeReadable = fs.createReadStream('./test/assets/readabletext.txt'); + const webReadable = nodewebhelpers.convertNodeReadableToWebReadable(nodeReadable); + const nodeReadable2 = nodewebhelpers.convertWebReadableToNodeReadable(webReadable); + + const chunks: Buffer[] = []; + const done = tools.defer(); + + nodeReadable2.on('data', (chunk: Buffer) => { + chunks.push(chunk); + }); + + nodeReadable2.on('end', () => { + expect(chunks.length).toBeGreaterThan(0); + done.resolve(); + }); + + await done.promise; +}); + +export default tap.start(); diff --git a/test/test.smartduplex.ts b/test/test.smartduplex.ts new file mode 100644 index 0000000..2b73cf7 --- /dev/null +++ b/test/test.smartduplex.ts @@ -0,0 +1,379 @@ +import { expect, tap } from '@push.rocks/tapbundle'; +import * as fs from 'fs'; +import * as smartstream from '../ts/index.js'; +import { SmartDuplex } from '../ts/smartstream.classes.smartduplex.js'; + +// ============================================= +// Constructor +// ============================================= + +tap.test('SmartDuplex: should construct with no options', async () => { + const duplex = new SmartDuplex(); + expect(duplex).toBeInstanceOf(SmartDuplex); +}); + +tap.test('SmartDuplex: should construct with options', async () => { + const duplex = new SmartDuplex({ + objectMode: true, + writeFunction: async (chunk) => chunk, + }); + expect(duplex).toBeInstanceOf(SmartDuplex); +}); + +// ============================================= +// fromBuffer +// ============================================= + +tap.test('SmartDuplex: should create from a Buffer', async () => { + const bufferData = Buffer.from('This is a test buffer'); + const stream = SmartDuplex.fromBuffer(bufferData, {}); + + let receivedData = Buffer.alloc(0); + + return new Promise((resolve) => { + stream.on('data', (chunk: Buffer) => { + receivedData = Buffer.concat([receivedData, chunk]); + }); + stream.on('end', () => { + expect(receivedData.toString()).toEqual(bufferData.toString()); + resolve(); + }); + }); +}); + +// ============================================= +// writeFunction +// ============================================= + +tap.test('SmartDuplex: should transform chunks via writeFunction', async (tools) => { + const results: string[] = []; + const transform = new SmartDuplex({ + objectMode: true, + writeFunction: async (chunk) => { + return chunk.toUpperCase(); + }, + }); + + const done = tools.defer(); + + transform.on('data', (chunk: string) => { + results.push(chunk); + }); + + transform.on('end', () => { + expect(results).toContain('HELLO'); + expect(results).toContain('WORLD'); + done.resolve(); + }); + + transform.write('hello'); + transform.write('world'); + transform.end(); + await done.promise; +}); + +tap.test('SmartDuplex: writeFunction returning undefined should not push', async (tools) => { + const results: any[] = []; + const transform = new SmartDuplex({ + objectMode: true, + writeFunction: async () => { + return undefined; + }, + }); + + const done = tools.defer(); + + transform.on('data', (chunk: any) => { + results.push(chunk); + }); + + transform.on('end', () => { + expect(results.length).toEqual(0); + done.resolve(); + }); + + transform.write('hello'); + transform.end(); + await done.promise; +}); + +// ============================================= +// tools.push — multiple outputs +// ============================================= + +tap.test('SmartDuplex: should emit multiple chunks via tools.push', async (tools) => { + const results: string[] = []; + const splitter = new SmartDuplex({ + objectMode: true, + writeFunction: async (chunk, streamTools) => { + const words = chunk.split(' '); + for (const word of words) { + await streamTools.push(word); + } + }, + }); + + const done = tools.defer(); + + splitter.on('data', (chunk: string) => results.push(chunk)); + + splitter.on('end', () => { + expect(results).toContain('hello'); + expect(results).toContain('beautiful'); + expect(results).toContain('world'); + done.resolve(); + }); + + splitter.write('hello beautiful world'); + splitter.end(); + await done.promise; +}); + +// ============================================= +// finalFunction +// ============================================= + +tap.test('SmartDuplex: should emit final chunk via finalFunction', async (tools) => { + const results: string[] = []; + let count = 0; + + const aggregator = new SmartDuplex({ + objectMode: true, + writeFunction: async () => { + count++; + return undefined; + }, + finalFunction: async () => { + return `total: ${count}`; + }, + }); + + const done = tools.defer(); + + aggregator.on('data', (chunk: string) => results.push(chunk)); + + aggregator.on('end', () => { + expect(results.length).toEqual(1); + expect(results[0]).toEqual('total: 2'); + done.resolve(); + }); + + aggregator.write('a'); + aggregator.write('b'); + aggregator.end(); + await done.promise; +}); + +tap.test('SmartDuplex: finalFunction can push multiple chunks via tools.push', async (tools) => { + const results: string[] = []; + + const stream = new SmartDuplex({ + objectMode: true, + writeFunction: async (chunk) => chunk, + finalFunction: async (streamTools) => { + await streamTools.push('final1'); + await streamTools.push('final2'); + }, + }); + + const done = tools.defer(); + + stream.on('data', (chunk: string) => results.push(chunk)); + + stream.on('end', () => { + expect(results).toContain('hello'); + expect(results).toContain('final1'); + expect(results).toContain('final2'); + done.resolve(); + }); + + stream.write('hello'); + stream.end(); + await done.promise; +}); + +// ============================================= +// truncate +// ============================================= + +tap.test('SmartDuplex: should truncate stream early', async (tools) => { + const results: string[] = []; + + const limiter = new SmartDuplex({ + objectMode: true, + writeFunction: async (chunk, streamTools) => { + if (chunk === 'STOP') { + streamTools.truncate(); + return undefined; + } + return chunk; + }, + }); + + const done = tools.defer(); + + limiter.on('data', (chunk: string) => results.push(chunk)); + + limiter.on('end', () => { + expect(results).toContain('a'); + expect(results).toContain('b'); + expect(results).not.toContain('STOP'); + done.resolve(); + }); + + limiter.write('a'); + limiter.write('b'); + // Write STOP on next tick to allow previous writes to flush + process.nextTick(() => { + limiter.write('STOP'); + }); + await done.promise; +}); + +// ============================================= +// Error handling +// ============================================= + +tap.test('SmartDuplex: should emit error when writeFunction throws', async (tools) => { + const stream = new SmartDuplex({ + objectMode: true, + writeFunction: async () => { + throw new Error('write error'); + }, + }); + + const done = tools.defer(); + stream.on('error', (err) => { + expect(err.message).toEqual('write error'); + done.resolve(); + }); + + stream.write('test'); + await done.promise; +}); + +tap.test('SmartDuplex: should error when no writeFunction and data is written', async (tools) => { + const stream = new SmartDuplex({ + objectMode: true, + }); + + const done = tools.defer(); + stream.on('error', (err) => { + expect(err.message).toEqual('No stream function provided'); + done.resolve(); + }); + + stream.write('test'); + await done.promise; +}); + +// ============================================= +// fromWebReadableStream +// ============================================= + +tap.test('SmartDuplex: should create from a Web ReadableStream', async (tools) => { + const chunks = ['hello', 'world', 'foo']; + const webReadable = new ReadableStream({ + start(controller) { + for (const chunk of chunks) { + controller.enqueue(chunk); + } + controller.close(); + } + }); + + const duplex = SmartDuplex.fromWebReadableStream(webReadable); + const results: string[] = []; + + const done = tools.defer(); + duplex.on('data', (chunk: string) => results.push(chunk)); + duplex.on('end', () => { + expect(results).toEqual(chunks); + done.resolve(); + }); + await done.promise; +}); + +// ============================================= +// getWebStreams +// ============================================= + +tap.test('SmartDuplex: should provide web streams via getWebStreams()', async () => { + const duplex = new SmartDuplex({ + objectMode: true, + writeFunction: async (chunk) => { + return chunk.toUpperCase(); + }, + }); + + const { readable, writable } = await duplex.getWebStreams(); + + const writer = writable.getWriter(); + const reader = readable.getReader(); + + await writer.write('hello'); + await writer.write('world'); + await writer.close(); + + const results: string[] = []; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + results.push(value); + } + + expect(results).toContain('HELLO'); + expect(results).toContain('WORLD'); +}); + +// ============================================= +// Debug mode +// ============================================= + +tap.test('SmartDuplex: debug mode should not crash', async (tools) => { + const stream = new SmartDuplex({ + name: 'DebugStream', + debug: true, + objectMode: true, + writeFunction: async (chunk) => chunk, + }); + + const done = tools.defer(); + stream.on('data', () => {}); + stream.on('end', () => done.resolve()); + + stream.write('test'); + stream.end(); + await done.promise; +}); + +// ============================================= +// Pipe with file read +// ============================================= + +tap.test('SmartDuplex: should handle a read stream pipeline', async () => { + const streamWrapper = new smartstream.StreamWrapper([ + fs.createReadStream('./test/assets/readabletext.txt'), + new smartstream.SmartDuplex({ + writeFunction: async (chunkStringArg: Buffer, streamTools) => { + const result = chunkStringArg.toString().substr(0, 100); + streamTools.push('wow =========== \n'); + return Buffer.from(result); + }, + finalFunction: async () => { + return Buffer.from('this is the end'); + }, + }), + new smartstream.SmartDuplex({ + writeFunction: async (chunkStringArg) => { + // consume data + }, + finalFunction: async (streamTools) => { + streamTools.push(null); + }, + }) + ]); + await streamWrapper.run(); +}); + +export default tap.start(); diff --git a/test/test.streamintake.ts b/test/test.streamintake.ts new file mode 100644 index 0000000..b2c0082 --- /dev/null +++ b/test/test.streamintake.ts @@ -0,0 +1,128 @@ +import { expect, tap } from '@push.rocks/tapbundle'; +import * as fs from 'fs'; +import { StreamIntake, SmartDuplex } from '../ts/index.js'; +import * as stream from 'stream'; + +// ============================================= +// Basic StreamIntake +// ============================================= + +tap.test('StreamIntake: should push data and signal end', async (tools) => { + const intake = new StreamIntake(); + const results: string[] = []; + + intake.pipe( + new SmartDuplex({ + objectMode: true, + writeFunction: async (chunk) => { + results.push(chunk); + return chunk; + }, + }) + ); + + const done = tools.defer(); + let counter = 0; + intake.pushNextObservable.subscribe(() => { + if (counter < 5) { + counter++; + intake.pushData(`item-${counter}`); + } else { + intake.signalEnd(); + done.resolve(); + } + }); + + await done.promise; + // Give streams time to flush + await new Promise((resolve) => setTimeout(resolve, 100)); + expect(results.length).toBeGreaterThan(0); +}); + +tap.test('StreamIntake: should pipe to a writable', async (tools) => { + const intake = new StreamIntake(); + + intake + .pipe( + new SmartDuplex({ + objectMode: true, + writeFunction: async (chunk: string) => { + return chunk; + }, + }) + ) + .pipe(fs.createWriteStream('./test/assets/writabletext.txt')); + + const done = tools.defer(); + let counter = 0; + intake.pushNextObservable.subscribe(() => { + if (counter < 10) { + counter++; + intake.pushData('data\n'); + } else { + intake.signalEnd(); + done.resolve(); + } + }); + + await done.promise; +}); + +// ============================================= +// StreamIntake.fromStream (Node Readable) +// ============================================= + +tap.test('StreamIntake: fromStream should wrap a Node readable', async (tools) => { + const nodeReadable = fs.createReadStream('./test/assets/readabletext.txt'); + const intake = await StreamIntake.fromStream(nodeReadable); + + const chunks: Buffer[] = []; + const done = tools.defer(); + + intake.on('data', (chunk: Buffer) => { + chunks.push(chunk); + }); + + intake.on('end', () => { + expect(chunks.length).toBeGreaterThan(0); + const content = Buffer.concat(chunks).toString(); + expect(content.length).toBeGreaterThan(0); + done.resolve(); + }); + + await done.promise; +}); + +// ============================================= +// StreamIntake.fromStream (Web ReadableStream) +// ============================================= + +tap.test('StreamIntake: fromStream should wrap a Web ReadableStream', async (tools) => { + const data = ['chunk1', 'chunk2', 'chunk3']; + const webReadable = new ReadableStream({ + start(controller) { + for (const item of data) { + controller.enqueue(item); + } + controller.close(); + }, + }); + + const intake = await StreamIntake.fromStream(webReadable); + + const results: string[] = []; + const done = tools.defer(); + + intake.on('data', (chunk: string) => { + results.push(chunk); + }); + + intake.on('end', () => { + expect(results).toEqual(data); + done.resolve(); + }); + + await done.promise; +}); + +export default tap.start(); diff --git a/test/test.streamwrapper.ts b/test/test.streamwrapper.ts new file mode 100644 index 0000000..fe1e8f8 --- /dev/null +++ b/test/test.streamwrapper.ts @@ -0,0 +1,70 @@ +import { expect, tap } from '@push.rocks/tapbundle'; +import * as fs from 'fs'; +import { StreamWrapper, SmartDuplex } from '../ts/index.js'; + +tap.test('StreamWrapper: should pipe read to write', async () => { + const wrapper = new StreamWrapper([ + fs.createReadStream('./test/assets/test.md'), + fs.createWriteStream('./test/assets/testCopy.md'), + ]); + await wrapper.run(); +}); + +tap.test('StreamWrapper: should propagate errors', async (tools) => { + const failingStream = new SmartDuplex({ + writeFunction: async () => { + throw new Error('intentional error'); + }, + }); + + const wrapper = new StreamWrapper([ + fs.createReadStream('./test/assets/test.md'), + failingStream, + ]); + + let errorCaught = false; + try { + await wrapper.run(); + } catch (err) { + errorCaught = true; + expect(err.message).toEqual('intentional error'); + } + expect(errorCaught).toBeTrue(); +}); + +tap.test('StreamWrapper: streamStarted should resolve', async () => { + const wrapper = new StreamWrapper([ + fs.createReadStream('./test/assets/test.md'), + fs.createWriteStream('./test/assets/testCopy.md'), + ]); + + const runPromise = wrapper.run(); + await wrapper.streamStarted(); + await runPromise; +}); + +tap.test('StreamWrapper: onCustomEvent should fire', async (tools) => { + const results: string[] = []; + + const emitter = new SmartDuplex({ + writeFunction: async (chunk, streamTools) => { + (emitter as any).emit('custom-progress', 'progress'); + return chunk; + }, + }); + + const wrapper = new StreamWrapper([ + fs.createReadStream('./test/assets/test.md'), + emitter, + fs.createWriteStream('./test/assets/testCopy.md'), + ]); + + wrapper.onCustomEvent('custom-progress', () => { + results.push('fired'); + }); + + await wrapper.run(); + expect(results.length).toBeGreaterThan(0); +}); + +export default tap.start(); diff --git a/test/test.ts.backpressure.ts b/test/test.ts.backpressure.ts deleted file mode 100644 index dc79960..0000000 --- a/test/test.ts.backpressure.ts +++ /dev/null @@ -1,68 +0,0 @@ -import { tap, expect } from '@push.rocks/tapbundle'; -import { SmartDuplex, type ISmartDuplexOptions, StreamWrapper } from '../ts/index.js'; - -tap.test('should run backpressure test', async (toolsArg) => { - const done = toolsArg.defer(); - async function testBackpressure() { - const stream1 = new SmartDuplex({ - name: 'stream1', - objectMode: true, - writeFunction: async (chunk, tools) => { - await new Promise((resolve) => setTimeout(resolve, 10)); // Slow processing - console.log(`processed chunk ${chunk} in stream 1`); - return chunk; // Fast processing - }, - }); - const stream2 = new SmartDuplex({ - name: 'stream2', - objectMode: true, - writeFunction: async (chunk, tools) => { - await new Promise((resolve) => setTimeout(resolve, 20)); // Slow processing - console.log(`processed chunk ${chunk} in stream 2`); - await tools.push(chunk); - // return chunk, optionally return ; - }, - }); // This stream processes data more slowly - const stream3 = new SmartDuplex({ - objectMode: true, - name: 'stream3', - writeFunction: async (chunk, tools) => { - await new Promise((resolve) => setTimeout(resolve, 200)); // Slow processing - console.log(`processed chunk ${chunk} in stream 3`); - }, - }); - - stream1.pipe(stream2).pipe(stream3); - - let backpressured = false; - for (let i = 0; i < 200; i++) { - const canContinue = stream1.write(`Chunk ${i}`, 'utf8'); - if (!canContinue) { - backpressured = true; - console.log(`Backpressure at chunk ${i}`); - } - } - - stream1.end(); - - stream1.on('finish', () => { - console.log('Stream 1 finished processing.'); - }); - stream2.on('finish', () => { - console.log('Stream 2 finished processing.'); - }); - stream3.on('finish', () => { - console.log('Stream 3 finished processing.'); - if (!backpressured) { - throw new Error('No backpressure was observed.'); - } else { - done.resolve(); - } - }); - } - - testBackpressure(); - await done.promise; -}); - -export default tap.start(); \ No newline at end of file diff --git a/test/test.ts.smartstream.ts b/test/test.ts.smartstream.ts deleted file mode 100644 index 988a63a..0000000 --- a/test/test.ts.smartstream.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { expect, tap } from '@push.rocks/tapbundle'; -import { SmartDuplex } from '../ts/smartstream.classes.smartduplex.js'; // Adjust the import to your file structure -import * as smartrx from '@push.rocks/smartrx'; -import * as fs from 'fs'; - -tap.test('should create a SmartStream from a Buffer', async () => { - const bufferData = Buffer.from('This is a test buffer'); - const smartStream = SmartDuplex.fromBuffer(bufferData, {}); - - let receivedData = Buffer.alloc(0); - - return new Promise((resolve) => { - smartStream.on('data', (chunk: Buffer) => { - receivedData = Buffer.concat([receivedData, chunk]); - }); - - smartStream.on('end', () => { - expect(receivedData.toString()).toEqual(bufferData.toString()); - resolve(); - }); - }); -}); - -export default tap.start(); diff --git a/test/test.ts.streamfunction.ts b/test/test.ts.streamfunction.ts deleted file mode 100644 index bd5a375..0000000 --- a/test/test.ts.streamfunction.ts +++ /dev/null @@ -1,65 +0,0 @@ -import { expect, tap } from '@push.rocks/tapbundle'; -import * as fs from 'fs'; - -import * as smartstream from '../ts/index.js'; - -let testIntake: smartstream.StreamIntake; - -tap.test('should handle a read stream', async (tools) => { - const counter = 0; - const streamWrapper = new smartstream.StreamWrapper([ - fs.createReadStream('./test/assets/readabletext.txt'), - new smartstream.SmartDuplex({ - writeFunction: async (chunkStringArg: Buffer, streamTools) => { - // do something with the stream here - const result = chunkStringArg.toString().substr(0, 100); - streamTools.push('wow =========== \n'); - return Buffer.from(result); - }, - finalFunction: async (tools) => { - return Buffer.from('this is the end'); - }, - }), - new smartstream.SmartDuplex({ - writeFunction: async (chunkStringArg) => { - console.log(chunkStringArg.toString()); - }, - finalFunction: async (tools) => { - tools.push(null); - }, - }) - ]); - await streamWrapper.run(); -}); - -tap.test('should create a valid Intake', async (tools) => { - testIntake = new smartstream.StreamIntake(); - testIntake.pipe( - new smartstream.SmartDuplex({ - objectMode: true, - writeFunction: async (chunkStringArg: string, streamTools) => { - await tools.delayFor(100); - console.log(chunkStringArg); - return chunkStringArg; - } - }) - ) - .pipe(fs.createWriteStream('./test/assets/writabletext.txt')); - const testFinished = tools.defer(); - let counter = 0; - testIntake.pushNextObservable.subscribe(() => { - if (counter < 50) { - counter++; - testIntake.pushData('hi'); - testIntake.pushData('+wow'); - testIntake.pushData('\n'); - } else { - testIntake.signalEnd(); - testFinished.resolve(); - } - }); - await testFinished.promise; - testIntake.signalEnd(); -}); - -export default tap.start(); diff --git a/test/test.ts.ts b/test/test.ts.ts deleted file mode 100644 index 883dfa1..0000000 --- a/test/test.ts.ts +++ /dev/null @@ -1,15 +0,0 @@ -import * as fs from 'fs'; -import { expect, tap } from '@push.rocks/tapbundle'; - -import * as smartstream from '../ts/smartstream.classes.streamwrapper.js'; - -let testSmartstream: smartstream.StreamWrapper; -tap.test('should combine a stream', async () => { - testSmartstream = new smartstream.StreamWrapper([ - fs.createReadStream('./test/assets/test.md'), - fs.createWriteStream('./test/assets/testCopy.md'), - ]); - await testSmartstream.run(); -}); - -export default tap.start(); diff --git a/test/test.ts_web.both.ts b/test/test.ts_web.both.ts deleted file mode 100644 index 3b23b27..0000000 --- a/test/test.ts_web.both.ts +++ /dev/null @@ -1,67 +0,0 @@ -import { expect, tap } from '@push.rocks/tapbundle'; -import * as webstream from '../ts_web/index.js'; - -tap.test('WebDuplexStream fromUInt8Array should read back the same Uint8Array', async () => { - const inputUint8Array = new Uint8Array([1, 2, 3, 4, 5]); - const stream = webstream.WebDuplexStream.fromUInt8Array(inputUint8Array); - - const reader = stream.readable.getReader(); - let readUint8Array = new Uint8Array(); - - // Read from the stream - while (true) { - const { value, done } = await reader.read(); - if (done) break; - if (value) { - // Concatenate value to readUint8Array - const tempArray = new Uint8Array(readUint8Array.length + value.length); - tempArray.set(readUint8Array, 0); - tempArray.set(value, readUint8Array.length); - readUint8Array = tempArray; - } - } - - expect(readUint8Array).toEqual(inputUint8Array); -}); - -tap.test('WebDuplexStream should handle transform with a write function', async () => { - const input = [1, 2, 3, 4, 5]; - const expectedOutput = [2, 4, 6, 8, 10]; - - const webDuplexStream = new webstream.WebDuplexStream({ - writeFunction: async (chunk, { push }) => { - // Push the doubled number into the stream - push(chunk * 2); - }, - }); - - const writer = webDuplexStream.writable.getWriter(); - const reader = webDuplexStream.readable.getReader(); - - const output: number[] = []; - - // Read from the stream asynchronously - const readPromise = (async () => { - while (true) { - const { value, done } = await reader.read(); - if (done) break; - if (value !== undefined) { - output.push(value); - } - } - })(); - - // Write to the stream - for (const num of input) { - await writer.write(num); - } - await writer.close(); - - // Wait for the reading to complete - await readPromise; - - // Assert that the output matches the expected transformed data - expect(output).toEqual(expectedOutput); -}); - -export default tap.start(); \ No newline at end of file diff --git a/test/test.utilities.ts b/test/test.utilities.ts new file mode 100644 index 0000000..6a3868c --- /dev/null +++ b/test/test.utilities.ts @@ -0,0 +1,51 @@ +import { expect, tap } from '@push.rocks/tapbundle'; +import { createTransformFunction, createPassThrough, SmartDuplex, StreamWrapper } from '../ts/index.js'; + +// ============================================= +// createTransformFunction +// ============================================= + +tap.test('createTransformFunction: should create a transform stream', async (tools) => { + const doubler = createTransformFunction(async (n) => n * 2, { objectMode: true }); + const results: number[] = []; + + doubler.on('data', (chunk: number) => results.push(chunk)); + + const done = tools.defer(); + doubler.on('end', () => { + expect(results).toContain(10); + expect(results).toContain(20); + expect(results).toContain(30); + done.resolve(); + }); + + doubler.write(5); + doubler.write(10); + doubler.write(15); + doubler.end(); + await done.promise; +}); + +// ============================================= +// createPassThrough +// ============================================= + +tap.test('createPassThrough: should pass data through unchanged', async (tools) => { + const passThrough = createPassThrough(); + const results: string[] = []; + + passThrough.on('data', (chunk: string) => results.push(chunk)); + + const done = tools.defer(); + passThrough.on('end', () => { + expect(results).toEqual(['hello', 'world']); + done.resolve(); + }); + + passThrough.write('hello'); + passThrough.write('world'); + passThrough.end(); + await done.promise; +}); + +export default tap.start(); diff --git a/test/test.webduplexstream.both.ts b/test/test.webduplexstream.both.ts new file mode 100644 index 0000000..af10269 --- /dev/null +++ b/test/test.webduplexstream.both.ts @@ -0,0 +1,144 @@ +import { expect, tap } from '@push.rocks/tapbundle'; +import { WebDuplexStream } from '../ts_web/index.js'; + +// Helper: collect all chunks from a readable +async function collectAll(reader: ReadableStreamDefaultReader): Promise { + const results: T[] = []; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + results.push(value); + } + return results; +} + +// ============================================= +// Basic transform +// ============================================= + +tap.test('WebDuplexStream: should transform chunks via writeFunction', async () => { + const stream = new WebDuplexStream({ + writeFunction: async (chunk, { push }) => { + push(chunk * 2); + }, + }); + + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + // Read and write concurrently to avoid backpressure deadlock + const readPromise = collectAll(reader); + await writer.write(5); + await writer.write(10); + await writer.close(); + const results = await readPromise; + + expect(results).toContain(10); + expect(results).toContain(20); +}); + +// ============================================= +// writeFunction return value +// ============================================= + +tap.test('WebDuplexStream: should enqueue returned non-null values', async () => { + const stream = new WebDuplexStream({ + writeFunction: async (chunk) => { + return chunk.toUpperCase(); + }, + }); + + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + const readPromise = collectAll(reader); + await writer.write('hello'); + await writer.close(); + const results = await readPromise; + + expect(results[0]).toEqual('HELLO'); +}); + +// ============================================= +// fromUInt8Array +// ============================================= + +tap.test('WebDuplexStream: fromUInt8Array should produce data', async () => { + const data = new Uint8Array([1, 2, 3, 4, 5]); + const stream = WebDuplexStream.fromUInt8Array(data); + const reader = stream.readable.getReader(); + + const { value } = await reader.read(); + expect(value).toBeTruthy(); + expect(value.length).toEqual(5); +}); + +// ============================================= +// readFunction +// ============================================= + +tap.test('WebDuplexStream: readFunction should supply data to the stream', async () => { + const stream = new WebDuplexStream({ + readFunction: async (tools) => { + await tools.write('chunk1'); + await tools.write('chunk2'); + tools.done(); + }, + writeFunction: async (chunk, { push }) => { + push(chunk.toUpperCase()); + }, + }); + + const reader = stream.readable.getReader(); + const results = await collectAll(reader); + + expect(results).toContain('CHUNK1'); + expect(results).toContain('CHUNK2'); +}); + +// ============================================= +// finalFunction +// ============================================= + +tap.test('WebDuplexStream: finalFunction should emit final data', async () => { + const stream = new WebDuplexStream({ + writeFunction: async (chunk) => { + return chunk; + }, + finalFunction: async (tools) => { + tools.push('final'); + return undefined; + }, + }); + + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + const readPromise = collectAll(reader); + await writer.write('hello'); + await writer.close(); + const results = await readPromise; + + expect(results).toContain('hello'); + expect(results).toContain('final'); +}); + +// ============================================= +// No writeFunction = passthrough +// ============================================= + +tap.test('WebDuplexStream: no writeFunction should passthrough', async () => { + const stream = new WebDuplexStream({}); + + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + const readPromise = collectAll(reader); + await writer.write('pass'); + await writer.close(); + const results = await readPromise; + + expect(results[0]).toEqual('pass'); +}); + +export default tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 54e1a2a..8051f85 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '3.3.0', + version: '3.4.0', description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.' } diff --git a/ts/smartstream.classes.smartduplex.ts b/ts/smartstream.classes.smartduplex.ts index a6325a1..043fb26 100644 --- a/ts/smartstream.classes.smartduplex.ts +++ b/ts/smartstream.classes.smartduplex.ts @@ -56,67 +56,116 @@ export class SmartDuplex extends Duplex { readableStream: ReadableStream ): SmartDuplex { const smartDuplex = new SmartDuplex({ - /** - * this function is called whenever the stream is being read from and at the same time if nothing is enqueued - * therefor it is important to always unlock the reader after reading - */ - readFunction: async () => { - const reader = readableStream.getReader(); - const { value, done } = await reader.read(); - if (value !== undefined) { - smartDuplex.push(value); - } - reader.releaseLock(); - if (done) { - smartDuplex.push(null); - } - }, + objectMode: true, }); + + // Acquire reader ONCE + const reader = readableStream.getReader(); + let reading = false; + + // Override _read to pull from the web reader + smartDuplex._read = function (_size: number) { + if (reading) return; + reading = true; + reader.read().then( + ({ value, done }) => { + reading = false; + if (done) { + smartDuplex.push(null); + } else { + smartDuplex.push(value); + } + }, + (err) => { + reading = false; + smartDuplex.destroy(err); + } + ); + }; + + // Cancel reader on destroy + smartDuplex.on('close', () => { + reader.cancel().catch(() => {}); + }); + return smartDuplex; } // INSTANCE - private backpressuredArray: plugins.lik.BackpressuredArray; // an array that only takes a defined amount of items + private backpressuredArray: plugins.lik.BackpressuredArray; public options: ISmartDuplexOptions; - private observableSubscription?: plugins.smartrx.rxjs.Subscription; + private _consumerWantsData = false; + private _readFunctionRunning = false; + private debugLog(messageArg: string) { - // optional debug log if (this.options.debug) { console.log(messageArg); } } constructor(optionsArg?: ISmartDuplexOptions) { + const safeOptions = optionsArg || {} as ISmartDuplexOptions; super( Object.assign( { highWaterMark: 1, }, - optionsArg + safeOptions ) ); - this.options = optionsArg; + this.options = safeOptions; this.backpressuredArray = new plugins.lik.BackpressuredArray( this.options.highWaterMark || 1 ); } - public async _read(size: number): Promise { - this.debugLog(`${this.options.name}: read was called`); - if (this.options.readFunction) { - await this.options.readFunction(); - } - await this.backpressuredArray.waitForItems(); - this.debugLog(`${this.options.name}: successfully waited for items.`); - let canPushMore = true; - while (this.backpressuredArray.data.length > 0 && canPushMore) { + /** + * Synchronously drains items from the backpressuredArray into the readable side. + * Stops when push() returns false (consumer is full) or array is empty. + */ + private _drainBackpressuredArray(): void { + while (this.backpressuredArray.data.length > 0) { const nextChunk = this.backpressuredArray.shift(); - canPushMore = this.push(nextChunk); + if (nextChunk === null) { + // EOF signal — push null to end readable side + this.push(null); + this._consumerWantsData = false; + return; + } + const canPushMore = this.push(nextChunk); + if (!canPushMore) { + this._consumerWantsData = false; + return; + } + } + } + + // _read must NOT be async — Node.js ignores the return value + public _read(size: number): void { + this.debugLog(`${this.options.name}: read was called`); + this._consumerWantsData = true; + + // Drain any buffered items first + if (this.backpressuredArray.data.length > 0) { + this._drainBackpressuredArray(); + } + + // If readFunction exists and is not already running, start it + if (this.options.readFunction && !this._readFunctionRunning) { + this._readFunctionRunning = true; + this.options.readFunction().then( + () => { this._readFunctionRunning = false; }, + (err) => { this._readFunctionRunning = false; this.destroy(err); } + ); } } public async backpressuredPush(pushArg: TOutput) { const canPushMore = this.backpressuredArray.push(pushArg); + // Try to drain if the consumer wants data + if (this._consumerWantsData) { + this._drainBackpressuredArray(); + } if (!canPushMore) { this.debugLog(`${this.options.name}: cannot push more`); await this.backpressuredArray.waitForSpace(); @@ -126,83 +175,151 @@ export class SmartDuplex extends Duplex { } private asyncWritePromiseObjectmap = new plugins.lik.ObjectMap>(); - // Ensure the _write method types the chunk as TInput and encodes TOutput - public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) { + + // _write must NOT be async — Node.js ignores the return value + public _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) { if (!this.options.writeFunction) { return callback(new Error('No stream function provided')); } + let callbackCalled = false; + const safeCallback = (err?: Error | null) => { + if (!callbackCalled) { + callbackCalled = true; + callback(err); + } + }; + let isTruncated = false; const tools: IStreamTools = { truncate: () => { - this.push(null); isTruncated = true; - callback(); + safeCallback(); + this.push(null); }, push: async (pushArg: TOutput) => { return await this.backpressuredPush(pushArg); }, }; - try { - const writeDeferred = plugins.smartpromise.defer(); - this.asyncWritePromiseObjectmap.add(writeDeferred.promise); - const modifiedChunk = await this.options.writeFunction(chunk, tools); - if (isTruncated) { - return; - } - if (modifiedChunk) { - await tools.push(modifiedChunk); - } - callback(); - writeDeferred.resolve(); - writeDeferred.promise.then(() => { + const writeDeferred = plugins.smartpromise.defer(); + this.asyncWritePromiseObjectmap.add(writeDeferred.promise); + + this.options.writeFunction(chunk, tools).then( + (modifiedChunk) => { + if (isTruncated) { + writeDeferred.resolve(); + this.asyncWritePromiseObjectmap.remove(writeDeferred.promise); + return; + } + const finish = () => { + safeCallback(); + writeDeferred.resolve(); + this.asyncWritePromiseObjectmap.remove(writeDeferred.promise); + }; + if (modifiedChunk !== undefined && modifiedChunk !== null) { + this.backpressuredPush(modifiedChunk).then(finish, (err) => { + safeCallback(err); + writeDeferred.resolve(); + this.asyncWritePromiseObjectmap.remove(writeDeferred.promise); + }); + } else { + finish(); + } + }, + (err) => { + safeCallback(err); + writeDeferred.resolve(); this.asyncWritePromiseObjectmap.remove(writeDeferred.promise); - }); - } catch (err) { - callback(err); - } + } + ); } - public async _final(callback: (error?: Error | null) => void) { - await Promise.all(this.asyncWritePromiseObjectmap.getArray()); - if (this.options.finalFunction) { - const tools: IStreamTools = { - truncate: () => callback(), - push: async (pipeObject) => { - return this.backpressuredArray.push(pipeObject); - }, - }; - - try { - const finalChunk = await this.options.finalFunction(tools); - if (finalChunk) { - this.backpressuredArray.push(finalChunk); - } - } catch (err) { - this.backpressuredArray.push(null); + // _final must NOT be async — Node.js ignores the return value + public _final(callback: (error?: Error | null) => void) { + let callbackCalled = false; + const safeCallback = (err?: Error | null) => { + if (!callbackCalled) { + callbackCalled = true; callback(err); - return; } - } - this.backpressuredArray.push(null); - callback(); + }; + + Promise.all(this.asyncWritePromiseObjectmap.getArray()).then(() => { + if (this.options.finalFunction) { + const tools: IStreamTools = { + truncate: () => safeCallback(), + push: async (pipeObject) => { + return await this.backpressuredPush(pipeObject); + }, + }; + + this.options.finalFunction(tools).then( + (finalChunk) => { + const pushNull = () => { + this.backpressuredArray.push(null); + if (this._consumerWantsData) { + this._drainBackpressuredArray(); + } + safeCallback(); + }; + + if (finalChunk !== undefined && finalChunk !== null) { + this.backpressuredPush(finalChunk).then(pushNull, (err) => { + safeCallback(err); + }); + } else { + pushNull(); + } + }, + (err) => { + this.backpressuredArray.push(null); + if (this._consumerWantsData) { + this._drainBackpressuredArray(); + } + safeCallback(err); + } + ); + } else { + this.backpressuredArray.push(null); + if (this._consumerWantsData) { + this._drainBackpressuredArray(); + } + safeCallback(); + } + }, (err) => { + safeCallback(err); + }); } public async getWebStreams(): Promise<{ readable: ReadableStream; writable: WritableStream }> { const duplex = this; + let readableClosed = false; + const readable = new ReadableStream({ start(controller) { - duplex.on('readable', () => { + const onReadable = () => { let chunk; while (null !== (chunk = duplex.read())) { controller.enqueue(chunk); } - }); + }; - duplex.on('end', () => { - controller.close(); - }); + const onEnd = () => { + if (!readableClosed) { + readableClosed = true; + controller.close(); + } + cleanup(); + }; + + const cleanup = () => { + duplex.removeListener('readable', onReadable); + duplex.removeListener('end', onEnd); + }; + + duplex.on('readable', onReadable); + duplex.on('end', onEnd); }, cancel(reason) { duplex.destroy(new Error(reason)); @@ -212,22 +329,38 @@ export class SmartDuplex extends Duplex { const writable = new WritableStream({ write(chunk) { return new Promise((resolve, reject) => { + let resolved = false; + const onDrain = () => { + if (!resolved) { + resolved = true; + resolve(); + } + }; + const isBackpressured = !duplex.write(chunk, (error) => { if (error) { - reject(error); - } else { + if (!resolved) { + resolved = true; + duplex.removeListener('drain', onDrain); + reject(error); + } + } else if (!isBackpressured && !resolved) { + resolved = true; resolve(); } }); if (isBackpressured) { - duplex.once('drain', resolve); + duplex.once('drain', onDrain); } }); }, close() { return new Promise((resolve, reject) => { - duplex.end(resolve); + duplex.end((err: Error | null) => { + if (err) reject(err); + else resolve(); + }); }); }, abort(reason) { diff --git a/ts/smartstream.classes.streamintake.ts b/ts/smartstream.classes.streamintake.ts index 5f89898..496549c 100644 --- a/ts/smartstream.classes.streamintake.ts +++ b/ts/smartstream.classes.streamintake.ts @@ -6,8 +6,11 @@ export class StreamIntake extends plugins.stream.Readable { const intakeStream = new StreamIntake(options); if (inputStream instanceof plugins.stream.Readable) { - inputStream.on('data', (chunk: U) => { - intakeStream.pushData(chunk); + inputStream.on('readable', () => { + let chunk: U; + while (null !== (chunk = inputStream.read() as U)) { + intakeStream.pushData(chunk); + } }); inputStream.on('end', () => { diff --git a/ts/smartstream.classes.streamwrapper.ts b/ts/smartstream.classes.streamwrapper.ts index 624ad64..e73a471 100644 --- a/ts/smartstream.classes.streamwrapper.ts +++ b/ts/smartstream.classes.streamwrapper.ts @@ -1,8 +1,5 @@ import * as plugins from './smartstream.plugins.js'; -// interfaces -import { Transform } from 'stream'; - export interface IErrorFunction { (err: Error): any; } @@ -82,15 +79,17 @@ export class StreamWrapper { this.streamStartedDeferred.resolve(); - finalStream.on('end', () => { - done.resolve(); - }); - finalStream.on('close', () => { - done.resolve(); - }); - finalStream.on('finish', () => { - done.resolve(); - }); + let resolved = false; + const safeResolve = () => { + if (!resolved) { + resolved = true; + done.resolve(); + } + }; + + finalStream.on('end', safeResolve); + finalStream.on('close', safeResolve); + finalStream.on('finish', safeResolve); return done.promise; } } diff --git a/ts/smartstream.nodewebhelpers.ts b/ts/smartstream.nodewebhelpers.ts index 046c494..053fced 100644 --- a/ts/smartstream.nodewebhelpers.ts +++ b/ts/smartstream.nodewebhelpers.ts @@ -1,7 +1,7 @@ import * as plugins from './smartstream.plugins.js'; /** - * Creates a Web ReadableStream from a file. + * Creates a Web ReadableStream from a file using pull-based backpressure. * * @param filePath - The path to the file to be read * @returns A Web ReadableStream that reads the file in chunks @@ -11,23 +11,53 @@ export function createWebReadableStreamFromFile(filePath: string): ReadableStrea return new ReadableStream({ start(controller) { - // When data is available, enqueue it into the Web ReadableStream - fileStream.on('data', (chunk) => { - controller.enqueue(chunk as Uint8Array); + fileStream.on('error', (err) => { + controller.error(err); }); - // When the file stream ends, close the Web ReadableStream fileStream.on('end', () => { controller.close(); }); - // If there's an error, error the Web ReadableStream - fileStream.on('error', (err) => { - controller.error(err); + // Pause immediately — pull() will drive reads + fileStream.pause(); + }, + pull(controller) { + return new Promise((resolve, reject) => { + const chunk = fileStream.read(); + if (chunk !== null) { + controller.enqueue(chunk as Uint8Array); + resolve(); + return; + } + // No data available yet — wait for 'readable' or 'end' + const onReadable = () => { + cleanup(); + const data = fileStream.read(); + if (data !== null) { + controller.enqueue(data as Uint8Array); + } + resolve(); + }; + const onEnd = () => { + cleanup(); + resolve(); + }; + const onError = (err: Error) => { + cleanup(); + reject(err); + }; + const cleanup = () => { + fileStream.removeListener('readable', onReadable); + fileStream.removeListener('end', onEnd); + fileStream.removeListener('error', onError); + }; + fileStream.once('readable', onReadable); + fileStream.once('end', onEnd); + fileStream.once('error', onError); }); }, cancel() { - // If the Web ReadableStream is canceled, destroy the file stream fileStream.destroy(); } }); @@ -43,23 +73,25 @@ export function convertWebReadableToNodeReadable(webStream: ReadableStream { + if (done) { + this.push(null); + } else { + this.push(Buffer.from(value)); + } + }, + (err) => { + this.destroy(err); } - } catch (err) { - this.destroy(err); // Handle errors by destroying the stream - } + ); } }); } /** - * Converts a Node.js Readable stream to a Web ReadableStream. + * Converts a Node.js Readable stream to a Web ReadableStream using pull-based backpressure. * * @param nodeStream - The Node.js Readable stream to convert * @returns A Web ReadableStream that reads data from the Node.js Readable stream @@ -67,16 +99,50 @@ export function convertWebReadableToNodeReadable(webStream: ReadableStream { return new ReadableStream({ start(controller) { - nodeStream.on('data', (chunk) => { - controller.enqueue(new Uint8Array(chunk)); + nodeStream.on('error', (err) => { + controller.error(err); }); nodeStream.on('end', () => { controller.close(); }); - nodeStream.on('error', (err) => { - controller.error(err); + // Pause immediately — pull() will drive reads + nodeStream.pause(); + }, + pull(controller) { + return new Promise((resolve, reject) => { + const chunk = nodeStream.read(); + if (chunk !== null) { + controller.enqueue(new Uint8Array(chunk)); + resolve(); + return; + } + // No data available yet — wait for 'readable' or 'end' + const onReadable = () => { + cleanup(); + const data = nodeStream.read(); + if (data !== null) { + controller.enqueue(new Uint8Array(data)); + } + resolve(); + }; + const onEnd = () => { + cleanup(); + resolve(); + }; + const onError = (err: Error) => { + cleanup(); + reject(err); + }; + const cleanup = () => { + nodeStream.removeListener('readable', onReadable); + nodeStream.removeListener('end', onEnd); + nodeStream.removeListener('error', onError); + }; + nodeStream.once('readable', onReadable); + nodeStream.once('end', onEnd); + nodeStream.once('error', onError); }); }, cancel() { @@ -95,19 +161,23 @@ export function convertWebWritableToNodeWritable(webWritable: WritableStream callback(), + (err) => callback(err) + ); }, final(callback) { writer.close().then(() => callback()).catch(callback); }, destroy(err, callback) { - writer.abort(err).then(() => callback(err)).catch(callback); + if (err) { + writer.abort(err).then(() => callback(err)).catch(() => callback(err)); + } else { + // Clean destroy — just release the lock + writer.releaseLock(); + callback(null); + } } }); } @@ -133,7 +203,7 @@ export function convertNodeWritableToWebWritable(nodeWritable: plugins.stream.Wr }, close() { return new Promise((resolve, reject) => { - nodeWritable.end((err) => { + nodeWritable.end((err: Error | null) => { if (err) { reject(err); } else { @@ -143,9 +213,7 @@ export function convertNodeWritableToWebWritable(nodeWritable: plugins.stream.Wr }); }, abort(reason) { - return new Promise((resolve, reject) => { - nodeWritable.destroy(reason); - }); + nodeWritable.destroy(reason instanceof Error ? reason : new Error(String(reason))); } }); -} \ No newline at end of file +} diff --git a/ts_web/00_commitinfo_data.ts b/ts_web/00_commitinfo_data.ts index 54e1a2a..8051f85 100644 --- a/ts_web/00_commitinfo_data.ts +++ b/ts_web/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '3.3.0', + version: '3.4.0', description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.' } diff --git a/ts_web/classes.webduplexstream.ts b/ts_web/classes.webduplexstream.ts index 9a62ccc..72e7e9a 100644 --- a/ts_web/classes.webduplexstream.ts +++ b/ts_web/classes.webduplexstream.ts @@ -78,17 +78,14 @@ export class WebDuplexStream extends TransformStrea try { const finalChunk = await optionsArg.finalFunction(tools); - if (finalChunk) { - controller.enqueue(finalChunk); + if (finalChunk !== undefined && finalChunk !== null) { + controller.enqueue(finalChunk as TOutput); } } catch (err) { controller.error(err); - } finally { - controller.terminate(); } - } else { - controller.terminate(); } + // TransformStream auto-closes readable after flush resolves — no terminate() needed }, }); @@ -96,7 +93,9 @@ export class WebDuplexStream extends TransformStrea // Start producing data if readFunction is provided if (this.options.readFunction) { - this._startReading(); + this._startReading().catch((err) => { + // Prevent unhandled rejection — the error is propagated through the writable side + }); } } @@ -104,17 +103,25 @@ export class WebDuplexStream extends TransformStrea const writable = this.writable; const writer = writable.getWriter(); + let doneSignaled = false; const tools: IStreamToolsRead = { - done: () => writer.close(), + done: () => { + doneSignaled = true; + }, write: async (writeArg) => await writer.write(writeArg), }; try { await this.options.readFunction(tools); + if (doneSignaled) { + await writer.close(); + } } catch (err) { - writer.abort(err); - } finally { - writer.releaseLock(); + try { + await writer.abort(err); + } catch (_) { + // Writer may already be in error state + } } } @@ -128,8 +135,8 @@ export class WebDuplexStream extends TransformStrea }); const writer = stream.writable.getWriter(); - writer.write(uint8Array).then(() => writer.close()); + writer.write(uint8Array).then(() => writer.close()).catch(() => {}); return stream; } -} \ No newline at end of file +}