fix(core): update

This commit is contained in:
2019-02-19 01:10:56 +01:00
parent b036bfcb92
commit 65a97c9ee0
15 changed files with 1895 additions and 702 deletions

View File

@ -1,105 +1,107 @@
import * as plugins from './smartstream.plugins'
import * as plugins from './smartstream.plugins';
// interfaces
import { Transform } from 'stream'
import { Transform } from 'stream';
export interface IErrorFunction {
(err): any
(err): any;
}
export interface ICustomEventFunction {
(): any
(): any;
}
export interface ICustomEventObject {
eventName: string
eventFunction: ICustomEventFunction
eventName: string;
eventFunction: ICustomEventFunction;
}
/**
* class Smartstream handles
* class Smartstream handles
*/
export class Smartstream {
private streamArray = []
private customEventObjectArray: ICustomEventObject[] = []
private streamStartedDeferred = plugins.q.defer()
private streamArray = [];
private customEventObjectArray: ICustomEventObject[] = [];
private streamStartedDeferred = plugins.smartpromise.defer();
/**
* constructor
*/
constructor(streamArrayArg: any[]) {
this.streamArray = streamArrayArg
this.streamArray = streamArrayArg;
}
/**
* make something with the stream itself
*/
streamStarted (): Promise<any> {
return this.streamStartedDeferred.promise
streamStarted(): Promise<any> {
return this.streamStartedDeferred.promise;
}
/**
* attach listener to custom event
*/
onCustomEvent (eventNameArg: string, eventFunctionArg: ICustomEventFunction) {
onCustomEvent(eventNameArg: string, eventFunctionArg: ICustomEventFunction) {
this.customEventObjectArray.push({
eventName: eventNameArg,
eventFunction: eventFunctionArg
})
});
}
/**
* run the stream
* @returns Promise
*/
run (): Promise<void> {
let done = plugins.q.defer<void>()
run(): Promise<void> {
const done = plugins.smartpromise.defer<void>();
// clone Array
let streamExecutionArray = []
for (let streamItem of this.streamArray) { streamExecutionArray.push(streamItem) }
// combine the stream
let finalStream = null
let firstIteration: boolean = true
for (let stream of streamExecutionArray) {
if (firstIteration === true) {
finalStream = stream
}
stream.on('error', (err) => {
done.reject(err)
})
for (let customEventObject of this.customEventObjectArray) {
stream.on(customEventObject.eventName, customEventObject.eventFunction)
}
if (!firstIteration) {
finalStream = finalStream.pipe(stream)
}
firstIteration = false
const streamExecutionArray = [];
for (const streamItem of this.streamArray) {
streamExecutionArray.push(streamItem);
}
this.streamStartedDeferred.resolve()
// combine the stream
let finalStream = null;
let firstIteration: boolean = true;
for (const stream of streamExecutionArray) {
if (firstIteration === true) {
finalStream = stream;
}
stream.on('error', err => {
done.reject(err);
});
for (const customEventObject of this.customEventObjectArray) {
stream.on(customEventObject.eventName, customEventObject.eventFunction);
}
if (!firstIteration) {
finalStream = finalStream.pipe(stream);
}
firstIteration = false;
}
finalStream.on('end', function () {
done.resolve()
})
finalStream.on('close', function () {
done.resolve()
})
finalStream.on('finish', function () {
done.resolve()
})
return done.promise
this.streamStartedDeferred.resolve();
finalStream.on('end', () => {
done.resolve();
});
finalStream.on('close', () => {
done.resolve();
});
finalStream.on('finish', () => {
done.resolve();
});
return done.promise;
}
}
export let cleanPipe = () => {
return plugins.through2.obj(
(file, enc, cb) => {
cb()
cb();
},
(cb) => {
cb()
cb => {
cb();
}
)
}
);
};

View File

@ -1,3 +1,7 @@
import 'typings-global'
export import q = require('smartq')
export import through2 = require('through2')
import * as smartpromise from '@pushrocks/smartpromise';
import * as through2 from 'through2';
export {
smartpromise,
through2
};