diff --git a/package.json b/package.json index 0b410b1..fdbad06 100644 --- a/package.json +++ b/package.json @@ -26,7 +26,7 @@ "@push.rocks/smartpromise": "^4.0.3", "@push.rocks/smartrequest": "^2.0.20", "@push.rocks/smartrx": "^3.0.7", - "@push.rocks/smartstream": "^3.0.7", + "@push.rocks/smartstream": "^3.0.11", "@push.rocks/smartunique": "^3.0.6", "@push.rocks/smarturl": "^3.0.7", "@types/tar-stream": "^3.1.2", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4dcd6e6..aecb422 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -21,8 +21,8 @@ dependencies: specifier: ^3.0.7 version: 3.0.7 '@push.rocks/smartstream': - specifier: ^3.0.7 - version: 3.0.7 + specifier: ^3.0.11 + version: 3.0.11 '@push.rocks/smartunique': specifier: ^3.0.6 version: 3.0.6 @@ -717,7 +717,7 @@ packages: '@push.rocks/smartpath': 5.0.11 '@push.rocks/smartpromise': 4.0.3 '@push.rocks/smartrequest': 2.0.20 - '@push.rocks/smartstream': 3.0.7 + '@push.rocks/smartstream': 3.0.11 '@types/fs-extra': 11.0.3 '@types/glob': 8.1.0 '@types/js-yaml': 4.0.8 @@ -934,8 +934,8 @@ packages: through2: 4.0.2 dev: true - /@push.rocks/smartstream@3.0.7: - resolution: {integrity: sha512-F4HsYlMJusa7uf18aIXGuuAdlPxKaIcr7UDMLg4QUCtGK114SVt6E+72bXtN6yPyZ40+x8BVUWUkkTOdw22BeA==} + /@push.rocks/smartstream@3.0.11: + resolution: {integrity: sha512-MrJGCXcUYliAZlE/ozGzCj6Udtg/2f4OfJCd/7We8tK2kS+YWl+TSvubz8KFbUqcl5dqHQro0txrmVcthd9gEQ==} dependencies: '@push.rocks/smartpromise': 4.0.3 '@push.rocks/smartrx': 3.0.7 diff --git a/test/test.ts b/test/test.ts index 52335b4..29d7c1e 100644 --- a/test/test.ts +++ b/test/test.ts @@ -35,7 +35,16 @@ tap.test('should extract existing files on disk', async () => { const testSmartarchive = await smartarchive.SmartArchive.fromArchiveUrl( 'https://verdaccio.lossless.digital/@pushrocks%2fwebsetup/-/websetup-2.0.14.tgz' ); - const streamfileStream = await testSmartarchive.exportToFs(testPaths.nogitDir); + await testSmartarchive.exportToFs(testPaths.nogitDir); }); +tap.test('should extract a b2zip', async () => { + const dataUrl = 'https://daten.offeneregister.de/de_companies_ocdata.jsonl.bz2'; + const testArchive = await smartarchive.SmartArchive.fromArchiveUrl(dataUrl); + await testArchive.exportToFs( + plugins.path.join(testPaths.nogitDir, 'de_companies_ocdata.jsonl'), + 'data.jsonl', + ); +}) + tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 14c22e2..7b7089e 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartarchive', - version: '4.0.2', + version: '4.0.3', description: 'work with archives' } diff --git a/ts/classes.archiveanalyzer.ts b/ts/classes.archiveanalyzer.ts index 71cba63..31e88ca 100644 --- a/ts/classes.archiveanalyzer.ts +++ b/ts/classes.archiveanalyzer.ts @@ -53,13 +53,13 @@ export class ArchiveAnalyzer { readableObjectMode: true, writeAndTransformFunction: async (chunkArg: Buffer, streamtools) => { const fileType = await plugins.fileType.fileTypeFromBuffer(chunkArg); - const decompressionStream = this.getDecompressionStream(fileType.mime as any); + const decompressionStream = this.getDecompressionStream(fileType?.mime as any); resultStream.push(chunkArg); if (firstRun) { firstRun = false; const result: IAnalyzedResult = { fileType, - isArchive: await this.mimeTypeIsArchive(fileType.mime), + isArchive: await this.mimeTypeIsArchive(fileType?.mime), resultStream, decompressionStream, }; @@ -68,6 +68,10 @@ export class ArchiveAnalyzer { return null; } }, + finalFunction: async (tools) => { + resultStream.push(null); + return null; + } }); return analyzerstream; } diff --git a/ts/classes.smartarchive.ts b/ts/classes.smartarchive.ts index f702112..adb6caf 100644 --- a/ts/classes.smartarchive.ts +++ b/ts/classes.smartarchive.ts @@ -74,7 +74,7 @@ export class SmartArchive { // return archiveStream; } - public async exportToFs(targetDir: string): Promise { + public async exportToFs(targetDir: string, fileNameArg?: string): Promise { const done = plugins.smartpromise.defer(); const streamFileStream = await this.exportToStreamOfStreamFiles(); streamFileStream.pipe(new plugins.smartstream.SmartDuplex({ @@ -83,14 +83,15 @@ export class SmartArchive { console.log(chunkArg.relativeFilePath); const streamFile = chunkArg; const readStream = await streamFile.createReadStream(); - const writePath = plugins.path.join(targetDir + streamFile.relativeFilePath); - const dir = plugins.path.parse(writePath).dir; - await plugins.smartfile.fs.ensureDir(plugins.path.dirname(dir)); + await plugins.smartfile.fs.ensureDir(targetDir); + const writePath = plugins.path.join(targetDir, (streamFile.relativeFilePath || fileNameArg)); + await plugins.smartfile.fs.ensureDir(plugins.path.dirname(writePath)); const writeStream = plugins.smartfile.fsStream.createWriteStream(writePath); - readStream.pipe(writeStream).end(() => { - done.resolve(); - }); + readStream.pipe(writeStream); }, + finalFunction: async () => { + done.resolve(); + } })); return done.promise; } @@ -106,7 +107,7 @@ export class SmartArchive { const createUnpackStream = () => plugins.smartstream.createTransformFunction( async (analyzedResultChunk) => { - if (analyzedResultChunk.fileType.mime === 'application/x-tar') { + if (analyzedResultChunk.fileType?.mime === 'application/x-tar') { const tarStream = analyzedResultChunk.decompressionStream as plugins.tarStream.Extract; tarStream.on( 'entry', @@ -116,11 +117,13 @@ export class SmartArchive { stream.on('end', function () { next(); // ready for next entry }); + stream.resume(); // just auto drain the stream } ); tarStream.on('finish', function () { + console.log('finished'); streamFileIntake.signalEnd(); - }) + }); analyzedResultChunk.resultStream.pipe(analyzedResultChunk.decompressionStream); } else if (analyzedResultChunk.isArchive && analyzedResultChunk.decompressionStream) { analyzedResultChunk.resultStream @@ -130,11 +133,11 @@ export class SmartArchive { } else { const streamFile = plugins.smartfile.StreamFile.fromStream( analyzedResultChunk.resultStream, - analyzedResultChunk.fileType.ext + analyzedResultChunk.fileType?.ext ); streamFileIntake.push(streamFile); } - } + }, ); archiveStream.pipe(createAnalyzedStream()).pipe(createUnpackStream());