Files
smartregistry/ts/core/helpers.stream.ts

64 lines
1.8 KiB
TypeScript

import * as crypto from 'crypto';
/**
* Convert Buffer, Uint8Array, string, or JSON object to a ReadableStream<Uint8Array>.
*/
export function toReadableStream(data: Buffer | Uint8Array | string | object): ReadableStream<Uint8Array> {
const buf = Buffer.isBuffer(data)
? data
: data instanceof Uint8Array
? Buffer.from(data)
: typeof data === 'string'
? Buffer.from(data, 'utf-8')
: Buffer.from(JSON.stringify(data), 'utf-8');
return new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new Uint8Array(buf));
controller.close();
},
});
}
/**
* Consume a ReadableStream into a Buffer.
*/
export async function streamToBuffer(stream: ReadableStream<Uint8Array>): Promise<Buffer> {
const reader = stream.getReader();
const chunks: Uint8Array[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value) chunks.push(value);
}
return Buffer.concat(chunks);
}
/**
* Consume a ReadableStream into a parsed JSON object.
*/
export async function streamToJson<T = any>(stream: ReadableStream<Uint8Array>): Promise<T> {
const buf = await streamToBuffer(stream);
return JSON.parse(buf.toString('utf-8'));
}
/**
* Create a TransformStream that incrementally hashes data passing through.
* Data flows through unchanged; the digest is available after the stream completes.
*/
export function createHashTransform(algorithm: string = 'sha256'): {
transform: TransformStream<Uint8Array, Uint8Array>;
getDigest: () => string;
} {
const hash = crypto.createHash(algorithm);
const transform = new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
hash.update(chunk);
controller.enqueue(chunk);
},
});
return {
transform,
getDigest: () => hash.digest('hex'),
};
}