import * as plugins from './plugins.js'; import * as paths from './paths.js'; import type { OpenData } from './classes.main.opendata.js'; export class JsonlDataProcessor { public openDataRef: OpenData; constructor(openDataRefArg: OpenData) { this.openDataRef = openDataRefArg; } // TODO: define a mapper as argument instead of hard-coding it public async processDataFromUrl(dataUrlArg = 'https://daten.offeneregister.de/de_companies_ocdata.jsonl.bz2') { const done = plugins.smartpromise.defer(); const promiseArray: Promise[] = []; const dataExists = await plugins.smartfile.fs.isDirectory(paths.germanBusinessDataDir); if (!dataExists) { await plugins.smartfile.fs.ensureDir(paths.germanBusinessDataDir); } else { } const smartarchive = await plugins.smartarchive.SmartArchive.fromArchiveUrl(dataUrlArg); promiseArray .push // smartarchive.exportToFs(paths.germanBusinessDataDir, 'de_companies_ocdata.jsonl') (); const jsonlDataStream = await smartarchive.exportToStreamOfStreamFiles(); let totalRecordsCounter = 0; let nextRest: string = ''; jsonlDataStream.pipe( new plugins.smartstream.SmartDuplex({ objectMode: true, writeFunction: async (chunkArg: plugins.smartfile.StreamFile, streamToolsArg) => { const readStream = await chunkArg.createReadStream(); readStream.pipe( new plugins.smartstream.SmartDuplex({ objectMode: true, writeFunction: async (chunkArg: Buffer, streamToolsArg) => { const currentString = nextRest + chunkArg.toString(); const lines = currentString.split('\n'); nextRest = lines.pop(); console.log(`Got another ${lines.length} records.`); for (const line of lines) { let entry: any; if (!line) continue; try { entry = JSON.parse(line); console.log(JSON.stringify(entry, null, 2)); process.exit(0); } catch (err) { console.log(line); await plugins.smartdelay.delayFor(10000); } if (!entry) continue; totalRecordsCounter++; if (totalRecordsCounter % 10000 === 0) console.log(`${totalRecordsCounter} total records.`); const businessRecord = new this.openDataRef.CBusinessRecord(); businessRecord.id = await this.openDataRef.CBusinessRecord.getNewId(); businessRecord.data.name = entry.name; await businessRecord.save(); } }, finalFunction: async (streamToolsArg) => { console.log(`finished processing ${totalRecordsCounter} records.`); if (!nextRest) return; JSON.parse(nextRest); } }) ); }, }) ); } public async getBusinessRecordByName(nameArg: string) { const businessRecord = await this.openDataRef.CBusinessRecord.getInstance({ data: { name: { $regex: `${nameArg}`, $options: "i" } as any, } }); return businessRecord; } }