opendata/ts/classes.jsonldata.ts

111 lines
3.6 KiB
TypeScript
Raw Normal View History

2023-11-14 16:15:11 +01:00
import * as plugins from './plugins.js';
import * as paths from './paths.js';
import type { OpenData } from './classes.main.opendata.js';
export type SeedEntryType = {
all_attributes: {
_registerArt: string;
_registerNummer: string;
additional_data: {
AD: boolean;
CD: boolean;
DK: boolean;
HD: boolean;
SI: boolean;
UT: boolean;
: boolean;
};
federal_state: string;
native_company_number: string;
registered_office: string;
registrar: string;
};
company_number: string;
current_status: string;
jurisdiction_code: string;
name: string;
officers: {
name: string;
other_attributes: {
city: string;
firstname: string;
flag: string;
lastname: string;
};
position: string;
start_date: string; // ISO 8601 date string
type: string;
}[];
registered_address: string;
retrieved_at: string; // ISO 8601 date string
};
export class JsonlDataProcessor<T> {
public forEachFunction: (entryArg: T) => Promise<void>;
constructor(forEachFunctionArg: typeof this.forEachFunction) {
this.forEachFunction = forEachFunctionArg;
2023-11-14 16:15:11 +01:00
}
// TODO: define a mapper as argument instead of hard-coding it
public async processDataFromUrl(
dataUrlArg = 'https://daten.offeneregister.de/de_companies_ocdata.jsonl.bz2'
) {
2023-11-14 16:15:11 +01:00
const done = plugins.smartpromise.defer();
const dataExists = await plugins.smartfile.fs.isDirectory(paths.germanBusinessDataDir);
if (!dataExists) {
await plugins.smartfile.fs.ensureDir(paths.germanBusinessDataDir);
} else {
}
const smartarchive = await plugins.smartarchive.SmartArchive.fromArchiveUrl(dataUrlArg);
2023-11-14 16:15:11 +01:00
const jsonlDataStream = await smartarchive.exportToStreamOfStreamFiles();
let totalRecordsCounter = 0;
let nextRest: string = '';
jsonlDataStream.pipe(
new plugins.smartstream.SmartDuplex({
objectMode: true,
writeFunction: async (chunkArg: plugins.smartfile.StreamFile, streamToolsArg) => {
const readStream = await chunkArg.createReadStream();
readStream.pipe(
new plugins.smartstream.SmartDuplex({
objectMode: true,
writeFunction: async (chunkArg: Buffer, streamToolsArg) => {
const currentString = nextRest + chunkArg.toString();
const lines = currentString.split('\n');
nextRest = lines.pop();
console.log(`Got another ${lines.length} records.`);
const concurrentProcessor = new plugins.smartarray.ConcurrentProcessor<string>(
async (line) => {
let entry: T;
if (!line) return;
try {
entry = JSON.parse(line);
} catch (err) {
console.log(line);
await plugins.smartdelay.delayFor(10000);
}
if (!entry) return;
totalRecordsCounter++;
if (totalRecordsCounter % 10000 === 0)
console.log(`${totalRecordsCounter} total records.`);
await this.forEachFunction(entry);
},
1000
);
await concurrentProcessor.process(lines);
2023-11-14 16:15:11 +01:00
},
finalFunction: async (streamToolsArg) => {
console.log(`finished processing ${totalRecordsCounter} records.`);
2023-11-14 16:15:11 +01:00
if (!nextRest) return;
JSON.parse(nextRest);
done.resolve();
},
2023-11-14 16:15:11 +01:00
})
);
},
})
);
await done.promise;
}
2023-11-14 16:15:11 +01:00
}