76 lines
2.8 KiB
TypeScript
76 lines
2.8 KiB
TypeScript
|
import * as plugins from './plugins.js';
|
||
|
import * as paths from './paths.js';
|
||
|
import type { OpenData } from './classes.main.opendata.js';
|
||
|
|
||
|
export class GermanBusinessData {
|
||
|
public openDataRef: OpenData;
|
||
|
constructor(openDataRefArg: OpenData) {
|
||
|
this.openDataRef = openDataRefArg;
|
||
|
}
|
||
|
|
||
|
public async start() {
|
||
|
await this.update();
|
||
|
}
|
||
|
public async stop() {}
|
||
|
|
||
|
public async update() {
|
||
|
const done = plugins.smartpromise.defer();
|
||
|
const promiseArray: Promise<any>[] = [];
|
||
|
const dataUrl = 'https://daten.offeneregister.de/de_companies_ocdata.jsonl.bz2';
|
||
|
const dataExists = await plugins.smartfile.fs.isDirectory(paths.germanBusinessDataDir);
|
||
|
if (!dataExists) {
|
||
|
await plugins.smartfile.fs.ensureDir(paths.germanBusinessDataDir);
|
||
|
} else {
|
||
|
}
|
||
|
|
||
|
const smartarchive = await plugins.smartarchive.SmartArchive.fromArchiveUrl(dataUrl);
|
||
|
promiseArray
|
||
|
.push
|
||
|
// smartarchive.exportToFs(paths.germanBusinessDataDir, 'de_companies_ocdata.jsonl')
|
||
|
();
|
||
|
const jsonlDataStream = await smartarchive.exportToStreamOfStreamFiles();
|
||
|
let totalRecordsCounter = 0;
|
||
|
let nextRest: string = '';
|
||
|
jsonlDataStream.pipe(
|
||
|
new plugins.smartstream.SmartDuplex({
|
||
|
objectMode: true,
|
||
|
writeFunction: async (chunkArg: plugins.smartfile.StreamFile, streamToolsArg) => {
|
||
|
const readStream = await chunkArg.createReadStream();
|
||
|
readStream.pipe(
|
||
|
new plugins.smartstream.SmartDuplex({
|
||
|
objectMode: true,
|
||
|
writeFunction: async (chunkArg: Buffer, streamToolsArg) => {
|
||
|
const currentString = nextRest + chunkArg.toString();
|
||
|
const lines = currentString.split('\n');
|
||
|
nextRest = lines.pop();
|
||
|
console.log(`Got another ${lines.length} records.`);
|
||
|
for (const line of lines) {
|
||
|
let entry: any;
|
||
|
if (!line) continue;
|
||
|
try {
|
||
|
entry = JSON.parse(line);
|
||
|
} catch (err) {
|
||
|
console.log(line);
|
||
|
await plugins.smartdelay.delayFor(10000);
|
||
|
}
|
||
|
if (!entry) continue;
|
||
|
totalRecordsCounter++;
|
||
|
if (totalRecordsCounter % 10000 === 0) console.log(`${totalRecordsCounter} total records.`);
|
||
|
const businessRecord = new this.openDataRef.CBusinessRecord();
|
||
|
businessRecord.data.name = entry.name;
|
||
|
await businessRecord.save();
|
||
|
// console.log(`stored ${businessRecord.data.name}`);
|
||
|
}
|
||
|
},
|
||
|
finalFunction: async (streamToolsArg) => {
|
||
|
if (!nextRest) return;
|
||
|
JSON.parse(nextRest);
|
||
|
}
|
||
|
})
|
||
|
);
|
||
|
},
|
||
|
})
|
||
|
);
|
||
|
}
|
||
|
}
|