smartstream/ts/smartstream.classes.smartduplex.ts
2023-11-11 18:53:38 +01:00

185 lines
5.5 KiB
TypeScript

import * as plugins from './smartstream.plugins.js';
import { Duplex, type DuplexOptions } from 'stream';
export interface IStreamTools {
truncate: () => void;
push: (pipeObject: any) => void;
}
export interface IStreamWriteFunction<T, rT> {
(chunkArg: T, toolsArg: IStreamTools): Promise<rT>;
}
export interface IStreamFinalFunction<rT> {
(toolsArg: IStreamTools): Promise<rT>;
}
export interface SmartStreamOptions<TInput, TOutput> extends DuplexOptions {
handleBackpressure?: boolean;
readFunction?: () => Promise<void>;
writeFunction?: IStreamWriteFunction<TInput, TOutput>;
finalFunction?: IStreamFinalFunction<TOutput>;
// Add other custom options if necessary
}
export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
// STATIC
static fromBuffer(buffer: Buffer, options?: DuplexOptions): SmartDuplex {
const smartStream = new SmartDuplex(options);
process.nextTick(() => {
smartStream.push(buffer);
smartStream.push(null); // Signal the end of the data
});
return smartStream;
}
static fromObservable(
observable: plugins.smartrx.rxjs.Observable<any>,
options?: DuplexOptions
): SmartDuplex {
const smartStream = new SmartDuplex(options);
smartStream.observableSubscription = observable.subscribe({
next: (data) => {
if (!smartStream.push(data)) {
// Pause the observable if the stream buffer is full
smartStream.observableSubscription?.unsubscribe();
smartStream.once('drain', () => {
// Resume the observable when the stream buffer is drained
smartStream.observableSubscription?.unsubscribe();
smartStream.observableSubscription = observable.subscribe((data) => {
smartStream.push(data);
});
});
}
},
error: (err) => {
smartStream.emit('error', err);
},
complete: () => {
smartStream.push(null); // Signal the end of the data
},
});
return smartStream;
}
static fromReplaySubject(
replaySubject: plugins.smartrx.rxjs.ReplaySubject<any>,
options?: DuplexOptions
): SmartDuplex {
const smartStream = new SmartDuplex(options);
let isBackpressured = false;
// Subscribe to the ReplaySubject
const subscription = replaySubject.subscribe({
next: (data) => {
const canPush = smartStream.push(data);
if (!canPush) {
// If push returns false, pause the subscription because of backpressure
isBackpressured = true;
subscription.unsubscribe();
}
},
error: (err) => {
smartStream.emit('error', err);
},
complete: () => {
smartStream.push(null); // End the stream when the ReplaySubject completes
},
});
// Listen for 'drain' event to resume the subscription if it was paused
smartStream.on('drain', () => {
if (isBackpressured) {
isBackpressured = false;
// Resubscribe to the ReplaySubject since we previously paused
smartStream.observableSubscription = replaySubject.subscribe({
next: (data) => {
if (!smartStream.push(data)) {
smartStream.observableSubscription?.unsubscribe();
isBackpressured = true;
}
},
// No need to repeat error and complete handling here because it's already set up above
});
}
});
return smartStream;
}
// INSTANCE
private readFunction?: () => Promise<void>;
private handleBackpressure: boolean;
private writeFunction?: IStreamWriteFunction<TInput, TOutput>;
private finalFunction?: IStreamFinalFunction<TOutput>;
private observableSubscription?: plugins.smartrx.rxjs.Subscription;
constructor(optionsArg?: SmartStreamOptions<TInput, TOutput>) {
super(optionsArg);
this.readFunction = optionsArg?.readFunction;
this.writeFunction = optionsArg?.writeFunction;
this.finalFunction = optionsArg?.finalFunction;
this.handleBackpressure = optionsArg?.handleBackpressure ?? true;
}
public async _read(size: number): Promise<void> {
if (this.readFunction) {
await this.readFunction();
}
}
// Ensure the _write method types the chunk as TInput and encodes TOutput
public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) {
if (!this.writeFunction) {
return callback(new Error('No stream function provided'));
}
const tools: IStreamTools = {
truncate: () => {
this.push(null);
callback();
},
push: (pushArg: TOutput) => this.push(pushArg),
};
try {
const modifiedChunk = await this.writeFunction(chunk, tools);
if (modifiedChunk) {
if (!this.push(modifiedChunk) && this.handleBackpressure) {
this.pause();
// Listen for 'drain' event to resume
this.once('drain', () => {
this.resume(); // Resume the source of data
});
}
}
callback();
} catch (err) {
callback(err);
}
}
public async _final(callback: (error?: Error | null) => void) {
if (this.finalFunction) {
const tools: IStreamTools = {
truncate: () => callback(),
push: (pipeObject) => this.push(pipeObject),
};
try {
const finalChunk = await this.finalFunction(tools);
if (finalChunk) {
this.push(finalChunk);
}
} catch (err) {
callback(err);
}
} else {
// nothing here
}
this.push(null);
callback();
}
}