diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml deleted file mode 100644 index c3c7ce1..0000000 --- a/.gitlab-ci.yml +++ /dev/null @@ -1,140 +0,0 @@ -# gitzone ci_default -image: registry.gitlab.com/hosttoday/ht-docker-node:npmci - -cache: - paths: - - .npmci_cache/ - key: '$CI_BUILD_STAGE' - -stages: - - security - - test - - release - - metadata - -before_script: - - npm install -g @shipzone/npmci - -# ==================== -# security stage -# ==================== -mirror: - stage: security - script: - - npmci git mirror - only: - - tags - tags: - - lossless - - docker - - notpriv - -auditProductionDependencies: - image: registry.gitlab.com/hosttoday/ht-docker-node:npmci - stage: security - script: - - npmci npm prepare - - npmci command npm install --production --ignore-scripts - - npmci command npm config set registry https://registry.npmjs.org - - npmci command npm audit --audit-level=high --only=prod --production - tags: - - docker - allow_failure: true - -auditDevDependencies: - image: registry.gitlab.com/hosttoday/ht-docker-node:npmci - stage: security - script: - - npmci npm prepare - - npmci command npm install --ignore-scripts - - npmci command npm config set registry https://registry.npmjs.org - - npmci command npm audit --audit-level=high --only=dev - tags: - - docker - allow_failure: true - -# ==================== -# test stage -# ==================== - -testStable: - stage: test - script: - - npmci npm prepare - - npmci node install stable - - npmci npm install - - npmci npm test - coverage: /\d+.?\d+?\%\s*coverage/ - tags: - - docker - -testBuild: - stage: test - script: - - npmci npm prepare - - npmci node install stable - - npmci npm install - - npmci command npm run build - coverage: /\d+.?\d+?\%\s*coverage/ - tags: - - docker - -release: - stage: release - script: - - npmci node install stable - - npmci npm publish - only: - - tags - tags: - - lossless - - docker - - notpriv - -# ==================== -# metadata stage -# ==================== -codequality: - stage: metadata - allow_failure: true - only: - - tags - script: - - npmci command npm install -g typescript - - npmci npm prepare - - npmci npm install - tags: - - lossless - - docker - - priv - -trigger: - stage: metadata - script: - - npmci trigger - only: - - tags - tags: - - lossless - - docker - - notpriv - -pages: - stage: metadata - script: - - npmci node install lts - - npmci command npm install -g @git.zone/tsdoc - - npmci npm prepare - - npmci npm install - - npmci command tsdoc - tags: - - lossless - - docker - - notpriv - only: - - tags - artifacts: - expire_in: 1 week - paths: - - public - allow_failure: true diff --git a/assets/denoentry.ts b/assets/denoentry.ts deleted file mode 100644 index 379250e..0000000 --- a/assets/denoentry.ts +++ /dev/null @@ -1 +0,0 @@ -console.log('Hello from deno'); diff --git a/package.json b/package.json index a402c35..782033b 100644 --- a/package.json +++ b/package.json @@ -3,13 +3,14 @@ "version": "3.0.38", "private": false, "description": "A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.", - "main": "dist_ts/index.js", - "typings": "dist_ts/index.d.ts", "type": "module", + "exports": { + ".": "./dist_ts/index.js", + "./web": "./dist_ts_web/index.js" + }, "scripts": { "test": "(tstest test/)", - "build": "(tsbuild)", - "buildDocs": "tsdoc" + "build": "(tsbuild tsfolders --web --allowimplicitany)" }, "repository": { "type": "git", @@ -31,9 +32,9 @@ }, "dependencies": { "@push.rocks/lik": "^6.0.15", + "@push.rocks/smartenv": "^5.0.12", "@push.rocks/smartpromise": "^4.0.3", - "@push.rocks/smartrx": "^3.0.7", - "@push.rocks/webstream": "^1.0.8" + "@push.rocks/smartrx": "^3.0.7" }, "browserslist": [ "last 1 chrome versions" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a344f34..07ca72a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -11,15 +11,15 @@ importers: '@push.rocks/lik': specifier: ^6.0.15 version: 6.0.15 + '@push.rocks/smartenv': + specifier: ^5.0.12 + version: 5.0.12 '@push.rocks/smartpromise': specifier: ^4.0.3 version: 4.0.3 '@push.rocks/smartrx': specifier: ^3.0.7 version: 3.0.7 - '@push.rocks/webstream': - specifier: ^1.0.8 - version: 1.0.8 devDependencies: '@git.zone/tsbuild': specifier: ^2.1.80 @@ -349,9 +349,6 @@ packages: '@push.rocks/smartdelay@3.0.5': resolution: {integrity: sha512-mUuI7kj2f7ztjpic96FvRIlf2RsKBa5arw81AHNsndbxO6asRcxuWL8dTVxouEIK8YsBUlj0AsrCkHhMbLQdHw==} - '@push.rocks/smartenv@5.0.10': - resolution: {integrity: sha512-yJURKoz5awTPyN+5MO1A9maf5+Zh2qutHlbZPx2y5P6rwGPFWThippLicqXUSs2rM49ES6RKAD48Q8w5GQkWFw==} - '@push.rocks/smartenv@5.0.12': resolution: {integrity: sha512-tDEFwywzq0FNzRYc9qY2dRl2pgQuZG0G2/yml2RLWZWSW+Fn1EHshnKOGHz8o77W7zvu4hTgQQX42r/JY5XHTg==} @@ -3505,10 +3502,6 @@ snapshots: dependencies: '@push.rocks/smartpromise': 4.0.3 - '@push.rocks/smartenv@5.0.10': - dependencies: - '@push.rocks/smartpromise': 4.0.3 - '@push.rocks/smartenv@5.0.12': dependencies: '@push.rocks/smartpromise': 4.0.3 @@ -3827,7 +3820,7 @@ snapshots: '@push.rocks/smartstring@4.0.9': dependencies: '@push.rocks/isounique': 1.0.5 - '@push.rocks/smartenv': 5.0.10 + '@push.rocks/smartenv': 5.0.12 '@types/randomatic': 3.1.4 buffer: 6.0.3 crypto-random-string: 5.0.0 diff --git a/test/test.backpressure.ts b/test/test.ts.backpressure.ts similarity index 100% rename from test/test.backpressure.ts rename to test/test.ts.backpressure.ts diff --git a/test/test.smartstream.ts b/test/test.ts.smartstream.ts similarity index 100% rename from test/test.smartstream.ts rename to test/test.ts.smartstream.ts diff --git a/test/test.streamfunction.ts b/test/test.ts.streamfunction.ts similarity index 100% rename from test/test.streamfunction.ts rename to test/test.ts.streamfunction.ts diff --git a/test/test.ts b/test/test.ts.ts similarity index 100% rename from test/test.ts rename to test/test.ts.ts diff --git a/test/test.ts_web.both.ts b/test/test.ts_web.both.ts new file mode 100644 index 0000000..952a6bc --- /dev/null +++ b/test/test.ts_web.both.ts @@ -0,0 +1,70 @@ +import { expect, expectAsync, tap } from '@push.rocks/tapbundle'; +import * as webstream from '../ts_web/index.js'; + +tap.test('WebDuplexStream', async (toolsArg) => { + const testDone = toolsArg.defer(); // Create a deferred object to control test completion. + const inputUint8Array = new Uint8Array([1, 2, 3, 4, 5]); + const stream = webstream.WebDuplexStream.fromUInt8Array(inputUint8Array); + + const reader = stream.readable.getReader(); + let readUint8Array = new Uint8Array(); + + reader.read().then(function processText({ done, value }) { + if (done) { + expect(readUint8Array).toEqual(inputUint8Array); + testDone.resolve(); // Correctly signal that the test is done. + return; + } + readUint8Array = new Uint8Array([...readUint8Array, ...value]); + return reader.read().then(processText); + }); + + return testDone.promise; // Return the promise to properly wait for the test to complete. +}); + + +tap.test('should handle transform with a write function', async (toolsArg) => { + const testDone = toolsArg.defer(); + const input = [1, 2, 3, 4, 5]; + const expectedOutput = [2, 4, 6, 8, 10]; + + const transformStream = new webstream.WebDuplexStream({ + writeFunction: (chunk, { push }) => { + push(chunk * 2); // Push the doubled number into the stream + return Promise.resolve(); // Resolve the promise immediately + }, + }); + + const writableStream = transformStream.writable.getWriter(); + const readableStream = transformStream.readable.getReader(); + + const output: number[] = []; + + // Process the text and resolve the test once done. + const processText = async ({ done, value }) => { + if (done) { + expect(output).toEqual(expectedOutput); + testDone.resolve(); // Resolve the deferred test once all values have been read. + return; + } + if (value !== undefined) { + output.push(value); + } + // Continue reading and processing. + await readableStream.read().then(processText); + }; + + // Start the read process before writing to the stream. + readableStream.read().then(processText); + + // Sequentially write to the stream and close when done. + for (const num of input) { + await writableStream.write(num); + } + await writableStream.close(); + + return testDone.promise; // This will wait until the testDone is resolved before completing the test. +}); + + +tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 48f3114..8c25f90 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstream', - version: '3.0.38', + version: '3.0.39', description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.' } diff --git a/ts/smartstream.plugins.ts b/ts/smartstream.plugins.ts index 5424c85..e210f10 100644 --- a/ts/smartstream.plugins.ts +++ b/ts/smartstream.plugins.ts @@ -7,7 +7,7 @@ export { stream }; import * as lik from '@push.rocks/lik'; import * as smartpromise from '@push.rocks/smartpromise'; import * as smartrx from '@push.rocks/smartrx'; -import * as webstream from '@push.rocks/webstream'; +import * as webstream from '../dist_ts_web/index.js'; export { lik, smartpromise, smartrx, webstream }; diff --git a/ts_web/00_commitinfo_data.ts b/ts_web/00_commitinfo_data.ts new file mode 100644 index 0000000..8c25f90 --- /dev/null +++ b/ts_web/00_commitinfo_data.ts @@ -0,0 +1,8 @@ +/** + * autocreated commitinfo by @pushrocks/commitinfo + */ +export const commitinfo = { + name: '@push.rocks/smartstream', + version: '3.0.39', + description: 'A library to simplify the creation and manipulation of Node.js streams, providing utilities for handling transform, duplex, and readable/writable streams effectively in TypeScript.' +} diff --git a/ts_web/classes.webduplexstream.ts b/ts_web/classes.webduplexstream.ts new file mode 100644 index 0000000..d6e7e7e --- /dev/null +++ b/ts_web/classes.webduplexstream.ts @@ -0,0 +1,156 @@ +import * as plugins from './plugins.js'; + + +// ======================================== +// READ +// ======================================== +export interface IStreamToolsRead { + done: () => void; + write: (writeArg: TInput) => void; +} + +/** + * the read function is called anytime + * -> the WebDuplexStream is being read from + * and at the same time if nothing is enqueued + */ +export interface IStreamReadFunction { + (toolsArg: IStreamToolsRead): Promise; +} + +// ======================================== +// WRITE +// ======================================== +export interface IStreamToolsWrite { + truncate: () => void; + push: (pushArg: TOutput) => void; +} + +/** + * the write function can return something. + * It is called anytime a chunk is written to the stream. + */ +export interface IStreamWriteFunction { + (chunkArg: TInput, toolsArg: IStreamToolsWrite): Promise; +} + +export interface IStreamFinalFunction { + (toolsArg: IStreamToolsWrite): Promise; +} + +export interface WebDuplexStreamOptions { + readFunction?: IStreamReadFunction; + writeFunction?: IStreamWriteFunction; + finalFunction?: IStreamFinalFunction; +} + +export class WebDuplexStream extends TransformStream { + static fromUInt8Array(uint8Array: Uint8Array): WebDuplexStream { + const stream = new WebDuplexStream({ + writeFunction: async (chunk, { push }) => { + push(chunk); // Directly push the chunk as is + return null; + } + }); + + const writer = stream.writable.getWriter(); + writer.write(uint8Array).then(() => writer.close()); + + return stream; + } + + // INSTANCE + options: WebDuplexStreamOptions; + + constructor(optionsArg: WebDuplexStreamOptions) { + super({ + async transform(chunk, controller) { + // Transformation logic remains unchanged + if (optionsArg?.writeFunction) { + const tools: IStreamToolsWrite = { + truncate: () => controller.terminate(), + push: (pushArg: TOutput) => controller.enqueue(pushArg), + }; + + optionsArg.writeFunction(chunk, tools) + .then(writeReturnChunk => { + // the write return chunk is optional + // just in case the write function returns something other than void. + if (writeReturnChunk) { + controller.enqueue(writeReturnChunk); + } + }) + .catch(err => controller.error(err)); + } else { + controller.error(new Error('No write function provided')); + } + }, + async flush(controller) { + // Flush logic remains unchanged + if (optionsArg?.finalFunction) { + const tools: IStreamToolsWrite = { + truncate: () => controller.terminate(), + push: (pipeObject) => controller.enqueue(pipeObject), + }; + + optionsArg.finalFunction(tools) + .then(finalChunk => { + if (finalChunk) { + controller.enqueue(finalChunk); + } + }) + .catch(err => controller.error(err)) + .finally(() => controller.terminate()); + } else { + controller.terminate(); + } + } + }); + + this.options = optionsArg; + } + + // Method to create a custom readable stream that integrates the readFunction + // readFunction is executed whenever the stream is being read from and nothing is enqueued + getCustomReadableStream() { + const readableStream = this.readable; + const options = this.options; + const customReadable = new ReadableStream({ + async pull(controller) { + const reader = readableStream.getReader(); + + // Check the current state of the original stream + const { value, done } = await reader.read(); + reader.releaseLock(); + + if (done) { + // If the original stream is done, close the custom readable stream + controller.close(); + } else { + if (value) { + // If there is data in the original stream, enqueue it and do not execute the readFunction + controller.enqueue(value); + } else if (options.readFunction) { + // If the original stream is empty, execute the readFunction and read again + await options.readFunction({ + done: () => controller.close(), + write: (writeArg) => controller.enqueue(writeArg), + }); + + const newReader = readableStream.getReader(); + const { value: newValue, done: newDone } = await newReader.read(); + newReader.releaseLock(); + + if (newDone) { + controller.close(); + } else { + controller.enqueue(newValue); + } + } + } + } + }); + + return customReadable; + } +} diff --git a/ts_web/convert.ts b/ts_web/convert.ts new file mode 100644 index 0000000..a88e297 --- /dev/null +++ b/ts_web/convert.ts @@ -0,0 +1,64 @@ +export interface IDuplexStream { + read(): any; + write(chunk: any, callback?: (error?: Error | null) => void): boolean; + on(event: string, listener: (...args: any[]) => void): this; + once(event: string, listener: (...args: any[]) => void): this; + end(callback?: () => void): void; + destroy(error?: Error): void; +} + +export interface IReadableStreamOptions { + highWaterMark?: number; +} + +export interface IWritableStreamOptions { + highWaterMark?: number; +} + +export function convertDuplexToWebStream(duplex: IDuplexStream): { readable: ReadableStream, writable: WritableStream } { + const readable = new ReadableStream({ + start(controller) { + duplex.on('readable', () => { + let chunk; + while (null !== (chunk = duplex.read())) { + controller.enqueue(chunk); + } + }); + + duplex.on('end', () => { + controller.close(); + }); + }, + cancel(reason) { + duplex.destroy(new Error(reason)); + } + }); + + const writable = new WritableStream({ + write(chunk) { + return new Promise((resolve, reject) => { + const isBackpressured = !duplex.write(chunk, (error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + + if (isBackpressured) { + duplex.once('drain', resolve); + } + }); + }, + close() { + return new Promise((resolve, reject) => { + duplex.end(resolve); + }); + }, + abort(reason) { + duplex.destroy(new Error(reason)); + } + }); + + return { readable, writable }; +} diff --git a/ts_web/index.ts b/ts_web/index.ts new file mode 100644 index 0000000..b49b5d7 --- /dev/null +++ b/ts_web/index.ts @@ -0,0 +1,5 @@ +import './plugins.js'; +export * from './classes.webduplexstream.js'; +export { + convertDuplexToWebStream, +} from './convert.js'; \ No newline at end of file diff --git a/ts_web/plugins.ts b/ts_web/plugins.ts new file mode 100644 index 0000000..95d4a18 --- /dev/null +++ b/ts_web/plugins.ts @@ -0,0 +1,15 @@ +// @push.rocks scope +import * as smartenv from '@push.rocks/smartenv'; + +export { + smartenv, +} + +// lets setup dependencies +const smartenvInstance = new smartenv.Smartenv(); + +await smartenvInstance.getSafeNodeModule('stream/web', async (moduleArg) => { + globalThis.ReadableStream = moduleArg.ReadableStream; + globalThis.WritableStream = moduleArg.WritableStream; + globalThis.TransformStream = moduleArg.TransformStream; +})