feat(laws,opendata): add local law storage and migrate OpenData persistence to smartdb-backed local storage
This commit is contained in:
@@ -53,14 +53,15 @@ export class JsonlDataProcessor<T> {
|
||||
dataUrlArg = 'https://daten.offeneregister.de/de_companies_ocdata.jsonl.bz2'
|
||||
) {
|
||||
const done = plugins.smartpromise.defer();
|
||||
const dataExists = await plugins.smartfile.fs.isDirectory(this.germanBusinessDataDir);
|
||||
const dataExists = await plugins.smartfs.directory(this.germanBusinessDataDir).exists();
|
||||
if (!dataExists) {
|
||||
await plugins.smartfile.fs.ensureDir(this.germanBusinessDataDir);
|
||||
await plugins.smartfs.directory(this.germanBusinessDataDir).create();
|
||||
} else {
|
||||
}
|
||||
|
||||
const smartarchive = await plugins.smartarchive.SmartArchive.fromArchiveUrl(dataUrlArg);
|
||||
const jsonlDataStream = await smartarchive.exportToStreamOfStreamFiles();
|
||||
const jsonlDataStream = await plugins.smartarchive.SmartArchive.create()
|
||||
.url(dataUrlArg)
|
||||
.toStreamFiles();
|
||||
let totalRecordsCounter = 0;
|
||||
let nextRest: string = '';
|
||||
jsonlDataStream.pipe(
|
||||
@@ -74,11 +75,11 @@ export class JsonlDataProcessor<T> {
|
||||
writeFunction: async (chunkArg: Buffer, streamToolsArg) => {
|
||||
const currentString = nextRest + chunkArg.toString();
|
||||
const lines = currentString.split('\n');
|
||||
nextRest = lines.pop();
|
||||
nextRest = lines.pop() ?? '';
|
||||
console.log(`Got another ${lines.length} records.`);
|
||||
const concurrentProcessor = new plugins.smartarray.ConcurrentProcessor<string>(
|
||||
async (line) => {
|
||||
let entry: T;
|
||||
let entry: T | undefined;
|
||||
if (!line) return;
|
||||
try {
|
||||
entry = JSON.parse(line);
|
||||
|
||||
Reference in New Issue
Block a user