videre
This commit is contained in:
@@ -0,0 +1,126 @@
|
||||
import argparse
|
||||
|
||||
from udpak_semistruktur.config import valider_yaml
|
||||
from udpak_semistruktur.logger import opsaet_logging, hent_logger
|
||||
from udpak_semistruktur import ddl
|
||||
|
||||
from udpak_semistruktur.extract.reader import læs_filer
|
||||
from udpak_semistruktur.extract.extractor import generer_datafil
|
||||
from udpak_semistruktur.transform.clean import rens, tag_strip, fjern_linjeskift, upper_lower, filename
|
||||
from udpak_semistruktur.transform.reshape import flatten, join, where, id_felt, sammensat_noegle
|
||||
from udpak_semistruktur.transform.convert import konverter
|
||||
from udpak_semistruktur.transform.hash import beregn_hash
|
||||
from udpak_semistruktur.load.file_writer import generer_filer_med_overskrifter, skriv_fil_med_retry
|
||||
from udpak_semistruktur.load.db_writer import get_ase_connection_windows, insert_rows_ase
|
||||
from udpak_semistruktur.db import læs_json_fil
|
||||
from udpak_semistruktur.utils import generer_filnavn
|
||||
|
||||
logger = hent_logger(__name__)
|
||||
|
||||
def _byg_argument_parser() -> argparse.ArgumentParser:
|
||||
"""Bygger og returnerer CLI argument-parseren."""
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Udtræk og transformation af semistrukturerede data (JSON/XML)."
|
||||
)
|
||||
parser.add_argument("--config", required=True, help="Sti til YAML-konfigurationsfil")
|
||||
|
||||
# Tilføj DDL-flags via ddl-modulet
|
||||
ddl.add_cli_args(parser)
|
||||
return parser
|
||||
|
||||
def _kør_udtræk(config: dict, global_config: dict) -> None:
|
||||
"""Kører den normale udtræks- og transformationspipeline."""
|
||||
|
||||
input_fil = global_config.get("input_fil")
|
||||
input_fil_liste = global_config.get("input_fil_liste")
|
||||
|
||||
# Opret DB-forbindelse hvis der er tabel-output
|
||||
har_tabel_output = any(
|
||||
cfg.get("type") == "tabel"
|
||||
for cfg in config.get("output_filer", [])
|
||||
)
|
||||
conn = None
|
||||
if har_tabel_output:
|
||||
bruger, password, env, host, port = læs_json_fil(global_config)
|
||||
conn = get_ase_connection_windows(
|
||||
bruger, password, host, port,
|
||||
global_config["database"]
|
||||
)
|
||||
|
||||
for record in læs_filer(global_config, input_fil, input_fil_liste):
|
||||
for cfg in config.get("output_filer", []):
|
||||
# 1) Udtræk
|
||||
tmp_data = generer_datafil(record, cfg, global_config)
|
||||
|
||||
# 2) Transform-pipeline
|
||||
tmp_data = join(tmp_data, cfg)
|
||||
tmp_data = flatten(tmp_data, cfg)
|
||||
tmp_data = rens(tmp_data, cfg, global_config)
|
||||
tmp_data = tag_strip(tmp_data, cfg, global_config)
|
||||
tmp_data = fjern_linjeskift(tmp_data, cfg, global_config)
|
||||
tmp_data = where(tmp_data, cfg, global_config)
|
||||
tmp_data = konverter(tmp_data, cfg, global_config)
|
||||
tmp_data = upper_lower(tmp_data, cfg, global_config)
|
||||
tmp_data = id_felt(tmp_data, cfg)
|
||||
tmp_data = sammensat_noegle(tmp_data, cfg, global_config)
|
||||
tmp_data = beregn_hash(tmp_data, cfg, global_config)
|
||||
tmp_data = filename(tmp_data, cfg, global_config)
|
||||
|
||||
# 3) Load – afhænger af cfg["type"]
|
||||
if cfg.get("type") == "fil":
|
||||
fil_navn = generer_filnavn(cfg["fil_navn"], global_config)
|
||||
output_sti = global_config["output_path"] + fil_navn
|
||||
|
||||
overskrifter = cfg.get("overskrifter", True)
|
||||
generer_filer_med_overskrifter(overskrifter, output_sti, cfg["kolonner"], global_config)
|
||||
|
||||
separator = global_config["separator"]
|
||||
encoding = global_config["encoding"]
|
||||
|
||||
# Bemærk: skriv() lukker over loop-variabler – kaldes straks af skriv_fil_med_retry
|
||||
def skriv():
|
||||
with open(output_sti, "a", encoding=encoding) as f:
|
||||
for række in tmp_data["rækker"]:
|
||||
linje = separator.join(
|
||||
str(række.get(k["navn"], "")) for k in cfg["kolonner"]
|
||||
)
|
||||
f.write(linje + "\n")
|
||||
|
||||
skriv_fil_med_retry(skriv, output_sti)
|
||||
logger.info(f"Fil: {output_sti} skrevet ({len(tmp_data['rækker'])} rækker)")
|
||||
|
||||
elif cfg.get("type") == "tabel":
|
||||
kolonner = [k["navn"] for k in cfg["kolonner"]]
|
||||
indsatte, fejlede = insert_rows_ase(conn, cfg["tabel_navn"], kolonner, tmp_data["rækker"])
|
||||
logger.info(f"DB: {indsatte} rækker indsat i {cfg['tabel_navn']}")
|
||||
if fejlede:
|
||||
logger.warning(f"DB: {len(fejlede)} rækker fejlede i {cfg['tabel_navn']}")
|
||||
|
||||
if conn is not None:
|
||||
conn.close()
|
||||
logger.debug("DB-forbindelse lukket.")
|
||||
|
||||
def main():
|
||||
"""Hovedfunktion der eksekveres ved kørsel af scriptet."""
|
||||
parser = _byg_argument_parser()
|
||||
args = parser.parse_args()
|
||||
|
||||
# Valider og indlæs YAML-konfiguration
|
||||
config = valider_yaml(args.config)
|
||||
global_config = config["config"]
|
||||
|
||||
# Opsæt logging baseret på argumenter
|
||||
opsaet_logging(
|
||||
log_fil=global_config["logfil"],
|
||||
niveau=global_config.get("log_niveau", "info"),
|
||||
)
|
||||
|
||||
# Eksekver DDL-flowet
|
||||
if ddl.is_enabled(args):
|
||||
ddl.run_ddl_mode(args, config, global_config)
|
||||
else:
|
||||
_kør_udtræk(config, global_config)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user