65 lines
1.5 KiB
TypeScript
65 lines
1.5 KiB
TypeScript
export interface IDuplexStream {
|
|
read(): any;
|
|
write(chunk: any, callback?: (error?: Error | null) => void): boolean;
|
|
on(event: string, listener: (...args: any[]) => void): this;
|
|
once(event: string, listener: (...args: any[]) => void): this;
|
|
end(callback?: () => void): void;
|
|
destroy(error?: Error): void;
|
|
}
|
|
|
|
export interface IReadableStreamOptions {
|
|
highWaterMark?: number;
|
|
}
|
|
|
|
export interface IWritableStreamOptions {
|
|
highWaterMark?: number;
|
|
}
|
|
|
|
export function convertDuplexToWebStream(duplex: IDuplexStream): { readable: ReadableStream, writable: WritableStream } {
|
|
const readable = new ReadableStream({
|
|
start(controller) {
|
|
duplex.on('readable', () => {
|
|
let chunk;
|
|
while (null !== (chunk = duplex.read())) {
|
|
controller.enqueue(chunk);
|
|
}
|
|
});
|
|
|
|
duplex.on('end', () => {
|
|
controller.close();
|
|
});
|
|
},
|
|
cancel(reason) {
|
|
duplex.destroy(new Error(reason));
|
|
}
|
|
});
|
|
|
|
const writable = new WritableStream({
|
|
write(chunk) {
|
|
return new Promise<void>((resolve, reject) => {
|
|
const isBackpressured = !duplex.write(chunk, (error) => {
|
|
if (error) {
|
|
reject(error);
|
|
} else {
|
|
resolve();
|
|
}
|
|
});
|
|
|
|
if (isBackpressured) {
|
|
duplex.once('drain', resolve);
|
|
}
|
|
});
|
|
},
|
|
close() {
|
|
return new Promise<void>((resolve, reject) => {
|
|
duplex.end(resolve);
|
|
});
|
|
},
|
|
abort(reason) {
|
|
duplex.destroy(new Error(reason));
|
|
}
|
|
});
|
|
|
|
return { readable, writable };
|
|
}
|