114 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
			
		
		
	
	
			114 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
import * as plugins from './plugins.js';
 | 
						|
import type { OpenData } from './classes.main.opendata.js';
 | 
						|
 | 
						|
export type SeedEntryType = {
 | 
						|
  all_attributes: {
 | 
						|
    _registerArt: string;
 | 
						|
    _registerNummer: string;
 | 
						|
    additional_data: {
 | 
						|
      AD: boolean;
 | 
						|
      CD: boolean;
 | 
						|
      DK: boolean;
 | 
						|
      HD: boolean;
 | 
						|
      SI: boolean;
 | 
						|
      UT: boolean;
 | 
						|
      VÖ: boolean;
 | 
						|
    };
 | 
						|
    federal_state: string;
 | 
						|
    native_company_number: string;
 | 
						|
    registered_office: string;
 | 
						|
    registrar: string;
 | 
						|
  };
 | 
						|
  company_number: string;
 | 
						|
  current_status: string;
 | 
						|
  jurisdiction_code: string;
 | 
						|
  name: string;
 | 
						|
  officers: {
 | 
						|
    name: string;
 | 
						|
    other_attributes: {
 | 
						|
      city: string;
 | 
						|
      firstname: string;
 | 
						|
      flag: string;
 | 
						|
      lastname: string;
 | 
						|
    };
 | 
						|
    position: string;
 | 
						|
    start_date: string; // ISO 8601 date string
 | 
						|
    type: string;
 | 
						|
  }[];
 | 
						|
  registered_address: string;
 | 
						|
  retrieved_at: string; // ISO 8601 date string
 | 
						|
};
 | 
						|
 | 
						|
export class JsonlDataProcessor<T> {
 | 
						|
  private germanBusinessDataDir: string;
 | 
						|
  public forEachFunction: (entryArg: T) => Promise<void>;
 | 
						|
 | 
						|
  constructor(germanBusinessDataDirArg: string, forEachFunctionArg: typeof this.forEachFunction) {
 | 
						|
    this.germanBusinessDataDir = germanBusinessDataDirArg;
 | 
						|
    this.forEachFunction = forEachFunctionArg;
 | 
						|
  }
 | 
						|
 | 
						|
  // TODO: define a mapper as argument instead of hard-coding it
 | 
						|
  public async processDataFromUrl(
 | 
						|
    dataUrlArg = 'https://daten.offeneregister.de/de_companies_ocdata.jsonl.bz2'
 | 
						|
  ) {
 | 
						|
    const done = plugins.smartpromise.defer();
 | 
						|
    const dataExists = await plugins.smartfile.fs.isDirectory(this.germanBusinessDataDir);
 | 
						|
    if (!dataExists) {
 | 
						|
      await plugins.smartfile.fs.ensureDir(this.germanBusinessDataDir);
 | 
						|
    } else {
 | 
						|
    }
 | 
						|
 | 
						|
    const smartarchive = await plugins.smartarchive.SmartArchive.fromArchiveUrl(dataUrlArg);
 | 
						|
    const jsonlDataStream = await smartarchive.exportToStreamOfStreamFiles();
 | 
						|
    let totalRecordsCounter = 0;
 | 
						|
    let nextRest: string = '';
 | 
						|
    jsonlDataStream.pipe(
 | 
						|
      new plugins.smartstream.SmartDuplex({
 | 
						|
        objectMode: true,
 | 
						|
        writeFunction: async (chunkArg: plugins.smartfile.StreamFile, streamToolsArg) => {
 | 
						|
          const readStream = await chunkArg.createReadStream();
 | 
						|
          readStream.pipe(
 | 
						|
            new plugins.smartstream.SmartDuplex({
 | 
						|
              objectMode: true,
 | 
						|
              writeFunction: async (chunkArg: Buffer, streamToolsArg) => {
 | 
						|
                const currentString = nextRest + chunkArg.toString();
 | 
						|
                const lines = currentString.split('\n');
 | 
						|
                nextRest = lines.pop();
 | 
						|
                console.log(`Got another ${lines.length} records.`);
 | 
						|
                const concurrentProcessor = new plugins.smartarray.ConcurrentProcessor<string>(
 | 
						|
                  async (line) => {
 | 
						|
                    let entry: T;
 | 
						|
                    if (!line) return;
 | 
						|
                    try {
 | 
						|
                      entry = JSON.parse(line);
 | 
						|
                    } catch (err) {
 | 
						|
                      console.log(line);
 | 
						|
                      await plugins.smartdelay.delayFor(10000);
 | 
						|
                    }
 | 
						|
                    if (!entry) return;
 | 
						|
                    totalRecordsCounter++;
 | 
						|
                    if (totalRecordsCounter % 10000 === 0)
 | 
						|
                      console.log(`${totalRecordsCounter} total records.`);
 | 
						|
                    await this.forEachFunction(entry);
 | 
						|
                  },
 | 
						|
                  1000
 | 
						|
                );
 | 
						|
                await concurrentProcessor.process(lines);
 | 
						|
              },
 | 
						|
              finalFunction: async (streamToolsArg) => {
 | 
						|
                console.log(`finished processing ${totalRecordsCounter} records.`);
 | 
						|
                if (nextRest) {
 | 
						|
                  JSON.parse(nextRest);
 | 
						|
                };
 | 
						|
                done.resolve();
 | 
						|
              },
 | 
						|
            })
 | 
						|
          );
 | 
						|
        },
 | 
						|
      })
 | 
						|
    );
 | 
						|
    await done.promise;
 | 
						|
  }
 | 
						|
}
 |