import * as plugins from './plugins.js'; import * as paths from './paths.js'; import type { OpenData } from './classes.main.opendata.js'; export type SeedEntryType = { all_attributes: { _registerArt: string; _registerNummer: string; additional_data: { AD: boolean; CD: boolean; DK: boolean; HD: boolean; SI: boolean; UT: boolean; VĂ–: boolean; }; federal_state: string; native_company_number: string; registered_office: string; registrar: string; }; company_number: string; current_status: string; jurisdiction_code: string; name: string; officers: { name: string; other_attributes: { city: string; firstname: string; flag: string; lastname: string; }; position: string; start_date: string; // ISO 8601 date string type: string; }[]; registered_address: string; retrieved_at: string; // ISO 8601 date string }; export class JsonlDataProcessor { public forEachFunction: (entryArg: T) => Promise; constructor(forEachFunctionArg: typeof this.forEachFunction) { this.forEachFunction = forEachFunctionArg; } // TODO: define a mapper as argument instead of hard-coding it public async processDataFromUrl( dataUrlArg = 'https://daten.offeneregister.de/de_companies_ocdata.jsonl.bz2' ) { const done = plugins.smartpromise.defer(); const dataExists = await plugins.smartfile.fs.isDirectory(paths.germanBusinessDataDir); if (!dataExists) { await plugins.smartfile.fs.ensureDir(paths.germanBusinessDataDir); } else { } const smartarchive = await plugins.smartarchive.SmartArchive.fromArchiveUrl(dataUrlArg); const jsonlDataStream = await smartarchive.exportToStreamOfStreamFiles(); let totalRecordsCounter = 0; let nextRest: string = ''; jsonlDataStream.pipe( new plugins.smartstream.SmartDuplex({ objectMode: true, writeFunction: async (chunkArg: plugins.smartfile.StreamFile, streamToolsArg) => { const readStream = await chunkArg.createReadStream(); readStream.pipe( new plugins.smartstream.SmartDuplex({ objectMode: true, writeFunction: async (chunkArg: Buffer, streamToolsArg) => { const currentString = nextRest + chunkArg.toString(); const lines = currentString.split('\n'); nextRest = lines.pop(); console.log(`Got another ${lines.length} records.`); const concurrentProcessor = new plugins.smartarray.ConcurrentProcessor( async (line) => { let entry: T; if (!line) return; try { entry = JSON.parse(line); } catch (err) { console.log(line); await plugins.smartdelay.delayFor(10000); } if (!entry) return; totalRecordsCounter++; if (totalRecordsCounter % 10000 === 0) console.log(`${totalRecordsCounter} total records.`); await this.forEachFunction(entry); }, 1000 ); await concurrentProcessor.process(lines); }, finalFunction: async (streamToolsArg) => { console.log(`finished processing ${totalRecordsCounter} records.`); if (!nextRest) return; JSON.parse(nextRest); done.resolve(); }, }) ); }, }) ); await done.promise; } }