diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index c41a89e..8696011 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartrx', - version: '3.0.7', + version: '3.0.8', description: 'smart wrapper for rxjs' } diff --git a/ts/index.ts b/ts/index.ts index d022ee9..5c4b143 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -1,5 +1,6 @@ import * as plugins from './smartrx.plugins.js'; export * from './smartrx.classes.observablemap.js'; export * from './smartrx.classes.observableintake.js'; +export * from './smartrx.functions.js'; import * as rxjs from './smartrx.plugins.rxjs.js'; export { rxjs }; diff --git a/ts/smartrx.functions.ts b/ts/smartrx.functions.ts new file mode 100644 index 0000000..9961183 --- /dev/null +++ b/ts/smartrx.functions.ts @@ -0,0 +1,32 @@ +import { Observable } from 'rxjs'; +import { Readable } from 'stream'; + +export function fromStreamWithBackpressure(stream: Readable): Observable { + return new Observable((subscriber) => { + const pauseStream = () => stream.pause(); + const resumeStream = () => process.nextTick(() => stream.resume()); + + // Handler for each piece of data + const onData = (data: T) => { + // Pause the stream to apply backpressure + pauseStream(); + // Emit data and resume the stream if the subscriber is ready + subscriber.next(data); + resumeStream(); + }; + + // Subscribe to stream events + stream.on('data', onData); + stream.on('error', (error) => subscriber.error(error)); + stream.on('end', () => subscriber.complete()); + stream.on('close', () => subscriber.complete()); + + // If the subscriber unsubscribes, clean up the stream listeners + return () => { + stream.removeListener('data', onData); + stream.removeListener('error', subscriber.error); + stream.removeListener('end', subscriber.complete); + stream.removeListener('close', subscriber.complete); + }; + }); +} \ No newline at end of file