import * as plugins from './plugins.js'; import * as paths from './paths.js'; import type { OpenData } from './classes.main.opendata.js'; export class GermanBusinessData { public openDataRef: OpenData; constructor(openDataRefArg: OpenData) { this.openDataRef = openDataRefArg; } public async start() { await this.update(); } public async stop() {} public async update() { const done = plugins.smartpromise.defer(); const promiseArray: Promise[] = []; const dataUrl = 'https://daten.offeneregister.de/de_companies_ocdata.jsonl.bz2'; const dataExists = await plugins.smartfile.fs.isDirectory(paths.germanBusinessDataDir); if (!dataExists) { await plugins.smartfile.fs.ensureDir(paths.germanBusinessDataDir); } else { } const smartarchive = await plugins.smartarchive.SmartArchive.fromArchiveUrl(dataUrl); promiseArray .push // smartarchive.exportToFs(paths.germanBusinessDataDir, 'de_companies_ocdata.jsonl') (); const jsonlDataStream = await smartarchive.exportToStreamOfStreamFiles(); let totalRecordsCounter = 0; let nextRest: string = ''; jsonlDataStream.pipe( new plugins.smartstream.SmartDuplex({ objectMode: true, writeFunction: async (chunkArg: plugins.smartfile.StreamFile, streamToolsArg) => { const readStream = await chunkArg.createReadStream(); readStream.pipe( new plugins.smartstream.SmartDuplex({ objectMode: true, writeFunction: async (chunkArg: Buffer, streamToolsArg) => { const currentString = nextRest + chunkArg.toString(); const lines = currentString.split('\n'); nextRest = lines.pop(); console.log(`Got another ${lines.length} records.`); for (const line of lines) { let entry: any; if (!line) continue; try { entry = JSON.parse(line); } catch (err) { console.log(line); await plugins.smartdelay.delayFor(10000); } if (!entry) continue; totalRecordsCounter++; if (totalRecordsCounter % 10000 === 0) console.log(`${totalRecordsCounter} total records.`); const businessRecord = new this.openDataRef.CBusinessRecord(); businessRecord.data.name = entry.name; await businessRecord.save(); // console.log(`stored ${businessRecord.data.name}`); } }, finalFunction: async (streamToolsArg) => { if (!nextRest) return; JSON.parse(nextRest); } }) ); }, }) ); } }