feat(tartools): add streaming TAR support (tar-stream), Node.js streaming APIs for TarTools, and browser / web bundle docs
This commit is contained in:
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartarchive',
|
||||
version: '5.1.0',
|
||||
version: '5.2.0',
|
||||
description: 'A library for working with archive files, providing utilities for compressing and decompressing data.'
|
||||
}
|
||||
|
||||
@@ -380,36 +380,41 @@ export class SmartArchive {
|
||||
plugins.smartstream.createTransformFunction<IAnalyzedResult, void>(
|
||||
async (analyzedResultChunk) => {
|
||||
if (analyzedResultChunk.fileType?.mime === 'application/x-tar') {
|
||||
// Use modern-tar for TAR extraction
|
||||
const chunks: Buffer[] = [];
|
||||
// Use tar-stream for streaming TAR extraction
|
||||
// Buffer each entry to ensure tar-stream can proceed to next entry
|
||||
const extract = this.tarTools.getExtractStream();
|
||||
|
||||
analyzedResultChunk.resultStream.on('data', (chunk: Buffer) => {
|
||||
chunks.push(chunk);
|
||||
});
|
||||
|
||||
analyzedResultChunk.resultStream.on('end', async () => {
|
||||
try {
|
||||
const tarBuffer = Buffer.concat(chunks);
|
||||
const entries = await this.tarTools.extractTar(new Uint8Array(tarBuffer));
|
||||
|
||||
for (const entry of entries) {
|
||||
if (entry.isDirectory) continue;
|
||||
|
||||
const streamFile = plugins.smartfile.StreamFile.fromBuffer(
|
||||
Buffer.from(entry.content)
|
||||
);
|
||||
streamFile.relativeFilePath = entry.path;
|
||||
streamFileIntake.push(streamFile);
|
||||
}
|
||||
safeSignalEnd();
|
||||
} catch (err) {
|
||||
streamFileIntake.emit('error', err);
|
||||
extract.on('entry', (header, stream, next) => {
|
||||
if (header.type === 'directory') {
|
||||
stream.resume(); // Drain the stream
|
||||
next();
|
||||
return;
|
||||
}
|
||||
|
||||
// Buffer the entry content to avoid blocking tar-stream
|
||||
const chunks: Buffer[] = [];
|
||||
stream.on('data', (chunk: Buffer) => chunks.push(chunk));
|
||||
stream.on('end', () => {
|
||||
const content = Buffer.concat(chunks);
|
||||
const streamFile = plugins.smartfile.StreamFile.fromBuffer(content);
|
||||
streamFile.relativeFilePath = header.name;
|
||||
streamFileIntake.push(streamFile);
|
||||
next();
|
||||
});
|
||||
stream.on('error', (err: Error) => {
|
||||
streamFileIntake.emit('error', err);
|
||||
});
|
||||
});
|
||||
|
||||
analyzedResultChunk.resultStream.on('error', (err: Error) => {
|
||||
extract.on('finish', () => {
|
||||
safeSignalEnd();
|
||||
});
|
||||
|
||||
extract.on('error', (err: Error) => {
|
||||
streamFileIntake.emit('error', err);
|
||||
});
|
||||
|
||||
analyzedResultChunk.resultStream.pipe(extract);
|
||||
} else if (analyzedResultChunk.fileType?.mime === 'application/zip') {
|
||||
analyzedResultChunk.resultStream
|
||||
.pipe(analyzedResultChunk.decompressionStream)
|
||||
|
||||
@@ -1,14 +1,230 @@
|
||||
import * as plugins from './plugins.js';
|
||||
import type { IArchiveEntry, TCompressionLevel } from '../ts_shared/interfaces.js';
|
||||
import { TarTools as SharedTarTools } from '../ts_shared/classes.tartools.js';
|
||||
import { GzipTools } from '../ts_shared/classes.gziptools.js';
|
||||
|
||||
/**
|
||||
* Extended TAR archive utilities with Node.js filesystem support
|
||||
* Options for adding a file to a TAR pack stream
|
||||
*/
|
||||
export interface ITarPackFileOptions {
|
||||
fileName: string;
|
||||
content: string | Buffer | Uint8Array | plugins.stream.Readable;
|
||||
size?: number;
|
||||
mode?: number;
|
||||
mtime?: Date;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extended TAR archive utilities with Node.js streaming support
|
||||
*
|
||||
* For small archives: Use inherited buffer-based methods (packFiles, extractTar, etc.)
|
||||
* For large archives: Use streaming methods (getPackStream, getExtractStream, etc.)
|
||||
*/
|
||||
export class TarTools extends SharedTarTools {
|
||||
// ============================================
|
||||
// STREAMING PACK METHODS (for large files)
|
||||
// ============================================
|
||||
|
||||
/**
|
||||
* Pack a directory into a TAR buffer (Node.js only)
|
||||
* Get a streaming TAR pack instance
|
||||
* Use this for packing large files without buffering everything in memory
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const pack = tarTools.getPackStream();
|
||||
*
|
||||
* await tarTools.addFileToPack(pack, { fileName: 'large.bin', content: readStream, size: fileSize });
|
||||
* await tarTools.addFileToPack(pack, { fileName: 'small.txt', content: 'Hello World' });
|
||||
*
|
||||
* pack.finalize();
|
||||
* pack.pipe(fs.createWriteStream('output.tar'));
|
||||
* ```
|
||||
*/
|
||||
public getPackStream(): plugins.tarStream.Pack {
|
||||
return plugins.tarStream.pack();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a file to a TAR pack stream
|
||||
* Supports strings, buffers, and readable streams
|
||||
*
|
||||
* @param pack - The pack stream from getPackStream()
|
||||
* @param options - File options including name, content, and optional metadata
|
||||
*/
|
||||
public async addFileToPack(
|
||||
pack: plugins.tarStream.Pack,
|
||||
options: ITarPackFileOptions
|
||||
): Promise<void> {
|
||||
const { fileName, content, mode = 0o644, mtime = new Date() } = options;
|
||||
|
||||
if (typeof content === 'string') {
|
||||
// String content - convert to buffer
|
||||
const buffer = Buffer.from(content, 'utf8');
|
||||
const entry = pack.entry({
|
||||
name: fileName,
|
||||
size: buffer.length,
|
||||
mode,
|
||||
mtime,
|
||||
});
|
||||
entry.write(buffer);
|
||||
entry.end();
|
||||
} else if (Buffer.isBuffer(content) || content instanceof Uint8Array) {
|
||||
// Buffer content
|
||||
const buffer = Buffer.isBuffer(content) ? content : Buffer.from(content);
|
||||
const entry = pack.entry({
|
||||
name: fileName,
|
||||
size: buffer.length,
|
||||
mode,
|
||||
mtime,
|
||||
});
|
||||
entry.write(buffer);
|
||||
entry.end();
|
||||
} else if (content && typeof (content as any).pipe === 'function') {
|
||||
// Readable stream - requires size to be provided
|
||||
const size = options.size;
|
||||
if (size === undefined) {
|
||||
throw new Error('Size must be provided when adding a stream to TAR pack');
|
||||
}
|
||||
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const entry = pack.entry({
|
||||
name: fileName,
|
||||
size,
|
||||
mode,
|
||||
mtime,
|
||||
}, (err) => {
|
||||
if (err) reject(err);
|
||||
else resolve();
|
||||
});
|
||||
|
||||
(content as plugins.stream.Readable).pipe(entry);
|
||||
});
|
||||
} else {
|
||||
throw new Error('Unsupported content type for TAR entry');
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================
|
||||
// STREAMING EXTRACT METHODS (for large files)
|
||||
// ============================================
|
||||
|
||||
/**
|
||||
* Get a streaming TAR extract instance
|
||||
* Use this for extracting large archives without buffering everything in memory
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const extract = tarTools.getExtractStream();
|
||||
*
|
||||
* extract.on('entry', (header, stream, next) => {
|
||||
* console.log(`Extracting: ${header.name}`);
|
||||
* stream.pipe(fs.createWriteStream(`./out/${header.name}`));
|
||||
* stream.on('end', next);
|
||||
* });
|
||||
*
|
||||
* fs.createReadStream('archive.tar').pipe(extract);
|
||||
* ```
|
||||
*/
|
||||
public getExtractStream(): plugins.tarStream.Extract {
|
||||
return plugins.tarStream.extract();
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract a TAR stream to a directory with true streaming (no buffering)
|
||||
*
|
||||
* @param sourceStream - The TAR archive stream
|
||||
* @param targetDir - Directory to extract files to
|
||||
*/
|
||||
public async extractToDirectory(
|
||||
sourceStream: plugins.stream.Readable,
|
||||
targetDir: string
|
||||
): Promise<void> {
|
||||
await plugins.fsPromises.mkdir(targetDir, { recursive: true });
|
||||
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const extract = this.getExtractStream();
|
||||
|
||||
extract.on('entry', async (header, stream, next) => {
|
||||
const filePath = plugins.path.join(targetDir, header.name);
|
||||
|
||||
if (header.type === 'directory') {
|
||||
await plugins.fsPromises.mkdir(filePath, { recursive: true });
|
||||
stream.resume(); // Drain the stream
|
||||
next();
|
||||
} else if (header.type === 'file') {
|
||||
await plugins.fsPromises.mkdir(plugins.path.dirname(filePath), { recursive: true });
|
||||
const writeStream = plugins.fs.createWriteStream(filePath);
|
||||
stream.pipe(writeStream);
|
||||
writeStream.on('finish', next);
|
||||
writeStream.on('error', reject);
|
||||
} else {
|
||||
stream.resume(); // Skip other types
|
||||
next();
|
||||
}
|
||||
});
|
||||
|
||||
extract.on('finish', resolve);
|
||||
extract.on('error', reject);
|
||||
|
||||
sourceStream.pipe(extract);
|
||||
});
|
||||
}
|
||||
|
||||
// ============================================
|
||||
// STREAMING DIRECTORY PACK (for large directories)
|
||||
// ============================================
|
||||
|
||||
/**
|
||||
* Pack a directory into a TAR stream with true streaming (no buffering)
|
||||
* Files are read and written one at a time, never loading everything into memory
|
||||
*/
|
||||
public async getDirectoryPackStream(directoryPath: string): Promise<plugins.tarStream.Pack> {
|
||||
const pack = this.getPackStream();
|
||||
const fileTree = await plugins.listFileTree(directoryPath, '**/*');
|
||||
|
||||
// Process files sequentially to avoid memory issues
|
||||
(async () => {
|
||||
for (const filePath of fileTree) {
|
||||
const absolutePath = plugins.path.join(directoryPath, filePath);
|
||||
const stat = await plugins.fsPromises.stat(absolutePath);
|
||||
|
||||
if (stat.isFile()) {
|
||||
const readStream = plugins.fs.createReadStream(absolutePath);
|
||||
await this.addFileToPack(pack, {
|
||||
fileName: filePath,
|
||||
content: readStream,
|
||||
size: stat.size,
|
||||
mode: stat.mode,
|
||||
mtime: stat.mtime,
|
||||
});
|
||||
}
|
||||
}
|
||||
pack.finalize();
|
||||
})().catch((err) => pack.destroy(err));
|
||||
|
||||
return pack;
|
||||
}
|
||||
|
||||
/**
|
||||
* Pack a directory into a TAR.GZ stream with true streaming
|
||||
* Uses Node.js zlib for streaming compression
|
||||
*/
|
||||
public async getDirectoryPackStreamGz(
|
||||
directoryPath: string,
|
||||
compressionLevel?: TCompressionLevel
|
||||
): Promise<plugins.stream.Readable> {
|
||||
const tarStream = await this.getDirectoryPackStream(directoryPath);
|
||||
const { createGzip } = await import('node:zlib');
|
||||
const gzip = createGzip({ level: compressionLevel ?? 6 });
|
||||
return tarStream.pipe(gzip);
|
||||
}
|
||||
|
||||
// ============================================
|
||||
// BUFFER-BASED METHODS (inherited + filesystem)
|
||||
// ============================================
|
||||
|
||||
/**
|
||||
* Pack a directory into a TAR buffer (loads all files into memory)
|
||||
* For large directories, use getDirectoryPackStream() instead
|
||||
*/
|
||||
public async packDirectory(directoryPath: string): Promise<Uint8Array> {
|
||||
const fileTree = await plugins.listFileTree(directoryPath, '**/*');
|
||||
@@ -16,36 +232,41 @@ export class TarTools extends SharedTarTools {
|
||||
|
||||
for (const filePath of fileTree) {
|
||||
const absolutePath = plugins.path.join(directoryPath, filePath);
|
||||
const content = await plugins.fsPromises.readFile(absolutePath);
|
||||
entries.push({
|
||||
archivePath: filePath,
|
||||
content: new Uint8Array(content),
|
||||
});
|
||||
const stat = await plugins.fsPromises.stat(absolutePath);
|
||||
|
||||
if (stat.isFile()) {
|
||||
const content = await plugins.fsPromises.readFile(absolutePath);
|
||||
entries.push({
|
||||
archivePath: filePath,
|
||||
content: new Uint8Array(content),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return this.packFiles(entries);
|
||||
}
|
||||
|
||||
/**
|
||||
* Pack a directory into a TAR.GZ buffer (Node.js only)
|
||||
* Pack a directory into a TAR.GZ buffer (loads all files into memory)
|
||||
* For large directories, use getDirectoryPackStreamGz() instead
|
||||
*/
|
||||
public async packDirectoryToTarGz(
|
||||
directoryPath: string,
|
||||
compressionLevel?: TCompressionLevel
|
||||
): Promise<Uint8Array> {
|
||||
const tarBuffer = await this.packDirectory(directoryPath);
|
||||
const gzipTools = new GzipTools();
|
||||
return gzipTools.compress(tarBuffer, compressionLevel);
|
||||
const { gzipSync } = await import('fflate');
|
||||
return gzipSync(new Uint8Array(tarBuffer), { level: compressionLevel ?? 6 });
|
||||
}
|
||||
|
||||
/**
|
||||
* Pack a directory into a TAR.GZ stream (Node.js only)
|
||||
* Pack a directory into a TAR.GZ stream
|
||||
* This is now a true streaming implementation
|
||||
*/
|
||||
public async packDirectoryToTarGzStream(
|
||||
directoryPath: string,
|
||||
compressionLevel?: TCompressionLevel
|
||||
): Promise<plugins.stream.Readable> {
|
||||
const buffer = await this.packDirectoryToTarGz(directoryPath, compressionLevel);
|
||||
return plugins.stream.Readable.from(buffer);
|
||||
return this.getDirectoryPackStreamGz(directoryPath, compressionLevel);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,5 +7,5 @@ export * from './classes.smartarchive.js';
|
||||
// Node.js-specific: Archive analysis with SmartArchive integration
|
||||
export * from './classes.archiveanalyzer.js';
|
||||
|
||||
// Node.js-specific: Extended TarTools with filesystem support (overrides shared TarTools)
|
||||
export { TarTools } from './classes.tartools.js';
|
||||
// Node.js-specific: Extended TarTools with streaming support (overrides shared TarTools)
|
||||
export { TarTools, type ITarPackFileOptions } from './classes.tartools.js';
|
||||
|
||||
@@ -47,3 +47,7 @@ export {
|
||||
smartrx,
|
||||
smarturl,
|
||||
};
|
||||
|
||||
// Node.js-specific: tar-stream for true streaming TAR support
|
||||
import * as tarStream from 'tar-stream';
|
||||
export { tarStream };
|
||||
|
||||
Reference in New Issue
Block a user