fix(core): update
This commit is contained in:
parent
703dc11c6c
commit
12bb125bdc
@ -19,15 +19,15 @@ tap.test('should run backpressure test', async (toolsArg) => {
|
|||||||
writeFunction: async (chunk, tools) => {
|
writeFunction: async (chunk, tools) => {
|
||||||
await new Promise((resolve) => setTimeout(resolve, 20)); // Slow processing
|
await new Promise((resolve) => setTimeout(resolve, 20)); // Slow processing
|
||||||
console.log(`processed chunk ${chunk} in stream 2`);
|
console.log(`processed chunk ${chunk} in stream 2`);
|
||||||
// await tools.push(chunk);
|
await tools.push(chunk);
|
||||||
return chunk;
|
// return chunk;
|
||||||
},
|
},
|
||||||
}); // This stream processes data more slowly
|
}); // This stream processes data more slowly
|
||||||
const stream3 = new SmartDuplex({
|
const stream3 = new SmartDuplex({
|
||||||
objectMode: true,
|
objectMode: true,
|
||||||
name: 'stream3',
|
name: 'stream3',
|
||||||
writeFunction: async (chunk, tools) => {
|
writeFunction: async (chunk, tools) => {
|
||||||
await new Promise((resolve) => setTimeout(resolve, 100)); // Slow processing
|
await new Promise((resolve) => setTimeout(resolve, 200)); // Slow processing
|
||||||
console.log(`processed chunk ${chunk} in stream 3`);
|
console.log(`processed chunk ${chunk} in stream 3`);
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: '@push.rocks/smartstream',
|
name: '@push.rocks/smartstream',
|
||||||
version: '3.0.26',
|
version: '3.0.27',
|
||||||
description: 'simplifies access to node streams'
|
description: 'simplifies access to node streams'
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
export * from './smartstream.classes.passthrough.js';
|
|
||||||
export * from './smartstream.classes.smartduplex.js';
|
export * from './smartstream.classes.smartduplex.js';
|
||||||
export * from './smartstream.classes.streamwrapper.js';
|
export * from './smartstream.classes.streamwrapper.js';
|
||||||
export * from './smartstream.classes.streamintake.js';
|
export * from './smartstream.classes.streamintake.js';
|
||||||
|
@ -1,21 +0,0 @@
|
|||||||
import * as plugins from './smartstream.plugins.js';
|
|
||||||
|
|
||||||
export class PassThrough extends plugins.stream.Duplex {
|
|
||||||
constructor(options?: plugins.stream.DuplexOptions) {
|
|
||||||
super(options);
|
|
||||||
}
|
|
||||||
|
|
||||||
_read(size: number): void {
|
|
||||||
// No-op: Data written will be automatically available for reading.
|
|
||||||
}
|
|
||||||
|
|
||||||
_write(chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void): void {
|
|
||||||
if (this.push(chunk, encoding)) {
|
|
||||||
callback();
|
|
||||||
} else {
|
|
||||||
this.once('drain', () => {
|
|
||||||
callback();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -19,3 +19,12 @@ export function createTransformFunction<TInput, TOutput>(
|
|||||||
|
|
||||||
return smartDuplexStream;
|
return smartDuplexStream;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const createPassThrough = () => {
|
||||||
|
return new SmartDuplex({
|
||||||
|
objectMode: true,
|
||||||
|
writeFunction: async (chunkArg, toolsArg) => {
|
||||||
|
return chunkArg;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user