Files
smartarchive/ts/bzip2/index.ts

106 lines
2.8 KiB
TypeScript

import * as plugins from '../plugins.js';
import { Bzip2Error, BZIP2_ERROR_CODES } from '../errors.js';
import type { IBitReader } from '../interfaces.js';
import { Bzip2 } from './bzip2.js';
import { bitIterator } from './bititerator.js';
/**
* Creates a streaming BZIP2 decompression transform
*/
export function unbzip2Stream(): plugins.smartstream.SmartDuplex<Buffer, Buffer> {
const bzip2Instance = new Bzip2();
const bufferQueue: Buffer[] = [];
let hasBytes = 0;
let blockSize = 0;
let broken = false;
let bitReader: IBitReader | null = null;
let streamCRC: number | null = null;
function decompressBlock(): Buffer | undefined {
if (!blockSize) {
blockSize = bzip2Instance.header(bitReader!);
streamCRC = 0;
return undefined;
}
const bufsize = 100000 * blockSize;
const buf = new Int32Array(bufsize);
const chunk: number[] = [];
const outputFunc = (b: number): void => {
chunk.push(b);
};
streamCRC = bzip2Instance.decompress(bitReader!, outputFunc, buf, bufsize, streamCRC);
if (streamCRC === null) {
// Reset for next bzip2 header
blockSize = 0;
return undefined;
}
return Buffer.from(chunk);
}
let outlength = 0;
const decompressAndPush = async (): Promise<Buffer | undefined> => {
if (broken) return undefined;
try {
const resultChunk = decompressBlock();
if (resultChunk) {
outlength += resultChunk.length;
}
return resultChunk;
} catch (e) {
broken = true;
if (e instanceof Error) {
throw new Bzip2Error(`Decompression failed: ${e.message}`, BZIP2_ERROR_CODES.INVALID_BLOCK_DATA);
}
throw e;
}
};
return new plugins.smartstream.SmartDuplex<Buffer, Buffer>({
objectMode: true,
name: 'bzip2',
highWaterMark: 1,
writeFunction: async function (data, streamTools) {
bufferQueue.push(data);
hasBytes += data.length;
if (bitReader === null) {
bitReader = bitIterator(function () {
return bufferQueue.shift()!;
});
}
const threshold = 25000 + 100000 * blockSize || 4;
while (!broken && hasBytes - bitReader.bytesRead + 1 >= threshold) {
const result = await decompressAndPush();
if (!result) {
continue;
}
await streamTools.push(result);
}
return null;
},
finalFunction: async function (streamTools) {
while (!broken && bitReader && hasBytes > bitReader.bytesRead) {
const result = await decompressAndPush();
if (!result) {
continue;
}
await streamTools.push(result);
}
if (!broken && streamCRC !== null) {
this.emit('error', new Bzip2Error('Input stream ended prematurely', BZIP2_ERROR_CODES.PREMATURE_END));
}
return null;
},
});
}