import * as plugins from '../plugins.js'; import { Bzip2 } from './bzip2.js'; import { bitIterator } from './bititerator.js'; export function unbzip2Stream() { const bzip2Instance = new Bzip2(); var bufferQueue = []; var hasBytes = 0; var blockSize = 0; var broken = false; var done = false; var bitReader = null; var streamCRC = null; function decompressBlock() { if (!blockSize) { blockSize = bzip2Instance.header(bitReader); streamCRC = 0; } else { var bufsize = 100000 * blockSize; var buf = new Int32Array(bufsize); var chunk = []; var f = function (b) { chunk.push(b); }; streamCRC = bzip2Instance.decompress(bitReader, f, buf, bufsize, streamCRC); if (streamCRC === null) { // reset for next bzip2 header blockSize = 0; return; } else { return Buffer.from(chunk); } } } var outlength = 0; const decompressAndPush = async () => { if (broken) return; try { const resultChunk = decompressBlock(); if (resultChunk) { outlength += resultChunk.length; } return resultChunk; } catch (e) { console.error(e); broken = true; } }; let counter = 0; return new plugins.smartstream.SmartDuplex({ objectMode: true, name: 'bzip2', debug: false, highWaterMark: 1, writeFunction: async function (data, streamTools) { // console.log(`got chunk ${counter++}`) bufferQueue.push(data); hasBytes += data.length; if (bitReader === null) { bitReader = bitIterator(function () { return bufferQueue.shift(); }); } while (!broken && hasBytes - bitReader.bytesRead + 1 >= (25000 + 100000 * blockSize || 4)) { //console.error('decompressing with', hasBytes - bitReader.bytesRead + 1, 'bytes in buffer'); const result = await decompressAndPush(); if (!result) { continue; } // console.log(result.toString()); await streamTools.push(result); } }, finalFunction: async function (streamTools) { //console.error(x,'last compressing with', hasBytes, 'bytes in buffer'); while (!broken && bitReader && hasBytes > bitReader.bytesRead) { const result = await decompressAndPush(); if (!result) { continue; } await streamTools.push(result); } if (!broken) { if (streamCRC !== null) this.emit('error', new Error('input stream ended prematurely')); } }, }); }