fix(core): Fix issues with JSONL data processing and improve error handling in business record validation
This commit is contained in:
@@ -2,16 +2,55 @@ import * as plugins from './plugins.js';
|
||||
import * as paths from './paths.js';
|
||||
import type { OpenData } from './classes.main.opendata.js';
|
||||
|
||||
export class JsonlDataProcessor {
|
||||
public openDataRef: OpenData;
|
||||
constructor(openDataRefArg: OpenData) {
|
||||
this.openDataRef = openDataRefArg;
|
||||
export type SeedEntryType = {
|
||||
all_attributes: {
|
||||
_registerArt: string;
|
||||
_registerNummer: string;
|
||||
additional_data: {
|
||||
AD: boolean;
|
||||
CD: boolean;
|
||||
DK: boolean;
|
||||
HD: boolean;
|
||||
SI: boolean;
|
||||
UT: boolean;
|
||||
VÖ: boolean;
|
||||
};
|
||||
federal_state: string;
|
||||
native_company_number: string;
|
||||
registered_office: string;
|
||||
registrar: string;
|
||||
};
|
||||
company_number: string;
|
||||
current_status: string;
|
||||
jurisdiction_code: string;
|
||||
name: string;
|
||||
officers: {
|
||||
name: string;
|
||||
other_attributes: {
|
||||
city: string;
|
||||
firstname: string;
|
||||
flag: string;
|
||||
lastname: string;
|
||||
};
|
||||
position: string;
|
||||
start_date: string; // ISO 8601 date string
|
||||
type: string;
|
||||
}[];
|
||||
registered_address: string;
|
||||
retrieved_at: string; // ISO 8601 date string
|
||||
};
|
||||
|
||||
export class JsonlDataProcessor<T> {
|
||||
public forEachFunction: (entryArg: T) => Promise<void>;
|
||||
constructor(forEachFunctionArg: typeof this.forEachFunction) {
|
||||
this.forEachFunction = forEachFunctionArg;
|
||||
}
|
||||
|
||||
// TODO: define a mapper as argument instead of hard-coding it
|
||||
public async processDataFromUrl(dataUrlArg = 'https://daten.offeneregister.de/de_companies_ocdata.jsonl.bz2') {
|
||||
public async processDataFromUrl(
|
||||
dataUrlArg = 'https://daten.offeneregister.de/de_companies_ocdata.jsonl.bz2'
|
||||
) {
|
||||
const done = plugins.smartpromise.defer();
|
||||
const promiseArray: Promise<any>[] = [];
|
||||
const dataExists = await plugins.smartfile.fs.isDirectory(paths.germanBusinessDataDir);
|
||||
if (!dataExists) {
|
||||
await plugins.smartfile.fs.ensureDir(paths.germanBusinessDataDir);
|
||||
@@ -19,10 +58,6 @@ export class JsonlDataProcessor {
|
||||
}
|
||||
|
||||
const smartarchive = await plugins.smartarchive.SmartArchive.fromArchiveUrl(dataUrlArg);
|
||||
promiseArray
|
||||
.push
|
||||
// smartarchive.exportToFs(paths.germanBusinessDataDir, 'de_companies_ocdata.jsonl')
|
||||
();
|
||||
const jsonlDataStream = await smartarchive.exportToStreamOfStreamFiles();
|
||||
let totalRecordsCounter = 0;
|
||||
let nextRest: string = '';
|
||||
@@ -39,44 +74,37 @@ export class JsonlDataProcessor {
|
||||
const lines = currentString.split('\n');
|
||||
nextRest = lines.pop();
|
||||
console.log(`Got another ${lines.length} records.`);
|
||||
for (const line of lines) {
|
||||
let entry: any;
|
||||
if (!line) continue;
|
||||
try {
|
||||
entry = JSON.parse(line);
|
||||
console.log(JSON.stringify(entry, null, 2));
|
||||
process.exit(0);
|
||||
} catch (err) {
|
||||
console.log(line);
|
||||
await plugins.smartdelay.delayFor(10000);
|
||||
}
|
||||
if (!entry) continue;
|
||||
totalRecordsCounter++;
|
||||
if (totalRecordsCounter % 10000 === 0) console.log(`${totalRecordsCounter} total records.`);
|
||||
const businessRecord = new this.openDataRef.CBusinessRecord();
|
||||
businessRecord.id = await this.openDataRef.CBusinessRecord.getNewId();
|
||||
businessRecord.data.name = entry.name;
|
||||
await businessRecord.save();
|
||||
}
|
||||
const concurrentProcessor = new plugins.smartarray.ConcurrentProcessor<string>(
|
||||
async (line) => {
|
||||
let entry: T;
|
||||
if (!line) return;
|
||||
try {
|
||||
entry = JSON.parse(line);
|
||||
} catch (err) {
|
||||
console.log(line);
|
||||
await plugins.smartdelay.delayFor(10000);
|
||||
}
|
||||
if (!entry) return;
|
||||
totalRecordsCounter++;
|
||||
if (totalRecordsCounter % 10000 === 0)
|
||||
console.log(`${totalRecordsCounter} total records.`);
|
||||
await this.forEachFunction(entry);
|
||||
},
|
||||
1000
|
||||
);
|
||||
await concurrentProcessor.process(lines);
|
||||
},
|
||||
finalFunction: async (streamToolsArg) => {
|
||||
console.log(`finished processing ${totalRecordsCounter} records.`);
|
||||
if (!nextRest) return;
|
||||
JSON.parse(nextRest);
|
||||
}
|
||||
done.resolve();
|
||||
},
|
||||
})
|
||||
);
|
||||
},
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
public async getBusinessRecordByName(nameArg: string) {
|
||||
const businessRecord = await this.openDataRef.CBusinessRecord.getInstance({
|
||||
data: {
|
||||
name: { $regex: `${nameArg}`, $options: "i" } as any,
|
||||
}
|
||||
});
|
||||
return businessRecord;
|
||||
await done.promise;
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user