111 lines
3.6 KiB
TypeScript
111 lines
3.6 KiB
TypeScript
import * as plugins from './plugins.js';
|
|
import * as paths from './paths.js';
|
|
import type { OpenData } from './classes.main.opendata.js';
|
|
|
|
export type SeedEntryType = {
|
|
all_attributes: {
|
|
_registerArt: string;
|
|
_registerNummer: string;
|
|
additional_data: {
|
|
AD: boolean;
|
|
CD: boolean;
|
|
DK: boolean;
|
|
HD: boolean;
|
|
SI: boolean;
|
|
UT: boolean;
|
|
VÖ: boolean;
|
|
};
|
|
federal_state: string;
|
|
native_company_number: string;
|
|
registered_office: string;
|
|
registrar: string;
|
|
};
|
|
company_number: string;
|
|
current_status: string;
|
|
jurisdiction_code: string;
|
|
name: string;
|
|
officers: {
|
|
name: string;
|
|
other_attributes: {
|
|
city: string;
|
|
firstname: string;
|
|
flag: string;
|
|
lastname: string;
|
|
};
|
|
position: string;
|
|
start_date: string; // ISO 8601 date string
|
|
type: string;
|
|
}[];
|
|
registered_address: string;
|
|
retrieved_at: string; // ISO 8601 date string
|
|
};
|
|
|
|
export class JsonlDataProcessor<T> {
|
|
public forEachFunction: (entryArg: T) => Promise<void>;
|
|
constructor(forEachFunctionArg: typeof this.forEachFunction) {
|
|
this.forEachFunction = forEachFunctionArg;
|
|
}
|
|
|
|
// TODO: define a mapper as argument instead of hard-coding it
|
|
public async processDataFromUrl(
|
|
dataUrlArg = 'https://daten.offeneregister.de/de_companies_ocdata.jsonl.bz2'
|
|
) {
|
|
const done = plugins.smartpromise.defer();
|
|
const dataExists = await plugins.smartfile.fs.isDirectory(paths.germanBusinessDataDir);
|
|
if (!dataExists) {
|
|
await plugins.smartfile.fs.ensureDir(paths.germanBusinessDataDir);
|
|
} else {
|
|
}
|
|
|
|
const smartarchive = await plugins.smartarchive.SmartArchive.fromArchiveUrl(dataUrlArg);
|
|
const jsonlDataStream = await smartarchive.exportToStreamOfStreamFiles();
|
|
let totalRecordsCounter = 0;
|
|
let nextRest: string = '';
|
|
jsonlDataStream.pipe(
|
|
new plugins.smartstream.SmartDuplex({
|
|
objectMode: true,
|
|
writeFunction: async (chunkArg: plugins.smartfile.StreamFile, streamToolsArg) => {
|
|
const readStream = await chunkArg.createReadStream();
|
|
readStream.pipe(
|
|
new plugins.smartstream.SmartDuplex({
|
|
objectMode: true,
|
|
writeFunction: async (chunkArg: Buffer, streamToolsArg) => {
|
|
const currentString = nextRest + chunkArg.toString();
|
|
const lines = currentString.split('\n');
|
|
nextRest = lines.pop();
|
|
console.log(`Got another ${lines.length} records.`);
|
|
const concurrentProcessor = new plugins.smartarray.ConcurrentProcessor<string>(
|
|
async (line) => {
|
|
let entry: T;
|
|
if (!line) return;
|
|
try {
|
|
entry = JSON.parse(line);
|
|
} catch (err) {
|
|
console.log(line);
|
|
await plugins.smartdelay.delayFor(10000);
|
|
}
|
|
if (!entry) return;
|
|
totalRecordsCounter++;
|
|
if (totalRecordsCounter % 10000 === 0)
|
|
console.log(`${totalRecordsCounter} total records.`);
|
|
await this.forEachFunction(entry);
|
|
},
|
|
1000
|
|
);
|
|
await concurrentProcessor.process(lines);
|
|
},
|
|
finalFunction: async (streamToolsArg) => {
|
|
console.log(`finished processing ${totalRecordsCounter} records.`);
|
|
if (!nextRest) return;
|
|
JSON.parse(nextRest);
|
|
done.resolve();
|
|
},
|
|
})
|
|
);
|
|
},
|
|
})
|
|
);
|
|
await done.promise;
|
|
}
|
|
}
|