smartstream/test/test.streamfunction.ts

66 lines
1.9 KiB
TypeScript
Raw Permalink Normal View History

2023-07-12 09:27:46 +00:00
import { expect, tap } from '@push.rocks/tapbundle';
import * as smartfile from '@push.rocks/smartfile';
2022-06-07 14:16:14 +00:00
import * as smartstream from '../ts/index.js';
let testIntake: smartstream.StreamIntake<string>;
tap.test('should handle a read stream', async (tools) => {
const counter = 0;
2023-11-03 20:32:24 +00:00
const streamWrapper = new smartstream.StreamWrapper([
2022-06-07 14:16:14 +00:00
smartfile.fsStream.createReadStream('./test/assets/readabletext.txt'),
2023-11-03 20:32:24 +00:00
new smartstream.SmartDuplex({
2023-11-07 20:46:46 +00:00
writeFunction: async (chunkStringArg: Buffer, streamTools) => {
2022-06-07 14:16:14 +00:00
// do something with the stream here
const result = chunkStringArg.toString().substr(0, 100);
2023-11-03 20:32:24 +00:00
streamTools.push('wow =========== \n');
2022-06-07 14:16:14 +00:00
return Buffer.from(result);
},
2023-11-03 21:26:15 +00:00
finalFunction: async (tools) => {
2022-06-07 14:16:14 +00:00
return Buffer.from('this is the end');
},
2023-11-03 20:32:24 +00:00
}),
new smartstream.SmartDuplex({
2023-11-07 20:46:46 +00:00
writeFunction: async (chunkStringArg) => {
2023-11-03 20:32:24 +00:00
console.log(chunkStringArg.toString());
},
2023-11-03 21:26:15 +00:00
finalFunction: async (tools) => {
2023-11-03 20:32:24 +00:00
tools.push(null);
},
2023-11-03 20:36:10 +00:00
})
2022-06-07 14:16:14 +00:00
]);
2023-11-03 20:32:24 +00:00
// await streamWrapper.run();
2022-06-07 14:16:14 +00:00
});
tap.test('should create a valid Intake', async (tools) => {
testIntake = new smartstream.StreamIntake<string>();
2023-11-03 12:55:56 +00:00
testIntake.pipe(
2023-11-03 20:32:24 +00:00
new smartstream.SmartDuplex({
objectMode: true,
2023-11-07 20:46:46 +00:00
writeFunction: async (chunkStringArg: string, streamTools) => {
2022-06-07 14:16:14 +00:00
await tools.delayFor(100);
2023-11-03 20:32:24 +00:00
console.log(chunkStringArg);
return chunkStringArg;
2022-06-07 14:16:14 +00:00
}
2023-11-03 20:32:24 +00:00
})
2022-06-07 14:16:14 +00:00
)
.pipe(smartfile.fsStream.createWriteStream('./test/assets/writabletext.txt'));
const testFinished = tools.defer();
let counter = 0;
testIntake.pushNextObservable.subscribe(() => {
if (counter < 50) {
counter++;
testIntake.pushData('hi');
testIntake.pushData('+wow');
testIntake.pushData('\n');
} else {
testIntake.signalEnd();
testFinished.resolve();
}
});
await testFinished.promise;
testIntake.signalEnd();
});
tap.start();