feat(smartduplex): improve backpressure handling and web/node stream interoperability
This commit is contained in:
@@ -1,50 +1,10 @@
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
hi+wow
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
data
|
||||
|
||||
56
test/test.backpressure.ts
Normal file
56
test/test.backpressure.ts
Normal file
@@ -0,0 +1,56 @@
|
||||
import { tap, expect } from '@push.rocks/tapbundle';
|
||||
import { SmartDuplex } from '../ts/index.js';
|
||||
|
||||
tap.test('Backpressure: should apply backpressure across piped streams', async (toolsArg) => {
|
||||
const done = toolsArg.defer();
|
||||
|
||||
const stream1 = new SmartDuplex({
|
||||
name: 'stream1',
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
return chunk;
|
||||
},
|
||||
});
|
||||
|
||||
const stream2 = new SmartDuplex({
|
||||
name: 'stream2',
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
await tools.push(chunk);
|
||||
},
|
||||
});
|
||||
|
||||
const stream3 = new SmartDuplex({
|
||||
objectMode: true,
|
||||
name: 'stream3',
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
},
|
||||
});
|
||||
|
||||
stream1.pipe(stream2).pipe(stream3);
|
||||
|
||||
let backpressured = false;
|
||||
for (let i = 0; i < 200; i++) {
|
||||
const canContinue = stream1.write(`Chunk ${i}`, 'utf8');
|
||||
if (!canContinue) {
|
||||
backpressured = true;
|
||||
}
|
||||
}
|
||||
|
||||
stream1.end();
|
||||
|
||||
stream3.on('finish', () => {
|
||||
if (!backpressured) {
|
||||
throw new Error('No backpressure was observed.');
|
||||
} else {
|
||||
done.resolve();
|
||||
}
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
152
test/test.nodewebhelpers.ts
Normal file
152
test/test.nodewebhelpers.ts
Normal file
@@ -0,0 +1,152 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import * as fs from 'fs';
|
||||
import * as stream from 'stream';
|
||||
import { nodewebhelpers } from '../ts/index.js';
|
||||
|
||||
// =============================================
|
||||
// createWebReadableStreamFromFile
|
||||
// =============================================
|
||||
|
||||
tap.test('nodewebhelpers: createWebReadableStreamFromFile should read a file', async () => {
|
||||
const webStream = nodewebhelpers.createWebReadableStreamFromFile('./test/assets/readabletext.txt');
|
||||
const reader = webStream.getReader();
|
||||
|
||||
const chunks: Uint8Array[] = [];
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
chunks.push(value);
|
||||
}
|
||||
|
||||
expect(chunks.length).toBeGreaterThan(0);
|
||||
const content = Buffer.concat(chunks).toString();
|
||||
expect(content.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// convertNodeReadableToWebReadable
|
||||
// =============================================
|
||||
|
||||
tap.test('nodewebhelpers: convertNodeReadableToWebReadable should convert', async () => {
|
||||
const nodeReadable = fs.createReadStream('./test/assets/readabletext.txt');
|
||||
const webReadable = nodewebhelpers.convertNodeReadableToWebReadable(nodeReadable);
|
||||
|
||||
const reader = webReadable.getReader();
|
||||
const chunks: Uint8Array[] = [];
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
chunks.push(value);
|
||||
}
|
||||
|
||||
expect(chunks.length).toBeGreaterThan(0);
|
||||
const content = Buffer.concat(chunks).toString();
|
||||
expect(content.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// convertWebReadableToNodeReadable
|
||||
// =============================================
|
||||
|
||||
tap.test('nodewebhelpers: convertWebReadableToNodeReadable should convert', async (tools) => {
|
||||
const data = new Uint8Array([72, 101, 108, 108, 111]); // "Hello"
|
||||
const webReadable = new ReadableStream<Uint8Array>({
|
||||
start(controller) {
|
||||
controller.enqueue(data);
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const nodeReadable = nodewebhelpers.convertWebReadableToNodeReadable(webReadable);
|
||||
|
||||
const chunks: Buffer[] = [];
|
||||
const done = tools.defer();
|
||||
|
||||
nodeReadable.on('data', (chunk: Buffer) => {
|
||||
chunks.push(chunk);
|
||||
});
|
||||
|
||||
nodeReadable.on('end', () => {
|
||||
const result = Buffer.concat(chunks).toString();
|
||||
expect(result).toEqual('Hello');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// convertNodeWritableToWebWritable
|
||||
// =============================================
|
||||
|
||||
tap.test('nodewebhelpers: convertNodeWritableToWebWritable should convert', async () => {
|
||||
const chunks: Buffer[] = [];
|
||||
const nodeWritable = new stream.Writable({
|
||||
write(chunk, encoding, callback) {
|
||||
chunks.push(chunk);
|
||||
callback();
|
||||
},
|
||||
});
|
||||
|
||||
const webWritable = nodewebhelpers.convertNodeWritableToWebWritable(nodeWritable);
|
||||
const writer = webWritable.getWriter();
|
||||
|
||||
await writer.write(new Uint8Array([65, 66, 67])); // "ABC"
|
||||
await writer.close();
|
||||
|
||||
const result = Buffer.concat(chunks).toString();
|
||||
expect(result).toEqual('ABC');
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// convertWebWritableToNodeWritable
|
||||
// =============================================
|
||||
|
||||
tap.test('nodewebhelpers: convertWebWritableToNodeWritable should convert', async (tools) => {
|
||||
const chunks: Uint8Array[] = [];
|
||||
|
||||
const webWritable = new WritableStream<Uint8Array>({
|
||||
write(chunk) {
|
||||
chunks.push(chunk);
|
||||
},
|
||||
});
|
||||
|
||||
const nodeWritable = nodewebhelpers.convertWebWritableToNodeWritable(webWritable);
|
||||
|
||||
const done = tools.defer();
|
||||
nodeWritable.write(Buffer.from('Hello'), (err) => {
|
||||
expect(err).toBeFalsy();
|
||||
nodeWritable.end(() => {
|
||||
expect(chunks.length).toBeGreaterThan(0);
|
||||
done.resolve();
|
||||
});
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// Round-trip: Node → Web → Node
|
||||
// =============================================
|
||||
|
||||
tap.test('nodewebhelpers: round-trip Node → Web → Node readable', async (tools) => {
|
||||
const nodeReadable = fs.createReadStream('./test/assets/readabletext.txt');
|
||||
const webReadable = nodewebhelpers.convertNodeReadableToWebReadable(nodeReadable);
|
||||
const nodeReadable2 = nodewebhelpers.convertWebReadableToNodeReadable(webReadable);
|
||||
|
||||
const chunks: Buffer[] = [];
|
||||
const done = tools.defer();
|
||||
|
||||
nodeReadable2.on('data', (chunk: Buffer) => {
|
||||
chunks.push(chunk);
|
||||
});
|
||||
|
||||
nodeReadable2.on('end', () => {
|
||||
expect(chunks.length).toBeGreaterThan(0);
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
379
test/test.smartduplex.ts
Normal file
379
test/test.smartduplex.ts
Normal file
@@ -0,0 +1,379 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import * as fs from 'fs';
|
||||
import * as smartstream from '../ts/index.js';
|
||||
import { SmartDuplex } from '../ts/smartstream.classes.smartduplex.js';
|
||||
|
||||
// =============================================
|
||||
// Constructor
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should construct with no options', async () => {
|
||||
const duplex = new SmartDuplex();
|
||||
expect(duplex).toBeInstanceOf(SmartDuplex);
|
||||
});
|
||||
|
||||
tap.test('SmartDuplex: should construct with options', async () => {
|
||||
const duplex = new SmartDuplex({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => chunk,
|
||||
});
|
||||
expect(duplex).toBeInstanceOf(SmartDuplex);
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// fromBuffer
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should create from a Buffer', async () => {
|
||||
const bufferData = Buffer.from('This is a test buffer');
|
||||
const stream = SmartDuplex.fromBuffer(bufferData, {});
|
||||
|
||||
let receivedData = Buffer.alloc(0);
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
stream.on('data', (chunk: Buffer) => {
|
||||
receivedData = Buffer.concat([receivedData, chunk]);
|
||||
});
|
||||
stream.on('end', () => {
|
||||
expect(receivedData.toString()).toEqual(bufferData.toString());
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// writeFunction
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should transform chunks via writeFunction', async (tools) => {
|
||||
const results: string[] = [];
|
||||
const transform = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => {
|
||||
return chunk.toUpperCase();
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
|
||||
transform.on('data', (chunk: string) => {
|
||||
results.push(chunk);
|
||||
});
|
||||
|
||||
transform.on('end', () => {
|
||||
expect(results).toContain('HELLO');
|
||||
expect(results).toContain('WORLD');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
transform.write('hello');
|
||||
transform.write('world');
|
||||
transform.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
tap.test('SmartDuplex: writeFunction returning undefined should not push', async (tools) => {
|
||||
const results: any[] = [];
|
||||
const transform = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async () => {
|
||||
return undefined;
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
|
||||
transform.on('data', (chunk: any) => {
|
||||
results.push(chunk);
|
||||
});
|
||||
|
||||
transform.on('end', () => {
|
||||
expect(results.length).toEqual(0);
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
transform.write('hello');
|
||||
transform.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// tools.push — multiple outputs
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should emit multiple chunks via tools.push', async (tools) => {
|
||||
const results: string[] = [];
|
||||
const splitter = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, streamTools) => {
|
||||
const words = chunk.split(' ');
|
||||
for (const word of words) {
|
||||
await streamTools.push(word);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
|
||||
splitter.on('data', (chunk: string) => results.push(chunk));
|
||||
|
||||
splitter.on('end', () => {
|
||||
expect(results).toContain('hello');
|
||||
expect(results).toContain('beautiful');
|
||||
expect(results).toContain('world');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
splitter.write('hello beautiful world');
|
||||
splitter.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// finalFunction
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should emit final chunk via finalFunction', async (tools) => {
|
||||
const results: string[] = [];
|
||||
let count = 0;
|
||||
|
||||
const aggregator = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async () => {
|
||||
count++;
|
||||
return undefined;
|
||||
},
|
||||
finalFunction: async () => {
|
||||
return `total: ${count}`;
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
|
||||
aggregator.on('data', (chunk: string) => results.push(chunk));
|
||||
|
||||
aggregator.on('end', () => {
|
||||
expect(results.length).toEqual(1);
|
||||
expect(results[0]).toEqual('total: 2');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
aggregator.write('a');
|
||||
aggregator.write('b');
|
||||
aggregator.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
tap.test('SmartDuplex: finalFunction can push multiple chunks via tools.push', async (tools) => {
|
||||
const results: string[] = [];
|
||||
|
||||
const stream = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => chunk,
|
||||
finalFunction: async (streamTools) => {
|
||||
await streamTools.push('final1');
|
||||
await streamTools.push('final2');
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
|
||||
stream.on('data', (chunk: string) => results.push(chunk));
|
||||
|
||||
stream.on('end', () => {
|
||||
expect(results).toContain('hello');
|
||||
expect(results).toContain('final1');
|
||||
expect(results).toContain('final2');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
stream.write('hello');
|
||||
stream.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// truncate
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should truncate stream early', async (tools) => {
|
||||
const results: string[] = [];
|
||||
|
||||
const limiter = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, streamTools) => {
|
||||
if (chunk === 'STOP') {
|
||||
streamTools.truncate();
|
||||
return undefined;
|
||||
}
|
||||
return chunk;
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
|
||||
limiter.on('data', (chunk: string) => results.push(chunk));
|
||||
|
||||
limiter.on('end', () => {
|
||||
expect(results).toContain('a');
|
||||
expect(results).toContain('b');
|
||||
expect(results).not.toContain('STOP');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
limiter.write('a');
|
||||
limiter.write('b');
|
||||
// Write STOP on next tick to allow previous writes to flush
|
||||
process.nextTick(() => {
|
||||
limiter.write('STOP');
|
||||
});
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// Error handling
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should emit error when writeFunction throws', async (tools) => {
|
||||
const stream = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async () => {
|
||||
throw new Error('write error');
|
||||
},
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
stream.on('error', (err) => {
|
||||
expect(err.message).toEqual('write error');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
stream.write('test');
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
tap.test('SmartDuplex: should error when no writeFunction and data is written', async (tools) => {
|
||||
const stream = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
stream.on('error', (err) => {
|
||||
expect(err.message).toEqual('No stream function provided');
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
stream.write('test');
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// fromWebReadableStream
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should create from a Web ReadableStream', async (tools) => {
|
||||
const chunks = ['hello', 'world', 'foo'];
|
||||
const webReadable = new ReadableStream<string>({
|
||||
start(controller) {
|
||||
for (const chunk of chunks) {
|
||||
controller.enqueue(chunk);
|
||||
}
|
||||
controller.close();
|
||||
}
|
||||
});
|
||||
|
||||
const duplex = SmartDuplex.fromWebReadableStream(webReadable);
|
||||
const results: string[] = [];
|
||||
|
||||
const done = tools.defer();
|
||||
duplex.on('data', (chunk: string) => results.push(chunk));
|
||||
duplex.on('end', () => {
|
||||
expect(results).toEqual(chunks);
|
||||
done.resolve();
|
||||
});
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// getWebStreams
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should provide web streams via getWebStreams()', async () => {
|
||||
const duplex = new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => {
|
||||
return chunk.toUpperCase();
|
||||
},
|
||||
});
|
||||
|
||||
const { readable, writable } = await duplex.getWebStreams();
|
||||
|
||||
const writer = writable.getWriter();
|
||||
const reader = readable.getReader();
|
||||
|
||||
await writer.write('hello');
|
||||
await writer.write('world');
|
||||
await writer.close();
|
||||
|
||||
const results: string[] = [];
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
results.push(value);
|
||||
}
|
||||
|
||||
expect(results).toContain('HELLO');
|
||||
expect(results).toContain('WORLD');
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// Debug mode
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: debug mode should not crash', async (tools) => {
|
||||
const stream = new SmartDuplex<string, string>({
|
||||
name: 'DebugStream',
|
||||
debug: true,
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => chunk,
|
||||
});
|
||||
|
||||
const done = tools.defer();
|
||||
stream.on('data', () => {});
|
||||
stream.on('end', () => done.resolve());
|
||||
|
||||
stream.write('test');
|
||||
stream.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// Pipe with file read
|
||||
// =============================================
|
||||
|
||||
tap.test('SmartDuplex: should handle a read stream pipeline', async () => {
|
||||
const streamWrapper = new smartstream.StreamWrapper([
|
||||
fs.createReadStream('./test/assets/readabletext.txt'),
|
||||
new smartstream.SmartDuplex({
|
||||
writeFunction: async (chunkStringArg: Buffer, streamTools) => {
|
||||
const result = chunkStringArg.toString().substr(0, 100);
|
||||
streamTools.push('wow =========== \n');
|
||||
return Buffer.from(result);
|
||||
},
|
||||
finalFunction: async () => {
|
||||
return Buffer.from('this is the end');
|
||||
},
|
||||
}),
|
||||
new smartstream.SmartDuplex({
|
||||
writeFunction: async (chunkStringArg) => {
|
||||
// consume data
|
||||
},
|
||||
finalFunction: async (streamTools) => {
|
||||
streamTools.push(null);
|
||||
},
|
||||
})
|
||||
]);
|
||||
await streamWrapper.run();
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
128
test/test.streamintake.ts
Normal file
128
test/test.streamintake.ts
Normal file
@@ -0,0 +1,128 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import * as fs from 'fs';
|
||||
import { StreamIntake, SmartDuplex } from '../ts/index.js';
|
||||
import * as stream from 'stream';
|
||||
|
||||
// =============================================
|
||||
// Basic StreamIntake
|
||||
// =============================================
|
||||
|
||||
tap.test('StreamIntake: should push data and signal end', async (tools) => {
|
||||
const intake = new StreamIntake<string>();
|
||||
const results: string[] = [];
|
||||
|
||||
intake.pipe(
|
||||
new SmartDuplex<string, string>({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk) => {
|
||||
results.push(chunk);
|
||||
return chunk;
|
||||
},
|
||||
})
|
||||
);
|
||||
|
||||
const done = tools.defer();
|
||||
let counter = 0;
|
||||
intake.pushNextObservable.subscribe(() => {
|
||||
if (counter < 5) {
|
||||
counter++;
|
||||
intake.pushData(`item-${counter}`);
|
||||
} else {
|
||||
intake.signalEnd();
|
||||
done.resolve();
|
||||
}
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
// Give streams time to flush
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
expect(results.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
tap.test('StreamIntake: should pipe to a writable', async (tools) => {
|
||||
const intake = new StreamIntake<string>();
|
||||
|
||||
intake
|
||||
.pipe(
|
||||
new SmartDuplex({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk: string) => {
|
||||
return chunk;
|
||||
},
|
||||
})
|
||||
)
|
||||
.pipe(fs.createWriteStream('./test/assets/writabletext.txt'));
|
||||
|
||||
const done = tools.defer();
|
||||
let counter = 0;
|
||||
intake.pushNextObservable.subscribe(() => {
|
||||
if (counter < 10) {
|
||||
counter++;
|
||||
intake.pushData('data\n');
|
||||
} else {
|
||||
intake.signalEnd();
|
||||
done.resolve();
|
||||
}
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// StreamIntake.fromStream (Node Readable)
|
||||
// =============================================
|
||||
|
||||
tap.test('StreamIntake: fromStream should wrap a Node readable', async (tools) => {
|
||||
const nodeReadable = fs.createReadStream('./test/assets/readabletext.txt');
|
||||
const intake = await StreamIntake.fromStream<Buffer>(nodeReadable);
|
||||
|
||||
const chunks: Buffer[] = [];
|
||||
const done = tools.defer();
|
||||
|
||||
intake.on('data', (chunk: Buffer) => {
|
||||
chunks.push(chunk);
|
||||
});
|
||||
|
||||
intake.on('end', () => {
|
||||
expect(chunks.length).toBeGreaterThan(0);
|
||||
const content = Buffer.concat(chunks).toString();
|
||||
expect(content.length).toBeGreaterThan(0);
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// StreamIntake.fromStream (Web ReadableStream)
|
||||
// =============================================
|
||||
|
||||
tap.test('StreamIntake: fromStream should wrap a Web ReadableStream', async (tools) => {
|
||||
const data = ['chunk1', 'chunk2', 'chunk3'];
|
||||
const webReadable = new ReadableStream<string>({
|
||||
start(controller) {
|
||||
for (const item of data) {
|
||||
controller.enqueue(item);
|
||||
}
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const intake = await StreamIntake.fromStream<string>(webReadable);
|
||||
|
||||
const results: string[] = [];
|
||||
const done = tools.defer();
|
||||
|
||||
intake.on('data', (chunk: string) => {
|
||||
results.push(chunk);
|
||||
});
|
||||
|
||||
intake.on('end', () => {
|
||||
expect(results).toEqual(data);
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
70
test/test.streamwrapper.ts
Normal file
70
test/test.streamwrapper.ts
Normal file
@@ -0,0 +1,70 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import * as fs from 'fs';
|
||||
import { StreamWrapper, SmartDuplex } from '../ts/index.js';
|
||||
|
||||
tap.test('StreamWrapper: should pipe read to write', async () => {
|
||||
const wrapper = new StreamWrapper([
|
||||
fs.createReadStream('./test/assets/test.md'),
|
||||
fs.createWriteStream('./test/assets/testCopy.md'),
|
||||
]);
|
||||
await wrapper.run();
|
||||
});
|
||||
|
||||
tap.test('StreamWrapper: should propagate errors', async (tools) => {
|
||||
const failingStream = new SmartDuplex<Buffer, Buffer>({
|
||||
writeFunction: async () => {
|
||||
throw new Error('intentional error');
|
||||
},
|
||||
});
|
||||
|
||||
const wrapper = new StreamWrapper([
|
||||
fs.createReadStream('./test/assets/test.md'),
|
||||
failingStream,
|
||||
]);
|
||||
|
||||
let errorCaught = false;
|
||||
try {
|
||||
await wrapper.run();
|
||||
} catch (err) {
|
||||
errorCaught = true;
|
||||
expect(err.message).toEqual('intentional error');
|
||||
}
|
||||
expect(errorCaught).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('StreamWrapper: streamStarted should resolve', async () => {
|
||||
const wrapper = new StreamWrapper([
|
||||
fs.createReadStream('./test/assets/test.md'),
|
||||
fs.createWriteStream('./test/assets/testCopy.md'),
|
||||
]);
|
||||
|
||||
const runPromise = wrapper.run();
|
||||
await wrapper.streamStarted();
|
||||
await runPromise;
|
||||
});
|
||||
|
||||
tap.test('StreamWrapper: onCustomEvent should fire', async (tools) => {
|
||||
const results: string[] = [];
|
||||
|
||||
const emitter = new SmartDuplex<Buffer, Buffer>({
|
||||
writeFunction: async (chunk, streamTools) => {
|
||||
(emitter as any).emit('custom-progress', 'progress');
|
||||
return chunk;
|
||||
},
|
||||
});
|
||||
|
||||
const wrapper = new StreamWrapper([
|
||||
fs.createReadStream('./test/assets/test.md'),
|
||||
emitter,
|
||||
fs.createWriteStream('./test/assets/testCopy.md'),
|
||||
]);
|
||||
|
||||
wrapper.onCustomEvent('custom-progress', () => {
|
||||
results.push('fired');
|
||||
});
|
||||
|
||||
await wrapper.run();
|
||||
expect(results.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -1,68 +0,0 @@
|
||||
import { tap, expect } from '@push.rocks/tapbundle';
|
||||
import { SmartDuplex, type ISmartDuplexOptions, StreamWrapper } from '../ts/index.js';
|
||||
|
||||
tap.test('should run backpressure test', async (toolsArg) => {
|
||||
const done = toolsArg.defer();
|
||||
async function testBackpressure() {
|
||||
const stream1 = new SmartDuplex({
|
||||
name: 'stream1',
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 10)); // Slow processing
|
||||
console.log(`processed chunk ${chunk} in stream 1`);
|
||||
return chunk; // Fast processing
|
||||
},
|
||||
});
|
||||
const stream2 = new SmartDuplex({
|
||||
name: 'stream2',
|
||||
objectMode: true,
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 20)); // Slow processing
|
||||
console.log(`processed chunk ${chunk} in stream 2`);
|
||||
await tools.push(chunk);
|
||||
// return chunk, optionally return ;
|
||||
},
|
||||
}); // This stream processes data more slowly
|
||||
const stream3 = new SmartDuplex({
|
||||
objectMode: true,
|
||||
name: 'stream3',
|
||||
writeFunction: async (chunk, tools) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 200)); // Slow processing
|
||||
console.log(`processed chunk ${chunk} in stream 3`);
|
||||
},
|
||||
});
|
||||
|
||||
stream1.pipe(stream2).pipe(stream3);
|
||||
|
||||
let backpressured = false;
|
||||
for (let i = 0; i < 200; i++) {
|
||||
const canContinue = stream1.write(`Chunk ${i}`, 'utf8');
|
||||
if (!canContinue) {
|
||||
backpressured = true;
|
||||
console.log(`Backpressure at chunk ${i}`);
|
||||
}
|
||||
}
|
||||
|
||||
stream1.end();
|
||||
|
||||
stream1.on('finish', () => {
|
||||
console.log('Stream 1 finished processing.');
|
||||
});
|
||||
stream2.on('finish', () => {
|
||||
console.log('Stream 2 finished processing.');
|
||||
});
|
||||
stream3.on('finish', () => {
|
||||
console.log('Stream 3 finished processing.');
|
||||
if (!backpressured) {
|
||||
throw new Error('No backpressure was observed.');
|
||||
} else {
|
||||
done.resolve();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
testBackpressure();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -1,24 +0,0 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import { SmartDuplex } from '../ts/smartstream.classes.smartduplex.js'; // Adjust the import to your file structure
|
||||
import * as smartrx from '@push.rocks/smartrx';
|
||||
import * as fs from 'fs';
|
||||
|
||||
tap.test('should create a SmartStream from a Buffer', async () => {
|
||||
const bufferData = Buffer.from('This is a test buffer');
|
||||
const smartStream = SmartDuplex.fromBuffer(bufferData, {});
|
||||
|
||||
let receivedData = Buffer.alloc(0);
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
smartStream.on('data', (chunk: Buffer) => {
|
||||
receivedData = Buffer.concat([receivedData, chunk]);
|
||||
});
|
||||
|
||||
smartStream.on('end', () => {
|
||||
expect(receivedData.toString()).toEqual(bufferData.toString());
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -1,65 +0,0 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import * as fs from 'fs';
|
||||
|
||||
import * as smartstream from '../ts/index.js';
|
||||
|
||||
let testIntake: smartstream.StreamIntake<string>;
|
||||
|
||||
tap.test('should handle a read stream', async (tools) => {
|
||||
const counter = 0;
|
||||
const streamWrapper = new smartstream.StreamWrapper([
|
||||
fs.createReadStream('./test/assets/readabletext.txt'),
|
||||
new smartstream.SmartDuplex({
|
||||
writeFunction: async (chunkStringArg: Buffer, streamTools) => {
|
||||
// do something with the stream here
|
||||
const result = chunkStringArg.toString().substr(0, 100);
|
||||
streamTools.push('wow =========== \n');
|
||||
return Buffer.from(result);
|
||||
},
|
||||
finalFunction: async (tools) => {
|
||||
return Buffer.from('this is the end');
|
||||
},
|
||||
}),
|
||||
new smartstream.SmartDuplex({
|
||||
writeFunction: async (chunkStringArg) => {
|
||||
console.log(chunkStringArg.toString());
|
||||
},
|
||||
finalFunction: async (tools) => {
|
||||
tools.push(null);
|
||||
},
|
||||
})
|
||||
]);
|
||||
await streamWrapper.run();
|
||||
});
|
||||
|
||||
tap.test('should create a valid Intake', async (tools) => {
|
||||
testIntake = new smartstream.StreamIntake<string>();
|
||||
testIntake.pipe(
|
||||
new smartstream.SmartDuplex({
|
||||
objectMode: true,
|
||||
writeFunction: async (chunkStringArg: string, streamTools) => {
|
||||
await tools.delayFor(100);
|
||||
console.log(chunkStringArg);
|
||||
return chunkStringArg;
|
||||
}
|
||||
})
|
||||
)
|
||||
.pipe(fs.createWriteStream('./test/assets/writabletext.txt'));
|
||||
const testFinished = tools.defer();
|
||||
let counter = 0;
|
||||
testIntake.pushNextObservable.subscribe(() => {
|
||||
if (counter < 50) {
|
||||
counter++;
|
||||
testIntake.pushData('hi');
|
||||
testIntake.pushData('+wow');
|
||||
testIntake.pushData('\n');
|
||||
} else {
|
||||
testIntake.signalEnd();
|
||||
testFinished.resolve();
|
||||
}
|
||||
});
|
||||
await testFinished.promise;
|
||||
testIntake.signalEnd();
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -1,15 +0,0 @@
|
||||
import * as fs from 'fs';
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
|
||||
import * as smartstream from '../ts/smartstream.classes.streamwrapper.js';
|
||||
|
||||
let testSmartstream: smartstream.StreamWrapper;
|
||||
tap.test('should combine a stream', async () => {
|
||||
testSmartstream = new smartstream.StreamWrapper([
|
||||
fs.createReadStream('./test/assets/test.md'),
|
||||
fs.createWriteStream('./test/assets/testCopy.md'),
|
||||
]);
|
||||
await testSmartstream.run();
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -1,67 +0,0 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import * as webstream from '../ts_web/index.js';
|
||||
|
||||
tap.test('WebDuplexStream fromUInt8Array should read back the same Uint8Array', async () => {
|
||||
const inputUint8Array = new Uint8Array([1, 2, 3, 4, 5]);
|
||||
const stream = webstream.WebDuplexStream.fromUInt8Array(inputUint8Array);
|
||||
|
||||
const reader = stream.readable.getReader();
|
||||
let readUint8Array = new Uint8Array();
|
||||
|
||||
// Read from the stream
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
if (value) {
|
||||
// Concatenate value to readUint8Array
|
||||
const tempArray = new Uint8Array(readUint8Array.length + value.length);
|
||||
tempArray.set(readUint8Array, 0);
|
||||
tempArray.set(value, readUint8Array.length);
|
||||
readUint8Array = tempArray;
|
||||
}
|
||||
}
|
||||
|
||||
expect(readUint8Array).toEqual(inputUint8Array);
|
||||
});
|
||||
|
||||
tap.test('WebDuplexStream should handle transform with a write function', async () => {
|
||||
const input = [1, 2, 3, 4, 5];
|
||||
const expectedOutput = [2, 4, 6, 8, 10];
|
||||
|
||||
const webDuplexStream = new webstream.WebDuplexStream<number, number>({
|
||||
writeFunction: async (chunk, { push }) => {
|
||||
// Push the doubled number into the stream
|
||||
push(chunk * 2);
|
||||
},
|
||||
});
|
||||
|
||||
const writer = webDuplexStream.writable.getWriter();
|
||||
const reader = webDuplexStream.readable.getReader();
|
||||
|
||||
const output: number[] = [];
|
||||
|
||||
// Read from the stream asynchronously
|
||||
const readPromise = (async () => {
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
if (value !== undefined) {
|
||||
output.push(value);
|
||||
}
|
||||
}
|
||||
})();
|
||||
|
||||
// Write to the stream
|
||||
for (const num of input) {
|
||||
await writer.write(num);
|
||||
}
|
||||
await writer.close();
|
||||
|
||||
// Wait for the reading to complete
|
||||
await readPromise;
|
||||
|
||||
// Assert that the output matches the expected transformed data
|
||||
expect(output).toEqual(expectedOutput);
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
51
test/test.utilities.ts
Normal file
51
test/test.utilities.ts
Normal file
@@ -0,0 +1,51 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import { createTransformFunction, createPassThrough, SmartDuplex, StreamWrapper } from '../ts/index.js';
|
||||
|
||||
// =============================================
|
||||
// createTransformFunction
|
||||
// =============================================
|
||||
|
||||
tap.test('createTransformFunction: should create a transform stream', async (tools) => {
|
||||
const doubler = createTransformFunction<number, number>(async (n) => n * 2, { objectMode: true });
|
||||
const results: number[] = [];
|
||||
|
||||
doubler.on('data', (chunk: number) => results.push(chunk));
|
||||
|
||||
const done = tools.defer();
|
||||
doubler.on('end', () => {
|
||||
expect(results).toContain(10);
|
||||
expect(results).toContain(20);
|
||||
expect(results).toContain(30);
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
doubler.write(5);
|
||||
doubler.write(10);
|
||||
doubler.write(15);
|
||||
doubler.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// createPassThrough
|
||||
// =============================================
|
||||
|
||||
tap.test('createPassThrough: should pass data through unchanged', async (tools) => {
|
||||
const passThrough = createPassThrough();
|
||||
const results: string[] = [];
|
||||
|
||||
passThrough.on('data', (chunk: string) => results.push(chunk));
|
||||
|
||||
const done = tools.defer();
|
||||
passThrough.on('end', () => {
|
||||
expect(results).toEqual(['hello', 'world']);
|
||||
done.resolve();
|
||||
});
|
||||
|
||||
passThrough.write('hello');
|
||||
passThrough.write('world');
|
||||
passThrough.end();
|
||||
await done.promise;
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
144
test/test.webduplexstream.both.ts
Normal file
144
test/test.webduplexstream.both.ts
Normal file
@@ -0,0 +1,144 @@
|
||||
import { expect, tap } from '@push.rocks/tapbundle';
|
||||
import { WebDuplexStream } from '../ts_web/index.js';
|
||||
|
||||
// Helper: collect all chunks from a readable
|
||||
async function collectAll<T>(reader: ReadableStreamDefaultReader<T>): Promise<T[]> {
|
||||
const results: T[] = [];
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
results.push(value);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
// =============================================
|
||||
// Basic transform
|
||||
// =============================================
|
||||
|
||||
tap.test('WebDuplexStream: should transform chunks via writeFunction', async () => {
|
||||
const stream = new WebDuplexStream<number, number>({
|
||||
writeFunction: async (chunk, { push }) => {
|
||||
push(chunk * 2);
|
||||
},
|
||||
});
|
||||
|
||||
const writer = stream.writable.getWriter();
|
||||
const reader = stream.readable.getReader();
|
||||
|
||||
// Read and write concurrently to avoid backpressure deadlock
|
||||
const readPromise = collectAll(reader);
|
||||
await writer.write(5);
|
||||
await writer.write(10);
|
||||
await writer.close();
|
||||
const results = await readPromise;
|
||||
|
||||
expect(results).toContain(10);
|
||||
expect(results).toContain(20);
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// writeFunction return value
|
||||
// =============================================
|
||||
|
||||
tap.test('WebDuplexStream: should enqueue returned non-null values', async () => {
|
||||
const stream = new WebDuplexStream<string, string>({
|
||||
writeFunction: async (chunk) => {
|
||||
return chunk.toUpperCase();
|
||||
},
|
||||
});
|
||||
|
||||
const writer = stream.writable.getWriter();
|
||||
const reader = stream.readable.getReader();
|
||||
|
||||
const readPromise = collectAll(reader);
|
||||
await writer.write('hello');
|
||||
await writer.close();
|
||||
const results = await readPromise;
|
||||
|
||||
expect(results[0]).toEqual('HELLO');
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// fromUInt8Array
|
||||
// =============================================
|
||||
|
||||
tap.test('WebDuplexStream: fromUInt8Array should produce data', async () => {
|
||||
const data = new Uint8Array([1, 2, 3, 4, 5]);
|
||||
const stream = WebDuplexStream.fromUInt8Array(data);
|
||||
const reader = stream.readable.getReader();
|
||||
|
||||
const { value } = await reader.read();
|
||||
expect(value).toBeTruthy();
|
||||
expect(value.length).toEqual(5);
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// readFunction
|
||||
// =============================================
|
||||
|
||||
tap.test('WebDuplexStream: readFunction should supply data to the stream', async () => {
|
||||
const stream = new WebDuplexStream<string, string>({
|
||||
readFunction: async (tools) => {
|
||||
await tools.write('chunk1');
|
||||
await tools.write('chunk2');
|
||||
tools.done();
|
||||
},
|
||||
writeFunction: async (chunk, { push }) => {
|
||||
push(chunk.toUpperCase());
|
||||
},
|
||||
});
|
||||
|
||||
const reader = stream.readable.getReader();
|
||||
const results = await collectAll(reader);
|
||||
|
||||
expect(results).toContain('CHUNK1');
|
||||
expect(results).toContain('CHUNK2');
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// finalFunction
|
||||
// =============================================
|
||||
|
||||
tap.test('WebDuplexStream: finalFunction should emit final data', async () => {
|
||||
const stream = new WebDuplexStream<string, string>({
|
||||
writeFunction: async (chunk) => {
|
||||
return chunk;
|
||||
},
|
||||
finalFunction: async (tools) => {
|
||||
tools.push('final');
|
||||
return undefined;
|
||||
},
|
||||
});
|
||||
|
||||
const writer = stream.writable.getWriter();
|
||||
const reader = stream.readable.getReader();
|
||||
|
||||
const readPromise = collectAll(reader);
|
||||
await writer.write('hello');
|
||||
await writer.close();
|
||||
const results = await readPromise;
|
||||
|
||||
expect(results).toContain('hello');
|
||||
expect(results).toContain('final');
|
||||
});
|
||||
|
||||
// =============================================
|
||||
// No writeFunction = passthrough
|
||||
// =============================================
|
||||
|
||||
tap.test('WebDuplexStream: no writeFunction should passthrough', async () => {
|
||||
const stream = new WebDuplexStream<string, string>({});
|
||||
|
||||
const writer = stream.writable.getWriter();
|
||||
const reader = stream.readable.getReader();
|
||||
|
||||
const readPromise = collectAll(reader);
|
||||
await writer.write('pass');
|
||||
await writer.close();
|
||||
const results = await readPromise;
|
||||
|
||||
expect(results[0]).toEqual('pass');
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
Reference in New Issue
Block a user