smartstream/ts/smartstream.classes.smartduplex.ts

206 lines
6.1 KiB
TypeScript
Raw Normal View History

2023-11-01 13:16:58 +00:00
import * as plugins from './smartstream.plugins.js';
import { Duplex, type DuplexOptions } from 'stream';
2023-11-03 20:32:24 +00:00
export interface IStreamTools {
truncate: () => void;
push: (pipeObject: any) => void;
2023-11-03 12:55:56 +00:00
}
2023-11-01 13:16:58 +00:00
2023-11-07 20:46:46 +00:00
export interface IStreamWriteFunction<T, rT> {
2023-11-03 20:32:24 +00:00
(chunkArg: T, toolsArg: IStreamTools): Promise<rT>;
}
2023-11-01 13:16:58 +00:00
2023-11-07 20:46:46 +00:00
export interface IStreamFinalFunction<rT> {
2023-11-03 20:32:24 +00:00
(toolsArg: IStreamTools): Promise<rT>;
}
2023-11-01 13:16:58 +00:00
2023-11-11 19:30:42 +00:00
export interface ISmartDuplexOptions<TInput, TOutput> extends DuplexOptions {
2023-11-11 17:53:38 +00:00
handleBackpressure?: boolean;
2023-11-03 20:32:24 +00:00
readFunction?: () => Promise<void>;
2023-11-07 20:46:46 +00:00
writeFunction?: IStreamWriteFunction<TInput, TOutput>;
finalFunction?: IStreamFinalFunction<TOutput>;
2023-11-03 20:32:24 +00:00
// Add other custom options if necessary
}
2023-11-01 13:16:58 +00:00
2023-11-03 20:32:24 +00:00
export class SmartDuplex<TInput = any, TOutput = any> extends Duplex {
// STATIC
2023-11-11 19:30:42 +00:00
static fromBuffer(buffer: Buffer, options?: ISmartDuplexOptions<any, any>): SmartDuplex {
const smartDuplex = new SmartDuplex(options);
2023-11-01 13:16:58 +00:00
process.nextTick(() => {
2023-11-11 19:30:42 +00:00
smartDuplex.push(buffer);
smartDuplex.push(null); // Signal the end of the data
2023-11-01 13:16:58 +00:00
});
2023-11-11 19:30:42 +00:00
return smartDuplex;
2023-11-01 13:16:58 +00:00
}
2023-11-03 20:32:24 +00:00
static fromObservable(
observable: plugins.smartrx.rxjs.Observable<any>,
2023-11-11 19:30:42 +00:00
options?: ISmartDuplexOptions<any, any>
2023-11-03 20:32:24 +00:00
): SmartDuplex {
const smartStream = new SmartDuplex(options);
2023-11-01 13:16:58 +00:00
smartStream.observableSubscription = observable.subscribe({
next: (data) => {
2023-11-11 19:30:42 +00:00
if (!smartStream.push(data) && smartStream.handleBackpressure) {
2023-11-01 13:16:58 +00:00
// Pause the observable if the stream buffer is full
smartStream.observableSubscription?.unsubscribe();
smartStream.once('drain', () => {
// Resume the observable when the stream buffer is drained
smartStream.observableSubscription?.unsubscribe();
2023-11-03 20:32:24 +00:00
smartStream.observableSubscription = observable.subscribe((data) => {
2023-11-01 13:16:58 +00:00
smartStream.push(data);
});
});
}
},
error: (err) => {
smartStream.emit('error', err);
},
complete: () => {
smartStream.push(null); // Signal the end of the data
2023-11-03 20:32:24 +00:00
},
2023-11-01 13:16:58 +00:00
});
return smartStream;
}
2023-11-03 12:55:56 +00:00
2023-11-03 20:32:24 +00:00
static fromReplaySubject(
replaySubject: plugins.smartrx.rxjs.ReplaySubject<any>,
options?: DuplexOptions
): SmartDuplex {
const smartStream = new SmartDuplex(options);
2023-11-03 12:55:56 +00:00
let isBackpressured = false;
// Subscribe to the ReplaySubject
const subscription = replaySubject.subscribe({
next: (data) => {
const canPush = smartStream.push(data);
if (!canPush) {
// If push returns false, pause the subscription because of backpressure
isBackpressured = true;
subscription.unsubscribe();
}
},
error: (err) => {
smartStream.emit('error', err);
},
complete: () => {
smartStream.push(null); // End the stream when the ReplaySubject completes
2023-11-03 20:32:24 +00:00
},
2023-11-03 12:55:56 +00:00
});
// Listen for 'drain' event to resume the subscription if it was paused
smartStream.on('drain', () => {
if (isBackpressured) {
isBackpressured = false;
// Resubscribe to the ReplaySubject since we previously paused
smartStream.observableSubscription = replaySubject.subscribe({
next: (data) => {
if (!smartStream.push(data)) {
smartStream.observableSubscription?.unsubscribe();
isBackpressured = true;
}
},
// No need to repeat error and complete handling here because it's already set up above
});
}
});
return smartStream;
}
2023-11-03 20:32:24 +00:00
// INSTANCE
private readFunction?: () => Promise<void>;
2023-11-11 17:53:38 +00:00
private handleBackpressure: boolean;
2023-11-07 20:46:46 +00:00
private writeFunction?: IStreamWriteFunction<TInput, TOutput>;
private finalFunction?: IStreamFinalFunction<TOutput>;
2023-11-03 20:32:24 +00:00
private observableSubscription?: plugins.smartrx.rxjs.Subscription;
2023-11-11 19:30:42 +00:00
constructor(optionsArg?: ISmartDuplexOptions<TInput, TOutput>) {
2023-11-03 20:32:24 +00:00
super(optionsArg);
this.readFunction = optionsArg?.readFunction;
2023-11-07 20:46:46 +00:00
this.writeFunction = optionsArg?.writeFunction;
2023-11-06 21:10:20 +00:00
this.finalFunction = optionsArg?.finalFunction;
2023-11-11 17:53:38 +00:00
this.handleBackpressure = optionsArg?.handleBackpressure ?? true;
2023-11-03 20:32:24 +00:00
}
public async _read(size: number): Promise<void> {
if (this.readFunction) {
await this.readFunction();
}
}
2023-11-11 19:44:00 +00:00
public notBackpressured = true;
2023-11-11 19:56:46 +00:00
public get backpressured(): boolean {
return !this.notBackpressured;
}
2023-11-11 19:30:42 +00:00
public push(chunkArg?: TOutput | null): boolean {
const result = super.push(chunkArg);
if (!result && this.handleBackpressure) {
2023-11-11 19:44:00 +00:00
this.notBackpressured = false;
2023-11-11 19:30:42 +00:00
this.pause();
// Listen for 'drain' event to resume
this.once('drain', () => {
2023-11-11 19:44:00 +00:00
this.notBackpressured = true;
2023-11-11 19:30:42 +00:00
this.resume(); // Resume the source of data
});
}
return result;
}
2023-11-03 20:32:24 +00:00
// Ensure the _write method types the chunk as TInput and encodes TOutput
public async _write(chunk: TInput, encoding: string, callback: (error?: Error | null) => void) {
2023-11-06 21:10:20 +00:00
if (!this.writeFunction) {
2023-11-03 20:32:24 +00:00
return callback(new Error('No stream function provided'));
}
const tools: IStreamTools = {
truncate: () => {
this.push(null);
callback();
},
push: (pushArg: TOutput) => this.push(pushArg),
};
try {
2023-11-06 21:10:20 +00:00
const modifiedChunk = await this.writeFunction(chunk, tools);
2023-11-03 20:32:24 +00:00
if (modifiedChunk) {
2023-11-11 19:56:46 +00:00
this.push(modifiedChunk)
if (this.backpressured && this.handleBackpressure) {
this.once('drain', () => {
callback();
});
} else {
callback();
}
} else {
callback();
2023-11-03 20:32:24 +00:00
}
} catch (err) {
callback(err);
}
2023-11-11 19:44:00 +00:00
return this.notBackpressured;
2023-11-03 20:32:24 +00:00
}
public async _final(callback: (error?: Error | null) => void) {
2023-11-06 21:10:20 +00:00
if (this.finalFunction) {
2023-11-03 20:32:24 +00:00
const tools: IStreamTools = {
truncate: () => callback(),
push: (pipeObject) => this.push(pipeObject),
};
try {
2023-11-06 21:10:20 +00:00
const finalChunk = await this.finalFunction(tools);
2023-11-03 20:32:24 +00:00
if (finalChunk) {
this.push(finalChunk);
}
} catch (err) {
callback(err);
}
} else {
2023-11-06 21:10:20 +00:00
// nothing here
2023-11-03 20:32:24 +00:00
}
2023-11-06 20:59:25 +00:00
this.push(null);
2023-11-06 21:10:20 +00:00
callback();
2023-11-03 20:32:24 +00:00
}
2023-11-01 13:16:58 +00:00
}