diff --git a/package.json b/package.json index 0b5723d..9504a03 100644 --- a/package.json +++ b/package.json @@ -26,7 +26,7 @@ "@push.rocks/smartpromise": "^4.0.3", "@push.rocks/smartrequest": "^2.0.21", "@push.rocks/smartrx": "^3.0.7", - "@push.rocks/smartstream": "^3.0.26", + "@push.rocks/smartstream": "^3.0.27", "@push.rocks/smartunique": "^3.0.6", "@push.rocks/smarturl": "^3.0.7", "@types/tar-stream": "^3.1.3", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3f8e130..a83a2bb 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -21,8 +21,8 @@ dependencies: specifier: ^3.0.7 version: 3.0.7 '@push.rocks/smartstream': - specifier: ^3.0.26 - version: 3.0.26 + specifier: ^3.0.27 + version: 3.0.27 '@push.rocks/smartunique': specifier: ^3.0.6 version: 3.0.6 @@ -747,7 +747,7 @@ packages: '@push.rocks/smartpath': 5.0.11 '@push.rocks/smartpromise': 4.0.3 '@push.rocks/smartrequest': 2.0.21 - '@push.rocks/smartstream': 3.0.26 + '@push.rocks/smartstream': 3.0.27 '@types/fs-extra': 11.0.3 '@types/glob': 8.1.0 '@types/js-yaml': 4.0.8 @@ -964,8 +964,8 @@ packages: through2: 4.0.2 dev: true - /@push.rocks/smartstream@3.0.26: - resolution: {integrity: sha512-SJXrKnOMQFFL4a/K/b6z+yR05klWZ2KjuS5HqL/HD0kU5Xi/+bfuKyuNdOV+/BWBRwK03HKH4qnf1kbqPleZXA==} + /@push.rocks/smartstream@3.0.27: + resolution: {integrity: sha512-ZIYYS/dVQab+BP2LIP4u4uUn1oqvRvb/vPUabirhyBJwF8VBYC3ssDp9P+oYcuZ3i+Od0G4g1GQuhtFWNv6hgw==} dependencies: '@push.rocks/lik': 6.0.12 '@push.rocks/smartpromise': 4.0.3 diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index f2b27cb..cd329f0 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartarchive', - version: '4.0.14', + version: '4.0.15', description: 'work with archives' } diff --git a/ts/bzip2/index.ts b/ts/bzip2/index.ts index 72bfc3c..156db90 100644 --- a/ts/bzip2/index.ts +++ b/ts/bzip2/index.ts @@ -49,13 +49,17 @@ export function unbzip2Stream() { } catch (e) { console.error(e); broken = true; - return false; } }; - + let counter = 0; return new plugins.smartstream.SmartDuplex({ + objectMode: true, + // writableObjectMode: true, + name: 'bzip2', + debug: true, + highWaterMark: 1, writeFunction: async function (data, streamTools) { - //console.error('received', data.length,'bytes in', typeof data); + console.log(`got chunk ${counter++}`) bufferQueue.push(data); hasBytes += data.length; if (bitReader === null) { @@ -66,6 +70,10 @@ export function unbzip2Stream() { while (!broken && hasBytes - bitReader.bytesRead + 1 >= (25000 + 100000 * blockSize || 4)) { //console.error('decompressing with', hasBytes - bitReader.bytesRead + 1, 'bytes in buffer'); const result = await decompressAndPush(); + if (!result) { + continue; + } + // console.log(result.toString()); await streamTools.push(result); } }, @@ -73,11 +81,10 @@ export function unbzip2Stream() { //console.error(x,'last compressing with', hasBytes, 'bytes in buffer'); while (!broken && bitReader && hasBytes > bitReader.bytesRead) { const result = await decompressAndPush(); - streamTools.push(result); + await streamTools.push(result); } if (!broken) { if (streamCRC !== null) this.emit('error', new Error('input stream ended prematurely')); - this.queue(null); } }, }); diff --git a/ts/classes.archiveanalyzer.ts b/ts/classes.archiveanalyzer.ts index 158f678..61a565b 100644 --- a/ts/classes.archiveanalyzer.ts +++ b/ts/classes.archiveanalyzer.ts @@ -42,13 +42,13 @@ export class ArchiveAnalyzer { return this.smartArchiveRef.tarTools.getDecompressionStream(); // replace with your own tar decompression stream default: // Handle unsupported formats or no decompression needed - return new plugins.smartstream.PassThrough(); + return plugins.smartstream.createPassThrough(); } } public getAnalyzedStream() { let firstRun = true; - const resultStream = new plugins.smartstream.PassThrough(); + const resultStream = plugins.smartstream.createPassThrough(); const analyzerstream = new plugins.smartstream.SmartDuplex({ readableObjectMode: true, writeFunction: async (chunkArg: Buffer, streamtools) => { @@ -63,8 +63,7 @@ export class ArchiveAnalyzer { resultStream, decompressionStream, }; - streamtools.push(result); - streamtools.push(null); + await streamtools.push(result); return null; } },