fix(core): update

This commit is contained in:
Philipp Kunz 2022-06-07 16:16:14 +02:00
parent 2aff46eb0e
commit 05e9067a34
13 changed files with 7731 additions and 8332 deletions

9327
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -21,15 +21,19 @@
},
"homepage": "https://gitlab.com/pushrocks/smartstream#README",
"devDependencies": {
"@gitzone/tsbuild": "^2.1.61",
"@gitzone/tstest": "^1.0.70",
"@gitzone/tsbuild": "^2.1.63",
"@gitzone/tstest": "^1.0.71",
"@pushrocks/smartfile": "^10.0.0",
"@pushrocks/tapbundle": "^5.0.3",
"tslint": "^6.1.3",
"tslint-config-prettier": "^1.18.0"
},
"dependencies": {
"@pushrocks/smartpromise": "^3.1.7",
"@pushrocks/smartrx": "^2.0.25",
"@types/from2": "^2.3.1",
"@types/through2": "^2.0.36",
"from2": "^2.3.0",
"through2": "^4.0.2"
},
"browserslist": [

6210
test/assets/readabletext.txt Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,51 @@
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
hi+wow
noice

View File

@ -0,0 +1,68 @@
import { expect, tap } from '@pushrocks/tapbundle';
import * as smartfile from '@pushrocks/smartfile';
import * as smartstream from '../ts/index.js';
let testIntake: smartstream.StreamIntake<string>;
tap.test('should handle a read stream', async (tools) => {
const counter = 0;
const testSmartstream = new smartstream.StreamWrapper([
smartfile.fsStream.createReadStream('./test/assets/readabletext.txt'),
smartstream.createDuplexStream<Buffer, Buffer>(
async (chunkStringArg: Buffer, streamTools) => {
// do something with the stream here
const result = chunkStringArg.toString().substr(0, 100);
streamTools.pipeMore('wow =========== \n');
return Buffer.from(result);
},
async (tools) => {
// tools.pipeMore('hey, this is the end')
return Buffer.from('this is the end');
},
{ objectMode: false }
),
smartstream.createDuplexStream<Buffer, string>(async (chunkStringArg) => {
console.log(chunkStringArg.toString());
return null;
}),
smartstream.cleanPipe(),
]);
await testSmartstream.run();
});
tap.test('should create a valid Intake', async (tools) => {
testIntake = new smartstream.StreamIntake<string>();
testIntake
.getReadable()
.pipe(
smartstream.createDuplexStream<string, string>(
async (chunkString) => {
await tools.delayFor(100);
console.log(chunkString);
return chunkString;
},
async () => {
return 'noice';
}
)
)
.pipe(smartfile.fsStream.createWriteStream('./test/assets/writabletext.txt'));
const testFinished = tools.defer();
let counter = 0;
testIntake.pushNextObservable.subscribe(() => {
if (counter < 50) {
counter++;
testIntake.pushData('hi');
testIntake.pushData('+wow');
testIntake.pushData('\n');
} else {
testIntake.signalEnd();
testFinished.resolve();
}
});
await testFinished.promise;
testIntake.signalEnd();
});
tap.start();

View File

@ -1,11 +1,11 @@
import fs from 'fs';
import { expect, tap } from '@pushrocks/tapbundle';
import * as smartstream from '../ts/index.js';
import * as smartstream from '../ts/smartstream.classes.streamwrapper.js';
let testSmartstream: smartstream.Smartstream;
let testSmartstream: smartstream.StreamWrapper;
tap.test('should combine a stream', async () => {
testSmartstream = new smartstream.Smartstream([
testSmartstream = new smartstream.StreamWrapper([
fs.createReadStream('./test/assets/test.md'),
fs.createWriteStream('./test/assets/testCopy.md'),
]);

8
ts/00_commitinfo_data.ts Normal file
View File

@ -0,0 +1,8 @@
/**
* autocreated commitinfo by @pushrocks/commitinfo
*/
export const commitinfo = {
name: '@pushrocks/smartstream',
version: '2.0.2',
description: 'simplifies access to node streams'
}

View File

@ -1,107 +1,3 @@
import * as plugins from './smartstream.plugins.js';
// interfaces
import { Transform } from 'stream';
export interface IErrorFunction {
(err: Error): any;
}
export interface ICustomEventFunction {
(): any;
}
export interface ICustomEventObject {
eventName: string;
eventFunction: ICustomEventFunction;
}
/**
* class Smartstream handles
*/
export class Smartstream {
private streamArray: Array<plugins.stream.Duplex> = [];
private customEventObjectArray: ICustomEventObject[] = [];
private streamStartedDeferred = plugins.smartpromise.defer();
/**
* constructor
*/
constructor(streamArrayArg: any[]) {
this.streamArray = streamArrayArg;
}
/**
* make something with the stream itself
*/
streamStarted(): Promise<any> {
return this.streamStartedDeferred.promise;
}
/**
* attach listener to custom event
*/
onCustomEvent(eventNameArg: string, eventFunctionArg: ICustomEventFunction) {
this.customEventObjectArray.push({
eventName: eventNameArg,
eventFunction: eventFunctionArg,
});
}
/**
* run the stream
* @returns Promise
*/
run(): Promise<void> {
const done = plugins.smartpromise.defer<void>();
// clone Array
const streamExecutionArray: Array<plugins.stream.Duplex> = [];
for (const streamItem of this.streamArray) {
streamExecutionArray.push(streamItem);
}
// combine the stream
let finalStream = null;
let firstIteration: boolean = true;
for (const stream of streamExecutionArray) {
if (firstIteration === true) {
finalStream = stream;
}
stream.on('error', (err) => {
done.reject(err);
});
for (const customEventObject of this.customEventObjectArray) {
stream.on(customEventObject.eventName, customEventObject.eventFunction);
}
if (!firstIteration) {
finalStream = finalStream.pipe(stream);
}
firstIteration = false;
}
this.streamStartedDeferred.resolve();
finalStream.on('end', () => {
done.resolve();
});
finalStream.on('close', () => {
done.resolve();
});
finalStream.on('finish', () => {
done.resolve();
});
return done.promise;
}
}
export let cleanPipe = () => {
return plugins.through2.obj(
(file, enc, cb) => {
cb();
},
(cb) => {
cb();
}
);
};
export * from './smartstream.classes.streamwrapper.js';
export * from './smartstream.classes.streamintake.js';
export * from './smartstream.duplex.js';

View File

@ -0,0 +1,66 @@
import * as plugins from './smartstream.plugins.js';
export class StreamIntake<T> {
private signalEndBoolean = false;
private chunkStore: T[] = [];
public pushNextObservable = new plugins.smartrx.ObservableIntake<any>();
private pushedNextDeferred = plugins.smartpromise.defer();
private readableStream = plugins.from2.obj(async (size, next) => {
// console.log('get next');
// execute without backpressure
while (this.chunkStore.length > 0) {
next(null, this.chunkStore.shift());
}
if (this.signalEndBoolean) {
next(null, null);
}
// lets trigger backpressure handling
this.pushNextObservable.push('please push next');
await this.pushedNextDeferred.promise;
this.pushedNextDeferred = plugins.smartpromise.defer();
// execute with backpressure
while (this.chunkStore.length > 0) {
next(null, this.chunkStore.shift());
}
if (this.signalEndBoolean) {
next(null, null);
}
});
constructor() {
this.pushNextObservable.push('please push next');
}
/**
* returns a new style readble stream
*/
public getReadable() {
const readable = new plugins.stream.Readable({
objectMode: true,
});
return readable.wrap(this.readableStream);
}
/**
* returns an oldstyle readble stream
*/
public getReadableStream() {
return this.readableStream;
}
public pushData(chunkData: T) {
this.chunkStore.push(chunkData);
this.pushedNextDeferred.resolve();
}
public signalEnd() {
this.signalEndBoolean = true;
this.pushedNextDeferred.resolve();
this.pushNextObservable.signalComplete();
}
}

View File

@ -0,0 +1,107 @@
import * as plugins from './smartstream.plugins.js';
// interfaces
import { Transform } from 'stream';
export interface IErrorFunction {
(err: Error): any;
}
export interface ICustomEventFunction {
(): any;
}
export interface ICustomEventObject {
eventName: string;
eventFunction: ICustomEventFunction;
}
/**
* class Smartstream handles
*/
export class StreamWrapper {
private streamArray: Array<plugins.stream.Duplex> = [];
private customEventObjectArray: ICustomEventObject[] = [];
private streamStartedDeferred = plugins.smartpromise.defer();
/**
* constructor
*/
constructor(streamArrayArg: any[]) {
this.streamArray = streamArrayArg;
}
/**
* make something with the stream itself
*/
streamStarted(): Promise<any> {
return this.streamStartedDeferred.promise;
}
/**
* attach listener to custom event
*/
onCustomEvent(eventNameArg: string, eventFunctionArg: ICustomEventFunction) {
this.customEventObjectArray.push({
eventName: eventNameArg,
eventFunction: eventFunctionArg,
});
}
/**
* run the stream
* @returns Promise
*/
run(): Promise<void> {
const done = plugins.smartpromise.defer<void>();
// clone Array
const streamExecutionArray: Array<plugins.stream.Duplex> = [];
for (const streamItem of this.streamArray) {
streamExecutionArray.push(streamItem);
}
// combine the stream
let finalStream = null;
let firstIteration: boolean = true;
for (const stream of streamExecutionArray) {
if (firstIteration === true) {
finalStream = stream;
}
stream.on('error', (err) => {
done.reject(err);
});
for (const customEventObject of this.customEventObjectArray) {
stream.on(customEventObject.eventName, customEventObject.eventFunction);
}
if (!firstIteration) {
finalStream = finalStream.pipe(stream);
}
firstIteration = false;
}
this.streamStartedDeferred.resolve();
finalStream.on('end', () => {
done.resolve();
});
finalStream.on('close', () => {
done.resolve();
});
finalStream.on('finish', () => {
done.resolve();
});
return done.promise;
}
}
export let cleanPipe = () => {
return plugins.through2.obj(
(file, enc, cb) => {
cb();
},
(cb) => {
cb();
}
);
};

83
ts/smartstream.duplex.ts Normal file
View File

@ -0,0 +1,83 @@
import * as plugins from './smartstream.plugins.js';
export interface ITruncateFunc {
(): void;
}
export interface IPipeMoreFunc {
(pipeObject: any): void;
}
export interface IStreamTools {
truncate: ITruncateFunc;
pipeMore: IPipeMoreFunc;
}
export interface IStreamFunction<T, rT> {
(chunkArg: T, toolsArg: IStreamTools): Promise<rT>;
}
export interface IStreamEndFunction<rT> {
(toolsArg: IStreamTools): Promise<rT>;
}
export interface IStreamOptions {
objectMode?: boolean;
readableObjectMode?: boolean;
writableObjectMode?: boolean;
}
export let createDuplexStream = <T, rT>(
funcArg: IStreamFunction<T, rT>,
endFuncArg?: IStreamEndFunction<rT>,
optionsArg: IStreamOptions = {
objectMode: false,
readableObjectMode: true,
writableObjectMode: true,
}
) => {
return plugins.through2(
optionsArg,
function (chunk, enc, cb) {
let truncated = false;
const tools: IStreamTools = {
truncate: () => {
truncated = true;
cb(null, null);
},
pipeMore: (pipeObject) => {
this.push(pipeObject);
},
};
const asyncWrapper = async () => {
const resultChunk: rT = await funcArg(chunk, tools);
if (!truncated) {
cb(null, resultChunk);
}
};
asyncWrapper().catch((err) => {
console.log(err);
});
},
function (cb) {
const tools: IStreamTools = {
truncate: () => {
cb();
},
pipeMore: (pushArg) => {
this.push(pushArg);
},
};
const asyncWrapper = async () => {
if (endFuncArg) {
const result = await endFuncArg(tools);
this.push(result);
}
cb();
};
asyncWrapper().catch((err) => {
console.log(err);
});
}
);
};

View File

@ -1,12 +1,16 @@
// node native
import * as stream from 'stream';
export { stream };
// pushrocks scope
import * as smartpromise from '@pushrocks/smartpromise';
export { smartpromise };
import * as smartrx from '@pushrocks/smartrx';
export { smartpromise, smartrx };
// thirdparty
import * as through2 from 'through2';
import from2 from 'from2';
import through2 from 'through2';
export { through2 };
export { from2, through2 };

9
tsconfig.json Normal file
View File

@ -0,0 +1,9 @@
{
"compilerOptions": {
"experimentalDecorators": true,
"useDefineForClassFields": false,
"target": "ES2022",
"module": "ES2022",
"moduleResolution": "nodenext"
}
}