opendata/ts/classes.jsonldata.ts

83 lines
3.2 KiB
TypeScript
Raw Normal View History

2023-11-14 16:15:11 +01:00
import * as plugins from './plugins.js';
import * as paths from './paths.js';
import type { OpenData } from './classes.main.opendata.js';
export class JsonlDataProcessor {
2023-11-14 16:15:11 +01:00
public openDataRef: OpenData;
constructor(openDataRefArg: OpenData) {
this.openDataRef = openDataRefArg;
}
// TODO: define a mapper as argument instead of hard-coding it
public async processDataFromUrl(dataUrlArg = 'https://daten.offeneregister.de/de_companies_ocdata.jsonl.bz2') {
2023-11-14 16:15:11 +01:00
const done = plugins.smartpromise.defer();
const promiseArray: Promise<any>[] = [];
const dataExists = await plugins.smartfile.fs.isDirectory(paths.germanBusinessDataDir);
if (!dataExists) {
await plugins.smartfile.fs.ensureDir(paths.germanBusinessDataDir);
} else {
}
const smartarchive = await plugins.smartarchive.SmartArchive.fromArchiveUrl(dataUrlArg);
2023-11-14 16:15:11 +01:00
promiseArray
.push
// smartarchive.exportToFs(paths.germanBusinessDataDir, 'de_companies_ocdata.jsonl')
();
const jsonlDataStream = await smartarchive.exportToStreamOfStreamFiles();
let totalRecordsCounter = 0;
let nextRest: string = '';
jsonlDataStream.pipe(
new plugins.smartstream.SmartDuplex({
objectMode: true,
writeFunction: async (chunkArg: plugins.smartfile.StreamFile, streamToolsArg) => {
const readStream = await chunkArg.createReadStream();
readStream.pipe(
new plugins.smartstream.SmartDuplex({
objectMode: true,
writeFunction: async (chunkArg: Buffer, streamToolsArg) => {
const currentString = nextRest + chunkArg.toString();
const lines = currentString.split('\n');
nextRest = lines.pop();
console.log(`Got another ${lines.length} records.`);
for (const line of lines) {
let entry: any;
if (!line) continue;
try {
entry = JSON.parse(line);
console.log(JSON.stringify(entry, null, 2));
process.exit(0);
2023-11-14 16:15:11 +01:00
} catch (err) {
console.log(line);
await plugins.smartdelay.delayFor(10000);
}
if (!entry) continue;
totalRecordsCounter++;
if (totalRecordsCounter % 10000 === 0) console.log(`${totalRecordsCounter} total records.`);
const businessRecord = new this.openDataRef.CBusinessRecord();
businessRecord.id = await this.openDataRef.CBusinessRecord.getNewId();
2023-11-14 16:15:11 +01:00
businessRecord.data.name = entry.name;
await businessRecord.save();
}
},
finalFunction: async (streamToolsArg) => {
console.log(`finished processing ${totalRecordsCounter} records.`);
2023-11-14 16:15:11 +01:00
if (!nextRest) return;
JSON.parse(nextRest);
}
})
);
},
})
);
}
public async getBusinessRecordByName(nameArg: string) {
const businessRecord = await this.openDataRef.CBusinessRecord.getInstance({
data: {
name: { $regex: `${nameArg}`, $options: "i" } as any,
}
});
return businessRecord;
}
2023-11-14 16:15:11 +01:00
}