import * as plugins from './plugins.js';
import * as paths from './paths.js';
import type { OpenData } from './classes.main.opendata.js';

export type SeedEntryType = {
  all_attributes: {
    _registerArt: string;
    _registerNummer: string;
    additional_data: {
      AD: boolean;
      CD: boolean;
      DK: boolean;
      HD: boolean;
      SI: boolean;
      UT: boolean;
      VĂ–: boolean;
    };
    federal_state: string;
    native_company_number: string;
    registered_office: string;
    registrar: string;
  };
  company_number: string;
  current_status: string;
  jurisdiction_code: string;
  name: string;
  officers: {
    name: string;
    other_attributes: {
      city: string;
      firstname: string;
      flag: string;
      lastname: string;
    };
    position: string;
    start_date: string; // ISO 8601 date string
    type: string;
  }[];
  registered_address: string;
  retrieved_at: string; // ISO 8601 date string
};

export class JsonlDataProcessor<T> {
  public forEachFunction: (entryArg: T) => Promise<void>;
  constructor(forEachFunctionArg: typeof this.forEachFunction) {
    this.forEachFunction = forEachFunctionArg;
  }

  // TODO: define a mapper as argument instead of hard-coding it
  public async processDataFromUrl(
    dataUrlArg = 'https://daten.offeneregister.de/de_companies_ocdata.jsonl.bz2'
  ) {
    const done = plugins.smartpromise.defer();
    const dataExists = await plugins.smartfile.fs.isDirectory(paths.germanBusinessDataDir);
    if (!dataExists) {
      await plugins.smartfile.fs.ensureDir(paths.germanBusinessDataDir);
    } else {
    }

    const smartarchive = await plugins.smartarchive.SmartArchive.fromArchiveUrl(dataUrlArg);
    const jsonlDataStream = await smartarchive.exportToStreamOfStreamFiles();
    let totalRecordsCounter = 0;
    let nextRest: string = '';
    jsonlDataStream.pipe(
      new plugins.smartstream.SmartDuplex({
        objectMode: true,
        writeFunction: async (chunkArg: plugins.smartfile.StreamFile, streamToolsArg) => {
          const readStream = await chunkArg.createReadStream();
          readStream.pipe(
            new plugins.smartstream.SmartDuplex({
              objectMode: true,
              writeFunction: async (chunkArg: Buffer, streamToolsArg) => {
                const currentString = nextRest + chunkArg.toString();
                const lines = currentString.split('\n');
                nextRest = lines.pop();
                console.log(`Got another ${lines.length} records.`);
                const concurrentProcessor = new plugins.smartarray.ConcurrentProcessor<string>(
                  async (line) => {
                    let entry: T;
                    if (!line) return;
                    try {
                      entry = JSON.parse(line);
                    } catch (err) {
                      console.log(line);
                      await plugins.smartdelay.delayFor(10000);
                    }
                    if (!entry) return;
                    totalRecordsCounter++;
                    if (totalRecordsCounter % 10000 === 0)
                      console.log(`${totalRecordsCounter} total records.`);
                    await this.forEachFunction(entry);
                  },
                  1000
                );
                await concurrentProcessor.process(lines);
              },
              finalFunction: async (streamToolsArg) => {
                console.log(`finished processing ${totalRecordsCounter} records.`);
                if (nextRest) {
                  JSON.parse(nextRest);
                };
                done.resolve();
              },
            })
          );
        },
      })
    );
    await done.promise;
  }
}