import { Observable } from 'rxjs'; import { Readable } from 'stream'; export function fromStreamWithBackpressure(stream: Readable): Observable { return new Observable((subscriber) => { const pauseStream = () => stream.pause(); const resumeStream = () => process.nextTick(() => stream.resume()); // Handler for each piece of data const onData = (data: T) => { // Pause the stream to apply backpressure pauseStream(); // Emit data and resume the stream if the subscriber is ready subscriber.next(data); resumeStream(); }; // Subscribe to stream events stream.on('data', onData); stream.on('error', (error) => subscriber.error(error)); stream.on('end', () => subscriber.complete()); stream.on('close', () => subscriber.complete()); // If the subscriber unsubscribes, clean up the stream listeners return () => { stream.removeListener('data', onData); stream.removeListener('error', subscriber.error); stream.removeListener('end', subscriber.complete); stream.removeListener('close', subscriber.complete); }; }); }