import * as plugins from '../plugins.js'; import { Bzip2Error, BZIP2_ERROR_CODES } from '../errors.js'; import type { IBitReader } from '../interfaces.js'; import { Bzip2 } from './bzip2.js'; import { bitIterator } from './bititerator.js'; /** * Creates a streaming BZIP2 decompression transform */ export function unbzip2Stream(): plugins.smartstream.SmartDuplex { const bzip2Instance = new Bzip2(); const bufferQueue: Buffer[] = []; let hasBytes = 0; let blockSize = 0; let broken = false; let bitReader: IBitReader | null = null; let streamCRC: number | null = null; function decompressBlock(): Buffer | undefined { if (!blockSize) { blockSize = bzip2Instance.header(bitReader!); streamCRC = 0; return undefined; } const bufsize = 100000 * blockSize; const buf = new Int32Array(bufsize); const chunk: number[] = []; const outputFunc = (b: number): void => { chunk.push(b); }; streamCRC = bzip2Instance.decompress(bitReader!, outputFunc, buf, bufsize, streamCRC); if (streamCRC === null) { // Reset for next bzip2 header blockSize = 0; return undefined; } return Buffer.from(chunk); } let outlength = 0; const decompressAndPush = async (): Promise => { if (broken) return undefined; try { const resultChunk = decompressBlock(); if (resultChunk) { outlength += resultChunk.length; } return resultChunk; } catch (e) { broken = true; if (e instanceof Error) { throw new Bzip2Error(`Decompression failed: ${e.message}`, BZIP2_ERROR_CODES.INVALID_BLOCK_DATA); } throw e; } }; return new plugins.smartstream.SmartDuplex({ objectMode: true, name: 'bzip2', highWaterMark: 1, writeFunction: async function (data, streamTools) { bufferQueue.push(data); hasBytes += data.length; if (bitReader === null) { bitReader = bitIterator(function () { return bufferQueue.shift()!; }); } const threshold = 25000 + 100000 * blockSize || 4; while (!broken && hasBytes - bitReader.bytesRead + 1 >= threshold) { const result = await decompressAndPush(); if (!result) { continue; } await streamTools.push(result); } return null; }, finalFunction: async function (streamTools) { while (!broken && bitReader && hasBytes > bitReader.bytesRead) { const result = await decompressAndPush(); if (!result) { continue; } await streamTools.push(result); } if (!broken && streamCRC !== null) { this.emit('error', new Bzip2Error('Input stream ended prematurely', BZIP2_ERROR_CODES.PREMATURE_END)); } return null; }, }); }