Files
udpak_semistruktur/udpak_semistruktur.py
2026-04-04 00:57:17 +02:00

137 lines
5.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sys
import argparse
from udpak_semistruktur.config import valider_yaml
from udpak_semistruktur.logger import opsaet_logging, hent_logger
from udpak_semistruktur import ddl
from udpak_semistruktur.extract.reader import læs_filer
from udpak_semistruktur.extract.extractor import generer_datafil
from udpak_semistruktur.transform.clean import rens, tag_strip, fjern_linjeskift, upper_lower, filename
from udpak_semistruktur.transform.reshape import flatten, join, where, id_felt, sammensat_noegle
from udpak_semistruktur.transform.convert import konverter
from udpak_semistruktur.transform.hash import beregn_hash
from udpak_semistruktur.load.file_writer import generer_filer_med_overskrifter, skriv_fil_med_retry
from udpak_semistruktur.load.db_writer import get_ase_connection_windows, insert_rows_ase
from udpak_semistruktur.db import læs_json_fil
from udpak_semistruktur.utils import generer_filnavn, load_env_file
logger = hent_logger(__name__)
def _byg_argument_parser() -> argparse.ArgumentParser:
"""Bygger og returnerer CLI argument-parseren."""
parser = argparse.ArgumentParser(
description="Udtræk og transformation af semistrukturerede data (JSON/XML)."
)
parser.add_argument("--config", required=True, help="Sti til YAML-konfigurationsfil")
# Tilføj DDL-flags via ddl-modulet
ddl.add_cli_args(parser)
return parser
def _kør_udtræk(config: dict, global_config: dict) -> None:
"""Kører den normale udtræks- og transformationspipeline."""
print(f"DEBUG: Antal output_filer = {len(config.get('output_filer', []))}")
input_fil = global_config.get("input_fil")
input_fil_liste = global_config.get("input_fil_liste")
# Opret DB-forbindelse hvis der er tabel-output
har_tabel_output = any(
cfg.get("type") == "tabel"
for cfg in config.get("output_filer", [])
)
conn = None
if har_tabel_output:
bruger, password, env, host, port = læs_json_fil(global_config)
conn = get_ase_connection_windows(
bruger, password, host, port,
global_config["database"]
)
for record in læs_filer(global_config, input_fil, input_fil_liste):
for cfg in config.get("output_filer", []):
# 1) Udtræk
tmp_data = generer_datafil(record, cfg, global_config)
# 2) Transform-pipeline
tmp_data = join(tmp_data, cfg)
tmp_data = flatten(tmp_data, cfg)
tmp_data = rens(tmp_data, cfg, global_config)
tmp_data = tag_strip(tmp_data, cfg, global_config)
tmp_data = fjern_linjeskift(tmp_data, cfg, global_config)
tmp_data = where(tmp_data, cfg, global_config)
tmp_data = konverter(tmp_data, cfg, global_config)
tmp_data = upper_lower(tmp_data, cfg, global_config)
tmp_data = id_felt(tmp_data, cfg)
tmp_data = sammensat_noegle(tmp_data, cfg, global_config)
tmp_data = beregn_hash(tmp_data, cfg, global_config)
tmp_data = filename(tmp_data, cfg, global_config)
# 3) Load afhænger af cfg["type"]
if cfg.get("type") == "fil":
fil_navn = generer_filnavn(cfg["fil_navn"], global_config)
output_sti = global_config["output_path"] + fil_navn
overskrifter = cfg.get("overskrifter", True)
generer_filer_med_overskrifter(overskrifter, output_sti, cfg["kolonner"], global_config)
separator = global_config["separator"]
encoding = global_config["encoding"]
# Bemærk: skriv() lukker over loop-variabler kaldes straks af skriv_fil_med_retry
def skriv():
with open(output_sti, "a", encoding=encoding) as f:
for række in tmp_data["rækker"]:
linje = separator.join(
str(række.get(k["navn"], "")) for k in cfg["kolonner"]
)
f.write(linje + "\n")
skriv_fil_med_retry(skriv, output_sti)
logger.info(f"Fil: {output_sti} skrevet ({len(tmp_data['rækker'])} rækker)")
elif cfg.get("type") == "tabel":
kolonner = [k["navn"] for k in cfg["kolonner"]]
indsatte, fejlede = insert_rows_ase(conn, cfg["tabel_navn"], kolonner, tmp_data["rækker"])
logger.info(f"DB: {indsatte} rækker indsat i {cfg['tabel_navn']}")
if fejlede:
logger.warning(f"DB: {len(fejlede)} rækker fejlede i {cfg['tabel_navn']}")
if conn is not None:
conn.close()
logger.debug("DB-forbindelse lukket.")
def main():
"""Hovedfunktion der eksekveres ved kørsel af scriptet."""
# Indlæs lokal .env under udvikling springes over i kompileret .exe
if not getattr(sys, "frozen", False):
load_env_file("testenv.env")
parser = _byg_argument_parser()
args = parser.parse_args()
# Valider og indlæs YAML-konfiguration
config = valider_yaml(args.config)
global_config = config["config"]
# Opsæt logging baseret på argumenter
opsaet_logging(
log_fil=global_config["logfil"],
niveau=global_config.get("log_niveau", "info"),
log_output=global_config.get("log_output", "begge"),
)
# Eksekver DDL-flowet
print(f"MAIN DEBUG: output_filer count = {len(config.get('output_filer', []))}", flush=True)
for i, cfg in enumerate(config.get("output_filer", [])):
print(f"MAIN DEBUG [{i}]: rod={cfg.get('rod')}, fil={cfg.get('fil_navn')}, type={cfg.get('type')}", flush=True)
if ddl.is_enabled(args):
ddl.run_ddl_mode(args, config, global_config)
else:
_kør_udtræk(config, global_config)
if __name__ == "__main__":
main()